In [1]:
import findspark
findspark.init()
findspark.find()

from IPython.display import *
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = (
            SparkSession
                .builder
                .appName("DeltaLakeApp")
    
                .master("local[4]")    
                .config("spark.dynamicAllocation.enabled", "false")     
    
    
                # Add package for Delta Lake
                .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0")
    
    
                # Add settings to use Delta Lake with Spark session
                .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    
                .config("spark.sql.catalog.spark_catalog", 
                        "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    
                .getOrCreate()
        )

sc = spark.sparkContext

spark

In [3]:
# Create schema for Yellow Taxi data
yellowTaxiSchema = (
                        StructType
                        ([ 
                            StructField("VendorId"               , IntegerType()   , True),
                            StructField("PickupTime"             , TimestampType() , True),
                            StructField("DropTime"               , TimestampType() , True),
                            StructField("PassengerCount"         , DoubleType()    , True),
                            StructField("TripDistance"           , DoubleType()    , True),
                            StructField("RateCodeId"             , DoubleType()    , True),
                            StructField("StoreAndFwdFlag"        , StringType()    , True),
                            StructField("PickupLocationId"       , IntegerType()   , True),
                            StructField("DropLocationId"         , IntegerType()   , True),
                            StructField("PaymentType"            , IntegerType()   , True),
                            StructField("FareAmount"             , DoubleType()    , True),
                            StructField("Extra"                  , DoubleType()    , True),
                            StructField("MtaTax"                 , DoubleType()    , True),
                            StructField("TipAmount"              , DoubleType()    , True),
                            StructField("TollsAmount"            , DoubleType()    , True),
                            StructField("ImprovementSurcharge"   , DoubleType()    , True),
                            StructField("TotalAmount"            , DoubleType()    , True),
                            StructField("CongestionSurcharge"    , DoubleType()    , True),
                            StructField("AirportFee"             , DoubleType()    , True)
                        ])
                   )


# Read Yellow Taxis file
yellowTaxiDF = (
                  spark
                    .read
                    .option("header", "true")    
                    .schema(yellowTaxiSchema)    
                    .csv("C:\DataFiles\YellowTaxis_202210.csv")
               )


# Print schema of DataFrame
yellowTaxiDF.printSchema()

root
 |-- VendorId: integer (nullable = true)
 |-- PickupTime: timestamp (nullable = true)
 |-- DropTime: timestamp (nullable = true)
 |-- PassengerCount: double (nullable = true)
 |-- TripDistance: double (nullable = true)
 |-- RateCodeId: double (nullable = true)
 |-- StoreAndFwdFlag: string (nullable = true)
 |-- PickupLocationId: integer (nullable = true)
 |-- DropLocationId: integer (nullable = true)
 |-- PaymentType: integer (nullable = true)
 |-- FareAmount: double (nullable = true)
 |-- Extra: double (nullable = true)
 |-- MtaTax: double (nullable = true)
 |-- TipAmount: double (nullable = true)
 |-- TollsAmount: double (nullable = true)
 |-- ImprovementSurcharge: double (nullable = true)
 |-- TotalAmount: double (nullable = true)
 |-- CongestionSurcharge: double (nullable = true)
 |-- AirportFee: double (nullable = true)



### Create database in metastore

In [4]:
spark.sql("""

CREATE DATABASE IF NOT EXISTS TaxisDB

""")

DataFrame[]

### Parquet format: Save DataFrame as a Table

In [5]:
(
    yellowTaxiDF
            .write    
            .mode("overwrite")
    
            .partitionBy("VendorId")
    
            .format("parquet")
    
            .option("path", "C:\DataFiles\Output\YellowTaxis.parquet")
    
            .saveAsTable("TaxisDB.YellowTaxisParquet")
)

### Delta format: Save DataFrame as Table

In [6]:
# If you have already run this command, and want to start from beginning,
# delete folder from file system first

from delta import *

(
    yellowTaxiDF
            .write    
            .mode("overwrite")
    
            .partitionBy("VendorId")
    
            .format("delta")
    
            .option("path", "C:\DataFiles\Output\YellowTaxis.delta")
    
            .saveAsTable("TaxisDB.YellowTaxis")
)

In [7]:
spark.sql("""

SELECT COUNT(*)
FROM TaxisDB.YellowTaxis

""").show()

+--------+
|count(1)|
+--------+
| 3675412|
+--------+



### Audit History of Delta Table

This shows transaction log of Delta Table

In [8]:
spark.sql("""

DESCRIBE HISTORY TaxisDB.YellowTaxis

""").show(truncate=False)

+-------+-----------------------+------+--------+---------------------------------+----------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+---------------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation                        |operationParameters                                                                     |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                     |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+---------------------------------+----------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-------------------------------------------------

### Overwrite data in Delta table

In [9]:
# Overwrite data by re-running the command

(
    yellowTaxiDF
            .write    
            .mode("overwrite")
    
            .partitionBy("VendorId")
    
            .format("delta")
    
            .option("path", "C:\DataFiles\Output\YellowTaxis.delta")
    
            .saveAsTable("TaxisDB.YellowTaxis")
)

In [10]:
spark.sql("""

DESCRIBE HISTORY TaxisDB.YellowTaxis

""").show(truncate=False)

+-------+-----------------------+------+--------+---------------------------------+----------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+---------------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation                        |operationParameters                                                                     |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                     |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+---------------------------------+----------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-------------------------------------------------

In [11]:
spark.sql("""

DROP TABLE TaxisDB.YellowTaxis

""")

DataFrame[]

### Create Table Definition

Create table using DDL command, and later add the data

In [12]:
spark.sql("""

CREATE TABLE TaxisDB.YellowTaxis
(
    VendorId                INT               COMMENT 'Vendor providing the ride',
    
    PickupTime              TIMESTAMP,
    DropTime                TIMESTAMP,
    
    PickupLocationId        INT               NOT NULL,
    DropLocationId          INT,
    
    PassengerCount          DOUBLE,
    TripDistance            DOUBLE,
    
    RateCodeId              DOUBLE,
    StoreAndFwdFlag         STRING,
    PaymentType             INT,
    
    FareAmount              DOUBLE,
    Extra                   DOUBLE,
    MtaTax                  DOUBLE,
    TipAmount               DOUBLE,
    TollsAmount             DOUBLE,
    ImprovementSurcharge    DOUBLE,
    TotalAmount             DOUBLE,
    CongestionSurcharge     DOUBLE,
    AirportFee              DOUBLE
)

USING DELTA                  -- default is Parquet

LOCATION "C:/SparkCourse/DataFiles/Output/YellowTaxis.delta/"

PARTITIONED BY (VendorId)    -- optional

COMMENT 'This table stores ride information for Yellow Taxis'

""")

DataFrame[]

In [13]:
spark.sql("""

DESCRIBE TABLE EXTENDED TaxisDB.YellowTaxis

""").show(50, truncate=False)

+----------------------------+-------------------------------------------------------+-------------------------+
|col_name                    |data_type                                              |comment                  |
+----------------------------+-------------------------------------------------------+-------------------------+
|VendorId                    |int                                                    |Vendor providing the ride|
|PickupTime                  |timestamp                                              |NULL                     |
|DropTime                    |timestamp                                              |NULL                     |
|PickupLocationId            |int                                                    |NULL                     |
|DropLocationId              |int                                                    |NULL                     |
|PassengerCount              |double                                                 |NULL      

### Options to Add Data to Delta Table

#### Option 1: Insert command

Use typical SQL Insert command to add data to table

In [14]:
spark.sql("""

INSERT INTO TaxisDB.YellowTaxis

-- (VendorId, PickupTime, DropTime, PickupLocationId, DropLocationId, PassengerCount, TripDistance, RateCodeId, StoreAndFwdFlag, PaymentType, FareAmount, Extra, MtaTax, TipAmount, TollsAmount, ImprovementSurcharge, TotalAmount, CongestionSurcharge, AirportFee)

VALUES (3, '2022-12-01T00:00:00.000Z', '2022-12-01T00:15:34.000Z', 170, 140, 1.0, 2.9, 1.0, '1', 1, 13.0, 0.5, 0.5, 1.0, 0.0, 0.3, 15.3, 0.0, 0.0)

""")

DataFrame[]

In [15]:
spark.sql("""

SELECT * FROM TaxisDB.YellowTaxis

""").show(truncate=False)

+--------+-------------------+-------------------+----------------+--------------+--------------+------------+----------+---------------+-----------+----------+-----+------+---------+-----------+--------------------+-----------+-------------------+----------+
|VendorId|PickupTime         |DropTime           |PickupLocationId|DropLocationId|PassengerCount|TripDistance|RateCodeId|StoreAndFwdFlag|PaymentType|FareAmount|Extra|MtaTax|TipAmount|TollsAmount|ImprovementSurcharge|TotalAmount|CongestionSurcharge|AirportFee|
+--------+-------------------+-------------------+----------------+--------------+--------------+------------+----------+---------------+-----------+----------+-----+------+---------+-----------+--------------------+-----------+-------------------+----------+
|3       |2022-12-01 05:30:00|2022-12-01 05:45:34|170             |140           |1.0           |2.9         |1.0       |1              |1          |13.0      |0.5  |0.5   |1.0      |0.0        |0.3                 |15.3

In [16]:
spark.sql("""

DESCRIBE HISTORY TaxisDB.YellowTaxis

""").show(truncate=False)

+-------+-----------------------+------+--------+------------+---------------------------------------------------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation   |operationParameters                                                                                                                    |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                           |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+------------+---------------------------------------------------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-----

#### Option 2: Append a DataFrame

Read data as a DataFrame and append to Delta Table using PySpark

In [17]:
# Extract new records from Storage/Data Lake

yellowTaxisAppendDF = (
                          spark
                            .read
                            .option("header", "true")
                            .schema(yellowTaxiSchema)
                            .csv("C:\SparkCourse\DataFiles\Raw\YellowTaxis_append.csv")
                      )

yellowTaxisAppendDF.show()

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/C:/SparkCourse/DataFiles/Raw/YellowTaxis_append.csv.

In [None]:
# Append to data lake in delta format

(
    yellowTaxisAppendDF
        .write
  
        .mode("append")
  
        .partitionBy("VendorId")  
        .format("delta")           
        .save("C:\SparkCourse\DataFiles\Output\YellowTaxis.delta")
)

In [None]:
spark.sql("""

SELECT * 
FROM TaxisDB.YellowTaxis

""").show()

In [None]:
spark.sql("""

DESCRIBE HISTORY TaxisDB.YellowTaxis

""").show(truncate=False)

In [None]:
# Append to data lake in delta format
(
    yellowTaxiDF
        .write
  
        .mode("append")
  
        .partitionBy("VendorId")  
        .format("delta")
        .save("C:\SparkCourse\DataFiles\Output\YellowTaxis.delta")
)

In [None]:
spark.sql("""

SELECT COUNT(1)
FROM TaxisDB.YellowTaxis

""").show()

### Check files holding the data

In [None]:
spark.sql("""

SELECT INPUT_FILE_NAME()
     
     , VendorId
     , PickupLocationId
     , PassengerCount
     
FROM TaxisDB.YellowTaxis

WHERE VendorId = 3

""").show(truncate=False)

### UPDATE command

In [None]:
# Check passenger count

spark.sql("""

SELECT VendorId
     , PickupLocationId
     , PassengerCount

FROM TaxisDB.YellowTaxis

WHERE VendorId = 3
    AND PickupLocationId = 249

""").show()

In [None]:
# Run update statement to change passenger count

spark.sql("""

UPDATE TaxisDB.YellowTaxis

SET PassengerCount = 2

WHERE VendorId = 3
    AND PickupLocationId = 249

""").show()

In [None]:
# Check passenger count after update

spark.sql("""

SELECT VendorId
     , PickupLocationId
     , PassengerCount

FROM TaxisDB.YellowTaxis

WHERE VendorId = 3
    AND PickupLocationId = 249

""").show()

In [None]:
spark.sql("""

DESCRIBE HISTORY TaxisDB.YellowTaxis

""").show(truncate=False)

### DELETE command

In [18]:
# Check if record exist

spark.sql("""

SELECT VendorId
     , PickupLocationId
     , PassengerCount

FROM TaxisDB.YellowTaxis

WHERE VendorId = 3
    AND PickupLocationId = 151

""").show()

+--------+----------------+--------------+
|VendorId|PickupLocationId|PassengerCount|
+--------+----------------+--------------+
+--------+----------------+--------------+



In [19]:
# Delete the record

spark.sql("""

DELETE FROM TaxisDB.YellowTaxis

WHERE VendorId = 3
    AND PickupLocationId = 151

""").show()

+-----------------+
|num_affected_rows|
+-----------------+
|                0|
+-----------------+



In [None]:
# Check if record exist after delete operation

spark.sql("""

SELECT VendorId
     , PickupLocationId
     , PassengerCount

FROM TaxisDB.YellowTaxis

WHERE VendorId = 3
    AND PickupLocationId = 151

""").show()

In [None]:
spark.sql("""

DESCRIBE HISTORY TaxisDB.YellowTaxis

""").show(truncate=False)

### MERGE command

In [None]:
# Extract changed records from Storage/Data Lake

yellowTaxiChangesDF = (
                          spark
                            .read
                            .option("header", "true")
                            .schema(yellowTaxiSchema)
                            .csv("C:\SparkCourse\DataFiles\Raw\YellowTaxis_changes.csv")
                      )

yellowTaxiChangesDF.show()

In [None]:
yellowTaxiChangesDF.createOrReplaceTempView("YellowTaxiChanges")

In [None]:
spark.sql("""

SELECT *
FROM YellowTaxiChanges

""").show()

In [None]:
spark.sql("""

MERGE INTO TaxisDB.YellowTaxis tgt

    USING YellowTaxiChanges    src

        ON    tgt.VendorId          =  src.VendorId
          AND tgt.PickupLocationId  =  src.PickupLocationId
  
-- Update row if join conditions match
WHEN MATCHED
      
      THEN  
          UPDATE SET    tgt.PaymentType = src.PaymentType   
          
                                                      -- Use 'UPDATE SET *' to update all columns

-- Insert row if row is not present in target table
WHEN NOT MATCHED 
      AND PickupTime >= '2022-03-01'

      THEN 
          INSERT (VendorId, PickupTime, DropTime, PickupLocationId, DropLocationId, PassengerCount, TripDistance, 
                  RateCodeId, StoreAndFwdFlag, PaymentType, FareAmount, Extra, MtaTax, TipAmount, TollsAmount, 
                  ImprovementSurcharge, TotalAmount, CongestionSurcharge, AirportFee)
          
          VALUES (VendorId, PickupTime, DropTime, PickupLocationId, DropLocationId, PassengerCount, TripDistance,
                  RateCodeId, StoreAndFwdFlag, PaymentType, FareAmount, Extra, MtaTax, TipAmount, TollsAmount, 
                  ImprovementSurcharge, TotalAmount, CongestionSurcharge, AirportFee)
                                                                         


""").show()

### Constraints on Delta Table

#### NOT NULL constraint

Drop the constraint here which was previously created

<i>- Define NOT NULL constraint on column to avoid insertion of NULL values </i> <br/>

In [None]:
spark.sql("""

ALTER TABLE TaxisDB.YellowTaxis

      CHANGE COLUMN PickupLocationId DROP NOT NULL

""").show()


# At the time of recording, Open Issue: https://github.com/delta-io/delta/issues/831
# Does not allow to set column as null after table creation

# ALTER TABLE TaxisDB.YellowTaxis
#      CHANGE COLUMN PickupLocationId SET NOT NULL

In [None]:
spark.sql("""

DESCRIBE HISTORY TaxisDB.YellowTaxis

""").show(truncate=False)

#### CHECK constraint

Check constraint helps to enforce certain conditions on the table

In [None]:
spark.sql("""

ALTER TABLE TaxisDB.YellowTaxis

    ADD CONSTRAINT PassengerCountCheck CHECK (PassengerCount <= 5)

""").show()

In [None]:
spark.sql("""

ALTER TABLE TaxisDB.YellowTaxis

    ADD CONSTRAINT PassengerCheck CHECK (PassengerCount <= 9 OR PassengerCount IS NULL)

""")

In [None]:
spark.sql("""

INSERT INTO TaxisDB.YellowTaxis

--(VendorId, PickupTime, DropTime, PickupLocationId, 
DropLocationId, PassengerCount, TripDistance, RateCodeId, 
StoreAndFwdFlag, PaymentType, FareAmount, Extra, MtaTax, TipAmount, 
TollsAmount, ImprovementSurcharge, TotalAmount, CongestionSurcharge, AirportFee)

VALUES (3, '2022-12-01T00:00:00.000Z', '2022-12-01T00:15:34.000Z', 170, 140, 

        20,  -- PassengerCount

        2.9, 1.0, '1', 1, 13.0, 0.5, 0.5, 1.0, 0.0, 0.3, 15.3, 0.0, 0.0)

""")

In [None]:
spark.sql("""

ALTER TABLE TaxisDB.YellowTaxis

    DROP CONSTRAINT PassengerCheck

""")

In [None]:
spark.sql("""

DESCRIBE HISTORY TaxisDB.YellowTaxis

""").show(truncate=False)

### Update a record

In [None]:
# Check passenger count before update

spark.sql("""

SELECT PassengerCount

FROM TaxisDB.YellowTaxis

WHERE VendorId = 3
    AND PickupLocationId = 1

""").show()

In [None]:
# Update passenger count

spark.sql("""

UPDATE TaxisDB.YellowTaxis

SET PassengerCount = 1

WHERE VendorId = 3
    AND PickupLocationId = 1

""").show()

In [None]:
spark.sql("""

DESCRIBE HISTORY TaxisDB.YellowTaxis

""").show(truncate=False)

### Time Travel: Access using Version Number

In [None]:
# Check at initial version

spark.sql("""

SELECT PassengerCount

FROM TaxisDB.YellowTaxis        VERSION AS OF 0

WHERE VendorId = 3
    AND PickupLocationId = 1

""").show(truncate=False)

In [None]:
# Check at one prior version

spark.sql("""

SELECT PassengerCount

FROM TaxisDB.YellowTaxis        VERSION AS OF 9

WHERE VendorId = 3
    AND PickupLocationId = 1

""").show(truncate=False)

### Time Travel: Access using Timestamp

In [None]:
spark.sql("""

SELECT PassengerCount

FROM TaxisDB.YellowTaxis        TIMESTAMP AS OF '2023-03-12 22:39:00'

WHERE VendorId = 3
    AND PickupLocationId = 1

""").show(truncate=False)

### Restore Table to older version

In [None]:
spark.sql("""

RESTORE TABLE TaxisDB.YellowTaxis    TO VERSION AS OF 9

""").show(truncate=False)

In [None]:
spark.sql("""

DESCRIBE HISTORY TaxisDB.YellowTaxis

""").show(truncate=False)

In [None]:
# Check data after restore

spark.sql("""

SELECT PassengerCount

FROM TaxisDB.YellowTaxis

WHERE VendorId = 3
    AND PickupLocationId = 1

""").show()