# Initialize the SparkSession

Run the cells below to get a SparkSession.

In [2]:
import findspark
findspark.init()
import pyspark

In [3]:
spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.8.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

In [4]:
from delta.tables import *
import pyspark.sql.functions as f

# Store data in delta format

The data we use is the COVID-19 daily report data from Johns Hopkins university. It's already download to `/home/jovyan/COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/` as CSV files, with the filename formatted as the date in American format (mm-dd-yyyy). For now we will only load the April 2021 data.

After loading the CSV data, you need to write it in the delta format. Use `/tmp/deltalake/` to avoid permission issues.

In [5]:
df = spark.read.format("csv").options(header='True', inferSchema='True', delimiter=',').load("/home/jovyan/COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/04-*-2021.csv")
df.show()

+----+------+--------------------+-------------------+-------------------+---------+----------+---------+------+---------+------+--------------------+------------------+-------------------+
|FIPS|Admin2|      Province_State|     Country_Region|        Last_Update|      Lat|     Long_|Confirmed|Deaths|Recovered|Active|        Combined_Key|     Incident_Rate|Case_Fatality_Ratio|
+----+------+--------------------+-------------------+-------------------+---------+----------+---------+------+---------+------+--------------------+------------------+-------------------+
|null|  null|                null|        Afghanistan|2021-04-04 04:20:49| 33.93911| 67.709953|    56595|  2496|    51802|  2297|         Afghanistan|145.38251193391469|  4.410283593957064|
|null|  null|                null|            Albania|2021-04-04 04:20:49|  41.1533|   20.1683|   126183|  2256|    93173| 30754|             Albania|4384.7035930224465| 1.7878795083331351|
|null|  null|                null|            Alge

In [7]:
df.write.format("delta").save("/tmp/deltalake/")

# Read data in delta format

After storing the data in delta format you can now read it. Try showing the first few rows.

In [5]:
df = spark.read.format("delta").load("/tmp/deltalake/")
df.show(5)

+----+------+--------------+--------------+-------------------+--------+---------+---------+------+---------+------+------------+------------------+-------------------+
|FIPS|Admin2|Province_State|Country_Region|        Last_Update|     Lat|    Long_|Confirmed|Deaths|Recovered|Active|Combined_Key|     Incident_Rate|Case_Fatality_Ratio|
+----+------+--------------+--------------+-------------------+--------+---------+---------+------+---------+------+------------+------------------+-------------------+
|null|  null|          null|   Afghanistan|2021-04-04 04:20:49|33.93911|67.709953|    56595|  2496|    51802|  2297| Afghanistan|145.38251193391469|  4.410283593957064|
|null|  null|          null|       Albania|2021-04-04 04:20:49| 41.1533|  20.1683|   126183|  2256|    93173| 30754|     Albania|4384.7035930224465| 1.7878795083331351|
|null|  null|          null|       Algeria|2021-04-04 04:20:49| 28.0339|   1.6596|   117524|  3102|    81813| 32609|     Algeria|268.00730828682913|  2.639

# Clean up the data

We use the `FIPS`, `Admin2`, `Province_State`, `Country_Region` and `Last_Update` as the unique key of this dataset. For ease of use later on we want to clean up the nulls in `FIPS`, `Admin2` and `Province_State` column.

Do this using DeltaTable class with the update method. Replace nulls for strings with an empty string (`''`) and for numbers with `0`.

Hint: you can check for nulls with the `isNull()` method of `f.col()`.

