In [1]:
import random
import shutil
import string

from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, from_json, row_number, year, window, max as max_, col, sum as sum_
from pyspark.sql.types import DoubleType, LongType, StringType, StructField, StructType

# Create a Spark session
spark = (
    SparkSession.builder.appName("Optimized PySpark with MinIO, Delta, and Hive")
    .master("spark://spark-master:7077")
    .config("hive.metastore.uris", "thrift://hive-metastore:9083")
    .config(
        "spark.jars",
        "/opt/spark/jars/hadoop-aws-3.3.4.jar,"
        "/opt/spark/jars/s3-2.18.41.jar,"
        "/opt/spark/jars/aws-java-sdk-1.12.367.jar,"
        "/opt/spark/jars/delta-core_2.12-2.4.0.jar,"
        "/opt/spark/jars/delta-storage-2.4.0.jar,"
        "/opt/spark/jars/spark-sql-kafka-0-10_2.12-3.4.0.jar,"
        "/opt/spark/jars/kafka-clients-3.3.2.jar,"
        "/opt/spark/jars/aws-java-sdk-bundle-1.12.367.jar,"
        "/opt/spark/jars/spark-streaming-kafka-0-10-assembly_2.12-3.4.0.jar,"
        "/opt/spark/jars/commons-pool2-2.11.1.jar,"
        "/opt/spark/jars/spark-token-provider-kafka-0-10_2.12-3.4.0.jar",
    )
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    .config("spark.hadoop.fs.s3a.access.key", "minio")
    .config("spark.hadoop.fs.s3a.secret.key", "minio123")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.warehouse.dir", "s3a://warehouse/")
    .config("spark.eventLog.enabled", "false")
    .enableHiveSupport()
    .getOrCreate()
)
spark

/opt/spark/bin/load-spark-env.sh: line 68: ps: command not found
24/10/14 06:26:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/14 06:26:33 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


In [2]:
spark.sql("SHOW TABLES").show()

                                                                                

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default| business|      false|
|  default|  checkin|      false|
|  default|   review|      false|
|  default|      tip|      false|
|  default|     user|      false|
+---------+---------+-----------+



In [3]:
# Kafka specific configurations for optimization (single broker, 1 partition, RF = 1)
kafka_options = {
    "kafka.bootstrap.servers": "kafka:9092",  # Kafka broker
    "subscribe": "delta-pipeline",  # Kafka topic
    "startingOffsets": "latest",  # Start consuming from the latest offset
    "maxOffsetsPerTrigger": "500",  # Maximum number of messages per batch (adjusted for small partitions)
    "minPartitions": "1",  # Single partition
    "failOnDataLoss": "false",  # Handle data loss gracefully
}


# Helper function to generate random checkpoint directory in MinIO
def random_checkpoint_dir():
    return f"s3a://deltalake/checkpoints/{''.join(random.choices(string.ascii_lowercase + string.digits, k=10))}"


# Stop all active streaming queries
def stop_all_streams():
    for s in spark.streams.active:
        print(f"Stopping {s.name}")
        s.stop()


# Bronze layer ingestion with optimized Kafka configurations
def bronze_ingest_data(bronze_path):
    """
    Function to start a streaming query with a stream of data from Kafka and append to the Delta table.
    """
    # Define schema for the JSON data coming from Kafka
    schema = StructType(
        [
            StructField("review_id", StringType(), True),
            StructField("user_id", StringType(), True),
            StructField("business_id", StringType(), True),
            StructField("stars", DoubleType(), True),
            StructField("useful", LongType(), True),
            StructField("funny", LongType(), True),
            StructField("cool", LongType(), True),
            StructField("text", StringType(), True),
            StructField("date", StringType(), True),
        ]
    )

    # Read data from Kafka using the optimized configurations
    review_df = spark.readStream.format("kafka").options(**kafka_options).load()

    # Deserialize JSON data
    review_df_parsed = (
        review_df.select(
            from_json(col("value").cast("string"), schema).alias("data"), "timestamp"
        )
        .select("data.*", "timestamp")
        .withColumn("date", col("date").cast("date"))
    )

    # Write to Delta table (bronze layer)
    checkpoint_dir = random_checkpoint_dir()
    review_bronze = (
        review_df_parsed.writeStream.format("delta")
        .outputMode("append")
        .option("checkpointLocation", checkpoint_dir)
        .start(bronze_path)
    )

    return review_bronze

