
# Yelp Data Transformation - Overview


## Objective

The transformation process aims to:

- **Improve Data Quality**: Handle missing values, normalize formats, and ensure integrity.
- **Standardize Data**: Convert date formats, structure text fields, and normalize attributes.
- **Enhance Features**: Extract insights from review lengths and sentiment scores.
- **Optimize Performance**: Reduce storage costs and improve query speed.
- **Ensure Scalability**: Support distributed processing for large-scale datasets.
- **Enable Monitoring**: Implement logging to track errors and measure efficiency.
- **Use Delta Lake**: Convert Parquet to Delta for reliability, ACID transactions, and optimized queries.


## Parquet to Delta Conversion

### **Objective:**
Converting Parquet files to **Delta format** improves **query performance, reliability, and incremental processing.**

### **Steps:**

##### **1. Load Parquet Data**
```python
parquet_df = spark.read.parquet("/data/yelp/parquet/business")
```
##### **2. Write as Delta Table**
```python
parquet_df.write.format("delta").mode("overwrite").save("/data/yelp/delta/business")
```
##### **3. Register Delta Table**
```python
spark.sql("CREATE TABLE yelp_analytics.dim_business USING DELTA LOCATION '/data/yelp/delta/business'")
```

<a href="https://imgbb.com/"><img src="https://i.ibb.co/kdMhfbw/flow-transformation.png" alt="flow-transformation" border="0"></a><br /><a target='_blank' href='https://imgbb.com/'>transformation</a><br />
## **Handling Peak and Off-Peak Load Considerations**
```python
if load_condition == "peak":
    spark.conf.set("spark.sql.shuffle.partitions", "500")
    print(f"⚡ Peak Load for {table_name} ({file_size_mb:.2f} MB). Adjusting resources.")
else:
    spark.conf.set("spark.sql.shuffle.partitions", "100")
    print(f"🌙 Off-Peak Load for {table_name} ({file_size_mb:.2f} MB). Optimizing resources.")
```
**Explanation:**
- Dynamically **adjusts computing power** based on **file size**.
- **Peak load**: Uses **500 partitions** to process large files efficiently.
- **Off-peak**: Uses **100 partitions** to conserve resources.



## Init Funtion for Transformation in Pyspark

In [0]:
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import current_date, lit, col
from datetime import datetime

# Initialize Spark session
spark = SparkSession.builder.appName("IncrementalELT").getOrCreate()

# Base Parquet directory (update as needed)
parquet_base_path = "/data/yelp/parquet/"

# Function to get Parquet size (in MB)
def get_parquet_size(path):
    try:
        hadoop_conf = spark._jsc.hadoopConfiguration()
        path_obj = spark._jvm.org.apache.hadoop.fs.Path(path)
        fs = path_obj.getFileSystem(hadoop_conf)
        size_bytes = fs.getContentSummary(path_obj).getLength()
        return size_bytes / (1024 * 1024)  # Convert bytes to MB
    except Exception as e:
        print(f"⚠️ Warning: Unable to retrieve size for {path}. {e}")
        return 0

# Table-specific peak threshold (1GB per table)
PEAK_THRESHOLD_MB = 1000  

# Logging function to track ELT execution
def log_elt_process(table_name, status, exec_time, rows_affected=0, error_message=None, method_used="DataFrame", size="unknown"):
    new_log_id = spark.sql("SELECT COALESCE(MAX(log_id), 0) + 1 FROM config_db.elt_process_log").collect()[0][0]
    current_ts = datetime.now()
    log_sql = f"""
        INSERT INTO config_db.elt_process_log 
        (log_id, process_name, target_table, start_time, end_time, execution_time_seconds, size, rows_affected, method_used, status, error_message) 
        VALUES ({new_log_id}, 'Incremental Load', '{table_name}', '{current_ts}', '{current_ts}', {exec_time}, '{size}', {rows_affected}, '{method_used}', '{status}', {f'"{error_message}"' if error_message else 'NULL'})
    """
    print(f"""{status} {method_used} {exec_time}""")
    if status== "Failed": print(log_sql)
    spark.sql(log_sql)

def get_readable_size(size_bytes):
    """Convert bytes to a human-readable MB format."""
    return f"{size_bytes / (1024*1024):.2f} MB"


