In [1]:
%%configure -f
{
    "conf": {
        "spark.jars": "hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar",
        "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
        "spark.sql.hive.convertMetastoreParquet": "false"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1666706558378_0004,pyspark,idle,Link,Link,


In [2]:
from pyspark.sql.types import (
    StringType,
    StructField,
    StructType,
    IntegerType,
    DoubleType,
    DateType,
    BooleanType,
    TimestampType
)

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
5,application_1666706558378_0007,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Define Customer schema

In [3]:
customer_schema = StructType([
        StructField('customer_id', StringType(), False),
        StructField('first_name', StringType(), True),
        StructField('last_name', StringType(), True),
        StructField('city', StringType(), True),
        StructField('country', StringType(), True),
        StructField('eff_start_date', DateType(), True),
        StructField('eff_end_date', DateType(), True),
        StructField('timestamp', TimestampType(), True),
        StructField('is_current', BooleanType(), True),
    ])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Create Customer records

In [4]:
from pyspark.sql.functions import udf
import time

random_udf = udf(lambda: str(int(time.time() * 1000000)), StringType()) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
from datetime import datetime

customer_df = spark.createDataFrame([('1', 'John', 'Smith', 'London', 'UK', datetime.strptime('2020-09-27', '%Y-%m-%d'), datetime.strptime('2999-12-31', '%Y-%m-%d'), datetime.strptime('2020-12-08 09:15:32', '%Y-%m-%d %H:%M:%S'), True),
                       ('2', 'Susan', 'Chas', 'Seattle', 'US', datetime.strptime('2020-10-14', '%Y-%m-%d'), datetime.strptime('2999-12-31', '%Y-%m-%d'), datetime.strptime('2020-12-08 09:15:32', '%Y-%m-%d %H:%M:%S'), True)], customer_schema)

customer_sk_df = customer_df.withColumn("customer_sk", random_udf())
customer_sk_df.cache()
customer_sk_df.show(5, False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+
|customer_id|first_name|last_name|city   |country|eff_start_date|eff_end_date|timestamp          |is_current|customer_sk     |
+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+
|1          |John      |Smith    |London |UK     |2020-09-27    |2999-12-31  |2020-12-08 09:15:32|true      |1667041876044190|
|2          |Susan     |Chas     |Seattle|US     |2020-10-14    |2999-12-31  |2020-12-08 09:15:32|true      |1667041876947966|
+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+

In [6]:
S3_BUCKET_NAME="646297494209-us-east-1-modern-data-lake-storage"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
customers_table_path = 's3://646297494209-us-east-1-modern-data-lake-storage/customers/'
customers_table_name = 'customers_table'

record_key = "customer_sk"
partition_key = "country"

hudi_options = {'hoodie.insert.shuffle.parallelism':'2',
               'hoodie.upsert.shuffle.parallelism':'2',
               'hoodie.delete.shuffle.parallelism':'2',
               'hoodie.bulkinsert.shuffle.parallelism':'2',
               'hoodie.datasource.hive_sync.enable':'false',
               'hoodie.datasource.hive_sync.assume_date_partitioning':'true'
}

customer_sk_df.write.format('org.apache.hudi')\
                             .options(**hudi_options)\
                             .option('hoodie.table.name',customers_table_name)\
                             .option('hoodie.datasource.write.recordkey.field', record_key)\
                             .option('hoodie.datasource.write.partitionpath.field',partition_key)\
                             .option('hoodie.datasource.write.precombine.field','timestamp')\
                             .option('hoodie.datasource.write.operation', 'insert')\
                             .option('hoodie.datasource.hive_sync.table',customers_table_name)\
                             .mode('Overwrite')\
                             .save(customers_table_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
customer_sk_df = spark. \
  read. \
  format("hudi"). \
  load(customers_table_path)

customer_sk_df.createOrReplaceTempView(customers_table_name)

spark.sql('select customer_id,'
          'first_name, '
          'last_name, '
          'city, '
          'country, '
          'eff_start_date, '
          'eff_end_date, '
          'customer_sk, '
          'is_current '
          'from ' + customers_table_name).show(3, False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+---------+-------+-------+--------------+------------+----------------+----------+
|customer_id|first_name|last_name|city   |country|eff_start_date|eff_end_date|customer_sk     |is_current|
+-----------+----------+---------+-------+-------+--------------+------------+----------------+----------+
|2          |Susan     |Chas     |Seattle|US     |2020-10-14    |2999-12-31  |1667041876947966|true      |
|1          |John      |Smith    |London |UK     |2020-09-27    |2999-12-31  |1667041876044190|true      |
+-----------+----------+---------+-------+-------+--------------+------------+----------------+----------+

## Define Sales schema, create Sales records

In [9]:
from pyspark.sql.functions import to_timestamp

sales_schema = StructType([
        StructField('item_id', StringType(), True),
        StructField('quantity', IntegerType(), True),
        StructField('price', DoubleType(), True),
        StructField('timestamp', TimestampType(), True),
        StructField('customer_id', StringType(), True)
    ])

sales_df = spark.createDataFrame([('100', 25, 123.46, datetime.strptime('2020-11-17 09:15:32', '%Y-%m-%d %H:%M:%S'), '1'),
                                       ('101', 300, 123.46, datetime.strptime('2020-10-28 09:15:32', '%Y-%m-%d %H:%M:%S'), '1'),
                                      ('102', 5, 1038.0, datetime.strptime('2020-12-08 09:15:32', '%Y-%m-%d %H:%M:%S'), '2')], sales_schema)

sales_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+------+-------------------+-----------+
|item_id|quantity| price|          timestamp|customer_id|
+-------+--------+------+-------------------+-----------+
|    100|      25|123.46|2020-11-17 09:15:32|          1|
|    101|     300|123.46|2020-10-28 09:15:32|          1|
|    102|       5|1038.0|2020-12-08 09:15:32|          2|
+-------+--------+------+-------------------+-----------+

## Identify Customer surrogate key

In [10]:
from pyspark.sql.functions import when

join_cond = [sales_df.customer_id == customer_sk_df.customer_id,
             sales_df.timestamp >= customer_sk_df.eff_start_date,
             sales_df.timestamp < customer_sk_df.eff_end_date]

sales_with_cust_sk_df = (sales_df
                          .join(customer_sk_df, join_cond, 'leftouter')
                          .select(sales_df['*'],
                            when(customer_sk_df.customer_sk.isNull(), '-1')
                                  .otherwise(customer_sk_df.customer_sk)
                                  .alias("customer_sk") )
                       )

sales_with_cust_sk_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+------+-------------------+-----------+----------------+
|item_id|quantity| price|          timestamp|customer_id|     customer_sk|
+-------+--------+------+-------------------+-----------+----------------+
|    100|      25|123.46|2020-11-17 09:15:32|          1|1667041876044190|
|    101|     300|123.46|2020-10-28 09:15:32|          1|1667041876044190|
|    102|       5|1038.0|2020-12-08 09:15:32|          2|1667041876947966|
+-------+--------+------+-------------------+-----------+----------------+

## Write Sales records

In [11]:
sales_table_path = 's3://646297494209-us-east-1-modern-data-lake-storage/sales/'
sales_table_name = 'sales_table'

sales_with_cust_sk_df.write.format('parquet')\
                             .mode('Overwrite')\
                             .save(sales_table_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read Sales records

In [12]:
sales_with_cust_sk_df = spark. \
  read. \
  load(sales_table_path)

sales_with_cust_sk_df.createOrReplaceTempView(sales_table_name)

spark.sql('select item_id, '
          'quantity,'
          'price,'
          'timestamp,'
          'customer_id,'
          'customer_sk from ' + sales_table_name).show(5, False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+------+-------------------+-----------+----------------+
|item_id|quantity|price |timestamp          |customer_id|customer_sk     |
+-------+--------+------+-------------------+-----------+----------------+
|100    |25      |123.46|2020-11-17 09:15:32|1          |1667041876044190|
|101    |300     |123.46|2020-10-28 09:15:32|1          |1667041876044190|
|102    |5       |1038.0|2020-12-08 09:15:32|2          |1667041876947966|
+-------+--------+------+-------------------+-----------+----------------+

## Sales by Country

In [13]:
spark.sql(
    'SELECT ct.country, '
    'SUM(st.quantity) as sales_quantity,'
    'COUNT(*) as count_sales '
    'FROM sales_table st '
    'INNER JOIN customers_table ct on st.customer_sk = ct.customer_sk '
    'group by ct.country').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------+-----------+
|country|sales_quantity|count_sales|
+-------+--------------+-----------+
|     US|             5|          1|
|     UK|           325|          2|
+-------+--------------+-----------+

## New/Updated Customer records

In [14]:
new_customer_df = spark.createDataFrame([('3', 'Bastian', 'Back', 
                    'Berlin', 'GE',
                    datetime.strptime(datetime.today().strftime('%Y-%m-%d'), '%Y-%m-%d'),
                    datetime.strptime('2999-12-31', '%Y-%m-%d'), 
                    datetime.strptime('2020-12-09 09:15:32', '%Y-%m-%d %H:%M:%S'), True),
                    ('2', 'Susan', 'Chas',
                    'Paris', 'FR',
                    datetime.strptime(datetime.today().strftime('%Y-%m-%d'), '%Y-%m-%d'),
                    datetime.strptime('2999-12-31', '%Y-%m-%d'), 
                    datetime.strptime('2020-12-09 10:15:32', '%Y-%m-%d %H:%M:%S'), True)],
                customer_schema)

new_customer_df = new_customer_df.withColumn("customer_sk", random_udf())
new_customer_df.cache()
new_customer_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+---------+------+-------+--------------+------------+-------------------+----------+----------------+
|customer_id|first_name|last_name|  city|country|eff_start_date|eff_end_date|          timestamp|is_current|     customer_sk|
+-----------+----------+---------+------+-------+--------------+------------+-------------------+----------+----------------+
|          3|   Bastian|     Back|Berlin|     GE|    2022-10-29|  2999-12-31|2020-12-09 09:15:32|      true|1667042573856711|
|          2|     Susan|     Chas| Paris|     FR|    2022-10-29|  2999-12-31|2020-12-09 10:15:32|      true|1667042574223601|
+-----------+----------+---------+------+-------+--------------+------------+-------------------+----------+----------------+

## Update Customers

In [16]:
from pyspark.sql.functions import lit

join_cond = [customer_sk_df.customer_id == new_customer_df.customer_id, customer_sk_df.is_current == True]

## Find customer records to update
customers_to_update_df = (customer_sk_df
                          .join(new_customer_df, join_cond)
                          .select(customer_sk_df.customer_id,
                                  customer_sk_df.first_name,
                                  customer_sk_df.last_name,
                                  customer_sk_df.city,
                                  customer_sk_df.country,
                                  customer_sk_df.eff_start_date,
                                  new_customer_df.eff_start_date.alias("eff_end_date"),
                                  customer_sk_df.customer_sk,
                                  customer_sk_df.timestamp)
                          .withColumn('is_current', lit(False))
                         )


## Union with new customer records
merged_customers_df = new_customer_df.unionByName(customers_to_update_df)

record_key = 'customer_sk'
partition_key = "country"

# Upsert
merged_customers_df.write.format('org.apache.hudi')\
                    .options(**hudi_options)\
                    .option('hoodie.table.name',customers_table_name)\
                    .option('hoodie.datasource.write.recordkey.field', record_key)\
                    .option('hoodie.datasource.write.partitionpath.field',partition_key)\
                    .option('hoodie.datasource.write.precombine.field','timestamp')\
                    .option('hoodie.datasource.write.operation', 'upsert')\
                    .option('hoodie.datasource.hive_sync.table',customers_table_name)\
                    .mode('append')\
                    .save(customers_table_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
customer_sk_df = spark. \
  read. \
  format("hudi"). \
  load(customers_table_path)

customer_sk_df.createOrReplaceTempView(customers_table_name)

spark.sql('select customer_id,'
          'first_name, '
          'last_name, '
          'city, '
          'country, '
          'eff_start_date, '
          'eff_end_date, '
          'customer_sk, '
          'is_current '
          'from ' + customers_table_name).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+---------+-------+-------+--------------+------------+----------------+----------+
|customer_id|first_name|last_name|   city|country|eff_start_date|eff_end_date|     customer_sk|is_current|
+-----------+----------+---------+-------+-------+--------------+------------+----------------+----------+
|          3|   Bastian|     Back| Berlin|     GE|    2022-10-29|  2999-12-31|1667042573856711|      true|
|          2|     Susan|     Chas|Seattle|     US|    2020-10-14|  2022-10-29|1667041876947966|     false|
|          2|     Susan|     Chas|  Paris|     FR|    2022-10-29|  2999-12-31|1667042574223601|      true|
|          1|      John|    Smith| London|     UK|    2020-09-27|  2999-12-31|1667041876044190|      true|
+-----------+----------+---------+-------+-------+--------------+------------+----------------+----------+

In [18]:
sales_df = spark.createDataFrame([('103', 250, 12.3, datetime.strptime(datetime.today().strftime('%Y-%m-%d')+' 12:15:42', '%Y-%m-%d %H:%M:%S'), '2'),
                                       ('104', 3, 1021.0, datetime.strptime(datetime.today().strftime('%Y-%m-%d')+' 06:35:32', '%Y-%m-%d %H:%M:%S'), '2')], sales_schema)

sales_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+------+-------------------+-----------+
|item_id|quantity| price|          timestamp|customer_id|
+-------+--------+------+-------------------+-----------+
|    103|     250|  12.3|2022-10-29 12:15:42|          2|
|    104|       3|1021.0|2022-10-29 06:35:32|          2|
+-------+--------+------+-------------------+-----------+

In [19]:
from pyspark.sql.functions import when

join_cond = [sales_df.customer_id == customer_sk_df.customer_id, sales_df.timestamp >= customer_sk_df.eff_start_date, sales_df.timestamp < customer_sk_df.eff_end_date]


sales_with_cust_sk_df = (sales_df
                          .join(customer_sk_df, join_cond, 'leftouter')
                          .select(sales_df['*'],
                            when(customer_sk_df.customer_sk.isNull(), '-1').otherwise(customer_sk_df.customer_sk).alias("customer_sk") )
                         )

sales_with_cust_sk_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+------+-------------------+-----------+----------------+
|item_id|quantity| price|          timestamp|customer_id|     customer_sk|
+-------+--------+------+-------------------+-----------+----------------+
|    103|     250|  12.3|2022-10-29 12:15:42|          2|1667042574223601|
|    104|       3|1021.0|2022-10-29 06:35:32|          2|1667042574223601|
+-------+--------+------+-------------------+-----------+----------------+

In [20]:
sales_table_path = 's3://646297494209-us-east-1-modern-data-lake-storage/sales/'
sales_table_name = 'sales_table'

sales_with_cust_sk_df.write.format('parquet')\
                             .mode('Append')\
                             .save(sales_table_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
sales_with_cust_sk_df = spark. \
  read. \
  load(sales_table_path)

sales_with_cust_sk_df.createOrReplaceTempView(sales_table_name)

spark.sql("select item_id, quantity, price, timestamp, customer_id, customer_sk from " + sales_table_name).show(5, False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+------+-------------------+-----------+----------------+
|item_id|quantity|price |timestamp          |customer_id|customer_sk     |
+-------+--------+------+-------------------+-----------+----------------+
|100    |25      |123.46|2020-11-17 09:15:32|1          |1667041876044190|
|103    |250     |12.3  |2022-10-29 12:15:42|2          |1667042574223601|
|101    |300     |123.46|2020-10-28 09:15:32|1          |1667041876044190|
|104    |3       |1021.0|2022-10-29 06:35:32|2          |1667042574223601|
|102    |5       |1038.0|2020-12-08 09:15:32|2          |1667041876947966|
+-------+--------+------+-------------------+-----------+----------------+

In [22]:
spark.sql(
    'SELECT ct.country, SUM(st.quantity) as sales_quantity, COUNT(*) as count_sales '
    'FROM sales_table st '
    'INNER JOIN customers_table ct on st.customer_sk = ct.customer_sk group by ct.country').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------+-----------+
|country|sales_quantity|count_sales|
+-------+--------------+-----------+
|     US|             5|          1|
|     UK|           325|          2|
|     FR|           253|          2|
+-------+--------------+-----------+