In [13]:
from pyspark.sql import SparkSession
import os

In [2]:
spark = SparkSession.builder.appName("SparkDeltaTable") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.8.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .enableHiveSupport() \
    .getOrCreate()

from delta.tables import *

In [3]:
spark

***
## Create a Database & Table

In [4]:
%load_ext sparksql_magic

In [33]:
%%sparksql
SHOW DATABASES;

0
namespace
dbacademy
default
deltatable


In [34]:
%%sparksql
SHOW TABLES;

0,1,2
database,tableName,isTemporary


***
## Drop SQL Tables

In [10]:
%%sparksql
DROP TABLE src_pq;

In [12]:
%%sparksql
SHOW TABLES;

0,1,2
database,tableName,isTemporary


In [14]:
%%sparksql
CREATE DATABASE deltatable;

In [35]:
%%sparksql
USE deltatable;

In [36]:
%%sparksql
SHOW TABLES;

0,1,2
database,tableName,isTemporary


***
## List environment

In [37]:
sparkPath = "/spark-warehouse/"

In [38]:
os.listdir(sparkPath)

[]

***

In [26]:
from pyspark.sql import SQLContext
spark.sql(f"CREATE DATABASE IF NOT EXISTS dbacademy")
spark.sql(f"USE dbacademy")
health_tracker = f"/dbacademy/DLRS/healthtracker/"

In [27]:
spark.conf.set("spark.sql.shuffle.partitions", 8)

In [None]:
# %%sh
# wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_1.json
# wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_2.json
# wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_2_late.json
# wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_3.json

In [28]:
%%sh
ls data

health_tracker_data_2020_1.json
health_tracker_data_2020_2.json
health_tracker_data_2020_2_late.json
health_tracker_data_2020_3.json
kv1.txt
uk-macroeconomic-data.csv


In [29]:
file_path = "data/health_tracker_data_2020_1.json"
 
health_tracker_data_2020_1_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [30]:
health_tracker_data_2020_1_df.show(5)

+---------+-------------+--------------+-----------+
|device_id|    heartrate|          name|       time|
+---------+-------------+--------------+-----------+
|        0|52.8139067501|Deborah Powell|1.5778368E9|
|        0|53.9078900098|Deborah Powell|1.5778404E9|
|        0|52.7129593616|Deborah Powell| 1.577844E9|
|        0|52.2880422685|Deborah Powell|1.5778476E9|
|        0|52.5156095386|Deborah Powell|1.5778512E9|
+---------+-------------+--------------+-----------+
only showing top 5 rows



In [None]:
# import matplotlib.pyplot as plt
# plt.plot(health_tracker_data_2020_1_df.device_id)

## Step 2: Create a Parquet Table

We will perform data engineering on the data with the following transformations:

    Use the from_unixtime Spark SQL function to transform the unix timestamp into a time string

    Cast the time column to type timestamp to replace the column time

    Cast the time column to type date to create the column dte

    Select the columns in the order in which we would like them to be writte

As this is a process that we will perform on each dataset as it is loaded we compose a function to perform the necessary transformations. This function, process_health_tracker_data, can be reused each time.

In [31]:
from pyspark.sql.functions import col, from_unixtime
 
def process_health_tracker_data(dataframe):
    return (
        dataframe
        .withColumn("time", from_unixtime("time"))
        .withColumnRenamed("device_id", "p_device_id")
        .withColumn("time", col("time").cast("timestamp"))
        .withColumn("dte", col("time").cast("date"))
        .withColumn("p_device_id", col("p_device_id").cast("integer"))
        .select("dte", "time", "heartrate", "name", "p_device_id")
    )
  
processedDF = process_health_tracker_data(health_tracker_data_2020_1_df)

## Step 3: Write the Files to the processed directory

Note that we are partitioning the data by device id.

In [41]:
(processedDF.write
 .mode("overwrite")
 .format("parquet")
 .partitionBy("p_device_id")
 .saveAsTable("/tmp/processed"))

