In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType, DateType
import sys
import os
from delta import DeltaTable
from delta.tables import *


from pyspark.sql import DataFrame
from pyspark.sql.utils import AnalysisException
import io
import json

In [73]:
def create_spark_session():
    spark_packages_list = [
            'io.delta:delta-core_2.12:2.4.0',
            'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0',
            'org.mongodb.spark:mongo-spark-connector:10.0.2'
        ]
    warehouse_location = '/mnt/datalake/warehouse'
    #bin/spark-sql --packages io.delta:delta-core_2.12:2.4.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

    spark_packages = ",".join(spark_packages_list)
#     derby_location = '/mnt/datalake/derby'
#     .config("spark.driver.extraJavaOptions", f"Dderby.system.home='{derby_location}'") \
    return SparkSession \
        .builder \
        .appName("File Streaming Demo") \
        .master("local[3]") \
        .config("spark.databricks.delta.schema.autoMerge.enabled", "true")\
        .config("spark.sql.warehouse.dir", warehouse_location) \
        .config("spark.jars.packages", spark_packages) \
        .config("spark.streaming.stopGracefullyOnShutdown", "true") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("delta.deletedFileRetentionDuration",7)\
        .config("spark.databricks.delta.retentionDurationCheck.enabled","false") \
        .config("spark.databricks.delta.schema.autoMerge.enabled","true") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .enableHiveSupport()\
        .getOrCreate()

In [74]:
spark = create_spark_session()

23/12/31 17:04:31 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
spark.catalog.listDatabases()

23/12/29 21:15:49 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/12/29 21:15:49 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
23/12/29 21:15:52 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
23/12/29 21:15:52 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore jovyan@172.31.0.2


[Database(name='default', catalog='spark_catalog', description='Default Hive database', locationUri='file:/mnt/datalake/warehouse')]

In [5]:
spark.catalog.listTables('default')

23/12/29 21:15:55 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


[]

In [70]:
schema = StructType([ 
    StructField("product_id",IntegerType(),True), \
    StructField("product_name",StringType(),True),
    StructField("product_category_name",StringType(),True), \
    StructField("product_price", DoubleType(), True),
    StructField("status",StringType(),True),
  ])

data_product = [(1,'perfurme magico','perfumaria',100.5,'Inative'),
                (2,'replica vaso ming importado','artes',70.75,'Active'),
                (3,'raquete de tenis nacional','esporte_lazer',365.00,'Active'),
                (4,'mordedor','bebes',27.25,'Inative'),
                (5,'televisor 46 polegadas 4k','utilidades_domesticas',2555.55,'Active'),
                (6,'PS','utilidades_domesticas',2555.55,'Active')
               ]


df_products = spark.createDataFrame(data=data_product,schema=schema)


In [71]:
df_products.show()

+----------+--------------------+---------------------+-------------+-------+
|product_id|        product_name|product_category_name|product_price| status|
+----------+--------------------+---------------------+-------------+-------+
|         1|     perfurme magico|           perfumaria|        100.5|Inative|
|         2|replica vaso ming...|                artes|        70.75| Active|
|         3|raquete de tenis ...|        esporte_lazer|        365.0| Active|
|         4|            mordedor|                bebes|        27.25|Inative|
|         5|televisor 46 pole...| utilidades_domest...|      2555.55| Active|
|         6|                  PS| utilidades_domest...|      2555.55| Active|
+----------+--------------------+---------------------+-------------+-------+



In [None]:

location_tb  = '/mnt/datalake/bronze/history/products/'
merge_condition = "tgt.product_id = src.product_id"

columns = [
    StructField('product_id', IntegerType(), True),
    StructField('product_name', StringType(), True),
    StructField('product_category_name', StringType(), True),
    StructField('product_price', DoubleType(), True),
    StructField('status', StringType(), True)
]

if (DeltaTable.isDeltaTable(spark, location_tb)):
    print('1')
    deltaTable = DeltaTable.forPath(spark, location_tb)
    deltaTable.alias('tgt') \
        .merge(
            df_products.alias('src'),
            merge_condition
        ) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