### **Business Dimension (`dim_business`)**
```python
source_df = (spark.read.parquet("/data/yelp/parquet/business")
                  .select("business_id", "name", "address", "city", "state", "postal_code", "stars", "review_count", "is_open")
                  .withColumn("effective_date", current_date()))
```
**Explanation:**
- Loads business details including location, rating, and status.
- Adds `effective_date` for historical tracking.


In [0]:

# ---------------------------
# Incremental Load for dim_business
# ---------------------------
try:
    table_name = "dim_business"
    start = datetime.now()
    source_path = parquet_base_path + "business"
    
    # Get Parquet file size for the table
    file_size_mb = get_parquet_size(source_path)
    
    # Determine load condition based on table size
    load_condition = "peak" if file_size_mb > PEAK_THRESHOLD_MB else "off-peak"

    # Adjust Spark settings dynamically
    if load_condition == "peak":
        spark.conf.set("spark.sql.shuffle.partitions", "500")
        print(f"⚡ Peak Load for {table_name} ({file_size_mb:.2f} MB). Adjusting resources.")
    else:
        spark.conf.set("spark.sql.shuffle.partitions", "100")
        print(f"🌙 Off-Peak Load for {table_name} ({file_size_mb:.2f} MB). Optimizing resources.")

    # Read Parquet data
    source_df = spark.read.parquet(source_path).selectExpr(
        "business_id", "name", "address", "city", "state", "postal_code", "latitude", "longitude",
        "is_open", "categories", "review_count", "stars", "current_date() as effective_date"
    )
    rows_count = source_df.count()

    # Merge with Delta Table
    target = DeltaTable.forName(spark, "yelp_analytics.dim_business")
    merge_cond = "target.business_id = source.business_id AND target.current_flag = true"
    update_cond = """target.name <> source.name OR target.address <> source.address OR
                      target.city <> source.city OR target.state <> source.state OR
                      target.postal_code <> source.postal_code OR target.latitude <> source.latitude OR
                      target.longitude <> source.longitude OR target.is_open <> source.is_open OR
                      target.categories <> source.categories OR target.review_count <> source.review_count OR
                      target.stars <> source.stars"""
    target.alias("target").merge(
        source_df.alias("source"), merge_cond
    ).whenMatchedUpdate(
        condition=update_cond,
        set={"expiry_date": "current_date()", "current_flag": "false"}
    ).whenNotMatchedInsert(
        values={
            "business_id": "source.business_id",
            "name": "source.name",
            "address": "source.address",
            "city": "source.city",
            "state": "source.state",
            "postal_code": "source.postal_code",
            "latitude": "source.latitude",
            "longitude": "source.longitude",
            "is_open": "source.is_open",
            "categories": "source.categories",
            "review_count": "source.review_count",
            "stars": "source.stars",
            "effective_date": "source.effective_date",
            "expiry_date": lit("9999-12-31").cast("date"),
            "current_flag": lit(True)
        }
    ).execute()

    exec_time = (datetime.now() - start).total_seconds()
    
    # Log with Peak/Off-Peak information
    log_elt_process(table_name, "Success", exec_time, rows_affected=rows_count, 
                    size=f"{file_size_mb:.2f} MB", method_used=f"DataFrame ({load_condition.capitalize()})")
except Exception as e:
    exec_time = (datetime.now() - start).total_seconds()
    log_elt_process(table_name, "Failed", exec_time, rows_affected=0, size="unknown", 
                    error_message=str(e), method_used=f"DataFrame ({load_condition.capitalize()})")


🌙 Off-Peak Load for dim_business (17.17 MB). Optimizing resources.

        INSERT INTO config_db.elt_process_log
        (log_id, process_name, target_table, start_time, end_time, execution_time_seconds, size, rows_affected, method_used, status, error_message)
        VALUES (14, 'Incremental Load', 'dim_business', '2025-02-24 21:42:12.056553', '2025-02-24 21:42:12.056553', 0, '17.17 MB', 150346, 'DataFrame (Off-peak)', 'Success', "20.747179")
    



### **User Dimension (`dim_user`)**
```python
source_df = (spark.read.parquet("/data/yelp/parquet/user")
                  .select("user_id", "name", "yelping_since", "review_count", "average_stars", "fans", "cool", "funny", "useful", "elite")
                  .withColumn("effective_date", current_date())
                  .withColumn("cool", col("cool").cast("integer"))
                  .withColumn("funny", col("funny").cast("integer"))
                  .withColumn("useful", col("useful").cast("integer")))
```
**Explanation:**
- Reads the `user` data from Parquet files.
- Selects only relevant attributes.
- Converts numerical attributes to proper types.
- Adds `effective_date` to track changes.