ParseException: 
extraneous input '/' expecting {'ADD', 'AFTER', 'ALL', 'ALTER', 'ANALYZE', 'AND', 'ANTI', 'ANY', 'ARCHIVE', 'ARRAY', 'AS', 'ASC', 'AT', 'AUTHORIZATION', 'BETWEEN', 'BOTH', 'BUCKET', 'BUCKETS', 'BY', 'CACHE', 'CASCADE', 'CASE', 'CAST', 'CHANGE', 'CHECK', 'CLEAR', 'CLUSTER', 'CLUSTERED', 'CODEGEN', 'COLLATE', 'COLLECTION', 'COLUMN', 'COLUMNS', 'COMMENT', 'COMMIT', 'COMPACT', 'COMPACTIONS', 'COMPUTE', 'CONCATENATE', 'CONSTRAINT', 'COST', 'CREATE', 'CROSS', 'CUBE', 'CURRENT', 'CURRENT_DATE', 'CURRENT_TIME', 'CURRENT_TIMESTAMP', 'CURRENT_USER', 'DATA', 'DATABASE', DATABASES, 'DBPROPERTIES', 'DEFINED', 'DELETE', 'DELIMITED', 'DESC', 'DESCRIBE', 'DFS', 'DIRECTORIES', 'DIRECTORY', 'DISTINCT', 'DISTRIBUTE', 'DIV', 'DROP', 'ELSE', 'END', 'ESCAPE', 'ESCAPED', 'EXCEPT', 'EXCHANGE', 'EXISTS', 'EXPLAIN', 'EXPORT', 'EXTENDED', 'EXTERNAL', 'EXTRACT', 'FALSE', 'FETCH', 'FIELDS', 'FILTER', 'FILEFORMAT', 'FIRST', 'FOLLOWING', 'FOR', 'FOREIGN', 'FORMAT', 'FORMATTED', 'FROM', 'FULL', 'FUNCTION', 'FUNCTIONS', 'GLOBAL', 'GRANT', 'GROUP', 'GROUPING', 'HAVING', 'IF', 'IGNORE', 'IMPORT', 'IN', 'INDEX', 'INDEXES', 'INNER', 'INPATH', 'INPUTFORMAT', 'INSERT', 'INTERSECT', 'INTERVAL', 'INTO', 'IS', 'ITEMS', 'JOIN', 'KEYS', 'LAST', 'LATERAL', 'LAZY', 'LEADING', 'LEFT', 'LIKE', 'LIMIT', 'LINES', 'LIST', 'LOAD', 'LOCAL', 'LOCATION', 'LOCK', 'LOCKS', 'LOGICAL', 'MACRO', 'MAP', 'MATCHED', 'MERGE', 'MSCK', 'NAMESPACE', 'NAMESPACES', 'NATURAL', 'NO', NOT, 'NULL', 'NULLS', 'OF', 'ON', 'ONLY', 'OPTION', 'OPTIONS', 'OR', 'ORDER', 'OUT', 'OUTER', 'OUTPUTFORMAT', 'OVER', 'OVERLAPS', 'OVERLAY', 'OVERWRITE', 'PARTITION', 'PARTITIONED', 'PARTITIONS', 'PERCENT', 'PIVOT', 'PLACING', 'POSITION', 'PRECEDING', 'PRIMARY', 'PRINCIPALS', 'PROPERTIES', 'PURGE', 'QUERY', 'RANGE', 'RECORDREADER', 'RECORDWRITER', 'RECOVER', 'REDUCE', 'REFERENCES', 'REFRESH', 'RENAME', 'REPAIR', 'REPLACE', 'RESET', 'RESTRICT', 'REVOKE', 'RIGHT', RLIKE, 'ROLE', 'ROLES', 'ROLLBACK', 'ROLLUP', 'ROW', 'ROWS', 'SCHEMA', 'SELECT', 'SEMI', 'SEPARATED', 'SERDE', 'SERDEPROPERTIES', 'SESSION_USER', 'SET', 'MINUS', 'SETS', 'SHOW', 'SKEWED', 'SOME', 'SORT', 'SORTED', 'START', 'STATISTICS', 'STORED', 'STRATIFY', 'STRUCT', 'SUBSTR', 'SUBSTRING', 'TABLE', 'TABLES', 'TABLESAMPLE', 'TBLPROPERTIES', TEMPORARY, 'TERMINATED', 'THEN', 'TIME', 'TO', 'TOUCH', 'TRAILING', 'TRANSACTION', 'TRANSACTIONS', 'TRANSFORM', 'TRIM', 'TRUE', 'TRUNCATE', 'TYPE', 'UNARCHIVE', 'UNBOUNDED', 'UNCACHE', 'UNION', 'UNIQUE', 'UNKNOWN', 'UNLOCK', 'UNSET', 'UPDATE', 'USE', 'USER', 'USING', 'VALUES', 'VIEW', 'VIEWS', 'WHEN', 'WHERE', 'WINDOW', 'WITH', 'ZONE', IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 0)

== SQL ==
/tmp/processed
^^^


In [10]:
%load_ext sparksql_magic

In [11]:
%%sparksql
SHOW DATABASES;

0
namespace
dbacademy
default


In [12]:
%%sparksql
SHOW TABLES;

0,1,2
database,tableName,isTemporary
dbacademy,processed,False


## Step 4: Register the table in the metastore

Next, use Spark SQL to register the table in the metastore. We specify the table format as parquet and we refer to the location where we wrote the parquet files.

