In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, expr, rand, when, count, col

In [4]:
spark = SparkSession.builder.appName("DeltaLake Transaction Logs") \
    .master("local[3]") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .enableHiveSupport() \
    .getOrCreate()

# Main class for programmatically intereacting with Delta Tables.
from delta.tables import *

In [5]:
spark.conf.set("spark.sql.shuffle.partitions", 4)
spark.conf.set("spark.default.parallelism", 4)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "true")

In [21]:
%run Helpers.ipynb

ERROR:root:File `'Helpers.ipynb.py'` not found.


In [4]:
# Source data
# Change paths to match your environment!

inputPath    = "/home/oliver/DeltaLake/training/online_retail_dataset/CSV_DATA/"
sourceData   = inputPath + "online-retail-dataset.csv"

# Base location for all saved data
basePath     = "/home/oliver/DeltaLake/training/online_retail_dataset" 

# Path for Parquet formatted data
parquetPath  = basePath + "/parquet/online_retail_data"

# Path for Delta formatted data
deltaPath    = basePath + "/delta/online_retail_data"
deltaLogPath = deltaPath + "/_delta_log"

# Clean up from last run.
! rm -Rf $deltaPath 2>/dev/null
print("Deleted path " + deltaPath)

! rm -Rf $parquetPath 2>/dev/null
print("Deleted path " + parquetPath)

Deleted path /home/oliver/DeltaLake/training/online_retail_dataset/delta/online_retail_data
Deleted path /home/oliver/DeltaLake/training/online_retail_dataset/parquet/online_retail_data


In [5]:
import os.path

file_exists = os.path.isfile(f'{sourceData}')
 
if not file_exists:
    print("-> Downloading dataset.")
    os.system(f'curl https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/master/data/retail-data/all/online-retail-dataset.csv -o {sourceData}')
    file_exists = os.path.isfile(f'{sourceData}')
    
if file_exists:
    print("-> Dataset is present.\n")
    
    fileSize = ! du -m "$sourceData" | cut -f1 # Posix compliant
    print(f"File [{sourceData}] is {fileSize} MB in size.")

-> Dataset is present.

File [/home/oliver/DeltaLake/training/online_retail_dataset/CSV_DATA/online-retail-dataset.csv] is ['43'] MB in size.


In [6]:
! head -n 5 $sourceData 2>/dev/null

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/2010 8:26,2.75,17850,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/2010 8:26,3.39,17850,United Kingdom


In [6]:
schemaDDL = """InvoiceNo Integer, StockCode String, Description String, Quantity Integer, 
               InvoiceDate String, UnitPrice Double, CustomerID Integer, Country String """


# You could also use the StructType method.
# Libraries needed to define schemas
# from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType

#inputSchema = StructType([
#  StructField("InvoiceNo", IntegerType(), True),
#  StructField("StockCode", StringType(), True),
#  StructField("Description", StringType(), True),
#  StructField("Quantity", IntegerType(), True),
#  StructField("InvoiceDate", StringType(), True),
#  StructField("UnitPrice", DoubleType(), True),
#  StructField("CustomerID", IntegerType(), True),
#  StructField("Country", StringType(), True)
#])


In [7]:
# Create retail sales data dataframe

rawSalesDataDF = (
    spark.read
    .format("csv")
    .option("header","true")
    .schema(schemaDDL)
    .load(sourceData)
)

# Count rows and partitions
rowCount = rawSalesDataDF.count() 
partCount = rawSalesDataDF.rdd.getNumPartitions()

print(f'Row Count: {rowCount} Partition Count: {partCount}')

Row Count: 541909 Partition Count: 3


In [8]:
print("Columns with null values")
rawSalesDataDF.select([count(when(col(c).isNull(), c)).alias(c) for c in rawSalesDataDF.columns]).show()

Columns with null values
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|     9291|        0|       1454|       0|          0|        0|    135080|      0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [9]:
# Remove rows where important columns are null. In our case: InvoiceNo and CustomerID

cleanSalesDataDF = rawSalesDataDF.where(col("InvoiceNo").isNotNull() & col("CustomerID").isNotNull())
cleanSalesDataCount = cleanSalesDataDF.count()
# POO cleanSalesDataDF = cleanSalesDataDF.where(col("CustomerID").isNotNull())

# All rows with null values should be gone
print("null values")
cleanSalesDataDF.select([count(when(col(c).isNull(), c)).alias(c) for c in rawSalesDataDF.columns]).show()

print(f' RowsRemoved: {rowCount-cleanSalesDataCount}\n Final Row Count: {cleanSalesDataCount}')



null values
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|        0|        0|          0|       0|          0|        0|         0|      0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+

 RowsRemoved: 143985
 Final Row Count: 397924


In [10]:
# Define new dataframe based on cleansed data but only use a subset of the data to make things run faster

# Random sample of 25%, with seed and without replacement
retailSalesData1 = cleanSalesDataDF.sample(withReplacement=False, fraction=.25, seed=75)

# Count rows and partitions
rowCount = retailSalesData1.count() 
partCount = retailSalesData1.rdd.getNumPartitions()

print(f'Row Count: {rowCount} Partition Count: {partCount}')

Row Count: 99057 Partition Count: 3


In [11]:
# Peek at the dataframe

retailSalesData1.show(3, truncate = False)

+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
|536365   |21730    |GLASS STAR FROSTED T-LIGHT HOLDER  |6       |12/1/2010 8:26|4.25     |17850     |United Kingdom|
+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
only showing top 3 rows



In [12]:
# Create database to hold demo objects
spark.sql("CREATE DATABASE IF NOT EXISTS deltademo COMMENT 'Esta es una DB demo para uso del formato Delta' LOCATION '/home/oliver/DeltaLake/training/online_retail_dataset/DATABASE'")
spark.sql("SHOW DATABASES").show()

# Current DB should be deltademo
spark.sql("USE deltademo")
spark.sql("SELECT CURRENT_DATABASE()").show()
spark.sql("DESCRIBE DATABASE deltademo").show(truncate = False)

# Clean-up from last run
spark.sql("DROP TABLE IF EXISTS SalesParquetFormat")
spark.sql("DROP TABLE IF EXISTS SalesDeltaFormat")
spark.sql("DROP TABLE IF EXISTS tbl_CheckpointFile")
spark.sql("SHOW TABLES").show(truncate= False)

+---------+
|namespace|
+---------+
|  default|
|deltademo|
+---------+

+------------------+
|current_database()|
+------------------+
|         deltademo|
+------------------+

+-------------------------+-------------------------------------------------------------------+
|database_description_item|database_description_value                                         |
+-------------------------+-------------------------------------------------------------------+
|Database Name            |deltademo                                                          |
|Comment                  |Esta es una DB demo para uso del formato Delta                     |
|Location                 |file:/home/oliver/DeltaLake/training/online_retail_dataset/DATABASE|
|Owner                    |oliver                                                             |
+-------------------------+-------------------------------------------------------------------+

+--------+---------+-----------+
|database|tableName

In [21]:
#spark.sql("DROP DATABASE IF EXISTS deltademo")
#spark.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [13]:
# Save data as a table in Parquet format

retailSalesData1.write.saveAsTable('SalesParquetFormat', format='parquet', mode='overwrite',path=parquetPath)

In [14]:
# Let's peek into the catalog and verify that our table was created

spark.catalog.listTables()

# SQL method - not as informative
# spark.sql("show tables").show()


[Table(name='salesparquetformat', database='deltademo', description=None, tableType='EXTERNAL', isTemporary=False)]

In [15]:
# Files and size on disk
import glob
import os
import datetime