In [0]:
# ---------------------------
# Incremental Load for dim_user (SCD Type 2)
# ---------------------------
try:
    table_name = "dim_user"
    start = datetime.now()
    source_path = f"{parquet_base_path}user"

    # Determine Load Condition (Peak or Off-Peak)
    file_size_mb = get_parquet_size(source_path)
    load_condition = "peak" if file_size_mb > PEAK_THRESHOLD_MB else "off-peak"

    # Adjust Spark settings dynamically
    if load_condition == "peak":
        spark.conf.set("spark.sql.shuffle.partitions", "500")
        print(f"⚡ Peak Load for {table_name} ({file_size_mb:.2f} MB). Adjusting resources.")
    else:
        spark.conf.set("spark.sql.shuffle.partitions", "100")
        print(f"🌙 Off-Peak Load for {table_name} ({file_size_mb:.2f} MB). Optimizing resources.")

    # Read new data
    source_df = (spark.read.parquet(source_path)
                      .selectExpr(
                          "user_id", "name", "yelping_since", "review_count", "average_stars",
                          "fans", "cool", "funny", "useful", "elite"
                      )
                      .withColumn("cool", col("cool").cast("integer"))
                      .withColumn("funny", col("funny").cast("integer"))
                      .withColumn("useful", col("useful").cast("integer"))
                      .withColumn("effective_date", current_date())  # Set current date as `effective_date`
                )

    rows_count = source_df.count()

    # Reference Delta Table
    target = DeltaTable.forName(spark, f"yelp_analytics.{table_name}")
    merge_cond = "target.user_id = source.user_id AND target.current_flag = true"

    # Define Update Conditions (Detect Changes)
    update_cond = """target.name <> source.name OR
                      target.yelping_since <> source.yelping_since OR
                      target.review_count <> source.review_count OR
                      target.average_stars <> source.average_stars OR
                      target.fans <> source.fans OR
                      target.cool <> source.cool OR
                      target.funny <> source.funny OR
                      target.useful <> source.useful OR
                      target.elite <> source.elite"""

    # Merge Operation with SCD Type 2 Logic
    target.alias("target").merge(
        source_df.alias("source"), merge_cond
    ).whenMatchedUpdate(
        condition=update_cond,
        set={"expiry_date": "current_date()", "current_flag": "false"}  # Expire old record
    ).whenNotMatchedInsert(
        values={
            "user_id": "source.user_id",
            "name": "source.name",
            "yelping_since": "source.yelping_since",
            "review_count": "source.review_count",
            "average_stars": "source.average_stars",
            "fans": "source.fans",
            "cool": "source.cool",
            "funny": "source.funny",
            "useful": "source.useful",
            "elite": "source.elite",
            "effective_date": "source.effective_date",
            "expiry_date": lit("9999-12-31").cast("date"),  # Default future expiry
            "current_flag": lit(True)
        }
    ).execute()

    exec_time = (datetime.now() - start).total_seconds()

    # Log ELT Execution
    log_elt_process(table_name, "Success", exec_time, rows_affected=rows_count, 
                    size=f"{file_size_mb:.2f} MB", method_used=f"DataFrame ({load_condition})")

except Exception as e:
    exec_time = (datetime.now() - start).total_seconds()
    log_elt_process(table_name, "Failed", exec_time, rows_affected=0, size="unknown", 
                    error_message=str(e), method_used=f"DataFrame ({load_condition})")


⚡ Peak Load for dim_user (2496.88 MB). Adjusting resources.

        INSERT INTO config_db.elt_process_log
        (log_id, process_name, target_table, start_time, end_time, execution_time_seconds, size, rows_affected, method_used, status, error_message)
        VALUES (13, 'Incremental Load', 'dim_user', '2025-02-24 21:05:46.611369', '2025-02-24 21:05:46.611369', 0, '2496.88 MB', 1987897, 'DataFrame (peak)', 'Success', "33.03772")
    


### **Review Fact Table (`fact_review`)**
```python
source_df = (spark.read.parquet("/data/yelp/parquet/review")
                  .select("review_id", "user_id", "business_id", "stars", "date", "cool", "funny", "useful")
                  .withColumn("review_date", col("date").cast("date"))
                  .withColumn("cool", col("cool").cast("integer"))
                  .withColumn("funny", col("funny").cast("integer"))
                  .withColumn("useful", col("useful").cast("integer")))
```
**Explanation:**
- Loads the review data and casts columns for correct data types.
- Adds `review_date` for time-based analytics.