In [13]:
%%sparksql
DROP TABLE IF EXISTS pq_table

In [14]:
%%sparksql
SHOW DATABASES;

0
namespace
dbacademy
default


In [15]:
%%sparksql
SHOW TABLES

0,1,2
database,tableName,isTemporary
dbacademy,processed,False


In [16]:
%%sparksql
CREATE TABLE pq_table
USING PARQUET
LOCATION "processed"

In [17]:
%%sparksql
SHOW TABLES;

0,1,2
database,tableName,isTemporary
dbacademy,pq_table,False
dbacademy,processed,False


In [18]:
%%sparksql
SELECT * FROM processed LIMIT 5

0,1,2,3,4
dte,time,heartrate,name,p_device_id
2020-01-01,2020-01-01 00:00:00,47.5378557652,Kristin Vasser,1
2020-01-01,2020-01-01 01:00:00,48.3496970512,Kristin Vasser,1
2020-01-01,2020-01-01 02:00:00,49.1212033115,Kristin Vasser,1
2020-01-01,2020-01-01 03:00:00,47.9982802854,Kristin Vasser,1
2020-01-01,2020-01-01 04:00:00,47.841083408,Kristin Vasser,1


In [19]:
spark.read.table("processed").show(5)

+----------+-------------------+-------------+--------------+-----------+
|       dte|               time|    heartrate|          name|p_device_id|
+----------+-------------------+-------------+--------------+-----------+
|2020-01-01|2020-01-01 00:00:00|47.5378557652|Kristin Vasser|          1|
|2020-01-01|2020-01-01 01:00:00|48.3496970512|Kristin Vasser|          1|
|2020-01-01|2020-01-01 02:00:00|49.1212033115|Kristin Vasser|          1|
|2020-01-01|2020-01-01 03:00:00|47.9982802854|Kristin Vasser|          1|
|2020-01-01|2020-01-01 04:00:00| 47.841083408|Kristin Vasser|          1|
+----------+-------------------+-------------+--------------+-----------+
only showing top 5 rows



In [20]:
%%sparksql
DESCRIBE DETAIL processed

0,1,2,3,4,5,6,7,8,9,10,11,12
format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
parquet,,dbacademy.processed,,file:/opt/workspace/spark-warehouse/dbacademy.db/processed,2021-04-30 15:36:22.517000,,['p_device_id'],,,{},,


## Convert the files to Delta Files

In [23]:
from delta.tables import DeltaTable

parquet_table = f"parquet.`spark-warehouse/dbacademy.db/processed`"
partitioning_scheme = "p_device_id int"

DeltaTable.convertToDelta(spark, parquet_table, partitioning_scheme)

JavaObject id=o119

In [29]:
%%sparksql
CREATE TABLE health_tracker_processed_delta
USING DELTA
LOCATION "spark-warehouse/dbacademy.db/spark-warehouse/dbacademy.db/processed"

AnalysisException: 
You are trying to create an external table `dbacademy`.`health_tracker_processed_delta`
from `spark-warehouse/dbacademy.db/spark-warehouse/dbacademy.db/processed` using Delta Lake, but the schema is not specified when the
input path is empty.

To learn more about Delta, see https://docs.delta.io/latest/index.html
       

In [26]:
%%sparksql
SELECT * FROM health_tracker_processed LIMIT 5

AnalysisException: `dbacademy`.`health_tracker_processed` is not a Delta table.

In [27]:
%%sparksql
DESCRIBE DETAIL health_tracker_processed

0,1,2,3,4,5,6,7,8,9,10,11,12
format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,,dbacademy.health_tracker_processed,,file:/opt/workspace/spark-warehouse/dbacademy.db/spark-warehouse/dbacademy.db/processed,2021-04-30 15:42:57.019000,,[],,,{},,


### %%sparksql
SHOW TABLES

***

## Create a table

In [None]:
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

## Read data

In [None]:
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

## Create a Parquet Table

In [None]:
from pyspark.sql.functions import col, from_unixtime
 
def process_health_tracker_data(dataframe):
    return (
        dataframe
        .withColumn("time", from_unixtime("time"))
        .withColumnRenamed("device_id", "p_device_id")
        .withColumn("time", col("time").cast("timestamp"))
        .withColumn("dte", col("time").cast("date"))
        .withColumn("p_device_id", col("p_device_id").cast("integer"))
        .select("dte", "time", "heartrate", "name", "p_device_id")
    )

## Conditional update without overwrite

In [None]:
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")

# Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })

# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))

# Upsert (merge) new data
newData = spark.range(0, 20)

deltaTable.alias("oldData") \
  .merge(
    newData.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()

deltaTable.toDF().show()

In [None]:
spark.stop