def get_printable_size(byte_size):
    """
    A bit is the smallest unit, it's either 0 or 1
    1 byte = 1 octet = 8 bits
    1 kB = 1 kilobyte = 1000 bytes = 10^3 bytes
    1 KiB = 1 kibibyte = 1024 bytes = 2^10 bytes
    1 KB = 1 kibibyte OR kilobyte ~= 1024 bytes ~= 2^10 bytes (it usually means 1024 bytes but sometimes it's 1000... ask the sysadmin ;) )
    1 kb = 1 kilobits = 1000 bits (this notation should not be used, as it is very confusing)
    1 ko = 1 kilooctet = 1000 octets = 1000 bytes = 1 kB
    Also Kb seems to be a mix of KB and kb, again it depends on context.
    In linux, a byte (B) is composed by a sequence of bits (b). One byte has 256 possible values.
    More info : http://www.linfo.org/byte.html
    """
    BASE_SIZE = 1024.00
    MEASURE = ["B", "KB", "MB", "GB", "TB", "PB"]

    def _fix_size(size, size_index):
        if not size:
            return "0"
        elif size_index == 0:
            return str(size)
        else:
            return "{:.0f}".format(size)

    current_size = byte_size
    size_index = 0

    while current_size >= BASE_SIZE and len(MEASURE) != size_index:
        current_size = current_size / BASE_SIZE
        size_index = size_index + 1

    size = _fix_size(current_size, size_index)
    measure = MEASURE[size_index]
    return size + measure
    
def files_in_dir(dir_path, file_ext):
    files = glob.glob(f'{dir_path}/*.{file_ext}')
    files.sort(key=os.path.getmtime)
    
    for f in files:
        x = os.path.getmtime(f)
        x = datetime.datetime.fromtimestamp(x).strftime("%Y-%m-%d %H:%M:%S")
        print(f'{get_printable_size(os.path.getsize(f)):8} {x}  {os.path.basename(f)}')

    numFiles = os.popen(f'ls -p {dir_path}/*.{file_ext} | egrep -v /$ | wc -l').read().strip()
    totalSize = os.popen(f'! du -sh {dir_path} | cut -f1').read().strip()
    
    print("")
    print(f"Number of file/s: {numFiles} | Total size: {totalSize}")

files_in_dir(parquetPath, "parquet")

279KB    2020-11-11 22:28:52  part-00002-687c91cd-1dab-4ce0-abaf-f1d1c63e08b5-c000.snappy.parquet
399KB    2020-11-11 22:28:52  part-00001-687c91cd-1dab-4ce0-abaf-f1d1c63e08b5-c000.snappy.parquet
379KB    2020-11-11 22:28:52  part-00000-687c91cd-1dab-4ce0-abaf-f1d1c63e08b5-c000.snappy.parquet

Number of file/s: 3 | Total size: 1.1M


In [16]:
spark.sql("DESCRIBE EXTENDED SalesParquetFormat").show(100,truncate = False)

+----------------------------+-------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                            |comment|
+----------------------------+-------------------------------------------------------------------------------------+-------+
|InvoiceNo                   |int                                                                                  |null   |
|StockCode                   |string                                                                               |null   |
|Description                 |string                                                                               |null   |
|Quantity                    |int                                                                                  |null   |
|InvoiceDate                 |string                                                                               |null   |


In [17]:
# Use Spark SQL to query the newly created table

spark.sql("SELECT * FROM SalesParquetFormat;").show(3, truncate = False)

# You can directly query the directory too.
# spark.sql(f"SELECT * FROM parquet.`{parquetPath}` limit 5 ").show(truncate = False)

+---------+---------+-----------------------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate    |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+---------------+---------+----------+--------------+
|554055   |22469    |HEART OF WICKER SMALL              |40      |5/20/2011 15:39|1.45     |18198     |United Kingdom|
|554057   |18097C   |WHITE TALL PORCELAIN T-LIGHT HOLDER|288     |5/20/2011 15:52|1.95     |17867     |United Kingdom|
|554057   |72351B   |SET/6 PINK  BUTTERFLY T-LIGHTS     |1       |5/20/2011 15:52|2.1      |17867     |United Kingdom|
+---------+---------+-----------------------------------+--------+---------------+---------+----------+--------------+
only showing top 3 rows



In [18]:
# Add one row of data to the table
# Parquet being immutable necessitates the creation of an additional Parquet file

spark.sql("""
            INSERT INTO SalesParquetFormat
            VALUES(963316, 2291, "WORLD'S BEST JAM MAKING SET", 5 , "08/13/2011 07:58", 1.45, 15358, "United Kingdom")
          """)

DataFrame[]

In [19]:
files_in_dir(parquetPath, "parquet")

279KB    2020-11-11 22:28:52  part-00002-687c91cd-1dab-4ce0-abaf-f1d1c63e08b5-c000.snappy.parquet
399KB    2020-11-11 22:28:52  part-00001-687c91cd-1dab-4ce0-abaf-f1d1c63e08b5-c000.snappy.parquet
379KB    2020-11-11 22:28:52  part-00000-687c91cd-1dab-4ce0-abaf-f1d1c63e08b5-c000.snappy.parquet
2KB      2020-11-11 22:30:36  part-00000-3409adc9-47a2-491e-850e-4e3e7137057c-c000.snappy.parquet

Number of file/s: 4 | Total size: 1.1M


In [21]:
# Let's have a peek at the new file

spark.read.load(f"{parquetPath}/part-00000-3409adc9-47a2-491e-850e-4e3e7137057c-c000.snappy.parquet").show(truncate = False)

+---------+---------+---------------------------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                |Quantity|InvoiceDate     |UnitPrice|CustomerID|Country       |
+---------+---------+---------------------------+--------+----------------+---------+----------+--------------+
|963316   |2291     |WORLD'S BEST JAM MAKING SET|5       |08/13/2011 07:58|1.45     |15358     |United Kingdom|
+---------+---------+---------------------------+--------+----------------+---------+----------+--------------+



In [22]:
# Save retailSalesData1 to Delta

retailSalesData1.write.mode("overwrite").format("delta").save(deltaPath)#aqui solo creó nuevos datos en formato DELTA en el path 'deltaPath'
#pero no hizo una nueva tabla

# Query delta directory directly
spark.sql(f"SELECT * FROM delta.`{deltaPath}` LIMIT 3 ").show(truncate = False)

+---------+---------+-----------------------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate    |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+---------------+---------+----------+--------------+
|554055   |22469    |HEART OF WICKER SMALL              |40      |5/20/2011 15:39|1.45     |18198     |United Kingdom|
|554057   |18097C   |WHITE TALL PORCELAIN T-LIGHT HOLDER|288     |5/20/2011 15:52|1.95     |17867     |United Kingdom|
|554057   |72351B   |SET/6 PINK  BUTTERFLY T-LIGHTS     |1       |5/20/2011 15:52|2.1      |17867     |United Kingdom|
+---------+---------+-----------------------------------+--------+---------------+---------+----------+--------------+



DataFrame[database: string, tableName: string, isTemporary: boolean]

In [24]:
# Create variable for a path based Delta table

deltaTable = DeltaTable.forPath(spark, deltaPath) # Aquí solo crea una variable para poder accesar y monitorear los cambios a los archivos DELTA en 'deltaPath'

In [26]:
print("####### HISTORY ########")

# Observe history of actions taken on a Delta table

history = deltaTable.history().select('version', 'timestamp', 'operation', 'operationParameters', \
                                      'operationMetrics') \
                              .withColumnRenamed('version', 'ver')
history.show(truncate = False)

####### HISTORY ########
+---+-------------------+---------+--------------------------------------+------------------------------------------------------------------+
|ver|timestamp          |operation|operationParameters                   |operationMetrics                                                  |
+---+-------------------+---------+--------------------------------------+------------------------------------------------------------------+
|0  |2020-11-11 22:32:17|WRITE    |[mode -> Overwrite, partitionBy -> []]|[numFiles -> 3, numOutputBytes -> 1083137, numOutputRows -> 99057]|
+---+-------------------+---------+--------------------------------------+------------------------------------------------------------------+



In [27]:
# files and size on disk
# Notice the sub-directory "_delta_log"

