# Delta lake demo!

In [None]:
from delta import *
import pyspark
from pyspark.sql import SparkSession

delta_spark_version = "2.1.0"
spark_jars_packages = f"io.delta:delta-core_2.12:{delta_spark_version}"

spark = (
            SparkSession.builder.master("local[*]")
                .appName("PySparkLocal")
                .config("spark.executor.memory", "2g")
                .config("spark.driver.memory", "2g")
                .config("spark.jars.packages", spark_jars_packages)
                .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
                .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
                .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
                .config("spark.databricks.delta.schema.autoMerge.enabled", "true")
                .config("spark.databricks.delta.retentionDurationCheck.enabled", "false") # allow immediage vacuum
                .config("spark.databricks.delta.optimize.repartition.enabled", "true") # Automatically, avoid small files
                .getOrCreate()
        )

# Check how many records we have

In [None]:
deltaDf = spark.read.format("delta").load('../Deltalake/crypto')
deltaDf.count()

In [None]:
deltaDf.select('isoDate').distinct().count()

In [None]:
import pandas as pd

pandaDf = deltaDf.limit(100).toPandas()
pandaDf.head()

In [None]:
from pyspark.sql.functions import avg

deltaDf.select(avg(deltaDf.price)).show()

# What happens when we count again?

In [None]:
deltaDf.count()

In [None]:
deltaDf.select('isoDate').distinct().count()

In [None]:
deltaDf.select(avg(deltaDf.price)).show()

# Let's see the table's history!

In [None]:
from delta.tables import *
from IPython.lib.pretty import pretty

deltaTable = DeltaTable.forPath(spark, '../Deltalake/crypto')
pretty = deltaTable.history().toPandas()
display(pretty)

# Let's query as of data!

In [None]:
asOfDf = spark.read.format("delta").option('timestampAsOf', '2022-10-14 15:56:00').load('../Deltalake/crypto')
asOfDf.select('isoDate').distinct().count()

In [None]:
asOfDf.select(avg(asOfDf.price)).show()

# Let's see restore in action!

In [None]:
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '../Deltalake/crypto')

deltaTable.delete("isoDate == '2021-03-15'")

In [None]:
deltaDf.filter(deltaDf.isoDate == '2021-03-15').show()

In [None]:
pretty = deltaTable.history().toPandas()
display(pretty)

In [None]:
deltaTable.restoreToVersion(19) # restore table to before delete

In [None]:
deltaDf.filter(deltaDf.isoDate == '2021-03-15').show()

# Behind the scenes

In [None]:
pretty = deltaTable.detail().toPandas()
display(pretty)

In [None]:
import multiprocessing
import math
# Because the files are super small, let's reduce the number of files to 2 days per CPU core
cpu_count = multiprocessing.cpu_count()
numDates = deltaDf.select('isoDate').distinct().count()

numFiles = math.floor(cpu_count * numDates / 2)

path = '../Deltalake/crypto'

(spark.read.format("delta").load(path)
    .repartition(numFiles)
    .write
    .option("dataChange", "false")
    .format("delta")
    .mode("overwrite")
    .save(path)
)

In [None]:
pretty = deltaTable.detail().toPandas()
display(pretty)

In [None]:
deltaTable.vacuum(0)

# You can always go back to Parquet

In [None]:
deltaDf.write.format("parquet").mode("overwrite").save('crypto')

# Extras section

In [None]:
deltaTable.optimize().executeZOrderBy('isBuyerMaker') # Co-locate data for adhoc queries (not useful for this dataset)