else:
    print('2')
    print(DeltaTable.isDeltaTable(spark, location_tb))
    DeltaTable \
            .create(spark) \
            .addColumns(columns) \
            .location(location_tb) \
            .execute()

    deltaTable = DeltaTable.forPath(spark, location_tb)
    deltaTable.alias('tgt') \
        .merge(
            df_products.alias('src'),
            merge_condition
        ) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()


### History em Delta Tables <br>
atravez do método **DeltaTable.forPath(spark, caminho_Delta_table)**

**Importante** Uma DeltaTable não é um spark dataframe, então caso seja necessário utilizar operações e ações de um <br> 
spark dataframe existem duas formas :
* Carregar os dados gravados no formato delta para um dataframe pyspark,utilizar o spark.read.format('delta').load(caminho_Delta_table) 

* Carregar uma DeltaTable com **DeltaTable.forPath(spark, caminho_Delta_table)** e usar o metodo **toDf()** , que retorna um dataframe spark de uma tabela delta

In [75]:
from delta import DeltaTable
from delta.tables import *

In [76]:
path_products = '/mnt/datalake/bronze/history/products/'
deltaTable = DeltaTable.forPath(spark, path_products)
fullHistoryDF = deltaTable.history()
fullHistoryDF.show()

+-------+--------------------+------+--------+------------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|   operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+------------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|     11|2023-12-31 16:55:...|  null|    null|  VACUUM END|{status -> COMPLE...|null|    null|     null|         10|SnapshotIsolation|         true|{numDeletedFiles ...|        null|Apache-Spark/3.4....|
|     10|2023-12-31 16:55:...|  null|    null|VACUUM START|{retentionCheckEn...|null|    null|     null|          9|SnapshotIsolation|         true|{numFilesToDelete...|        null|Ap

In [28]:
fullHistoryDF.select("version", "timestamp", "operation", "operationParameters").show(10, False)


+-------+-----------------------+------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
|version|timestamp              |operation   |operationParameters                                                                                                                                |
+-------+-----------------------+------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
|1      |2023-12-29 14:25:45.397|MERGE       |{predicate -> (tgt.product_id = src.product_id), matchedPredicates -> [{"actionType":"update"}], notMatchedPredicates -> [{"actionType":"insert"}]}|
|0      |2023-12-29 14:25:41.145|CREATE TABLE|{isManaged -> false, description -> null, partitionBy -> [], properties -> {}}                                                                     |
+-------+----------------

### Descrição dos campos retornados pelo history ###

Column|Type|Description
------|----|-----------
version|	long|	Table version generated by the operation.
timestamp|	timestamp|	When this version was committed.
userId|	string|	ID of the user that ran the operation.
userName|	string|	Name of the user that ran the operation.
operation|	string|	Name of the operation.
operationParameters|	map|	Parameters of the operation (for example, predicates.)
job|	struct|	Details of the job that ran the operation.
notebook|	struct|	Details of notebook from which the operation was run.
clusterId|	string|	ID of the cluster on which the operation ran.
readVersion|	long|	Version of the table that was read to perform the write operation.
isolationLevel|	string|	Isolation level used for this operation.
isBlindAppend|	boolean|	Whether this operation appended data.
operationMetrics|	map|	Metrics of the operation (for example, number of rows and files modified.)
userMetadata|	string|	User-defined commit metadata if it was specified

### Versioning em Delta Tables <br>
É possivel carregar um dataframe spark com uma versão especifica existente, somente adicionando a option <br>
**option("versionAsOf", numero_da_versao)** ,  no exemplo, basta substituir o numero_da_versão por qualquer número de versão existente.

In [None]:
path_products = '/mnt/datalake/bronze/history/products/'
df_products_v0 = spark.read.format('delta').option("versionAsOf", 0).load(path_products)

In [18]:
df_products_v0.show()

                                                                                

+----------+------------+---------------------+-------------+------+
|product_id|product_name|product_category_name|product_price|status|
+----------+------------+---------------------+-------------+------+
+----------+------------+---------------------+-------------+------+



In [41]:
path_products = '/mnt/datalake/bronze/history/products/'
df_products_v1 = spark.read.format('delta').option("versionAsOf", 1).load(path_products)

In [42]:
df_products_v1.show()

                                                                                

+----------+--------------------+---------------------+-------------+-------+
|product_id|        product_name|product_category_name|product_price| status|
+----------+--------------------+---------------------+-------------+-------+
|         1|     perfurme magico|           perfumaria|        100.5|Inative|
|         2|replica vaso ming...|                artes|        70.75| Active|
|         3|raquete de tenis ...|        esporte_lazer|        365.0| Active|
|         4|            mordedor|                bebes|        27.25|Inative|
|         6|                  PS| utilidades_domest...|      2555.55| Active|
|         5|televisor 46 pole...| utilidades_domest...|      2555.55| Active|
+----------+--------------------+---------------------+-------------+-------+



In [22]:
spark.read.format("delta").option("versionAsOf", "1").load(path_products).show()

+----------+--------------------+---------------------+-------------+-------+
|product_id|        product_name|product_category_name|product_price| status|
+----------+--------------------+---------------------+-------------+-------+
|         1|     perfurme magico|           perfumaria|        100.5|Inative|
|         2|replica vaso ming...|                artes|        70.75| Active|
|         3|raquete de tenis ...|        esporte_lazer|        365.0| Active|
|         4|            mordedor|                bebes|        27.25|Inative|
|         6|                  PS| utilidades_domest...|      2555.55| Active|
|         5|televisor 46 pole...| utilidades_domest...|      2555.55| Active|
+----------+--------------------+---------------------+-------------+-------+



### Restore de versions em Delta Tables <br>
É possivel carregar um dataframe spark com uma versão especifica existente, somente adicionando a option <br>
**option("versionAsOf", numero_da_versao)** ,  no exemplo, basta substituir o numero_da_versão por qualquer número de versão existente.

In [33]:
path_products = '/mnt/datalake/bronze/history/products/'
deltaTable = DeltaTable.forPath(spark, path_products)


No exemplo abaixo, utilizamos a versão 0, onde somente criamos a delta table sem dados, o comando show retorna somente o cabeçalho dos dados como esperado <br>
**deltaTable.restoreToVersion(0)**

In [34]:
deltaTable.restoreToVersion(0)

                                                                                

DataFrame[table_size_after_restore: bigint, num_of_files_after_restore: bigint, num_removed_files: bigint, num_restored_files: bigint, removed_files_size: bigint, restored_files_size: bigint]

In [36]:
df_from_delta = deltaTable.toDF()
df_from_delta.show()

                                                                                

+----------+------------+---------------------+-------------+------+
|product_id|product_name|product_category_name|product_price|status|
+----------+------------+---------------------+-------------+------+
+----------+------------+---------------------+-------------+------+



Para demonstrar o a mudança dos dados, escolhemos a versão 1, onde o comando de **MERGE** foi realizado,  com isso uma nova <br> versão é criada com os dados da versão 1 , a versão criada é sempre mais atual. <br>
 **deltaTable.restoreToVersion(1)**

In [43]:
deltaTable.restoreToVersion(1)

                                                                                

DataFrame[table_size_after_restore: bigint, num_of_files_after_restore: bigint, num_removed_files: bigint, num_restored_files: bigint, removed_files_size: bigint, restored_files_size: bigint]

In [44]:
fullHistoryDF = deltaTable.history()
fullHistoryDF.show()

+-------+--------------------+------+--------+------------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|   operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+------------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|      5|2023-12-31 16:14:...|  null|    null|     RESTORE|{version -> 1, ti...|null|    null|     null|          4|     Serializable|        false|{numRestoredFiles...|        null|Apache-Spark/3.4....|
|      4|2023-12-31 16:12:...|  null|    null|     RESTORE|{version -> 2, ti...|null|    null|     null|          3|     Serializable|        false|{numRestoredFiles...|        null|Ap

In [45]:
df_from_delta = deltaTable.toDF()
df_from_delta.show()

+----------+--------------------+---------------------+-------------+-------+
|product_id|        product_name|product_category_name|product_price| status|
+----------+--------------------+---------------------+-------------+-------+
|         1|     perfurme magico|           perfumaria|        100.5|Inative|
|         2|replica vaso ming...|                artes|        70.75| Active|
|         3|raquete de tenis ...|        esporte_lazer|        365.0| Active|
|         4|            mordedor|                bebes|        27.25|Inative|
|         6|                  PS| utilidades_domest...|      2555.55| Active|
|         5|televisor 46 pole...| utilidades_domest...|      2555.55| Active|
+----------+--------------------+---------------------+-------------+-------+



### Vaccum em Delta Tables <br>
É possivel remover dados antigos de uma Delta Table utilizando o comando **Vacuum** . <br>
Essa opção se torna interessante por uma Delta Table vai acumular diversas versão criando maior custo de armazenagem de dados <br>
Após rodar o comando **vacuum** , os dados serão permanentemente removidos da Delta Table, o comando **vacuum** deve ser usado com cuidado <br>
e combinado com uma estrátégia de retenção de dados. <br>
No Exemplo abaixo o comando **vacuum** é disparado com o periodo default de retenção de dados. <br>
No segundo exemplo, o comando **vacuum** é disparado removendo versão com mais de 10 horas.Como o periodo de retenção é pequeno,<br>
o spark não vai executar o comando e vai informar que é necessário acrescentar a configuração
**spark.databricks.delta.retentionDurationCheck.enabled = false** no Spark Session

In [48]:
fullHistoryDF = deltaTable.history()
fullHistoryDF.show()

+-------+--------------------+------+--------+------------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|   operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+------------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|      5|2023-12-31 16:14:...|  null|    null|     RESTORE|{version -> 1, ti...|null|    null|     null|          4|     Serializable|        false|{numRestoredFiles...|        null|Apache-Spark/3.4....|
|      4|2023-12-31 16:12:...|  null|    null|     RESTORE|{version -> 2, ti...|null|    null|     null|          3|     Serializable|        false|{numRestoredFiles...|        null|Ap

In [72]:
conf = spark.sparkContext.getConf().get('spark.sql.warehouse.dir')
print(conf)

file:/mnt/datalake/warehouse


In [49]:
deltaTable.vacuum()        # vacuum files not required by versions older than the default retention period





Deleted 0 files and directories in a total of 1 directories.


                                                                                

DataFrame[]

In [50]:
fullHistoryDF = deltaTable.history()
fullHistoryDF.show()

+-------+--------------------+------+--------+------------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|   operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+------------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|      7|2023-12-31 16:46:...|  null|    null|  VACUUM END|{status -> COMPLE...|null|    null|     null|          6|SnapshotIsolation|         true|{numDeletedFiles ...|        null|Apache-Spark/3.4....|
|      6|2023-12-31 16:46:...|  null|    null|VACUUM START|{retentionCheckEn...|null|    null|     null|          5|SnapshotIsolation|         true|{numFilesToDelete...|        null|Ap

In [53]:
spark = create_spark_session()

23/12/31 16:51:52 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [77]:
deltaTable.vacuum(5)   # vacuum files not required by versions more than 5 hours old



Deleted 0 files and directories in a total of 1 directories.


                                                                                

DataFrame[]

In [78]:
fullHistoryDF = deltaTable.history()
fullHistoryDF.show()

+-------+--------------------+------+--------+------------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|   operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+------------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|     13|2023-12-31 17:05:...|  null|    null|  VACUUM END|{status -> COMPLE...|null|    null|     null|         12|SnapshotIsolation|         true|{numDeletedFiles ...|        null|Apache-Spark/3.4....|
|     12|2023-12-31 17:05:...|  null|    null|VACUUM START|{retentionCheckEn...|null|    null|     null|         11|SnapshotIsolation|         true|{numFilesToDelete...|        null|Ap