def silver_process(bronze_path, silver_path):
    """
    Read data from the bronze table, process null data, and save into the silver table.
    """
    # Read data from the bronze Delta table
    read_reviewBronze = spark.readStream \
                    .format("delta") \
                    .load(bronze_path)
    
    checkpointDir = random_checkpoint_dir()

    # Select relevant columns from the bronze layer
    ratings = read_reviewBronze.select(
        "user_id", "business_id", "stars", "date", "cool", "funny", "useful", "timestamp"
    )
    
    # Handle null values by filling in defaults or dropping rows
    ratings_cleaned = ratings.select(
        col("user_id"),
        col("business_id"),
        when(col("stars").isNull(), 0).otherwise(col("stars")).alias("stars"),  # Fill null stars with 0
        when(col("date").isNull(), "1970-01-01").otherwise(col("date")).alias("date"),  # Fill null date with default
        when(col("cool").isNull(), 0).otherwise(col("cool")).alias("cool"),  # Fill null cool with 0
        when(col("funny").isNull(), 0).otherwise(col("funny")).alias("funny"),  # Fill null funny with 0
        when(col("useful").isNull(), 0).otherwise(col("useful")).alias("useful"),  # Fill null useful with 0
        col("timestamp")
    )

    # Write the cleaned data to the silver Delta table
    write_reviewSilver = ratings_cleaned \
                            .writeStream \
                            .format("delta") \
                            .outputMode("append") \
                            .option("checkpointLocation", checkpointDir) \
                            .start(silver_path)
    
    return write_reviewSilver


def gold_process(silver_path, gold_path):
    """
    Function to read data from the silver table and produce aggregated datasets for business insights.
    """
    # Read data from the silver Delta table (streaming)
    review_silver = spark.readStream.format("delta").load(silver_path)

    # Add the year column to the review data
    review_silver_with_year = review_silver.withColumn("year", year(col("date")))

    # Read business and user tables from Hive (assumed to be registered in Hive)
    business = spark.table("business")  # Replace 'business' with the actual Hive table name
    user = spark.table("user")  # Replace 'user' with the actual Hive table name

    # Alias the tables to avoid column name collisions
    review_silver_with_year = review_silver_with_year.alias("reviews")
    business = business.alias("business")
    user = user.alias("user")

    # Join business and user data with review data (correcting column names)
    join_df = review_silver_with_year \
        .join(business, review_silver_with_year["business_id"] == business["business_id"], "left") \
        .join(user, review_silver_with_year["user_id"] == user["user_id"], "left")

    # Select the required columns for the final output (specifying source table for ambiguous columns)
    result_df = join_df.select(
        review_silver_with_year["user_id"],
        review_silver_with_year["business_id"],
        review_silver_with_year["stars"].alias("review_stars"),  # Use the stars from reviews table
        review_silver_with_year["date"],
        review_silver_with_year["cool"],
        review_silver_with_year["funny"],
        review_silver_with_year["useful"],
        review_silver_with_year["year"],
        business["address"],
        business["city"],
        business["state"],
        business["name"].alias("business_name"),  # Use the business name from business table
        user["name"].alias("user_name"),  # Use the user name from user table
        review_silver_with_year["timestamp"],
    )

    # Write the processed data to the gold Delta table (streaming)
    checkpoint_dir = random_checkpoint_dir()
    review_gold = result_df \
        .writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", checkpoint_dir) \
        .start(gold_path)

    return review_gold

In [4]:
# Paths for Delta table storage
bronze_path = "s3a://deltalake/bronze"
silver_path = "s3a://deltalake/silver"
gold_path = "s3a://deltalake/gold"

In [5]:
bronze_query = bronze_ingest_data(bronze_path)

24/10/14 06:27:49 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/10/14 06:27:50 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
24/10/14 06:27:55 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [7]:
from pyspark.sql.functions import when
silver_query = silver_process(bronze_path, silver_path)

24/10/14 06:29:02 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.

In [8]:
gold_query = gold_process(silver_path, gold_path)

24/10/14 06:29:11 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
[Stage 337:>               (0 + 4) / 25][Stage 339:>                (0 + 0) / 4]

In [None]:
# Await termination for the streaming queries
bronze_query.awaitTermination()
silver_query.awaitTermination()
gold_query.awaitTermination()

                                                                                

In [None]:
stop_all_streams()

In [None]:
spark.stop()