#                                   Yelp ETL Pipeline 

- **Medallion Architecture**: The data pipeline for the Yelp data is implemented following the Medallion Architecture, which includes the following 3 stages:
  - **Bronze**: Raw ingestion (store data as-is, minimal transformations).
  - **Silver**: Cleaned and enriched data.
  - **Gold**: Aggregated, business-ready data.


In [1]:
# Importing all dependencies

from pyspark.sql import SparkSession
import os
import logging
from pyspark.sql.functions import current_timestamp, lit


In [2]:
# creat Spark session

spark = SparkSession.builder \
    .appName("YelpDataPipeline") \
    .master("local[*]") \
    .config("spark.driver.memory", "8g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "64") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.autoBroadcastJoinThreshold", 200 * 1024 * 1024) \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/28 14:30:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 1. Bronze Layer
The following cell demonstrates the **Bronze layer** of a Medallion architecture ETL pipeline for the Yelp dataset where Yelp JSON files are ingested and stored as Parquet files.

The class `RawDataLoader` encapsulates the logic for:
- Reading Yelp JSON files
- Adding ingestion metadata: Source and timestamp.
- Writing them as Parquet into the Bronze layer

In [3]:
class RawDataLoader:
    """
    The Raw data from the json files defined in the config file are loaded as parquet files into the bronze layer.

    """
    def __init__(self, spark: SparkSession, raw_path: str, bronze_path: str):
        
        self.spark = spark
        self.raw_path = raw_path
        self.bronze_path = bronze_path
        self.logger = logging.getLogger(__name__)

    def load_json_to_bronze(self, dataset_name: str):
        """Load JSON and save as Parquet in bronze layer

        """
        try:
            json_file = os.path.join(self.raw_path, f"yelp_academic_dataset_{dataset_name}.json")
            bronze_table = os.path.join(self.bronze_path, dataset_name)

            self.logger.info(f"Loading {dataset_name} from {json_file}")

            # Read Json file
            df = self.spark.read.json(json_file)

            # Add metadata columns to track the ingested timestamp and source file.
            df = df.withColumn("ingestion_timestamp", current_timestamp()) \
                .withColumn("source_file", lit(json_file))

            # Write to bronze as Parquet format. No data modification/filter/clean performed at this step.
            df.write.mode("overwrite").parquet(bronze_table)

            row_count = df.count()
            self.logger.info(f"Loaded {row_count} rows to bronze/{dataset_name}")
            return row_count

        except Exception as e:
            self.logger.error(f"Failed to load {dataset_name}: {str(e)}")
            raise

    def load_all_datasets(self, datasets: list):
        """Load all the files specified in config file. The datasets(json files) of the Yelp dataset are defined in
           the config.yaml file

        """
        results = {}
        for dataset in datasets:
            results[dataset] = self.load_json_to_bronze(dataset)
        return results

In [4]:
# Define paths 
raw_path = "data/raw"   # path of the raw json files 
bronze_path = "data/bronze" # destination path of the ingested bronze data 

loader = RawDataLoader(spark, raw_path, bronze_path)

# Example: Load all datasets
datasets = ["business", "review", "user", "checkin", "tip"]
results = loader.load_all_datasets(datasets)
print("Bronze layer load results:", results)


25/11/28 14:30:33 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Bronze layer load results: {'business': 150346, 'review': 6990280, 'user': 1987897, 'checkin': 131930, 'tip': 908915}


In [5]:
bronze_business = spark.read.parquet(os.path.join(bronze_path, "business"))
# bronze_business.printSchema()
bronze_business.show(5)


