# Imports 

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Setup

In [2]:
spark = (
    SparkSession.builder.appName("pyspark-notebook")
    .master("spark://spark-master:7077")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .getOrCreate()
)

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9a78bf29-87a7-4361-8a34-91370917daa9;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.1.0 in central
	found io.delta#delta-storage;2.1.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 278ms :: artifacts dl 16ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	io.delta#delta-core_2.12;2.1.0 from central in [default]
	io.delta#delta-storage;2.1.0 

23/04/13 05:55:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "minio")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "minio123")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://172.18.0.2:9000")

In [4]:
from delta.tables import *

# Load DL 

In [5]:
dl = DeltaTable.forPath(spark, "s3a://lake/taxis")

23/04/13 06:10:05 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


                                                                                

# Read data

In [16]:
df = spark.read.option("header", "true").csv("s3a://incoming/yellow_tripdata_sample_2019_03_new_schema.csv")

In [17]:
df = df.withColumn("source_filename", F.input_file_name())

In [18]:
df.count()

1

In [19]:
df.show(1, vertical=True)

-RECORD 0-----------------------------------
 vendor_id           | 2                    
 pickup_datetime     | 2019-02-28 00:39:50  
 dropoff_datetime    | 2019-02-28 00:44:23  
 passenger_count     | 5                    
 pickup_location_id  | 79                   
 dropoff_location_id | 148                  
 fare_amount         | 5.0                  
 stars               | 5                    
 source_filename     | s3a://incoming/ye... 



In [20]:
df.printSchema()

root
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- pickup_location_id: string (nullable = true)
 |-- dropoff_location_id: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- stars: string (nullable = true)
 |-- source_filename: string (nullable = false)



# Append to DL 

In [21]:
# Enable automatic schema evolution
spark.sql("SET spark.databricks.delta.schema.autoMerge.enabled = true") 

DataFrame[key: string, value: string]

In [22]:
# new_data.write.option("mergeSchema", "true").mode("append").save(path)

In [23]:
(
    dl
    .alias("lake")
    .merge(df.alias("incoming"),"lake.source_filename = incoming.source_filename")
    .whenNotMatchedInsertAll()
    .execute()
)

                                                                                

23/04/13 06:15:51 WARN MergeIntoCommand: Merge source has SQLMetric(id: 841, name: Some(number of source rows), value: 1) rows in initial scan but SQLMetric(id: 842, name: Some(number of source rows (during repeated scan)), value: 0) rows in second scan


                                                                                

In [24]:
dl.history().show(truncate=False, vertical=True)

                                                                                

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 version             | 2                                                                                                                                                                                                                                                                                                              
 timestamp           | 2023-04-13 06:15:52                                                                                                                                                                                                                                                                                            
 userId            

# Read again DL 

In [25]:
df = spark.read.format("delta").load("s3a://lake/taxis")

In [26]:
df.count()

                                                                                

21

In [27]:
df.printSchema()

root
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- pickup_location_id: string (nullable = true)
 |-- dropoff_location_id: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- source_filename: string (nullable = true)
 |-- stars: string (nullable = true)