In [0]:

# ---------------------------
# Incremental Load for fact_review
# ---------------------------
try:
    table_name = "fact_review"
    start = datetime.now()
    source_path = parquet_base_path + "review"
    
    file_size_mb = get_parquet_size(source_path)
    load_condition = "peak" if file_size_mb > PEAK_THRESHOLD_MB else "off-peak"

    if load_condition == "peak":
        spark.conf.set("spark.sql.shuffle.partitions", "500")
        print(f"⚡ Peak Load for {table_name} ({file_size_mb:.2f} MB). Adjusting resources.")
    else:
        spark.conf.set("spark.sql.shuffle.partitions", "100")
        print(f"🌙 Off-Peak Load for {table_name} ({file_size_mb:.2f} MB). Optimizing resources.")

    max_date = spark.sql("SELECT COALESCE(MAX(review_date), '1900-01-01') as max_date FROM yelp_analytics.fact_review").collect()[0][0]

    source_df = (spark.read.parquet(source_path)
                      .filter(col("date") > lit(max_date))
                      .withColumn("review_date", col("date").cast("date"))
                      .withColumn("cool", col("cool").cast("integer"))
                      .withColumn("funny", col("funny").cast("integer"))
                      .withColumn("useful", col("useful").cast("integer"))
                 )
    source_df = source_df.select("review_id", "business_id", "user_id", "review_date", "stars", "cool", "funny", "useful")
    
    rows_count = source_df.count()
    
    source_df.write.format("delta").mode("append").saveAsTable("yelp_analytics.fact_review")
    exec_time = (datetime.now() - start).total_seconds()

    log_elt_process(table_name, "Success", exec_time, rows_affected=rows_count, size=f"{file_size_mb:.2f} MB",
                    method_used=f"DataFrame ({load_condition.capitalize()})")
except Exception as e:
    exec_time = (datetime.now() - start).total_seconds()
    log_elt_process(table_name, "Failed", exec_time, rows_affected=0, size="unknown", 
                    error_message=str(e), method_used=f"DataFrame ({load_condition.capitalize()})")

print("🎉 Incremental ELT process completed successfully!")


⚡ Peak Load for fact_review (2848.79 MB). Adjusting resources.
Success DataFrame (Peak) 86.702651
🎉 Incremental ELT process completed successfully!


### **Check-in Fact Table (`fact_checkin`)**
```python
source_df = (spark.read.parquet("/data/yelp/parquet/checkin")
                  .select("business_id", col("date").cast("date").alias("checkin_date")))
```
**Explanation:**
- Extracts business check-ins and timestamps them for trend analysis.


In [0]:

# ---------------------------
# Incremental Load for fact_checkin
# ---------------------------
try:
    start = datetime.now()
    table_name = "fact_checkin"
    source_path = "/data/yelp/parquet/checkin"

    file_size_mb = get_parquet_size(source_path) / (1024 * 1024)
    load_condition = "peak" if file_size_mb > PEAK_THRESHOLD_MB else "off-peak"

    # **Adjust Spark settings dynamically based on load condition**
    if load_condition == "peak":
        spark.conf.set("spark.sql.shuffle.partitions", "500")
        print(f"⚡ Peak Load for {table_name} ({file_size_mb:.2f} MB). Adjusting resources.")
    else:
        spark.conf.set("spark.sql.shuffle.partitions", "100")
        print(f"🌙 Off-Peak Load for {table_name} ({file_size_mb:.2f} MB). Optimizing resources.")

    max_date = spark.sql("SELECT COALESCE(MAX(checkin_date), '1900-01-01') as max_date FROM yelp_analytics.fact_checkin").collect()[0][0]
    source_df = spark.read.parquet(source_path).filter(col("date") > lit(max_date))

    # **Ensure schema consistency**
    source_df = source_df.select("business_id", col("date").cast("date").alias("checkin_date"))

    rows_count = source_df.count()
    file_size = get_readable_size(get_parquet_size(source_path))

    # **Enable schema auto-merge**
    source_df.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("yelp_analytics.fact_checkin")

    exec_time = (datetime.now() - start).total_seconds()
    log_elt_process(table_name, "Success", exec_time, rows_affected=rows_count, size=file_size, method_used=f"DataFrame ({load_condition})")
