In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pyspark.sql.functions import from_json, col, translate



In [2]:
spark = SparkSession.builder \
            .appName("ProcessBronzeToSilver") \
            .getOrCreate()



:: loading settings :: url = jar:file:/usr/local/spark-3.1.2-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9efb71ba-c2ea-4fa2-a057-1b369ea3096c;1.0
	confs: [default]
	found io.delta#delta-core_2.12;1.0.0 in central
	found org.antlr#antlr4;4.7 in central
	found org.antlr#antlr4-runtime;4.7 in central
	found org.antlr#antlr-runtime;3.5.2 in central
	found org.antlr#ST4;4.0.8 in central
	found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
	found org.glassfish#javax.json;1.0.4 in central
	found com.ibm.icu#icu4j;58.2 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.lub

In [3]:
topic = "finance.broker.transactions.customers"

In [4]:
schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("customer_username", StringType(), True),
    StructField("customer_name", StringType(), True),
    StructField("customer_gender", StringType(), True),
    StructField("customer_address", StringType(), True),
    StructField("customer_purchase_price", StringType(), True),
    StructField("customer_country", StringType(), True),
    StructField("createdAt", StringType(), True)
])

## Flatten columns from Dataframe

In [5]:
def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []
    while len(stack) > 0:
        parents, df = stack.pop()
        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]
        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]
        columns.extend(flat_cols)
        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))
    return nested_df.select(columns)

In [6]:
customer_stream_df = spark.readStream \
                          .format("delta") \
                          .option("path", f'/home/jovyan/work/datalake/bronze/{topic}') \
                          .load()

                                                                                

In [7]:
customer_stream_df = customer_stream_df.withColumn("events", from_json(col("value").cast(StringType()), schema))

In [8]:
customer_stream_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
 |-- headers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: binary (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- events: struct (nullable = true)
 |    |-- customer_id: string (nullable = true)
 |    |-- customer_username: string (nullable = true)
 |    |-- customer_name: string (nullable = true)
 |    |-- customer_gender: string (nullable = true)
 |    |-- customer_address: string (nullable = true)
 |    |-- customer_purchase_price: string (nullable = true)
 |    |-- customer_country: string (nullable = true)
 |    |-- createdAt: str

In [9]:
customer_stream_df_flattened = flatten_df(customer_stream_df)

In [10]:
customer_stream_df_flattened = customer_stream_df_flattened.select(col("events_customer_id").alias("customer_id"), col("events_customer_username").alias("customer_username"), 
                                                                  col("events_customer_name").alias("customer_name"), col("events_customer_gender").alias("customer_gender"),
                                                                  col("events_customer_address").alias("customer_address"), translate(col("events_customer_purchase_price"), "$", "").alias("customer_purchase_price").cast(FloatType()),
                                                                  col("events_customer_country").alias("customer_country"), col("year"), 
                                                                  col("month"), col("day"))

In [11]:
customer_stream_df_flattened.writeStream \
                            .format("delta") \
                            .option("path", f'/home/jovyan/work/datalake/silver/{topic}') \
                            .partitionBy("year", "month", "day") \
                            .option("checkpointLocation", f'/home/jovyan/work/checkpoint/{topic}-silver') \
                            .start().awaitTermination()

                                                                                

KeyboardInterrupt: 