In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql.functions import *
from pyspark import SparkContext
from pyspark.sql import SparkSession
import os

In [3]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:0.7.0 pyspark-shell'

In [4]:
spark = SparkSession.builder.appName('loan-risks').config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate()

In [5]:
#configure delta lake path
delta_path = 'tmp/lakehouse'
#read loans data and create the Delta Lake table
(spark.read.format('parquet').load('loan-risks.snappy.parquet').coalesce(1).write.format('delta').mode('overwrite').save(delta_path))

In [6]:
# Create a view on the data called loans_delta
spark.read.format('delta').load(delta_path).createOrReplaceTempView("loans_delta")

In [7]:
spark.sql('SELECT COUNT(*) FROM loans_delta').show()

+--------+
|count(1)|
+--------+
|   14705|
+--------+



In [7]:
cols = ['loan_id', 'funded_amnt', 'paid_amnt', 'addr_state','closed']
items = [
(1111111, 1000, 1000.0, 'TX',True),
(2222222, 2000, 0.0, 'CA', False)
]

loans_update = (spark.createDataFrame(items, cols).withColumn("funded_amnt", col("funded_amnt").cast("int")))

In [8]:
loans_update.write.format('delta').mode('append').option("mergeSchema", "true").save(delta_path)

In [11]:
spark.sql('SELECT * FROM loans_delta LIMIT 10').show()

+-------+-----------+---------+----------+------+
|loan_id|funded_amnt|paid_amnt|addr_state|closed|
+-------+-----------+---------+----------+------+
|      0|       1000|   182.22|        CA|  null|
|      1|       1000|   361.19|        WA|  null|
|      2|       1000|   176.26|        TX|  null|
|      3|       1000|   1000.0|        OK|  null|
|      4|       1000|   249.98|        PA|  null|
|      5|       1000|    408.6|        CA|  null|
|      6|       1000|   1000.0|        MD|  null|
|      7|       1000|   168.81|        OH|  null|
|      8|       1000|   193.64|        TX|  null|
|      9|       1000|   218.83|        CT|  null|
+-------+-----------+---------+----------+------+



In [9]:
#Updating data to fix errors
#1. Copy all of the row that are not affected into new table
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, delta_path)

In [10]:
deltaTable.update("addr_state='OR'", {"addr_state":"'WA'"})

In [14]:
deltaTable.delete("funded_amnt >= paid_amnt")

In [12]:
(deltaTable.alias('t')
    .merge(loans_update.alias('s'), 't.loan_id=s.loan_id')
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute())

In [17]:
deltaTable.history().show(10)

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|     11|2021-07-30 11:10:...|  null|    null|    MERGE|[predicate -> (t....|null|    null|     null|         10|          null|        false|[numTargetRowsCop...|        null|
|     10|2021-07-30 11:07:...|  null|    null|   UPDATE|[predicate -> (ad...|null|    null|     null|          9|          null|        false|[numRemovedFiles ...|        null|
|      9|2021-07-30 11:06:...|  null|    null|    WRITE|[mode -> Append, ...|null|    null|     null|          8|  

In [18]:
(deltaTable
.history(3)
.select("version", "timestamp", "operation", "operationParameters")
.show(truncate=False))

+-------+-----------------------+---------+------------------------------------------+
|version|timestamp              |operation|operationParameters                       |
+-------+-----------------------+---------+------------------------------------------+
|11     |2021-07-30 11:10:32.255|MERGE    |[predicate -> (t.`loan_id` = s.`loan_id`)]|
|10     |2021-07-30 11:07:15.74 |UPDATE   |[predicate -> (addr_state#1269 = OR)]     |
|9      |2021-07-30 11:06:54.407|WRITE    |[mode -> Append, partitionBy -> []]       |
+-------+-----------------------+---------+------------------------------------------+



In [22]:
(spark.read
.format("delta")
.option("timestampAsOf", "2021-07-30") # timestamp after table creation
.load(delta_path))

DataFrame[loan_id: bigint, funded_amnt: int, paid_amnt: double, addr_state: string, closed: boolean]