except Exception as e:
    exec_time = (datetime.now() - start).total_seconds()
    log_elt_process(table_name, "Failed", exec_time, rows_affected=rows_count if 'rows_count' in locals() else 0,
                    size=file_size if 'file_size' in locals() else "unknown", error_message=str(e), method_used=f"DataFrame ({load_condition})")



🌙 Off-Peak Load for fact_checkin (0.00 MB). Optimizing resources.
Success DataFrame (off-peak) 12.5958


### **Tip Fact Table (`fact_tip`)**
```python
source_df = (spark.read.parquet("/data/yelp/parquet/tip")
                  .select("business_id", "user_id", col("date").cast("date").alias("tip_date"), "text", col("compliment_count").cast("integer")))
```
**Explanation:**
- Processes user tips with timestamps for insights on feedback trends.



In [0]:
# ---------------------------
# Incremental Load for fact_tip
# ---------------------------
try:
    start = datetime.now()
    table_name = "fact_tip"
    source_path = "/data/yelp/parquet/tip"

    file_size_mb = get_parquet_size(source_path) / (1024 * 1024)
    load_condition = "peak" if file_size_mb > PEAK_THRESHOLD_MB else "off-peak"

    # **Adjust Spark settings dynamically based on load condition**
    if load_condition == "peak":
        spark.conf.set("spark.sql.shuffle.partitions", "500")
        print(f"⚡ Peak Load for {table_name} ({file_size_mb:.2f} MB). Adjusting resources.")
    else:
        spark.conf.set("spark.sql.shuffle.partitions", "100")
        print(f"🌙 Off-Peak Load for {table_name} ({file_size_mb:.2f} MB). Optimizing resources.")

    max_date = spark.sql("SELECT COALESCE(MAX(tip_date), '1900-01-01') as max_date FROM yelp_analytics.fact_tip").collect()[0][0]
    source_df = spark.read.parquet(source_path).filter(col("date") > lit(max_date))

    # **Ensure schema consistency**
    source_df = source_df.select(
        "business_id", "user_id",
        col("date").cast("date").alias("tip_date"),
        "text", col("compliment_count").cast("integer")
    )

    rows_count = source_df.count()
    file_size = get_readable_size(get_parquet_size(source_path))

    # **Enable schema auto-merge**
    source_df.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("yelp_analytics.fact_tip")

    exec_time = (datetime.now() - start).total_seconds()
    log_elt_process(table_name, "Success", exec_time, rows_affected=rows_count, size=file_size, method_used=f"DataFrame ({load_condition})")
except Exception as e:
    exec_time = (datetime.now() - start).total_seconds()
    log_elt_process(table_name, "Failed", exec_time, rows_affected=rows_count if 'rows_count' in locals() else 0,
                    size=file_size if 'file_size' in locals() else "unknown", error_message=str(e), method_used=f"DataFrame ({load_condition})")

print("🎉 Incremental ELT process completed.")


🌙 Off-Peak Load for fact_tip (0.00 MB). Optimizing resources.
Success DataFrame (off-peak) 19.012576
🎉 Incremental ELT process completed.


### **Rising Star Business - SCD Type 3 Implementation**

#### **Objective:**
The **Rising Star Business** transformation follows **Slowly Changing Dimension (SCD) Type 3**, which tracks both historical and current business performance trends. This allows comparisons of past and present performance without losing previous data.

#### **Transformation Steps:**

##### **Step 1: Compute Business Rating Trends**
```python
fact_review_df = spark.table("yelp_analytics.fact_review")

review_agg_df = fact_review_df.withColumn("review_year", year(col("review_date")))\
    .groupBy("business_id", "review_year")\
    .agg(count("*").alias("review_count"), avg("stars").alias("avg_rating"))
```
**Explanation:**
- Extracts **yearly review trends** per business.
- Computes **review count** and **average rating**.

##### **Step 2: Identify Rising Stars**
```python
curr_df = review_agg_df.alias("curr")
prev_df = review_agg_df.alias("prev")

rising_stars_df = curr_df.join(
    prev_df, 
    (col("curr.business_id") == col("prev.business_id")) & (col("curr.review_year") == col("prev.review_year") + 1)
).where(
    (col("curr.review_count") >= 10) & (col("curr.avg_rating") >= col("prev.avg_rating") + 1)
)
```
**Explanation:**
- Joins **consecutive year data** to identify businesses with improving ratings.
- Filters businesses with **at least 10 reviews** and a **1-star improvement**.

