In [1]:
# Config for notebook - optional
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

import pandas as pd
pd.set_option('display.max_colwidth',100)

In [2]:
# Delta lake docs
# https://docs.delta.io/latest/delta-intro.html

In [3]:
import pyspark

# Instantiate spark session with delta lake
spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.8.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

from delta.tables import *

In [7]:
# Read in data in to dataframe

df = spark.read.option("header",True).csv('../data/train.csv')

df.count()

df.show(5)

913000

+----------+-----+----+-----+
|      date|store|item|sales|
+----------+-----+----+-----+
|2013-01-01|    1|   1|   13|
|2013-01-02|    1|   1|   11|
|2013-01-03|    1|   1|   14|
|2013-01-04|    1|   1|   13|
|2013-01-05|    1|   1|   10|
+----------+-----+----+-----+
only showing top 5 rows



In [12]:
# Write as delta table.
df.write.format("delta").save("/home/jovyan/work/lake")

In [9]:
# Restart kernel and readin data from delta lake

delta_df = spark.read.format("delta").load("../lake")

type(delta_df)

delta_df.count()

pyspark.sql.dataframe.DataFrame

913000

In [20]:
# Create a new data frame with a new column
new_df = df.withColumn('col_n', df.store + df.item)

new_df.show()

In [26]:
# Write this back to delta lake. Note: without overwrite and mergescehma we will get an error as Delta lake wont allow us to overwrite data with new column

new_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("../lake")

In [10]:
# Lets check Delta table's meta data
deltaTable = DeltaTable.forPath(spark, "../lake")

In [11]:
# As we can see below, Delta lake has history of our initial write and then changes made subsequently.
deltaTable.history().toPandas()

Unnamed: 0,version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
0,1,2021-04-07 12:40:19.961,,,WRITE,"{'mode': 'Overwrite', 'partitionBy': '[]'}",,,,0.0,,False,"{'numOutputRows': '913000', 'numOutputBytes': '1776421', 'numFiles': '4'}",
1,0,2021-04-07 12:22:02.304,,,WRITE,"{'mode': 'ErrorIfExists', 'partitionBy': '[]'}",,,,,,True,"{'numOutputRows': '913000', 'numOutputBytes': '1773875', 'numFiles': '4'}",


In [12]:
# Below are all parquet files related to above changes
!ls ../lake

_delta_log
part-00000-457fb9cf-fd52-423d-9af9-c2922d0c85fe-c000.snappy.parquet
part-00000-bb692ea7-87a1-4af0-b7f3-95189f10bdcf-c000.snappy.parquet
part-00001-225c2da3-6d10-4561-a7eb-66d51c4e0720-c000.snappy.parquet
part-00001-a6373976-cac8-45d4-ab3d-120c40bd7222-c000.snappy.parquet
part-00002-2236d358-9a8e-4d96-bb24-38caecae18d6-c000.snappy.parquet
part-00002-712810b1-3c68-4e17-9536-246c20c10d4b-c000.snappy.parquet
part-00003-376097a8-e45b-4d0e-a3fd-7940325b6c65-c000.snappy.parquet
part-00003-7557dfdc-4c47-42d3-9801-4c80cafa6c21-c000.snappy.parquet


In [13]:
# Restart kernel and now we can read data as of version we are interested in.
old_table = spark.read.format("delta").option("versionAsOf", 0).load("../lake")
print((old_table.count(), len(old_table.columns)))
old_table.limit(5).toPandas()

(913000, 4)


Unnamed: 0,date,store,item,sales
0,2013-01-01,1,1,13
1,2013-01-02,1,1,11
2,2013-01-03,1,1,14
3,2013-01-04,1,1,13
4,2013-01-05,1,1,10


In [14]:
new_table = spark.read.format("delta").option("versionAsOf", 1).load("../lake")
print((new_table.count(), len(new_table.columns)))
new_table.limit(5).toPandas()

(913000, 5)


Unnamed: 0,date,store,item,sales,col_n
0,2013-01-01,1,1,13,2.0
1,2013-01-02,1,1,11,2.0
2,2013-01-03,1,1,14,2.0
3,2013-01-04,1,1,13,2.0
4,2013-01-05,1,1,10,2.0
