# Building Data Lakes with Spark

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

from delta import *

In [23]:
### Pulled this block of code from link below so we could run interactively
### https://delta.io/learn/getting-started

builder = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [7]:
### Configure source data path
sourcePath = "C:/Users/sean.cornillie/Education/LearningSparkV2/Spark_Dev/datasets/loan-risks.snappy.parquet"

### Configure Delta Lake path
deltaPath = "/tmp/loans_delta"

In [8]:
### Create the Delta Lake table with the loans data
(spark.read.format("parquet")
    .load(sourcePath)
    .write.format("delta")
    .save(deltaPath))

In [9]:
### Create a view on the data called loans_data
(spark.read.format("delta")
    .load(deltaPath)
    .createOrReplaceTempView("loans_delta"))

In [10]:
### Now we can perform typical spark sql functions within our delta lake
spark.sql("""SELECT count(*)
             FROM loans_delta""").show()

+--------+
|count(1)|
+--------+
|   14705|
+--------+



In [11]:
spark.sql("""SELECT *
             FROM loans_delta
             LIMIT 5""").show()

+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|      0|       1000|   182.22|        CA|
|      1|       1000|   361.19|        WA|
|      2|       1000|   176.26|        TX|
|      3|       1000|   1000.0|        OK|
|      4|       1000|   249.98|        PA|
+-------+-----------+---------+----------+



### Enforcing Schema on Write to Prevent Data Corruption

In [13]:
### Define our dummy data with extra column 'closed'
cols = ['loan_id', 'funded_amnt', 'paid_amnt', 'addr_state', 'closed']

items = [
(1111111, 1000, 1000.0, 'TX', True),
(2222222, 2000, 0.0, 'CA', False)
]

In [14]:
### This WILL throw a schema mismatch error (as intented by this exercise)
loanUpdates = (spark.createDataFrame(items, cols)
                  .withColumn("funded_amnt", col("funded_amnt").cast("int")))

loanUpdates.write.format("delta").mode("append").save(deltaPath)

AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: ca68afb8-3b0b-4326-835c-1978c753eb2e).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- loan_id: long (nullable = true)
-- funded_amnt: integer (nullable = true)
-- paid_amnt: double (nullable = true)
-- addr_state: string (nullable = true)


Data schema:
root
-- loan_id: long (nullable = true)
-- funded_amnt: integer (nullable = true)
-- paid_amnt: double (nullable = true)
-- addr_state: string (nullable = true)
-- closed: boolean (nullable = true)

         

### Updating Data within a Delta Lake table
Let's say that all of the loans assigned to addr_state = 'OR' should have been assigned to 'WA'. <br/>
If the data were in a parquet table, then we'd need to jump through some hoops to accomplish this. <br/>
However, within a Delta Lake we can just use the DL API:

In [16]:
### Create new table and just update with a one-liner.
deltaTable = DeltaTable.forPath(spark, deltaPath)

deltaTable.update("addr_state = 'OR'", {"addr_state": "'WA'"})

### Deleting Data
Need to be able to delte data on all loans that have been fully paid off (for example according to EU GDPR)

In [18]:
### Again we can just use built in API to accomplish. Can also use SQL DELETE statement directly for same effect.
deltaTable = DeltaTable.forPath(spark, deltaPath)

deltaTable.delete("funded_amnt >= paid_amnt")

### Auditing changes with operation history

In [19]:
deltaTable.history().show()

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      2|2023-01-23 13:11:...|  null|    null|   DELETE|{predicate -> ["(...|null|    null|     null|          1|  Serializable|        false|{numRemovedFiles ...|        null|Apache-Spark/3.3....|
|      1|2023-01-23 13:09:...|  null|    null|   UPDATE|{predicate -> (ad...|null|    null|     null|          0|  Serializable|        false|{numRemovedFiles ...|        null|Apache-Spark/3.3....|
|      0|2

### Querying previous snapshots with Time Travel

In [21]:
### Won't actually load these but reference the options for two different ways to accomplish
#(spark.read
#    .format("delta")
#    .option("timeStampAsOf", "2020-01-01")
#    .load(deltaPath))
#
#(spark.read
#    .format("delta")
#    .option("versionAsOf", "4")
#    .load(deltaPath))

In [22]:
spark.stop()