##### **Step 3: Merge Data into SCD Type 3 Table**
```python
target_table = DeltaTable.forName(spark, "yelp_analytics.datamart_rising_star_businesses")
merge_condition = "target.business_id = source.business_id"

target_table.alias("target").merge(
    rising_stars_df.alias("source"),
    merge_condition
).whenMatchedUpdate(
    set={
        "previous_avg_rating": "target.current_avg_rating",
        "current_avg_rating": "source.avg_rating",
        "review_count": "source.review_count",
        "last_update_date": "current_date()"
    }
).whenNotMatchedInsert(
    values={
        "business_id": "source.business_id",
        "previous_avg_rating": "NULL",
        "current_avg_rating": "source.avg_rating",
        "review_count": "source.review_count",
        "last_update_date": "current_date()"
    }
).execute()
```
**Explanation:**
- Uses **SCD Type 3**, where `previous_avg_rating` stores the last known value.
- **On update:** Moves `current_avg_rating` to `previous_avg_rating` and updates with the new value.
- **On insert:** Sets `previous_avg_rating` as NULL (first-time entry).


In [0]:
# Function to get table size in MB
def get_table_size(table_name):
    try:
        size_bytes = spark.sql(f"DESCRIBE DETAIL {table_name}").select("sizeInBytes").collect()[0][0]
        size_mb = size_bytes / (1024 * 1024)  # Convert bytes to MB
        return size_mb
    except Exception as e:
        print(f"⚠️ Warning: Unable to retrieve size for {table_name}. {e}")
        return 0  # Default to 0MB if error occurs

# Peak Threshold (1GB per table)
PEAK_THRESHOLD_MB = 1000  

# Enhanced logging function
def log_elt_process(table_name, status, error_message=None, execution_time_seconds=0, size="unknown", rows_affected=0, method_used="SQL"):
    log_id = spark.sql("SELECT COALESCE(MAX(log_id), 0) + 1 FROM config_db.elt_process_log").collect()[0][0]
    current_time = datetime.now()
    sql_log = f"""
        INSERT INTO config_db.elt_process_log
        (log_id, process_name, target_table, start_time, end_time, execution_time_seconds, size, rows_affected, method_used, status, error_message)
        VALUES ({log_id}, 'Incremental Load', '{table_name}', '{current_time}', '{current_time}', {execution_time_seconds}, '{size}', {rows_affected}, '{method_used}', '{status}', {f'"{error_message}"' if error_message else 'NULL'})
    """
    print(sql_log)
    spark.sql(sql_log)