- [Documentation of DeltaTable](https://docs.delta.io/latest/api/python/index.html).
- [Documentation of Spark SQL Functions](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions).

In [22]:
df.dtypes

[('FIPS', 'int'),
 ('Admin2', 'string'),
 ('Province_State', 'string'),
 ('Country_Region', 'string'),
 ('Last_Update', 'string'),
 ('Lat', 'double'),
 ('Long_', 'double'),
 ('Confirmed', 'int'),
 ('Deaths', 'int'),
 ('Recovered', 'int'),
 ('Active', 'int'),
 ('Combined_Key', 'string'),
 ('Incident_Rate', 'double'),
 ('Case_Fatality_Ratio', 'double')]

In [6]:
deltaTable = DeltaTable.forPath(spark, '/tmp/deltalake')

deltaTable.update(f.col("FIPS").isNull(), { "FIPS": "0" } ) 
deltaTable.update(f.col("Admin2").isNull(), { "Admin2": "''" } ) 
deltaTable.update(f.col("Province_State").isNull(), { "Province_State": "''" } )

In [10]:
df.show(5)

+----+------+--------------+--------------+-------------------+--------+---------+---------+------+---------+------+------------+------------------+-------------------+
|FIPS|Admin2|Province_State|Country_Region|        Last_Update|     Lat|    Long_|Confirmed|Deaths|Recovered|Active|Combined_Key|     Incident_Rate|Case_Fatality_Ratio|
+----+------+--------------+--------------+-------------------+--------+---------+---------+------+---------+------+------------+------------------+-------------------+
|   0|      |              |   Afghanistan|2021-04-04 04:20:49|33.93911|67.709953|    56595|  2496|    51802|  2297| Afghanistan|145.38251193391469|  4.410283593957064|
|   0|      |              |       Albania|2021-04-04 04:20:49| 41.1533|  20.1683|   126183|  2256|    93173| 30754|     Albania|4384.7035930224465| 1.7878795083331351|
|   0|      |              |       Algeria|2021-04-04 04:20:49| 28.0339|   1.6596|   117524|  3102|    81813| 32609|     Algeria|268.00730828682913|  2.639

Delta Lake keeps a history of you data. You can check it out with the code below.

In [25]:
table_history_df = deltaTable.history() 
table_history_df.show()

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|      4|2021-04-08 17:58:52|  null|    null|    MERGE|[predicate -> (((...|null|    null|     null|          3|          null|        false|[numTargetRowsCop...|        null|
|      3|2021-04-08 17:31:28|  null|    null|   UPDATE|[predicate -> isn...|null|    null|     null|          2|          null|        false|[numRemovedFiles ...|        null|
|      2|2021-04-08 17:31:22|  null|    null|   UPDATE|[predicate -> isn...|null|    null|     null|          1|        

# Upsert new data

The Confirmed cases for Flevoland in the Netherlands turned out to be different. We've received a new file with the fix, it's stored in `/home/jovyan/delta-lake-workshop/files/04-07-2021.csv`.

- Clean the new data with the same rules as you cleaned the existing data in the deltalake (replace null with `''` or `0`).
- Upsert the new data with a merge into the deltalake. You can find an example [here](https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge). Remember the columns that make up the key.

In [15]:
updates_df = spark.read.format("csv").options(header='True', inferSchema='True', delimiter=',').load("/home/jovyan/delta-lake-workshop/files/04-07-2021.csv")
updates_df.show(5)

+----+------+--------------+--------------+-------------------+--------+---------+---------+------+---------+------+------------+------------------+-------------------+
|FIPS|Admin2|Province_State|Country_Region|        Last_Update|     Lat|    Long_|Confirmed|Deaths|Recovered|Active|Combined_Key|     Incident_Rate|Case_Fatality_Ratio|
+----+------+--------------+--------------+-------------------+--------+---------+---------+------+---------+------+------------+------------------+-------------------+
|null|  null|          null|   Afghanistan|2021-04-08 04:21:13|33.93911|67.709953|    56873|  2512|    51940|  2421| Afghanistan| 146.0966446014229|  4.416858614808433|
|null|  null|          null|       Albania|2021-04-08 04:21:13| 41.1533|  20.1683|   127192|  2291|    95600| 29301|     Albania| 4419.765098339009|  1.801213912824706|
|null|  null|          null|       Algeria|2021-04-08 04:21:13| 28.0339|   1.6596|   118004|  3116|    82192| 32696|     Algeria| 269.1019230717044| 2.6405

In [17]:
updates_df = updates_df.withColumn('FIPS',
    f.when(f.col('FIPS').isNull(),0).
    otherwise(f.col('FIPS'))) \
    .withColumn('Admin2',
    f.when(f.col('Admin2').isNull(),'').
    otherwise(f.col('Admin2'))) \
    .withColumn('Province_State',
    f.when(f.col('Province_State').isNull(),'').
    otherwise(f.col('Province_State')))
updates_df.show(5)

+----+------+--------------+--------------+-------------------+--------+---------+---------+------+---------+------+------------+------------------+-------------------+
|FIPS|Admin2|Province_State|Country_Region|        Last_Update|     Lat|    Long_|Confirmed|Deaths|Recovered|Active|Combined_Key|     Incident_Rate|Case_Fatality_Ratio|
+----+------+--------------+--------------+-------------------+--------+---------+---------+------+---------+------+------------+------------------+-------------------+
|   0|      |              |   Afghanistan|2021-04-08 04:21:13|33.93911|67.709953|    56873|  2512|    51940|  2421| Afghanistan| 146.0966446014229|  4.416858614808433|
|   0|      |              |       Albania|2021-04-08 04:21:13| 41.1533|  20.1683|   127192|  2291|    95600| 29301|     Albania| 4419.765098339009|  1.801213912824706|
|   0|      |              |       Algeria|2021-04-08 04:21:13| 28.0339|   1.6596|   118004|  3116|    82192| 32696|     Algeria| 269.1019230717044| 2.6405

In [19]:
deltaTable.alias("covid19").merge(
    updates_df.alias("updates"),
    "covid19.FIPS = updates.FIPS AND \
    covid19.Admin2 = updates.Admin2 AND \
    covid19.Province_State = updates.Province_State AND \
    covid19.Country_Region = updates.Country_Region AND \
    covid19.Last_Update = updates.Last_Update") \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()

# Time travel
In a data warehouse (or lakehouse) you often want to be able to time travel, so you can see what the world looked like at a certain moment in time. This feature is very useful for data rollbacks, auditing, and reproducing reports and analytics at any point in time.

Since history is kept, this feature is supported out of the box. You can find examples on how to access the history [here](https://docs.delta.io/latest/delta-batch.html#query-an-older-snapshot-of-a-table-time-travel).

In [27]:
pre_merge_df = spark.read.format("delta").option("versionAsOf", 3).load("/tmp/deltalake/")
pre_merge_df.filter(pre_merge_df["Province_State"]=='Flevoland').orderBy(pre_merge_df['Last_Update']).show()

+----+------+--------------+--------------+-------------------+---------+--------+---------+------+---------+------+--------------------+------------------+-------------------+
|FIPS|Admin2|Province_State|Country_Region|        Last_Update|      Lat|   Long_|Confirmed|Deaths|Recovered|Active|        Combined_Key|     Incident_Rate|Case_Fatality_Ratio|
+----+------+--------------+--------------+-------------------+---------+--------+---------+------+---------+------+--------------------+------------------+-------------------+
|   0|      |     Flevoland|   Netherlands|2021-04-02 04:20:36|52.550383|5.515162|    29289|   226|        0| 29063|Flevoland, Nether...|6923.7697419277065| 0.7716207449895865|
|   0|      |     Flevoland|   Netherlands|2021-04-03 04:20:44|52.550383|5.515162|    29415|   226|        0| 29189|Flevoland, Nether...| 6953.555497244818| 0.7683154852966174|
|   0|      |     Flevoland|   Netherlands|2021-04-04 04:20:49|52.550383|5.515162|    29554|   226|        0| 29328

In [28]:
df.filter(df["Province_State"]=='Flevoland').orderBy(df['Last_Update']).show()

+----+------+--------------+--------------+-------------------+---------+--------+---------+------+---------+------+--------------------+------------------+-------------------+
|FIPS|Admin2|Province_State|Country_Region|        Last_Update|      Lat|   Long_|Confirmed|Deaths|Recovered|Active|        Combined_Key|     Incident_Rate|Case_Fatality_Ratio|
+----+------+--------------+--------------+-------------------+---------+--------+---------+------+---------+------+--------------------+------------------+-------------------+
|   0|      |     Flevoland|   Netherlands|2021-04-02 04:20:36|52.550383|5.515162|    29289|   226|        0| 29063|Flevoland, Nether...|6923.7697419277065| 0.7716207449895865|
|   0|      |     Flevoland|   Netherlands|2021-04-03 04:20:44|52.550383|5.515162|    29415|   226|        0| 29189|Flevoland, Nether...| 6953.555497244818| 0.7683154852966174|
|   0|      |     Flevoland|   Netherlands|2021-04-04 04:20:49|52.550383|5.515162|    29554|   226|        0| 29328

# Advanced Delta Lake: schema updates

We didn't think you would get here already, but if you're up for it you can take it one step further.

Remember that we only put April 2021 in our delta lake? The reason for this was that the data set has had quite a bit of schema drift in the last 1+ year. Delta Lake can deal with this, either manually (meh) or automatically (yay). Check out the examples [here](https://docs.delta.io/latest/delta-batch.html#update-table-schema).

We haven't tried this feature yet, so we're curious what your solution is.