In [1]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

builder = (
    SparkSession.builder.appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.driver.memory", "8g")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

:: loading settings :: url = jar:file:/Users/pee.tankulrat/Develop/TW/talk/delta-lake/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/pee.tankulrat/.ivy2/cache
The jars for the packages stored in: /Users/pee.tankulrat/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-69ad70be-3d9d-48fb-9f9f-5aba38144695;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.1.0 in central
	found io.delta#delta-storage;3.1.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 99ms :: artifacts dl 3ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.1.0 from central in [default]
	io.delta#delta-storage;3.1.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default    

24/03/21 14:36:28 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [2]:
eCommerce = spark.read.option("header", "true").csv("data/eCommerce/*.csv")

## It is slow to work on non-columnar files

In [3]:
eCommerce.count()

                                                                                

109950743

In [4]:
# TODO: Make this idempotent
eCommerce.write.format("delta").mode('overwrite').save("./data/delta/raw/eCommerce")

24/03/21 14:38:02 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [5]:
eComDelta = spark.read.format("delta").load("./data/delta/raw/eCommerce")
eComDelta.count()

109950743

In [7]:
eComDelta.show()

+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|2019-10-25 13:38:...|      view|  28717850|2053013565639492569|       apparel.shoes|    NULL| 177.35|562154821|3fcba72e-a3f7-450...|
|2019-10-25 13:38:...|  purchase|   1004856|2053013555631882655|electronics.smart...| samsung| 131.51|512711994|3b99facc-1331-4b2...|
|2019-10-25 13:38:...|      view|   1307236|2053013558920217191|  computers.notebook|  lenovo| 320.96|544749446|058ba4d7-23e0-4e6...|
|2019-10-25 13:38:...|      view|   4700630|2053013560899928785|auto.accessories....|    NULL|  30.86|558665584|50e91e76-5259-472...|
|2019-10-25 13:38:...|      view|   8200231|205301355541377883

## Let's discard NULL records

In [9]:
refined = eComDelta.filter(eComDelta.brand.isNotNull())
refined.write.format("delta").mode('overwrite').save("./data/delta/rfn/eCommerce")

                                                                                

In [10]:
refined.select('event_type').distinct().collect()

                                                                                

[Row(event_type='purchase'), Row(event_type='view'), Row(event_type='cart')]

In [11]:
dysonPurchaseDf = refined.filter(refined.brand == "dyson").filter(
    refined.event_type == "purchase"
)
dysonPurchaseDf.count()

                                                                                

423

## Do people spend more, if they are buying Dyson's product?

In [12]:
dysonPurchaseDf.select('price').summary().show()



+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|               423|
|   mean| 559.8790070921985|
| stddev|148.92210018071378|
|    min|            307.83|
|    25%|            478.75|
|    50%|            509.64|
|    75%|            694.74|
|    max|            846.80|
+-------+------------------+



                                                                                

In [13]:
(
    eComDelta
    .filter(eComDelta.brand.isNotNull())
    .filter(eComDelta.brand != "dyson")
    .filter(eComDelta.event_type == "purchase")
    .select("price")
    .summary()
    .show()
)



+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|           1527878|
|   mean|317.66230296529335|
| stddev| 351.7339956988872|
|    min|              0.77|
|    25%|             91.38|
|    50%|            180.16|
|    75%|            386.08|
|    max|            999.77|
+-------+------------------+



                                                                                