In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *


# Configuração do Spark
conf = SparkConf()
conf.setAppName("Write Delta Bronze Employee")
conf.set("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
conf.set("spark.hadoop.fs.s3a.access.key", "chapolin")
conf.set("spark.hadoop.fs.s3a.secret.key", "mudar@123")
conf.set("spark.hadoop.fs.s3a.path.style.access", True)
conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
conf.set("hive.metastore.uris", "thrift://metastore:9083")

# Inicialização da sessão do Spark
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()


In [2]:

# Dados de exemplo
data2 = [("James", "", "Smith", "36636", "M", 3000),
         ("Michael", "Rose", "", "40288", "M", 4000),
         ("Robert", "", "Williams", "42114", "M", 4000),
         ("Maria", "Anne", "Jones", "39192", "F", 4000),
         ("Jen", "Mary", "Brown", "", "F", -1)]

# Esquema dos dados
schema = StructType([
    StructField("firstname", StringType(), True),
    StructField("middlename", StringType(), True),
    StructField("lastname", StringType(), True),
    StructField("id", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("salary", IntegerType(), True)
])

# Criando DataFrame
df = spark.createDataFrame(data=data2, schema=schema)

df.show()

df.write.format("delta").mode("append").save('s3a://bronze/delta_employee')

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|    James|          |   Smith|36636|     M|  3000|
|  Michael|      Rose|        |40288|     M|  4000|
|   Robert|          |Williams|42114|     M|  4000|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+



In [3]:
spark.read.format("delta").option("versionAsOf", "4").load('s3a://bronze/delta_employee').show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|   Robert|          |Williams|42114|     M|  4000|
|   Robert|          |Williams|42114|     M|  4000|
|   Robert|          |Williams|42114|     M|  4000|
|   Robert|          |Williams|42114|     M|  4000|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|  Michael|      Rose|        |40288|     M|  4000|
|  Michael|      Rose|        |40288|     M|  4000|
|  Michael|      Rose|        |40288|     M|  4000|
|  Michael|      Rose|        |40288|     M|  4000|
|  Michael|      Rose|        |40288|     M|  4000|
|    James|          |   Smith|36636|     M|  3000|
|    James|          |   Smith|36636|     M|  3000|
|    James| 

In [2]:
from delta.tables import DeltaTable
from pyspark.sql import SparkSession

# Cria a sessão do Spark
spark = SparkSession.builder \
    .appName("DeltaTableHistory") \
    .getOrCreate()

# Caminho para a tabela Delta no S3
table_path = 's3a://bronze/delta_employee'

# Obter a tabela Delta
delta_table = DeltaTable.forPath(spark, table_path)

# Obter o histórico da tabela Delta
history_df = delta_table.history()

# Mostrar o histórico da tabela
history_df.show(truncate=False)



+-------+-------------------+------+--------+---------+----------------------------------------+----+--------+---------------+-----------+-----------------+-------------+--------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp          |userId|userName|operation|operationParameters                     |job |notebook|clusterId      |readVersion|isolationLevel   |isBlindAppend|operationMetrics                                              |userMetadata|engineInfo                         |
+-------+-------------------+------+--------+---------+----------------------------------------+----+--------+---------------+-----------+-----------------+-------------+--------------------------------------------------------------+------------+-----------------------------------+
|14     |2024-07-01 09:49:22|null  |null    |WRITE    |{mode -> Append, partitionBy -> []}     |null|null    |null           |13         |Serializable 

# Voltando para a versao 0

In [5]:
from delta.tables import DeltaTable

# Caminho para a tabela Delta no S3
table_path = 's3a://bronze/delta_employee'

# Obter a tabela Delta
delta_table = DeltaTable.forPath(spark, table_path)

# Ler a versão 0 da tabela
df_version_0 = spark.read.format("delta").option("versionAsOf", 0).load(table_path)

# Sobrescrever a tabela atual com os dados da versão 0
df_version_0.write.format("delta").mode("overwrite").save(table_path)


In [3]:
# Ler a tabela Delta
df = spark.read.format("delta").load('s3a://bronze/delta_employee')

# Mostrar os dados da tabela
df.show()


+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|   Robert|          |Williams|42114|     M|  4000|
|   Robert|          |Williams|42114|     M|  4000|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|  Michael|      Rose|        |40288|     M|  4000|
|  Michael|      Rose|        |40288|     M|  4000|
|    James|          |   Smith|36636|     M|  3000|
|    James|          |   Smith|36636|     M|  3000|
|      Jen|      Mary|   Brown|     |     F|    -1|
|      Jen|      Mary|   Brown|     |     F|    -1|
|  wallace|   Camargo|   Graca|77777|     M| 50000|
+---------+----------+--------+-----+------+------+



In [4]:

from delta.tables import DeltaTable
from pyspark.sql import SparkSession

# Cria a sessão do Spark
spark = SparkSession.builder \
    .appName("CheckDeltaOperations") \
    .getOrCreate()

# Caminho para a tabela Delta no S3
table_path = 's3a://bronze/delta_employee'

# Obter a tabela Delta
delta_table = DeltaTable.forPath(spark, table_path)

# Obter o histórico da tabela Delta
history_df = delta_table.history()

# Filtrar e mostrar operações de inserts, updates e deletes
operations_df = history_df.filter("operation != 'READ'").select("version", "timestamp", "operation", "operationParameters", "operationMetrics")

# Mostrar o resultado
operations_df.show(truncate=False)


+-------+-------------------+---------+----------------------------------------+--------------------------------------------------------------+
|version|timestamp          |operation|operationParameters                     |operationMetrics                                              |
+-------+-------------------+---------+----------------------------------------+--------------------------------------------------------------+
|14     |2024-07-01 09:49:22|WRITE    |{mode -> Append, partitionBy -> []}     |{numFiles -> 6, numOutputRows -> 5, numOutputBytes -> 9028}   |
|13     |2024-07-01 09:00:11|WRITE    |{queryId -> 20240701_090011_00056_9rh2z}|null                                                          |
|12     |2024-07-01 08:58:22|MERGE    |{queryId -> 20240701_085822_00036_9rh2z}|null                                                          |
|11     |2024-07-01 08:52:28|WRITE    |{queryId -> 20240701_085228_00016_9rh2z}|null                                                    

# Vacuum

In [4]:
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from datetime import datetime, timedelta

# Cria a sessão do Spark
spark = SparkSession.builder \
    .appName("DeltaTableVacuum") \
    .getOrCreate()

# Caminho para a tabela Delta no S3
table_path = 's3a://bronze/delta_employee'

# Obter a tabela Delta
delta_table = DeltaTable.forPath(spark, table_path)

# Calcular a quantidade de horas para retenção (7 dias em horas)
retention_hours = 7 * 24  # 7 dias = 7 * 24 horas

# Executar o vacuum na tabela Delta especificando o tempo de retenção
delta_table.vacuum(retention_hours)

print("Vacuum concluído com sucesso na tabela Delta, removendo dados antigos dos últimos 7 dias.")


Vacuum concluído com sucesso na tabela Delta, removendo dados antigos dos últimos 7 dias.


# Optimize

In [6]:
from delta.tables import DeltaTable
from pyspark.sql import SparkSession

# Cria a sessão do Spark
spark = SparkSession.builder \
    .appName("DeltaTableOptimize") \
    .getOrCreate()

# Caminho para a tabela Delta no S3
table_path = 's3a://bronze/delta_employee'

# Obter a tabela Delta
delta_table = DeltaTable.forPath(spark, table_path)

# Executar o optimize na tabela Delta
delta_table.optimize()

print("Optimize concluído com sucesso na tabela Delta.")


Optimize concluído com sucesso na tabela Delta.


In [7]:
from delta.tables import DeltaTable
from pyspark.sql import SparkSession

# Cria a sessão do Spark
spark = SparkSession.builder \
    .appName("ShowDeltaChanges") \
    .getOrCreate()

# Caminho para a tabela Delta no S3
table_path = 's3a://bronze/delta_employee'

# Obter a tabela Delta
delta_table = DeltaTable.forPath(spark, table_path)

# Obter o histórico da tabela Delta
history_df = delta_table.history()

# Filtrar e mostrar operações de inserts, updates e deletes
changes_df = history_df.filter("operation != 'READ'").select("version", "timestamp", "operation", "operationParameters", "operationMetrics")

# Mostrar o resultado
changes_df.show(truncate=False)


+-------+-------------------+---------+----------------------------------------+--------------------------------------------------------------+
|version|timestamp          |operation|operationParameters                     |operationMetrics                                              |
+-------+-------------------+---------+----------------------------------------+--------------------------------------------------------------+
|14     |2024-07-01 09:49:22|WRITE    |{mode -> Append, partitionBy -> []}     |{numFiles -> 6, numOutputRows -> 5, numOutputBytes -> 9028}   |
|13     |2024-07-01 09:00:11|WRITE    |{queryId -> 20240701_090011_00056_9rh2z}|null                                                          |
|12     |2024-07-01 08:58:22|MERGE    |{queryId -> 20240701_085822_00036_9rh2z}|null                                                          |
|11     |2024-07-01 08:52:28|WRITE    |{queryId -> 20240701_085228_00016_9rh2z}|null                                                    

In [3]:
from delta.tables import DeltaTable
from pyspark.sql import SparkSession

# Cria a sessão do Spark
spark = SparkSession.builder \
    .appName("ShowDeltaChanges") \
    .getOrCreate()

# Caminho para a tabela Delta no S3
table_path = 's3a://bronze/delta_employee'

# Obter a tabela Delta
delta_table = DeltaTable.forPath(spark, table_path)

# Obter o histórico da tabela Delta
history_df = delta_table.history()

# Filtrar e mostrar operações de INSERT
inserts_df = history_df.filter("operation = 'WRITE' and operationParameters.mode = 'Append'")
inserts_df.show(truncate=False)

# Filtrar e mostrar operações de UPDATE (Merge)
updates_df = history_df.filter("operation = 'MERGE'")
updates_df.show(truncate=False)

# Para mostrar os deletes, não há um registro explícito em Delta Lake, pois ele mantém um log de transações.


+-------+-------------------+------+--------+---------+-----------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp          |userId|userName|operation|operationParameters                |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                           |userMetadata|engineInfo                         |
+-------+-------------------+------+--------+---------+-----------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|14     |2024-07-01 09:49:22|null  |null    |WRITE    |{mode -> Append, partitionBy -> []}|null|null    |null     |13         |Serializable  |true         |{numFiles -> 6, numOutputRows -> 5, numOutputB

# cdf

In [2]:
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Cria a sessão do Spark
spark = SparkSession.builder \
    .appName("ShowDeltaChanges") \
    .getOrCreate()

# Caminho para a tabela Delta no S3
table_path = 's3a://bronze/delta_employee'

# Habilitar Change Data Feed na tabela (necessário fazer isso uma vez)
spark.sql(f"ALTER TABLE delta.`{table_path}` SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")

# Fazer algumas alterações na tabela (inserções, atualizações e exclusões)
# Supondo que você tenha um DataFrame com novos dados para inserir
new_data = [(6, 'John Doe', 'Sales'), (7, 'Jane Smith', 'Marketing')]
columns = ["id", "name", "department"]

new_df = spark.createDataFrame(new_data, columns)
new_df.write.format("delta").mode("append").save(table_path)

# Atualizando alguns dados
delta_table = DeltaTable.forPath(spark, table_path)
delta_table.update(condition=col("id") == 6, set={"department": "'HR'"})

# Deletando alguns dados
delta_table.delete(condition=col("id") == 7)

# Verificar a versão atual
end_version = delta_table.history().select("version").orderBy("version", ascending=False).first()[0]

# Ler as mudanças usando Change Data Feed (CDF)
# Especificando o intervalo de version numbers para as alterações desejadas
# Aqui, vamos começar a partir da versão em que habilitamos o CDF
start_version = end_version - 1  # Se o CDF foi habilitado na última operação

# Ler as mudanças entre as versões especificadas
changes_df = spark.read.format("delta") \
    .option("readChangeData", "true") \
    .option("startingVersion", start_version) \
    .option("endingVersion", end_version) \
    .load(table_path)

# Mostrar as linhas que foram inseridas
inserts_df = changes_df.filter("_change_type = 'insert'")
inserts_df.show(truncate=False)

# Mostrar as linhas que foram atualizadas
updates_df = changes_df.filter("_change_type = 'update_postimage'")
updates_df.show(truncate=False)

# Mostrar as linhas que foram deletadas
deletes_df = changes_df.filter("_change_type = 'delete'")
deletes_df.show(truncate=False)



AnalysisException: Failed to merge fields 'id' and 'id'. Failed to merge incompatible data types StringType and LongType