In [1]:
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.remote("sc://delta-connect-server").getOrCreate()

In [2]:
type(spark_session)

pyspark.sql.connect.session.SparkSession

In [3]:
from pyspark.sql.functions import col, to_date, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, FloatType

In [4]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, FloatType
schema = (StructType([
    StructField("event_time", StringType(), False),
    StructField("event_type", StringType(), False),
    StructField("product_id", IntegerType(), False),
    StructField("category_id", LongType(), False),
    StructField("category_code", StringType(), False),
    StructField("brand", StringType(), False),
    StructField("price", FloatType(), False),
    StructField("user_id", IntegerType(), False),
    StructField("user_session", StringType(), False),
]))

In [5]:
schema

StructType([StructField('event_time', StringType(), False), StructField('event_type', StringType(), False), StructField('product_id', IntegerType(), False), StructField('category_id', LongType(), False), StructField('category_code', StringType(), False), StructField('brand', StringType(), False), StructField('price', FloatType(), False), StructField('user_id', IntegerType(), False), StructField('user_session', StringType(), False)])

In [6]:
dataset_dir = '/opt/spark/work-dir/datasets/ecomm_raw/'

In [7]:
datasets = ['2019-Oct.csv', '2019-Nov.csv']

In [8]:
october_data = datasets[0]
november_data = datasets[1]

In [9]:
ecomm_df = (
    spark.read.format("csv")
    .option("header", True)
    .schema(schema)
    .load(f"{dataset_dir}/{october_data}")
)

In [10]:
ecomm_df.show()

+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|2019-10-01 00:00:...|      view|  44600062|2103807459595387724|                NULL|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|
|2019-10-01 00:00:...|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|   33.2|554748717|9333dfbd-b87a-470...|
|2019-10-01 00:00:...|      view|  17200506|2053013559792632471|furniture.living_...|    NULL|  543.1|519107250|566511c2-e2e3-422...|
|2019-10-01 00:00:...|      view|   1307067|2053013558920217191|  computers.notebook|  lenovo| 251.74|550050854|7c90fc70-0e80-459...|
|2019-10-01 00:00:...|      view|   1004237|205301355563188265

In [10]:
ecomm_df.count()

42448764

In [11]:
ecomm_nov_df = (
  spark.read.format("csv")
    .option("header", True)
    .schema(schema)
    .load(f"{dataset_dir}/{november_data}")
)

In [12]:
ecomm_nov_df.show()

+--------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|   brand| price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+
|2019-11-01 00:00:...|      view|   1003461|2053013555631882655|electronics.smart...|  xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|
|2019-11-01 00:00:...|      view|   5000088|2053013566100866035|appliances.sewing...|  janome|293.65|530496790|8e5f4f83-366c-4f7...|
|2019-11-01 00:00:...|      view|  17302664|2053013553853497655|                NULL|   creed| 28.31|561587266|755422e7-9040-477...|
|2019-11-01 00:00:...|      view|   3601530|2053013563810775923|appliances.kitche...|      lg|712.87|518085591|3bfb58cd-7892-48c...|
|2019-11-01 00:00:...|      view|   1004775|2053013555631882655|elect

In [13]:
ecomm_nov_df.count()

67501979

## Run this Block for each Month of Data
> Note: To get the full experience, run the following cell twice (once per referenced dataframe):
> - `target_df = ecomm_df`
> - `target_df = ecomm_nov_df`


In [15]:
from pyspark.sql.functions import to_date, to_timestamp
spark.conf.set("spark.sql.parquet.compression.codec", "zstd")

sink_dir = 'sm' if datasets[1].endswith('-sm.csv') else 'lg'

target_df = ecomm_df
#target_df = ecomm_nov_df

(target_df
   .withColumn("event_time", to_timestamp(col("event_time"), "yyyy-MM-dd HH:mm:ss z"))
   .withColumn("event_date", to_date(col("event_time")))
   .write
   .format("parquet")
   .partitionBy("event_date")
   .mode("append")
   .save(f"{dataset_dir}/parquet/{sink_dir}")
)

## Read the Parquet in and Generate new a Delta Lake table
> This is testing that spark-connect can write to Delta (the promise of delta-connect) :)

```
`spark-connect-server`: is where the action happens

This means the "files" are actually being read on the remote server. So locally, you literally are 'asking' if the operation can be completed server-side. I'm saying this because it isn't immediately apparent "where" each operation runs. This was a learning curve (took testing things out).

In [16]:
delta_path = f"{dataset_dir}/delta"
dl_unmanaged_table = "ecomm"

# we'll be reading from the remote location
source_parquet_dir = f"{dataset_dir}/parquet/{sink_dir}"

# create a new dataframe reader here
source_parquet = (spark.read
 .format("parquet")
 .load(source_parquet_dir)
)


In [17]:
## Now for the Delta part. Let's create an empty Delta table
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {dl_unmanaged_table} (
        event_time TIMESTAMP,
        event_type STRING,
        product_id INTEGER,
        category_id BIGINT,
        category_code STRING,
        brand STRING,
        price FLOAT,
        user_id INTEGER,
        user_session STRING,
        event_date DATE
    ) USING DELTA
    LOCATION '{delta_path}/{dl_unmanaged_table}'
    PARTITIONED BY (event_date)
    TBLPROPERTIES('delta.logRetentionDuration'='interval 28 days');
   """)

DataFrame[]

In [18]:
## proof is in the pudding as they say...
(source_parquet
 .write
 .format("delta")
 .option("path", f"{delta_path}/{dl_unmanaged_table}")
 .mode("append")
 .save()
)

In [24]:
spark.sql(
f"""
select * from delta.`{delta_path}/{dl_unmanaged_table}` 
where event_date BETWEEN DATE('2019-10-06') AND DATE('2019-10-16')
ORDER BY event_date ASC
""").show(1000)

+-------------------+----------+----------+-------------------+--------------------+------------+-------+---------+--------------------+----------+
|         event_time|event_type|product_id|        category_id|       category_code|       brand|  price|  user_id|        user_session|event_date|
+-------------------+----------+----------+-------------------+--------------------+------------+-------+---------+--------------------+----------+
|2019-10-06 16:40:19|      view|  26401638|2053013563651392361|                NULL|        NULL| 188.94|514588156|1b8af203-62d9-417...|2019-10-06|
|2019-10-06 05:49:02|      view|   1004833|2053013555631882655|electronics.smart...|     samsung| 173.96|557247327|36e33bf9-43c8-450...|2019-10-06|
|2019-10-06 12:30:27|      view|  13300618|2053013557166998015|                NULL|      askona| 200.52|544224224|604ff497-7041-40b...|2019-10-06|
|2019-10-06 05:49:14|      view|   4700569|2053013560899928785|auto.accessories....|        ibox|   56.6|5131838