+--------------------+--------------------+--------------------+--------------------+------------+--------------------+-------+----------+-----------+------------------+-----------+------------+-----+-----+--------------------+--------------------+
|             address|          attributes|         business_id|          categories|        city|               hours|is_open|  latitude|  longitude|              name|postal_code|review_count|stars|state| ingestion_timestamp|         source_file|
+--------------------+--------------------+--------------------+--------------------+------------+--------------------+-------+----------+-----------+------------------+-----------+------------+-----+-----+--------------------+--------------------+
|  5223 S Macdill Ave|{NULL, NULL, NULL...|LS49tIPKSRgWekZOP...|Tobacco Shops, Sh...|       Tampa|{22:0-20:0, 22:0-...|      0|27.8885924|-82.4942119|Ta Francisco Cigar|      33611|           6|  4.5|   FL|2025-11-28 14:30:...|data/raw/yelp_aca...|
|451

## 2. Silver Layer
The silver layer logic mainly deals with cleaning and transforming the raw Bronze layer data.
The main steps involved in the **silver layer** are:
- Select essential columns
- Standardize data types
- Filter invalid/missing records
- Deduplicate rows



This class `DataCleaner` encapsulates cleaning logic for each Yelp dataset:
- **Business**: Standardize data types, keep essential fields, remove nested attributes column, validate stars, deduplicate .
                  Optimization: At the end of silver layer, the business_silver dataframe is cached since it is used multiple times in the gold layer.
- **Review**: Standardize data types, standardize dates, filter invalid ratings, deduplicate.
- **Checkin**: Standardize data types, explode comma‑separated dates, convert to timestamp/date, deduplicate.
- **User**: Standardize data types, filter nulls, deduplicate.
- **Tip**:  Standardize data types, filter null values.

For more details check the docstrings of each class method.


In [6]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, when, trim, regexp_replace, to_timestamp, to_date, explode, split
import logging


class DataCleaner:
    def __init__(self, spark: SparkSession):
        self.spark = spark
        self.logger = logging.getLogger(__name__)

    def clean_business(self, df: DataFrame) -> DataFrame:
        """Clean the business dataset (original source: yelp_academic_dataset_business.json)       
            - Keep essential columns for downstream analytics and standardize data types
            - Remove overly nested/complex structures that need separate processing : Here Attributes column is highly 
              nested and does not have a consistent structure. Hence removing it out.
            - Check for Null values and valid range of stars column and deduplicate.
            
        """
        self.logger.info("Cleaning business data")
        # selecting required columns and casting them to a suitable data type.
        column_selected_df = df.select(
            trim(col("business_id")).alias("business_id").cast("string"),
            trim(col("name")).alias("business_name").cast("string"),
            col("address").cast("string"),
            col("city").cast("string"),
            col("state").cast("string"),
            col("postal_code").cast("string"),
            col("latitude").cast("double"),
            col("longitude").cast("double"),
            col("stars").cast("double"),
            col("review_count").cast("int"),
            col("is_open").cast("int"),
            col("categories")
        )

        filtered_df = column_selected_df.filter(
            # Remove records with missing critical fields
            col("business_id").isNotNull() &
            col("business_name").isNotNull() &
            col("stars").isNotNull() &
            (col("stars").between(0, 5))  # Valid star range
        )

        depuplicated_df = filtered_df.dropDuplicates(["business_id"])


        return depuplicated_df

    def clean_review(self, df: DataFrame) -> DataFrame:
        """ Clean review dataset source: yelp_academic_dataset_review.json
            - select required columns and trim required columns for leading/lagging spaces and standardize data types.
            - Check for nulls and valid range for respective columns
            - Remove duplicate reviews
        """

        self.logger.info("Cleaning review data")

        column_selection_df =  df.select(
            trim(col("review_id")).alias("review_id").cast("string"),
            trim(col("user_id")).alias("user_id").cast("string"),
            trim(col("business_id")).alias("business_id").cast("string"),
            col("stars").cast("double"),
            col("useful").cast("int"),
            col("funny").cast("int"),
            col("cool").cast("int"),
            to_date(to_timestamp(col("date"), "yyyy-MM-dd HH:mm:ss")).alias("review_date"), # Convert to timestamp when it is string
            col("text").cast("string").alias("review_text")
        )


        filtered_df = column_selection_df.filter(
            col("review_id").isNotNull() &
            col("user_id").isNotNull() &
            col("business_id").isNotNull() &
            col("stars").isNotNull() &
            (col("stars").between(0, 5))
        )

        deduplicated_df = filtered_df.dropDuplicates(["review_id"])

        return deduplicated_df

    def clean_checkin(self, df: DataFrame) -> DataFrame:
        """Clean checkin dataset  source: yelp_academic_dataset_checkin.json
            - Explode comma seperated checkin_time column to multiple rows and standardize data types.
            - Enrich with checkin_timestamp and checkin_date columns.
            - Check for Null values and deduplicate checkin data
            
        """
        self.logger.info("Cleaning checkin data")


        # Explode comma-separated dates
        checkin_explode_df = df.select(
            col("business_id"),
            explode(split(col("date"), ", ")).alias("checkin_time")
        )

        checkin_column_format_df = (
            checkin_explode_df
        .withColumn(
            "checkin_timestamp", to_timestamp(col("checkin_time"), "yyyy-MM-dd HH:mm:ss"))
        .withColumn(
            "checkin_date", to_date(col("checkin_timestamp")))
        )

        checkin_selected_df = checkin_column_format_df.select(
            trim(col("business_id")).alias("business_id").cast("string"),
            col("checkin_timestamp"),
            col("checkin_date")
        )

        checkin_filtered_df = checkin_selected_df.filter(
            col("business_id").isNotNull() &
            col("checkin_timestamp").isNotNull()
        )

        checkin_deduplicated_df = checkin_filtered_df.dropDuplicates(["business_id", "checkin_timestamp"])

        return checkin_deduplicated_df

    def clean_user(self, df: DataFrame) -> DataFrame:
        """Clean user dataset  source: yelp_academic_dataset_user.json
            - Trim values and standardize data types
            - Select required columns
            - Check for nulls and deduplicate

        """
        self.logger.info("Cleaning user data")

        user_selected_df = df.select(
            trim(col("user_id")).alias("user_id").cast("string"),
            trim(col("name")).cast("string").alias("user_name"),
            col("review_count").cast("int"),
            to_date(to_timestamp(col("yelping_since"), "yyyy-MM-dd HH:mm:ss")).alias("yelping_since"),
            col("useful").cast("int"),
            col("funny").cast("int"),
            col("cool").cast("int"),
            col("fans").cast("int"),
            col("average_stars").cast("double")
        )

        user_filtered_df = user_selected_df.filter(     # to add more filters
            col("user_id").isNotNull()
        )

        user_deduplicated_df = user_filtered_df.dropDuplicates(["user_id"])

        return user_deduplicated_df

    def clean_tip(self, df: DataFrame) -> DataFrame:
        """Clean tip dataset  source: yelp_academic_dataset_tip.json
            - Trim and standardize data types
            - Select required columns
            - Check for null values
            
        """
        self.logger.info("Cleaning tip data")

        tip_selected_df = df.select(
            trim(col("user_id")).alias("user_id").cast("string"),
            trim(col("business_id")).alias("business_id").cast("string"),
            col("text").cast("string").alias("tip_text"),
            to_date(to_timestamp(col("date"), "yyyy-MM-dd HH:mm:ss")).alias("tip_date"),
            col("compliment_count").cast("int")
        )

        tip_filtered_df = tip_selected_df.filter(
            col("user_id").isNotNull() &
            col("business_id").isNotNull()&
            col("tip_text").isNotNull()
        )

        # avoiding deduplicate check since same user_id can tip the same business_id multiple times. There is no tip_id column.

        return tip_filtered_df

In [7]:
# Demonstrating the cleaning step for bronze data
bronze_business = spark.read.parquet("data/bronze/business")
bronze_checkin = spark.read.parquet("data/bronze/checkin")
bronze_review = spark.read.parquet("data/bronze/review")
bronze_tip = spark.read.parquet("data/bronze/tip")
bronze_user = spark.read.parquet("data/bronze/user")

cleaner = DataCleaner(spark)
silver_business = cleaner.clean_business(bronze_business)
silver_business.write.mode("overwrite").parquet("data/silver/business")

silver_checkin = cleaner.clean_checkin(bronze_checkin)
silver_checkin.write.mode("overwrite").parquet("data/silver/checkin")

silver_review = cleaner.clean_review(bronze_review)
silver_review.write.mode("overwrite").parquet("data/silver/review")

silver_tip = cleaner.clean_tip(bronze_tip)
silver_tip.write.mode("overwrite").parquet("data/silver/tip")

silver_user = cleaner.clean_user(bronze_user)
silver_user.write.mode("overwrite").parquet("data/silver/user")

# printing silver business data
print(f"Rows after cleaning: {silver_business.count()}")
silver_business.show(5, truncate=False)

                                                                                

Rows after cleaning: 150346
+----------------------+---------------------------+---------------------------+--------------+-----+-----------+----------+-----------+-----+------------+-------+------------------------------------------------------------------+
|business_id           |business_name              |address                    |city          |state|postal_code|latitude  |longitude  |stars|review_count|is_open|categories                                                        |
+----------------------+---------------------------+---------------------------+--------------+-----+-----------+----------+-----------+-----+------------+-------+------------------------------------------------------------------+
|--30_8IhuyMHbSOcNWd6DQ|Action Karate              |2235 York Rd               |Jamison       |PA   |18929      |40.2553619|-75.0883992|3.5  |9           |1      |Trainers, Active Life, Fitness & Instruction, Karate, Martial Arts|
|--OS_I7dnABrXvRCCuWOGQ|Lens Auto Body & Paintin

## 3. Gold Layer

The **Gold Layer** deals with the aggregation of the Silver Layer data in order to derive meaningful business insights. The following aggregations are performed:
- Weekly star ratings per business
- Check‑ins in a business compared to overall star rating.

The class `DataAggregator` encapsulates aggregation logic for the Gold layer:

- Method `aggregate_weekly_stars(reviews_df, business_df)` : This method receives the 'reviews' and 'business' data from the silver layer. The aggregation logic is:
    -  The reviews data is enriched with the columns review_week and review_week_number to extract the week information from the review_date column.
    -  The reviews data is grouped by ("business_id", "review_year", "review_week_number", "review_week") and weekly aggregate star values and other rating values are derived.
    -  The business data  is now left-joined with  weekly aggregated reviews data in order to combine business related information to the weekly aggregated stars.
    - Optimization:
      - The spark.sql.autoBroadcastJoinThreshold is set to 200 MB so that smaller dataframes like the business dataframe are broadcast during joins.
      - The weekly_stars aggregated table is partitioned on 'review_week' so that future querying becomes efficient.

        
- Method `aggregate_checkins_vs_stars(checkin_df, business_df)` : This method receives the 'checkin' and 'business' data from the silver layer. The aggregation logic is:
    - Total checkins per business is derived by grouping the checkins data by "business_id" and counting the number of checkins.
    - The business dataframe containing business information such as overall star rating is left joined with the aggregated checkins dataframe in order to derive the Check‑ins in a
      business compared to overall star rating.
    - Additional attributes such as 'checkins_per_review' and 'star_category' is derived.

In [8]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import (
    col, count, avg, date_trunc, coalesce, lit,
    weekofyear, year, round as spark_round, when, sum as spark_sum
)
from pyspark.sql.types import TimestampType

import logging


class DataAggregator:
    def __init__(self, spark: SparkSession):
        self.spark = spark
        self.logger = logging.getLogger(__name__)

    def aggregate_weekly_stars(self, reviews_df: DataFrame, business_df: DataFrame) -> DataFrame:
        """
        Aggregate stars per business on a weekly basis.

        """
        self.logger.info("Aggregating weekly stars per business")

        # derive weekly aggregations on review data.
        weekly_agg = reviews_df.withColumn(
            "review_week", date_trunc("week", col("review_date"))
        ).withColumn(
            "review_year", year(col("review_date"))
        ).withColumn(
            "review_week_number", weekofyear(col("review_date"))
        ).groupBy(
            "business_id", "review_year", "review_week_number", "review_week"   # review_week is redundant but might be useful for edge cases.
        ).agg(
            spark_round(avg("stars"), 2).alias("avg_stars_weekly"),
            count("review_id").alias("review_count_weekly"),
            spark_round(avg("useful"), 2).alias("avg_useful"),
            spark_round(avg("funny"), 2).alias("avg_funny"),
            spark_round(avg("cool"), 2).alias("avg_cool")
        )
        print(f"rows of review agg: {weekly_agg.count()}")

        # define defaults to use in coalesce()
        default_week_number = 0
        default_year = 9999
        default_review_week = lit('9999-01-01 00:00:00').cast(TimestampType())
        # Join with business info
        result = (business_df.join(weekly_agg, "business_id", how= "left"
                                   ).select(
            "business_id",
            "business_name",
            "city",
            "state",
            col("stars").alias("overall_stars"),
            coalesce('review_week', default_review_week).alias('review_week'),
            coalesce('review_year', lit(default_year)).alias('review_year'),
            coalesce('review_week_number', lit(default_week_number)).alias('review_week_number'),
            coalesce('avg_stars_weekly', lit(0)).alias('avg_stars_weekly'),
            coalesce('review_count_weekly', lit(0)).alias('review_count_weekly'),
            coalesce('avg_useful', lit(0)).alias('avg_useful'),
            coalesce('avg_funny', lit(0)).alias('avg_funny'),
            coalesce('avg_cool', lit(0)).alias('avg_cool')
        ).orderBy("business_id", "review_year", "review_week_number"))

        return result

    def aggregate_checkins_vs_stars(
            self, checkin_df: DataFrame, business_df: DataFrame) -> DataFrame:
        """
        Aggregate checkins per business compared to overall star rating

        """
        self.logger.info("Aggregating check-ins vs star ratings")

        # Count check-ins per business
        checkin_counts = checkin_df.groupBy("business_id").agg(
            count("checkin_timestamp").alias("total_checkins")
        )

        # Join with business ratings
        result = business_df.join(
            checkin_counts,
            "business_id",
            "left"
        ).select(
            "business_id",
            "business_name",
            "city",
            "state",
            "stars",
            "review_count",
            when(col("total_checkins").isNull(), 0)
            .otherwise(col("total_checkins")).alias("total_checkins")
        ).withColumn(
            "checkins_per_review",
            spark_round(col("total_checkins") / col("review_count"), 2)
        ).withColumn(
            "star_category",      # adding an additional attribute based on star category
            when(col("stars") >= 4.0, "High")
            .when(col("stars") >= 2.5, "Medium")
            .otherwise("Low")
        )

        return result.orderBy(col("total_checkins").desc())



In [9]:
# Example: Load Silver datasets
silver_reviews = spark.read.parquet("data/silver/review")
silver_business = spark.read.parquet("data/silver/business")
silver_checkin = spark.read.parquet("data/silver/checkin")

aggregator = DataAggregator(spark)

# Weekly stars aggregation
print(f"rows of silver_reviews:{silver_reviews.count()}. Rows of business: {silver_business.count()}")
weekly_stars = aggregator.aggregate_weekly_stars(silver_reviews, silver_business)
weekly_stars.write.partitionBy("review_week").mode("overwrite").parquet(f"data/gold/weekly_stars")
print(f"Rows of weekly_stars: {weekly_stars.count()}")
print("Weekly Stars Aggregation:")
weekly_stars.filter(col('review_count_weekly')>1).show(5, truncate=False)

# Check-ins vs stars aggregation
print(f"Rows of silver_checkin: {silver_checkin.count()}")
checkins_vs_stars = aggregator.aggregate_checkins_vs_stars(silver_checkin, silver_business)
checkins_vs_stars.write.mode("overwrite").parquet("data/gold/checkins_vs_stars")
print(f"Rows of checkin_vs_stars aggregated: {checkins_vs_stars.count()}")
print("Check-ins vs Stars Aggregation:")
checkins_vs_stars.show(5, truncate=False)

rows of silver_reviews:6990280. Rows of business: 150346


                                                                                

rows of review agg: 5138585


                                                                                

Rows of weekly_stars: 5138585
Weekly Stars Aggregation:


                                                                                

+----------------------+-----------------------------------------------+---------------+-----+-------------+-------------------+-----------+------------------+----------------+-------------------+----------+---------+--------+
|business_id           |business_name                                  |city           |state|overall_stars|review_week        |review_year|review_week_number|avg_stars_weekly|review_count_weekly|avg_useful|avg_funny|avg_cool|
+----------------------+-----------------------------------------------+---------------+-----+-------------+-------------------+-----------+------------------+----------------+-------------------+----------+---------+--------+
|---kPU91CF4Lq2-WlRu9Lw|Frankie's Raw Bar                              |New Port Richey|FL   |4.5          |2021-11-22 00:00:00|2021       |47                |4.67            |3                  |0.0       |0.0      |0.0     |
|--0iUa4sNDFiZFrAdIWhZQ|Pupuseria Y Restaurant Melba                   |Clementon      |NJ  

In [10]:
# Sample analytical query to find the top business in a given year.

top_businesses = (
    weekly_stars.filter(col("review_year") == 2020)
           .groupBy("business_id", "business_name", "city", "state")
           .agg(
               spark_round(avg("avg_stars_weekly"), 2).alias("avg_stars_2020"),
               spark_sum("review_count_weekly").alias("total_reviews")
           )
           .orderBy(col("avg_stars_2020").desc(), col("total_reviews").desc())
           .limit(10)
)

top_businesses.show(truncate=False)

+----------------------+------------------------------------------+-------------+-----+--------------+-------------+
|business_id           |business_name                             |city         |state|avg_stars_2020|total_reviews|
+----------------------+------------------------------------------+-------------+-----+--------------+-------------+
|DboqYyH-S8pV6WxaF9Plow|Cal Coast Adventures                      |Santa Barbara|CA   |5.0           |72           |
|gqOmu_puGr6VY0IGRLHtSA|Michael Richardson - Bay Equity Home Loans|Reno         |NV   |5.0           |54           |
|S6AP1meHAC0RxJXW26fq1g|Philly Foodworks                          |Philadelphia |PA   |5.0           |52           |
|aAXo_vX8YG10kytAM9SC9Q|Ice Dreammm Shop                          |Lutz         |FL   |5.0           |51           |
|U6gikR4uhRl4zU8q3j02oA|The Artist Haus                           |Philadelphia |PA   |5.0           |50           |
|oy1al7IJ75ZH_Fh9ho8zwg|Flow Pros                               