! ls -thl $deltaPath

total 1.1M
drwxrwxr-x 2 oliver oliver 4.0K Nov 11 22:32 _delta_log
-rw-r--r-- 1 oliver oliver 400K Nov 11 22:32 part-00001-2088d53e-b24c-43c0-9062-c46c06918ac9-c000.snappy.parquet
-rw-r--r-- 1 oliver oliver 380K Nov 11 22:32 part-00000-ed1583ac-9038-4ca2-ac09-485e334d4d80-c000.snappy.parquet
-rw-r--r-- 1 oliver oliver 280K Nov 11 22:32 part-00002-405a1d18-b93c-4062-9069-07fdfdafd884-c000.snappy.parquet


In [28]:
# # files and size on disk
# We can see that parquet files were added but now there is a trx log

files_in_dir(deltaLogPath, "json")

2KB      2020-11-11 22:32:17  00000000000000000000.json

Number of file/s: 1 | Total size: 8.0K


In [27]:
# Let's have a peek inside the trx log

#spark.read.format("json").load(deltaLogPath + "/00000000000000000000.json").collect()
spark.read.format("json").load(deltaLogPath + "/00000000000000000000.json").collect()

[Row(add=None, commitInfo=Row(isBlindAppend=False, operation='WRITE', operationMetrics=Row(numFiles='3', numOutputBytes='1083137', numOutputRows='99057'), operationParameters=Row(mode='Overwrite', partitionBy='[]'), timestamp=1605132570093), metaData=None, protocol=None),
 Row(add=None, commitInfo=None, metaData=None, protocol=Row(minReaderVersion=1, minWriterVersion=2)),
 Row(add=None, commitInfo=None, metaData=Row(createdTime=1605132568722, format=Row(provider='parquet'), id='9395c7c5-4308-4c77-aec1-7464fc8fe70f', partitionColumns=[], schemaString='{"type":"struct","fields":[{"name":"InvoiceNo","type":"integer","nullable":true,"metadata":{}},{"name":"StockCode","type":"string","nullable":true,"metadata":{}},{"name":"Description","type":"string","nullable":true,"metadata":{}},{"name":"Quantity","type":"integer","nullable":true,"metadata":{}},{"name":"InvoiceDate","type":"string","nullable":true,"metadata":{}},{"name":"UnitPrice","type":"double","nullable":true,"metadata":{}},{"name":"

In [29]:
# Create a new dataframe with fraction of original data.
# Random sample of 25%, with seed and without replacement

retailSalesData2 = cleanSalesDataDF.sample(withReplacement=False, fraction=.25, seed=31)
retailSalesData2.count()

99463

In [32]:
# Add to our Delta Lake table by appending retailSalesData2

retailSalesData2.write.mode("append").format("delta").save(deltaPath) # solo agregó más datos a los DELTA files pero todavía no ha creado una tabla

In [33]:
print("####### HISTORY ########")

# Observe history of actions taken on a Delta table
# Reference for full history schema: https://docs.delta.io/latest/delta-utility.html

history = deltaTable.history().select('version', 'timestamp', 'operation', 'operationParameters', \
                                      'operationMetrics') \
                              .withColumnRenamed("version", 'ver')
history.show(truncate = False)

####### HISTORY ########
+---+-------------------+---------+--------------------------------------+------------------------------------------------------------------+
|ver|timestamp          |operation|operationParameters                   |operationMetrics                                                  |
+---+-------------------+---------+--------------------------------------+------------------------------------------------------------------+
|2  |2020-11-11 22:36:42|WRITE    |[mode -> Append, partitionBy -> []]   |[numFiles -> 3, numOutputBytes -> 1082711, numOutputRows -> 99463]|
|1  |2020-11-11 22:35:49|WRITE    |[mode -> Append, partitionBy -> []]   |[numFiles -> 3, numOutputBytes -> 1082711, numOutputRows -> 99463]|
|0  |2020-11-11 22:32:17|WRITE    |[mode -> Overwrite, partitionBy -> []]|[numFiles -> 3, numOutputBytes -> 1083137, numOutputRows -> 99057]|
+---+-------------------+---------+--------------------------------------+-------------------------------------------------

In [31]:
# Data Files and size on disk

files_in_dir(deltaPath, "parquet")

279KB    2020-11-11 16:09:29  part-00002-4f72cf15-cb0b-414d-921d-9aa9abb594fb-c000.snappy.parquet
399KB    2020-11-11 16:09:29  part-00001-28fd7101-7f97-4028-83c5-5dcdd9b20754-c000.snappy.parquet
379KB    2020-11-11 16:09:29  part-00000-e09d4d65-b48f-48ff-bb11-32ec31c62321-c000.snappy.parquet
281KB    2020-11-11 16:09:59  part-00002-e86f84af-b8c7-46c1-9339-e4e13871b47f-c000.snappy.parquet
379KB    2020-11-11 16:09:59  part-00000-6862b1f2-21bf-426b-926c-e8c0d3859485-c000.snappy.parquet
398KB    2020-11-11 16:09:59  part-00001-baff5b51-e2f0-4120-b7b6-56d621641096-c000.snappy.parquet

Number of file/s: 6 | Total size: 2.2M


In [32]:
# Transaction logs and size on disk

files_in_dir(deltaLogPath, "json")

2KB      2020-11-11 16:09:30  00000000000000000000.json
765B     2020-11-11 16:09:59  00000000000000000001.json

Number of file/s: 2 | Total size: 12K


In [33]:
# Peek inside the new transaction log

#logDF = spark.read.format("json").load(deltaLogPath + "/00000000000000000001.json")
#logDF.collect()

logDF = spark.read.format("json").load(deltaLogPath + "/00000000000000000001.json")
logDF.collect()

[Row(add=None, commitInfo=Row(isBlindAppend=True, operation='WRITE', operationMetrics=Row(numFiles='3', numOutputBytes='1082711', numOutputRows='99463'), operationParameters=Row(mode='Append', partitionBy='[]'), readVersion=0, timestamp=1605132599453)),
 Row(add=Row(dataChange=True, modificationTime=1605132599000, path='part-00000-6862b1f2-21bf-426b-926c-e8c0d3859485-c000.snappy.parquet', size=388155), commitInfo=None),
 Row(add=Row(dataChange=True, modificationTime=1605132599000, path='part-00001-baff5b51-e2f0-4120-b7b6-56d621641096-c000.snappy.parquet', size=407069), commitInfo=None),
 Row(add=Row(dataChange=True, modificationTime=1605132599000, path='part-00002-e86f84af-b8c7-46c1-9339-e4e13871b47f-c000.snappy.parquet', size=287487), commitInfo=None)]

In [35]:
# Create SQL table to make life easier
# Stick with SQL from here on out, where possible.
# En esta parte crea una nueva tabla utilizando los archivos que ya están presentes en el directorio 'delta'

spark.sql("""
          DROP TABLE IF EXISTS SalesDeltaFormat
          """)

spark.sql("""
          CREATE TABLE SalesDeltaFormat
          USING DELTA
          LOCATION '{}'
          """.format(deltaPath))

DataFrame[]

In [36]:
# Let's peek into the catalog and verify that our table was created.
#spark.catalog.listTables()

spark.catalog.listTables()

[Table(name='salesdeltaformat', database='deltademo', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='salesparquetformat', database='deltademo', description=None, tableType='EXTERNAL', isTemporary=False)]

In [36]:
spark.sql("DESCRIBE EXTENDED SalesDeltaFormat").show(100, truncate = False)

+----------------------------+-----------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                          |comment|
+----------------------------+-----------------------------------------------------------------------------------+-------+
|InvoiceNo                   |int                                                                                |       |
|StockCode                   |string                                                                             |       |
|Description                 |string                                                                             |       |
|Quantity                    |int                                                                                |       |
|InvoiceDate                 |string                                                                             |       |
|UnitPrice      

In [37]:
# Let's find a Invoice with only 1 count and use it to test DML.


oneRandomInvoice = spark.sql(""" SELECT InvoiceNo, count(*)
                                 FROM SalesDeltaFormat
                                 GROUP BY InvoiceNo
                                 ORDER BY 2 ASC
                                 LIMIT 1
                             """).collect()[0][0]

print(f"Random Invoice # => {oneRandomInvoice}")

Random Invoice # => 554207


In [38]:
# Before DML (insert)

spark.sql(f"""
                SELECT SUBSTRING(input_file_name(), -67, 67 ) AS Filename,
                *
                FROM SalesDeltaFormat
                WHERE InvoiceNo = {oneRandomInvoice}
           """).show(truncate = False)

+-------------------------------------------------------------------+---------+---------+-----------------------------+--------+---------------+---------+----------+--------------+
|Filename                                                           |InvoiceNo|StockCode|Description                  |Quantity|InvoiceDate    |UnitPrice|CustomerID|Country       |
+-------------------------------------------------------------------+---------+---------+-----------------------------+--------+---------------+---------+----------+--------------+
|part-00001-2088d53e-b24c-43c0-9062-c46c06918ac9-c000.snappy.parquet|554207   |85132C   |CHARLIE AND LOLA FIGURES TINS|1       |5/23/2011 12:15|1.95     |14573     |United Kingdom|
+-------------------------------------------------------------------+---------+---------+-----------------------------+--------+---------------+---------+----------+--------------+



In [39]:
# Let's add some data to our table

spark.sql(f"""
               INSERT INTO SalesDeltaFormat
               VALUES({oneRandomInvoice}, 2291, "WORLD'S BEST JAM MAKING SET", 5, "08/13/2011 07:58", 1.45, 15358, "France");
           """)

DataFrame[]

In [40]:
spark.sql("DESCRIBE HISTORY SalesDeltaFormat").show(truncate = False)

+-------+-------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------+------------+
|version|timestamp          |userId|userName|operation|operationParameters                   |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                  |userMetadata|
+-------+-------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------+------------+
|3      |2020-11-11 22:40:01|null  |null    |WRITE    |[mode -> Append, partitionBy -> []]   |null|null    |null     |2          |null          |true         |[numFiles -> 1, numOutputBytes -> 2369, numOutputRows -> 1]       |null        |
|2      |2020-11-11 22:36:42|null  |null

In [40]:
files_in_dir(deltaLogPath,"*")

2KB      2020-11-11 16:09:30  00000000000000000000.json
765B     2020-11-11 16:09:59  00000000000000000001.json
410B     2020-11-11 16:10:31  00000000000000000002.json

Number of file/s: 3 | Total size: 16K


In [41]:
# Schema details: https://docs.delta.io/latest/delta-utility.html

logDF = spark.read.format("json").load(deltaLogPath + "/00000000000000000002.json")
logDF.collect()

[Row(add=None, commitInfo=Row(isBlindAppend=True, operation='WRITE', operationMetrics=Row(numFiles='3', numOutputBytes='1082711', numOutputRows='99463'), operationParameters=Row(mode='Append', partitionBy='[]'), readVersion=1, timestamp=1605155802911)),
 Row(add=Row(dataChange=True, modificationTime=1605155802000, path='part-00000-8bad6924-194f-4f7c-be73-1a5d00ff94bc-c000.snappy.parquet', size=388155), commitInfo=None),
 Row(add=Row(dataChange=True, modificationTime=1605155802000, path='part-00001-2576e8c3-d158-4d3f-a1f5-3ba1debe407e-c000.snappy.parquet', size=407069), commitInfo=None),
 Row(add=Row(dataChange=True, modificationTime=1605155802000, path='part-00002-172a1fcd-26b8-4489-94fd-e243d09d4d6f-c000.snappy.parquet', size=287487), commitInfo=None)]

In [42]:
# After DML (insert)


spark.sql(f"""
               SELECT SUBSTRING(input_file_name(), -67, 67) AS FileName, *
               FROM SalesDeltaFormat
               WHERE InvoiceNo = {oneRandomInvoice}
           """).show(truncate = False)

+-------------------------------------------------------------------+---------+---------+----------------------------------+--------+----------------+---------+----------+--------------+
|FileName                                                           |InvoiceNo|StockCode|Description                       |Quantity|InvoiceDate     |UnitPrice|CustomerID|Country       |
+-------------------------------------------------------------------+---------+---------+----------------------------------+--------+----------------+---------+----------+--------------+
|part-00001-28fd7101-7f97-4028-83c5-5dcdd9b20754-c000.snappy.parquet|554485   |23152    |IVORY SWEETHEART WIRE LETTER RACK |2       |5/24/2011 13:22 |3.75     |18018     |United Kingdom|
|part-00000-b783e06e-a2cc-4d2d-ab2f-ba4bbe61552a-c000.snappy.parquet|554485   |2291     |WORLD'S BEST JAM MAKING SET       |5       |08/13/2011 07:58|1.45     |15358     |France        |
+----------------------------------------------------------------

In [42]:
# Update one invoice


#deltaTable.update(
#    condition=("InvoiceNo = oneRandomInvoice"),
#    set={"Quantity": expr("Quantity + 1000")}
#)

spark.sql(f"""
               UPDATE SalesDeltaFormat
               SET Quantity = Quantity + 1000
               WHERE InvoiceNo = {oneRandomInvoice}
           """)

DataFrame[]

In [43]:
spark.sql("DESCRIBE HISTORY SalesDeltaFormat").show(truncate = False)

+-------+-------------------+------+--------+---------+----------------------------------------+----+--------+---------+-----------+--------------+-------------+---------------------------------------------------------------------------------------+------------+
|version|timestamp          |userId|userName|operation|operationParameters                     |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                                       |userMetadata|
+-------+-------------------+------+--------+---------+----------------------------------------+----+--------+---------+-----------+--------------+-------------+---------------------------------------------------------------------------------------+------------+
|4      |2020-11-11 22:42:23|null  |null    |UPDATE   |[predicate -> (InvoiceNo#3224 = 554207)]|null|null    |null     |3          |null          |false        |[numRemovedFiles -> 2, numAddedFiles -> 2, numUpda

In [44]:
# After Update

spark.sql(f"""
               SELECT
               SUBSTRING(input_file_name(), -67, 67) AS FileName, *
               FROM SalesDeltaFormat
               WHERE InvoiceNo = {oneRandomInvoice}
           """).show(truncate = False)

+-------------------------------------------------------------------+---------+---------+----------------------------------+--------+----------------+---------+----------+--------------+
|FileName                                                           |InvoiceNo|StockCode|Description                       |Quantity|InvoiceDate     |UnitPrice|CustomerID|Country       |
+-------------------------------------------------------------------+---------+---------+----------------------------------+--------+----------------+---------+----------+--------------+
|part-00000-439e822e-6e49-42c2-9c9d-5d35162ba87f-c000.snappy.parquet|554485   |23152    |IVORY SWEETHEART WIRE LETTER RACK |1002    |5/24/2011 13:22 |3.75     |18018     |United Kingdom|
|part-00001-6f9a3c57-30d1-43e3-b580-0c1e65b85524-c000.snappy.parquet|554485   |2291     |WORLD'S BEST JAM MAKING SET       |1005    |08/13/2011 07:58|1.45     |15358     |France        |
+----------------------------------------------------------------

In [44]:
print("####### HISTORY ########")

# Show which datafile was removed.

# Observe history of actions taken on a Delta table
# spark.sql not supported in 0.7.0 OSS Delta

history = deltaTable.history().select('version', 'timestamp', 'operation', 'operationParameters', 'operationMetrics')\
                              .withColumnRenamed('version', 'ver')
history.show(truncate = False)

####### HISTORY ########
+---+-------------------+---------+----------------------------------------+---------------------------------------------------------------------------------------+
|ver|timestamp          |operation|operationParameters                     |operationMetrics                                                                       |
+---+-------------------+---------+----------------------------------------+---------------------------------------------------------------------------------------+
|4  |2020-11-11 22:42:23|UPDATE   |[predicate -> (InvoiceNo#3224 = 554207)]|[numRemovedFiles -> 2, numAddedFiles -> 2, numUpdatedRows -> 2, numCopiedRows -> 37483]|
|3  |2020-11-11 22:40:01|WRITE    |[mode -> Append, partitionBy -> []]     |[numFiles -> 1, numOutputBytes -> 2369, numOutputRows -> 1]                            |
|2  |2020-11-11 22:36:42|WRITE    |[mode -> Append, partitionBy -> []]     |[numFiles -> 3, numOutputBytes -> 1082711, numOutputRows -> 99463]        

In [46]:
files_in_dir(deltaPath, "parquet")

279KB    2020-11-11 16:09:29  part-00002-4f72cf15-cb0b-414d-921d-9aa9abb594fb-c000.snappy.parquet
399KB    2020-11-11 16:09:29  part-00001-28fd7101-7f97-4028-83c5-5dcdd9b20754-c000.snappy.parquet
379KB    2020-11-11 16:09:29  part-00000-e09d4d65-b48f-48ff-bb11-32ec31c62321-c000.snappy.parquet
281KB    2020-11-11 16:09:59  part-00002-e86f84af-b8c7-46c1-9339-e4e13871b47f-c000.snappy.parquet
379KB    2020-11-11 16:09:59  part-00000-6862b1f2-21bf-426b-926c-e8c0d3859485-c000.snappy.parquet
398KB    2020-11-11 16:09:59  part-00001-baff5b51-e2f0-4120-b7b6-56d621641096-c000.snappy.parquet
2KB      2020-11-11 16:10:31  part-00000-b783e06e-a2cc-4d2d-ab2f-ba4bbe61552a-c000.snappy.parquet
2KB      2020-11-11 16:10:53  part-00001-6f9a3c57-30d1-43e3-b580-0c1e65b85524-c000.snappy.parquet
398KB    2020-11-11 16:10:53  part-00000-439e822e-6e49-42c2-9c9d-5d35162ba87f-c000.snappy.parquet

Number of file/s: 9 | Total size: 2.6M


In [47]:
files_in_dir(deltaLogPath,"*")

2KB      2020-11-11 16:09:30  00000000000000000000.json
765B     2020-11-11 16:09:59  00000000000000000001.json
410B     2020-11-11 16:10:31  00000000000000000002.json
902B     2020-11-11 16:10:53  00000000000000000003.json

Number of file/s: 4 | Total size: 20K


In [45]:
# Schema details: https://docs.delta.io/latest/delta-utility.html

logDF = spark.read.format("json").load(deltaLogPath + "/00000000000000000003.json")
logDF.collect()

[Row(add=None, commitInfo=Row(isBlindAppend=True, operation='WRITE', operationMetrics=Row(numFiles='1', numOutputBytes='2369', numOutputRows='1'), operationParameters=Row(mode='Append', partitionBy='[]'), readVersion=2, timestamp=1605156001162)),
 Row(add=Row(dataChange=True, modificationTime=1605156001000, path='part-00000-30dbc080-1ec2-4d27-81e4-05e205c846ae-c000.snappy.parquet', size=2369), commitInfo=None)]

In [49]:
# Before DML (delete)

spark.sql(f""" SELECT 
               substring(input_file_name(), -67, 67) as FileName,
               * from SalesDeltaFormat 
               WHERE InvoiceNo = {oneRandomInvoice}
           """).show(truncate = False)

+-------------------------------------------------------------------+---------+---------+----------------------------------+--------+----------------+---------+----------+--------------+
|FileName                                                           |InvoiceNo|StockCode|Description                       |Quantity|InvoiceDate     |UnitPrice|CustomerID|Country       |
+-------------------------------------------------------------------+---------+---------+----------------------------------+--------+----------------+---------+----------+--------------+
|part-00000-439e822e-6e49-42c2-9c9d-5d35162ba87f-c000.snappy.parquet|554485   |23152    |IVORY SWEETHEART WIRE LETTER RACK |1002    |5/24/2011 13:22 |3.75     |18018     |United Kingdom|
|part-00001-6f9a3c57-30d1-43e3-b580-0c1e65b85524-c000.snappy.parquet|554485   |2291     |WORLD'S BEST JAM MAKING SET       |1005    |08/13/2011 07:58|1.45     |15358     |France        |
+----------------------------------------------------------------

In [46]:
# https://github.com/delta-io/delta/blob/master/examples/python/quickstart.py
# Delete and invoice (two records)

# This results in one new file being created.  One file had just the one record so it does not have to be re-created
# Each of the two records were in two different files. One of those files had only one record so it did not have to be re-created.

spark.sql(f"DELETE FROM SalesDeltaFormat WHERE InvoiceNo = {oneRandomInvoice}")
# deltaTable.delete(
#    condition=("InvoiceNo = {537617}")
# )

DataFrame[]

In [47]:
spark.sql("DESCRIBE HISTORY SalesDeltaFormat").show(truncate = False)
spark.sql(f"SELECT * FROM SalesDeltaFormat WHERE InvoiceNo = {oneRandomInvoice}").show(truncate = False)

+-------+-------------------+------+--------+---------+----------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+---------------------------------------------------------------------------------------+------------+
|version|timestamp          |userId|userName|operation|operationParameters                                                               |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                                       |userMetadata|
+-------+-------------------+------+--------+---------+----------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+---------------------------------------------------------------------------------------+------------+
|5      |2020-11-11 22:44:48|null  |null    |DELETE   |[predicate -> ["(spark_catalog

In [51]:
# After DML (delete)

spark.sql(f"""
              SELECT 
              SUBSTRING(input_file_name(), -67, 67) as FileName, *
              FROM SalesDeltaFormat 
              WHERE InvoiceNo = {oneRandomInvoice}
          """).show(truncate = False)

+--------+---------+---------+-----------+--------+-----------+---------+----------+-------+
|FileName|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+--------+---------+---------+-----------+--------+-----------+---------+----------+-------+
+--------+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [48]:
print("####### HISTORY ########")

# Observe history of actions taken on a Delta table
history = deltaTable.history().select('version','operation', 'operationParameters', \
                                      'operationMetrics') \
                              .withColumnRenamed("version", "ver")

history.show(truncate = False)

####### HISTORY ########
+---+---------+----------------------------------------------------------------------------------+---------------------------------------------------------------------------------------+
|ver|operation|operationParameters                                                               |operationMetrics                                                                       |
+---+---------+----------------------------------------------------------------------------------+---------------------------------------------------------------------------------------+
|5  |DELETE   |[predicate -> ["(spark_catalog.deltademo.SalesDeltaFormat.`InvoiceNo` = 554207)"]]|[numRemovedFiles -> 2, numDeletedRows -> 2, numAddedFiles -> 1, numCopiedRows -> 37483]|
|4  |UPDATE   |[predicate -> (InvoiceNo#3224 = 554207)]                                          |[numRemovedFiles -> 2, numAddedFiles -> 2, numUpdatedRows -> 2, numCopiedRows -> 37483]|
|3  |WRITE    |[mode -> Append, partitio

In [53]:
files_in_dir(deltaPath, "parquet")

279KB    2020-11-11 16:09:29  part-00002-4f72cf15-cb0b-414d-921d-9aa9abb594fb-c000.snappy.parquet
399KB    2020-11-11 16:09:29  part-00001-28fd7101-7f97-4028-83c5-5dcdd9b20754-c000.snappy.parquet
379KB    2020-11-11 16:09:29  part-00000-e09d4d65-b48f-48ff-bb11-32ec31c62321-c000.snappy.parquet
281KB    2020-11-11 16:09:59  part-00002-e86f84af-b8c7-46c1-9339-e4e13871b47f-c000.snappy.parquet
379KB    2020-11-11 16:09:59  part-00000-6862b1f2-21bf-426b-926c-e8c0d3859485-c000.snappy.parquet
398KB    2020-11-11 16:09:59  part-00001-baff5b51-e2f0-4120-b7b6-56d621641096-c000.snappy.parquet
2KB      2020-11-11 16:10:31  part-00000-b783e06e-a2cc-4d2d-ab2f-ba4bbe61552a-c000.snappy.parquet
2KB      2020-11-11 16:10:53  part-00001-6f9a3c57-30d1-43e3-b580-0c1e65b85524-c000.snappy.parquet
398KB    2020-11-11 16:10:53  part-00000-439e822e-6e49-42c2-9c9d-5d35162ba87f-c000.snappy.parquet
399KB    2020-11-11 16:11:17  part-00000-7ce26bc6-9b27-4622-a970-21a895241fca-c000.snappy.parquet

Number of file/s: 1

In [54]:
files_in_dir(deltaLogPath,"*")

2KB      2020-11-11 16:09:30  00000000000000000000.json
765B     2020-11-11 16:09:59  00000000000000000001.json
410B     2020-11-11 16:10:31  00000000000000000002.json
902B     2020-11-11 16:10:53  00000000000000000003.json
775B     2020-11-11 16:11:17  00000000000000000004.json

Number of file/s: 5 | Total size: 24K


In [55]:
logDF = spark.read.format("json").load(deltaLogPath + "/00000000000000000004.json")
#dfLog.printSchema()
logDF.collect()

[Row(add=None, commitInfo=Row(isBlindAppend=False, operation='DELETE', operationMetrics=Row(numAddedFiles='1', numCopiedRows='37483', numDeletedRows='2', numRemovedFiles='2'), operationParameters=Row(predicate='["(spark_catalog.deltademo.SalesDeltaFormat.`InvoiceNo` = 554485)"]'), readVersion=3, timestamp=1605132677391), remove=None),
 Row(add=None, commitInfo=None, remove=Row(dataChange=True, deletionTimestamp=1605132677389, path='part-00000-439e822e-6e49-42c2-9c9d-5d35162ba87f-c000.snappy.parquet')),
 Row(add=None, commitInfo=None, remove=Row(dataChange=True, deletionTimestamp=1605132677389, path='part-00001-6f9a3c57-30d1-43e3-b580-0c1e65b85524-c000.snappy.parquet')),
 Row(add=Row(dataChange=True, modificationTime=1605132677000, path='part-00000-7ce26bc6-9b27-4622-a970-21a895241fca-c000.snappy.parquet', size=408279), commitInfo=None, remove=None)]

In [51]:
# Randomy update 5 random invoices to force a checkpoint

count = 0
anInvoice = retailSalesData2.select("InvoiceNo").orderBy(rand()).limit(1).collect()[0][0]

while (count <= 5):
  deltaTable.update(
    condition=(f"InvoiceNo = {anInvoice}"),
    set={"Quantity": expr("Quantity + 100")})

  count = count + 1
  anInvoice = retailSalesData2.select("InvoiceNo").orderBy(rand()).limit(1).collect()[0][0]

In [52]:
files_in_dir(deltaLogPath,"*")

2KB      2020-11-11 22:32:17  00000000000000000000.json
765B     2020-11-11 22:35:49  00000000000000000001.json
765B     2020-11-11 22:36:42  00000000000000000002.json
410B     2020-11-11 22:40:01  00000000000000000003.json
902B     2020-11-11 22:42:23  00000000000000000004.json
775B     2020-11-11 22:44:48  00000000000000000005.json
1KB      2020-11-11 22:49:39  00000000000000000006.json
1KB      2020-11-11 22:49:40  00000000000000000007.json
1KB      2020-11-11 22:49:42  00000000000000000008.json
1KB      2020-11-11 22:49:44  00000000000000000009.json
1KB      2020-11-11 22:49:45  00000000000000000010.json
18KB     2020-11-11 22:49:46  00000000000000000010.checkpoint.parquet
1KB      2020-11-11 22:49:47  00000000000000000011.json

Number of file/s: 13 | Total size: 80K


In [53]:
checkPointDF = spark.read.format("parquet").load(deltaLogPath + "/00000000000000000010.checkpoint.parquet")
checkPointDF.show(100, truncate = False)

b38ca5-7f0a-472e-b176-f72af7be81d8-c000.snappy.parquet, 1605156288183, false]|null                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |null    |null      |
|null|null                                                                                                     |[part-00002-4d0a18a3-0d19-4555-85c9-2cb8e4185175-c000.snappy.parquet, 1605156582345, false]|null                                           

In [54]:
checkPointFile10 = (    
    checkPointDF.select(col("add.path").alias("FileAdded"),
                        col("add.modificationTime").alias("DateAdded"),
                        col("remove.path").alias("FileDeleted"),
                        col("remove.deletionTimestamp").alias("DateDeleted"))
                .orderBy(["DateAdded", "DateDeleted"], ascending = [True, False])
)

spark.sql("DROP TABLE IF EXISTS tbl_checkpointfile")
spark.sql("CREATE TABLE IF NOT EXISTS tbl_checkpointfile (Action string, filename string, ActionDate Long)")# Crea la tabla tbl_checkpointfile
# dentro del folder 'DATABASE', ya que no se especificó 'LOCATION' y en este caso se crea una tabla vacia

checkPointFile10.createOrReplaceTempView("vw_checkpointfile")

In [60]:
spark.sql("SELECT * FROM vw_checkpointfile LIMIT 100").show(100, truncate=False)

+-------------------------------------------------------------------+-------------+-------------------------------------------------------------------+-------------+
|FileAdded                                                          |DateAdded    |FileDeleted                                                        |DateDeleted  |
+-------------------------------------------------------------------+-------------+-------------------------------------------------------------------+-------------+
|null                                                               |null         |part-00001-9daea6bb-7c90-4378-aa1e-4b442b914e27-c000.snappy.parquet|1605132699095|
|null                                                               |null         |part-00000-9b9ec761-6e6b-4aad-a254-3469ccac8e20-c000.snappy.parquet|1605132699095|
|null                                                               |null         |part-00000-4129a15a-895f-4cbf-b83f-d861256264ef-c000.snappy.parquet|1605132697485|
|nul

In [55]:
spark.sql("""
              INSERT INTO tbl_checkpointfile
              SELECT "Add", FileAdded, DateAdded
              FROM vw_checkpointfile
              WHERE FileAdded IS NOT NULL
          """)
# Aquí es donde se añaden los datos a la tabla 'tbl_checkpointfile' 
spark.sql("""
              INSERT INTO tbl_checkpointfile
              SELECT "Delete", FileDeleted, DateDeleted
              FROM vw_checkpointfile
              WHERE FileDeleted IS NOT NULL
          """)

DataFrame[]

In [62]:
spark.sql("""
              SELECT Action, filename AS `File Name`,
              from_unixtime(actiondate/1e3) AS `ActionDate`
              FROM tbl_checkpointfile
              ORDER BY ActionDate ASC
          """).show(200, truncate = False)

+------+-------------------------------------------------------------------+-------------------+
|Action|File Name                                                          |ActionDate         |
+------+-------------------------------------------------------------------+-------------------+
|Add   |part-00002-4f72cf15-cb0b-414d-921d-9aa9abb594fb-c000.snappy.parquet|2020-11-11 16:09:29|
|Add   |part-00002-e86f84af-b8c7-46c1-9339-e4e13871b47f-c000.snappy.parquet|2020-11-11 16:09:59|
|Delete|part-00000-b783e06e-a2cc-4d2d-ab2f-ba4bbe61552a-c000.snappy.parquet|2020-11-11 16:10:53|
|Delete|part-00001-28fd7101-7f97-4028-83c5-5dcdd9b20754-c000.snappy.parquet|2020-11-11 16:10:53|
|Delete|part-00001-6f9a3c57-30d1-43e3-b580-0c1e65b85524-c000.snappy.parquet|2020-11-11 16:11:17|
|Delete|part-00000-439e822e-6e49-42c2-9c9d-5d35162ba87f-c000.snappy.parquet|2020-11-11 16:11:17|
|Delete|part-00001-baff5b51-e2f0-4120-b7b6-56d621641096-c000.snappy.parquet|2020-11-11 16:11:31|
|Delete|part-00000-7ce26bc6-9b

In [56]:
# Let's add some data to our table by doing a merge
# Do this to show how json logs pick up after checkpoint

# Create a tiny dataframe to use with merge
mergeSalesData= cleanSalesDataDF.sample(withReplacement=False, fraction=.0001, seed=13)
mergeSalesData.createOrReplaceTempView("vw_mergeSalesData")

# User-defined commit metadata
spark.sql(f"""
               SET spark.databricks.delta.commitInfo.userMetadata = 11-11-2020 Data Merge Message;
          """)

spark.sql("""
          MERGE INTO SalesDeltaFormat
          USING vw_mergeSalesData
          ON SalesDeltaFormat.StockCode = vw_mergeSalesData.StockCode
           AND SalesDeltaFormat.InvoiceNo = vw_mergeSalesData.InvoiceNo
          WHEN MATCHED THEN
            UPDATE SET *
          WHEN NOT MATCHED
            THEN INSERT *
          """)

DataFrame[]

In [58]:
spark.sql("describe history SalesDeltaFormat").show(truncate = False)# para ver el mensaje de 'userMetadata'

+-------+-------------------+------+--------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------+
|version|timestamp          |userId|userName|operation|operationParameters                                                                                                                                                                               |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                                                                          

In [59]:
print("####### HISTORY ########")

# Observe history of actions taken on a Delta table
history = deltaTable.history().select('version','operation', 'operationParameters', \
                                      'operationMetrics') \
                              .withColumnRenamed("version", "ver")

history.show(truncate = False)

####### HISTORY ########
+---+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ver|operation|operationParameters                                                                                                                                                                               |operationMetrics                                                                                                                                                                                                       |
+---+---------+----------------------------------------------------------------------------------------------------------

In [65]:
files_in_dir(deltaLogPath,"*")

2KB      2020-11-11 16:09:30  00000000000000000000.json
765B     2020-11-11 16:09:59  00000000000000000001.json
410B     2020-11-11 16:10:31  00000000000000000002.json
902B     2020-11-11 16:10:53  00000000000000000003.json
775B     2020-11-11 16:11:17  00000000000000000004.json
904B     2020-11-11 16:11:31  00000000000000000005.json
905B     2020-11-11 16:11:33  00000000000000000006.json
905B     2020-11-11 16:11:34  00000000000000000007.json
905B     2020-11-11 16:11:36  00000000000000000008.json
905B     2020-11-11 16:11:37  00000000000000000009.json
905B     2020-11-11 16:11:39  00000000000000000010.json
17KB     2020-11-11 16:11:40  00000000000000000010.checkpoint.parquet
2KB      2020-11-11 16:12:08  00000000000000000011.json

Number of file/s: 13 | Total size: 80K


In [66]:
# Count files in deltaPath

numFiles = ! ls $deltaPath/*parquet | wc -l
print(f"There are {numFiles} files.")

There are ['26'] files.


In [67]:
spark.sql("""
              SELECT Action, COUNT(Action) AS `CountAction`
              FROM tbl_checkpointfile
              GROUP BY ACTION WITH ROLLUP
          """).show(200, truncate = False)

+------+-----------+
|ACTION|CountAction|
+------+-----------+
|Add   |6          |
|Delete|16         |
|null  |22         |
+------+-----------+



# Delta Lake File Compaction

In [68]:
# Create an artificial "small file" problem

(spark.read.format("delta").load(deltaPath).repartition(1000).write.option("dataChange", True)\
      .format("delta").mode("overwrite").save(deltaPath))

In [69]:
# Count files in deltaPath

numFiles = ! ls $deltaPath/*parquet | wc -l
print(f"There are {numFiles} files.")

There are ['1026'] files.


In [70]:
%%time

# spark.sql("select * from SalesDeltaFormat limit 2").show()

rowCount = spark.sql(""" SELECT CustomerID, count(Country) AS num_countries
                         FROM SalesDeltaFormat
                         GROUP BY CustomerID 
                     """).count()

print(f"Row Count => {rowCount}\n")

Row Count => 4248

CPU times: user 1.51 ms, sys: 826 µs, total: 2.34 ms
Wall time: 939 ms


In [71]:
# Compact 1000 files to 4

(spark.read
.format("delta")
.load(deltaPath)
.repartition(4)
.write
.option("dataChange", False)
.format("delta")
.mode("overwrite")
.save(deltaPath)
)

In [72]:
# Count files in deltaPath

numFiles = ! ls $deltaPath/*parquet | wc -l
print(f"There are {numFiles} files.")

There are ['1030'] files.


In [73]:
%%time

# spark.sql("select * from SalesDeltaFormat limit 2").show()
rowCount = spark.sql(""" select CustomerID, count(Country) as num_countries
                         from SalesDeltaFormat
                        group by CustomerID """).count()

print(f"Row Count => {rowCount}")

Row Count => 4248
CPU times: user 1.04 ms, sys: 567 µs, total: 1.61 ms
Wall time: 239 ms



# Delta Time Travel Queries


In [74]:
# Time Travel Queries

#POO currentVersion = deltaTable.history(1).select("version").collect()[0][0]
# Determine latest version of the Delta table
currentVersion = spark.sql("DESCRIBE HISTORY SalesDeltaFormat LIMIT 1").collect()[0][0]

# Query table as of the current version to attain row count
currentRowCount = spark.read.format("delta").option("versionAsOf", currentVersion).load(deltaPath).count()

print(f"Row Count: {currentRowCount} as of table version {currentVersion}")
print("")

Row Count: 198542 as of table version 13



In [75]:
print("####### HISTORY ########")

# Observe history of actions taken on a Delta table
history = deltaTable.history().select('version','operation', 'operationParameters', \
                                      'operationMetrics') \
                              .withColumnRenamed("version", "ver")

history.show(100, truncate = False)

####### HISTORY ########
+---+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ver|operation|operationParameters                                                                                                                                                                               |operationMetrics                                                                                                                                                                                                       |
+---+---------+----------------------------------------------------------------------------------------------------------

In [76]:
# Determine difference in record count between the current version and the original version of the table.

origRowCount = spark.read.format("delta").option("versionAsOf", 0).load(deltaPath).count()
print(f"There are {currentRowCount-origRowCount} more rows in version [{currentVersion}] than version [0] of the table.")

There are 99485 more rows in version [13] than version [0] of the table.


In [77]:
# Roll back current table to version 0 (original).
(
    spark
    .read
    .format("delta")
    .option("versionAsOf",0)
    .load(deltaPath)
    .write
    .format("delta")
    .mode("overwrite")
    .save(deltaPath)
)

In [78]:
# Current version should have same record count as version 0.

currentVersion = spark.sql("DESCRIBE HISTORY SalesDeltaFormat LIMIT 1").collect()[0][0]
# If equal it will return "true"
spark.read.format("delta").option("versionAsOf", currentVersion).load(deltaPath).count() == spark.read.format("delta").option("versionAsOf", 0).load(deltaPath).count()


True

In [79]:
print("####### HISTORY ########")

# Observe history of actions taken on a Delta table
history = deltaTable.history().select('version','operation', 'operationParameters', \
                                      'operationMetrics') \
                              .withColumnRenamed("version", "ver")

history.show(100, truncate = False)

####### HISTORY ########
+---+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ver|operation|operationParameters                                                                                                                                                                               |operationMetrics                                                                                                                                                                                                       |
+---+---------+----------------------------------------------------------------------------------------------------------

# Delta Vacuum - Data Retention

delta.logRetentionDuration - default 30 days

delta.deletedFileRetentionDuration - default 30 days

    - Don't need to set them to be the same. You may want to keep the log files around after the tombstoned files are purged.
    - Time travel in order of months/years infeasible
    - Initially desinged to correct mistakes



In [80]:
# files_in_dir(deltaPath,"parquet")
# Count files in deltaPath

numFiles = ! ls $deltaPath/*parquet | wc -l
print(f"There are {numFiles} files.")

There are ['1033'] files.


In [81]:
# Attempt to vacuum table with default settings

#spark.sql("vacuum SalesDeltaFormat retain 0 hours dry run").show(100, truncate = False)
spark.sql("VACUUM SalesDeltaFormat RETAIN 0 HOURS dry run").show(100, truncate = False)

IllegalArgumentException: requirement failed: Are you sure you would like to vacuum files with such a low retention period? If you have
writers that are currently writing to this table, there is a risk that you may corrupt the
state of your Delta table.

If you are certain that there are no operations being performed on this table, such as
insert/upsert/delete/optimize, then you may turn off this check by setting:
spark.databricks.delta.retentionDurationCheck.enabled = false

If you are not sure, please use a value not less than "168 hours".
       

In [82]:
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)

In [83]:
%%time
# Vacuum Delta table to remove all history

spark.sql("VACUUM SalesDeltaFormat RETAIN 0 HOURS").show(truncate = False)
# ! Can use deltaTable.vacuum(0) against directory

+-----------------------------------------------------------------------------------+
|path                                                                               |
+-----------------------------------------------------------------------------------+
|file:/home/oliver/DeltaLake/training/online_retail_dataset/delta/online_retail_data|
+-----------------------------------------------------------------------------------+

CPU times: user 2.75 ms, sys: 1.43 ms, total: 4.18 ms
Wall time: 23.9 s


In [84]:
files_in_dir(deltaPath,"parquet")

279KB    2020-11-11 16:13:39  part-00002-f14f22a5-b50a-41bd-85a5-d66be4f819e4-c000.snappy.parquet
379KB    2020-11-11 16:13:39  part-00001-6cf1a758-7c65-4b52-a884-78f77291b0d8-c000.snappy.parquet
399KB    2020-11-11 16:13:39  part-00000-9d4068ec-b5a3-458d-982a-53bf31f53e15-c000.snappy.parquet

Number of file/s: 3 | Total size: 1.7M


In [85]:
files_in_dir(deltaLogPath,"*")

2KB      2020-11-11 16:09:30  00000000000000000000.json
765B     2020-11-11 16:09:59  00000000000000000001.json
410B     2020-11-11 16:10:31  00000000000000000002.json
902B     2020-11-11 16:10:53  00000000000000000003.json
775B     2020-11-11 16:11:17  00000000000000000004.json
904B     2020-11-11 16:11:31  00000000000000000005.json
905B     2020-11-11 16:11:33  00000000000000000006.json
905B     2020-11-11 16:11:34  00000000000000000007.json
905B     2020-11-11 16:11:36  00000000000000000008.json
905B     2020-11-11 16:11:37  00000000000000000009.json
905B     2020-11-11 16:11:39  00000000000000000010.json
17KB     2020-11-11 16:11:40  00000000000000000010.checkpoint.parquet
2KB      2020-11-11 16:12:08  00000000000000000011.json
169KB    2020-11-11 16:13:02  00000000000000000012.json
141KB    2020-11-11 16:13:13  00000000000000000013.json
1KB      2020-11-11 16:13:40  00000000000000000014.json

Number of file/s: 16 | Total size: 400K


In [86]:
spark.read.format("json").load(deltaLogPath + "/00000000000000000014.json").collect()

[Row(add=None, commitInfo=Row(isBlindAppend=False, operation='WRITE', operationMetrics=Row(numFiles='3', numOutputBytes='1083137', numOutputRows='99057'), operationParameters=Row(mode='Overwrite', partitionBy='[]'), readVersion=13, timestamp=1605132820024, userMetadata='11-11-2020 Data Merge Message;'), remove=None),
 Row(add=Row(dataChange=True, modificationTime=1605132819000, path='part-00000-9d4068ec-b5a3-458d-982a-53bf31f53e15-c000.snappy.parquet', size=408716), commitInfo=None, remove=None),
 Row(add=Row(dataChange=True, modificationTime=1605132819000, path='part-00001-6cf1a758-7c65-4b52-a884-78f77291b0d8-c000.snappy.parquet', size=388268), commitInfo=None, remove=None),
 Row(add=Row(dataChange=True, modificationTime=1605132819000, path='part-00002-f14f22a5-b50a-41bd-85a5-d66be4f819e4-c000.snappy.parquet', size=286153), commitInfo=None, remove=None),
 Row(add=None, commitInfo=None, remove=Row(dataChange=True, deletionTimestamp=1605132820024, path='part-00002-7e9952fb-7b6e-47a3-97e

In [87]:
spark.read.format("delta").option("versionAsOf", currentVersion).load(deltaPath).count()

99057

In [88]:
# Configure Delta table to keep around 7 days of deleted data and 7 days of older log files

spark.sql("ALTER TABLE SalesDeltaFormat SET TBLPROPERTIES ('delta.logRetentionDuration' = 'INTERVAL 7 DAYS', 'delta.deletedFileRetentionDuration' = 'INTERVAL 7 DAYS')")

DataFrame[]

In [89]:
# Verify our changes
spark.sql("describe extended SalesDeltaFormat").show(truncate = False)

+----------------------------+-----------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                      |comment|
+----------------------------+-----------------------------------------------------------------------------------------------+-------+
|InvoiceNo                   |int                                                                                            |       |
|StockCode                   |string                                                                                         |       |
|Description                 |string                                                                                         |       |
|Quantity                    |int                                                                                            |       |
|InvoiceDate                 |string                   

In [10]:
#spark.stop()
spark.sql("SHOW DATABASES").show()

# Current DB should be deltademo
spark.sql("USE deltademo")
spark.sql("SELECT CURRENT_DATABASE()").show()
spark.sql("DESCRIBE DATABASE deltademo").show(truncate = False)
spark.sql("SHOW TABLES").show(truncate= False)
spark.sql("DESCRIBE EXTENDED tbl_checkpointfile").show(truncate = False)

+---------+
|namespace|
+---------+
|  default|
|deltademo|
+---------+

+------------------+
|current_database()|
+------------------+
|         deltademo|
+------------------+

+-------------------------+-------------------------------------------------------------------+
|database_description_item|database_description_value                                         |
+-------------------------+-------------------------------------------------------------------+
|Database Name            |deltademo                                                          |
|Comment                  |Esta es una DB demo para uso del formato Delta                     |
|Location                 |file:/home/oliver/DeltaLake/training/online_retail_dataset/DATABASE|
|Owner                    |oliver                                                             |
+-------------------------+-------------------------------------------------------------------+

+---------+------------------+-----------+
|database

In [11]:
spark.sql("DESCRIBE HISTORY SalesDeltaFormat LIMIT 1").show()

+-------+-------------------+------+--------+-----------------+--------------------+----+--------+---------+-----------+--------------+-------------+----------------+--------------------+
|version|          timestamp|userId|userName|        operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics|        userMetadata|
+-------+-------------------+------+--------+-----------------+--------------------+----+--------+---------+-----------+--------------+-------------+----------------+--------------------+
|     15|2020-11-11 16:15:01|  null|    null|SET TBLPROPERTIES|[properties -> {"...|null|    null|     null|         14|          null|         true|              []|11-11-2020 Data M...|
+-------+-------------------+------+--------+-----------------+--------------------+----+--------+---------+-----------+--------------+-------------+----------------+--------------------+

