# Working with Delta on EMR and Athena

## Add Delta Support to Spark

In [None]:
%%configure -f
{ "conf": 
   {
       "spark.jars.packages": "io.delta:delta-core_2.12:2.1.1",
       "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
       "spark.sql.catalog.spark_catalog" : "org.apache.spark.sql.delta.catalog.DeltaCatalog"
   }
}

## Create a SparkSession
We are creating this from scratch because EMR's implicit _SparkSession_ creation is unreliable.

In [2]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("Export delta to Athena").getOrCreate()
spark

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1667912611103_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f5e48514610>

## Read listings parquet

In [5]:
LISTINGS_INPUT_PARQUET='s3://...'
LISTINGS_DELTA_LOCATION='s3://athena-delta/delta/listings.delta'
HOSTS_DELTA_LOCATION='s3://athena-delta/delta/hosts.delta'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
df = spark.read.parquet(INPUT_PARQUET)
df.printSchema()

In [None]:
print(f'This delta file will have {df.rdd.getNumPartitions()} partitions.')

## Write into Delta

In [None]:
df.write.format('delta').save(LISTINGS_DELTA_LOCATION)

In [None]:
def q(s): return spark.sql(s)

## Register listings as a table and execute a few queries 

In [None]:
q(f"DROP TABLE IF EXISTS my_table")
q(f"CREATE TABLE my_table USING DELTA LOCATION '{DELTA_LOCATION}'")

In [None]:
q('DESCRIBE EXTENDED my_table').show(truncate=False)

In [None]:
q('SELECT * FROM my_table').show()

In [None]:
prDF = q("SELECT * FROM my_table WHERE room_type='Private room'")
prDF.write.format('delta').mode('overwrite').save(DELTA_LOCATION)

In [None]:
q('DESCRIBE HISTORY my_table').show()

In [None]:
hist_df = q('DESCRIBE HISTORY my_table')
hist_df.select('version', 'timestamp', 'operation', 'operationParameters').show(truncate=False)

In [None]:
q("SELECT * FROM my_table WHERE minimum_nights > 365").show()

In [None]:
q("DELETE FROM my_table WHERE minimum_nights > 365")

In [None]:
q("SELECT * FROM my_table WHERE minimum_nights < 1").show()

In [None]:
q("UPDATE my_table set minimum_nights = 1 WHERE minimum_nights < 1")

In [None]:
hist_df = q('DESCRIBE HISTORY my_table')
hist_df.select('version', 'timestamp', 'operation', 'operationParameters').show(truncate=False)

In [None]:
q('SELECT * FROM my_table VERSION AS OF 0').show()

In [None]:
from delta import DeltaTable

deltaTable = DeltaTable.forPath(spark, path=DELTA_LOCATION)
deltaTable.generate("symlink_format_manifest")

print(f'Generated Manifest file at {DELTA_LOCATION}/_symlink_format_manifest')

# Load hosts.csv and save as delta

In [4]:
hosts = spark.read.csv('s3://nordquant/athena/airbnb/hosts/hosts.csv', header=True, 
                       schema="id INT, name STRING, is_superhost STRING, created_at STRING, updated_at STRING")
hosts.show()
hosts.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+--------------+------------+-------------------+-------------------+
|   id|          name|is_superhost|         created_at|         updated_at|
+-----+--------------+------------+-------------------+-------------------+
| 1581|       Annette|           f|2014-01-05 16:12:45|2014-01-05 16:12:45|
| 2164|         Lulah|           t|2013-07-31 23:29:31|2013-07-31 23:29:31|
| 2217|           Ion|           t|2017-10-17 05:20:28|2017-10-17 05:20:28|
| 3718|        Britta|           f|2009-06-05 21:34:42|2009-06-05 21:34:42|
|11622|         Maria|           f|2021-10-24 02:42:09|2021-10-24 02:42:09|
|12360|       Michael|           t|2017-08-27 12:08:43|2017-08-27 12:08:43|
|12424|       Mariana|           f|2021-11-08 17:27:31|2021-11-08 17:27:31|
|15115|     Christian|           f|2015-01-03 17:53:53|2015-01-03 17:53:53|
|17391|    BrightRoom|           t|2009-08-12 12:30:30|2009-08-12 12:30:30|
|22901|        Hannes|           f|2013-07-06 11:09:47|2013-07-06 11:09:47|
|32299|     

In [6]:
hosts.write.format("delta").mode("overwrite").option('overwriteSchema', True).save(HOSTS_DELTA_LOCATION)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
from delta import DeltaTable

deltaTable = DeltaTable.forPath(spark, path=HOSTS_DELTA_LOCATION)
deltaTable.generate("symlink_format_manifest")

print(f'Generated Manifest file at {HOSTS_DELTA_LOCATION}/_symlink_format_manifest')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Generated Manifest file at s3://athena-delta/delta/hosts.delta/_symlink_format_manifest