try:
    start = datetime.now()
    table_name = "fact_review"

    # Get the size of the `fact_review` table
    table_size_mb = get_table_size("yelp_analytics.fact_review")

    # Determine Peak/Off-Peak Mode
    load_condition = "peak" if table_size_mb > PEAK_THRESHOLD_MB else "off-peak"

    # Adjust Spark settings dynamically
    if load_condition == "peak":
        spark.conf.set("spark.sql.shuffle.partitions", "500")  # Allowed runtime change
        print(f"⚡ Peak Load for {table_name} ({table_size_mb:.2f} MB). High parallelism applied.")
    else:
        spark.conf.set("spark.sql.shuffle.partitions", "100")  # Allowed runtime change
        print(f"🌙 Off-Peak Load for {table_name} ({table_size_mb:.2f} MB). Optimizing resource usage.")

    # Step 1: Read fact_review data from the datamart table
    fact_review_df = spark.table("yelp_analytics.fact_review")

    # Step 2: Compute review_agg by grouping reviews per business per year
    review_agg_df = fact_review_df.withColumn("review_year", year(col("review_date")))\
        .groupBy("business_id", "review_year")\
        .agg(count("*").alias("review_count"), avg("stars").alias("avg_rating"))

    # Step 3: Self-join review_agg to find rising stars (consecutive years with improvement)
    curr_df = review_agg_df.alias("curr")
    prev_df = review_agg_df.alias("prev")

    rising_stars_df = curr_df.join(
        prev_df, 
        (col("curr.business_id") == col("prev.business_id")) & (col("curr.review_year") == col("prev.review_year") + 1)
    ).where(
        (col("curr.review_count") >= 10) & (col("curr.avg_rating") >= col("prev.avg_rating") + 1)
    ).select(
        col("curr.business_id"),
        col("curr.review_year").alias("current_year"),
        col("prev.review_year").alias("previous_year"),
        col("curr.review_count").alias("current_review_count"),
        col("curr.avg_rating").alias("current_avg"),
        col("prev.review_count").alias("prior_review_count"),
        col("prev.avg_rating").alias("prior_avg"),
        (col("curr.avg_rating") - col("prev.avg_rating")).alias("rating_improvement"),
        concat(lit("Rising Star "), col("prev.review_year").cast("string"), lit("-"), col("curr.review_year").cast("string")).alias("rising_star_label")
    )

    # Step 4: Join with the business dimension to retrieve the business name
    dim_business_df = spark.table("yelp_analytics.dim_business").select("business_id", "name")

    source_df = rising_stars_df.join(dim_business_df, "business_id", "left")\
        .select(
            "business_id",
            col("name"),
            lit("YoY").alias("period_type"),
            to_date(concat(col("current_year").cast("string"), lit("-01-01"))).alias("current_period"),
            to_date(concat(col("previous_year").cast("string"), lit("-01-01"))).alias("previous_period"),
            array(col("rising_star_label")).alias("rising_star_labels"),
            col("current_review_count"),
            col("current_avg").alias("current_avg_stars"),
            col("rating_improvement").alias("current_rating_improvement"),
            col("prior_review_count"),
            col("prior_avg").alias("prior_avg_stars"),
            lit(None).alias("prior_rating_improvement"),
            current_date().alias("last_update_date")
        )

    # Step 5: Merge the source DataFrame into the target Delta table
    target_table = DeltaTable.forName(spark, "yelp_analytics.datamart_rising_star_businesses")
    merge_condition = "target.business_id = source.business_id AND target.current_period = source.current_period"

    target_table.alias("target").merge(
        source_df.alias("source"),
        merge_condition
    ).whenMatchedUpdate(
        set={
            "name": "source.name",
            "period_type": "source.period_type",
            "previous_period": "source.previous_period",
            "rising_star_labels": "source.rising_star_labels",
            "current_review_count": "source.current_review_count",
            "current_avg_stars": "source.current_avg_stars",
            "current_rating_improvement": "source.current_rating_improvement",
            "prior_review_count": "source.prior_review_count",
            "prior_avg_stars": "source.prior_avg_stars",
            "prior_rating_improvement": "source.prior_rating_improvement",
            "last_update_date": "source.last_update_date"
        }
    ).whenNotMatchedInsert(
        values={
            "business_id": "source.business_id",
            "name": "source.name",
            "period_type": "source.period_type",
            "current_period": "source.current_period",
            "previous_period": "source.previous_period",
            "rising_star_labels": "source.rising_star_labels",
            "current_review_count": "source.current_review_count",
            "current_avg_stars": "source.current_avg_stars",
            "current_rating_improvement": "source.current_rating_improvement",
            "prior_review_count": "source.prior_review_count",
            "prior_avg_stars": "source.prior_avg_stars",
            "prior_rating_improvement": "source.prior_rating_improvement",
            "last_update_date": "source.last_update_date"
        }
    ).execute()

    end = datetime.now()
    exec_time = (end - start).total_seconds()

    log_elt_process("datamart_rising_star_businesses", "Success", execution_time_seconds=exec_time, 
                    size=f"{table_size_mb:.2f} MB", rows_affected=0, method_used=f"DataFrame ({load_condition.capitalize()})")

except Exception as e:
    end = datetime.now()
    exec_time = (end - start).total_seconds()
    log_elt_process("datamart_rising_star_businesses", "Failed", str(e), execution_time_seconds=exec_time, 
                    size=f"{table_size_mb:.2f} MB", rows_affected=0, method_used=f"DataFrame ({load_condition.capitalize()})")
    raise e


🌙 Off-Peak Load for fact_review (356.17 MB). Optimizing resource usage.

        INSERT INTO config_db.elt_process_log
        (log_id, process_name, target_table, start_time, end_time, execution_time_seconds, size, rows_affected, method_used, status, error_message)
        VALUES (12, 'Incremental Load', 'datamart_rising_star_businesses', '2025-02-24 19:39:29.136763', '2025-02-24 19:39:29.136763', 34.92769, '356.17 MB', 0, 'DataFrame (Off-peak)', 'Success', NULL)
    
