# Bloco 1

In [27]:
# Bibliotecas necessárias

from pyspark.sql import SparkSession, Row
from delta import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType
from pyspark.sql.functions import sum

In [1]:
# Configurar a sessão Spark com suporte ao Delta Lake
builder = SparkSession.builder \
    .appName("Delta Lake Test") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Verificar a configuração da sessão Spark
print("Spark Session Configurations:")
print(spark.sparkContext.getConf().getAll())

Spark Session Configurations:
[('spark.app.initial.jar.urls', 'spark://28ff82eaa3cc:38477/jars/io.delta_delta-spark_2.12-3.2.0.jar,spark://28ff82eaa3cc:38477/jars/org.antlr_antlr4-runtime-4.9.3.jar,spark://28ff82eaa3cc:38477/jars/io.delta_delta-storage-3.2.0.jar'), ('spark.driver.port', '38477'), ('spark.jars', 'file:///home/jovyan/.ivy2/jars/io.delta_delta-spark_2.12-3.2.0.jar,file:///home/jovyan/.ivy2/jars/io.delta_delta-storage-3.2.0.jar,file:///home/jovyan/.ivy2/jars/org.antlr_antlr4-runtime-4.9.3.jar'), ('spark.executor.id', 'driver'), ('spark.app.startTime', '1722609809443'), ('spark.app.initial.file.urls', 'file:///home/jovyan/.ivy2/jars/io.delta_delta-storage-3.2.0.jar,file:///home/jovyan/.ivy2/jars/org.antlr_antlr4-runtime-4.9.3.jar,file:///home/jovyan/.ivy2/jars/io.delta_delta-spark_2.12-3.2.0.jar'), ('spark.repl.local.jars', 'file:///home/jovyan/.ivy2/jars/io.delta_delta-spark_2.12-3.2.0.jar,file:///home/jovyan/.ivy2/jars/io.delta_delta-storage-3.2.0.jar,file:///home/jovyan/

# Bloco 2

In [2]:
# Ler o arquivo CSV
csv_file_path = "/home/jovyan/work/financial_data.csv"
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# Mostrar os dados carregados
print("DataFrame Carregado do CSV:")
df.show()

DataFrame Carregado do CSV:
+------------+--------------+-------+--------------+--------------------+
|id_transacao|data_transacao|  valor|tipo_transacao|           descricao|
+------------+--------------+-------+--------------+--------------------+
|           1|    2024-07-01|1500.45|      DEPOSITO| Depósito de Salário|
|           2|    2024-07-02|-200.32|         SAQUE|        Saque em ATM|
|           3|    2024-07-03| -50.67|     PAGAMENTO|           Cafeteria|
|           4|    2024-07-03|2000.78| TRANSFERENCIA|Transferência par...|
|           5|    2024-07-04|-100.25|     PAGAMENTO|        Supermercado|
|           6|    2024-07-04| -30.12|     PAGAMENTO|  Transporte Público|
|           7|    2024-07-05|2500.89|      DEPOSITO| Pagamento Freelance|
|           8|    2024-07-06|-300.55|         SAQUE|        Saque em ATM|
|           9|    2024-07-07|  -20.3|     PAGAMENTO|   Assinatura Online|
|          10|    2024-07-07|  500.4|      DEPOSITO|Retorno de Invest...|
|         

# Bloco 3

In [3]:
# Verificar os tipos de cada coluna
print("Schema do DataFrame Carregado do CSV:")
df.printSchema()

Schema do DataFrame Carregado do CSV:
root
 |-- id_transacao: integer (nullable = true)
 |-- data_transacao: date (nullable = true)
 |-- valor: double (nullable = true)
 |-- tipo_transacao: string (nullable = true)
 |-- descricao: string (nullable = true)



# Bloco 4

In [4]:
# Filtrar as transações com valor acima de 1000
df_filtrado = df.filter(df['valor'] > 1000)

# Mostrar os dados filtrados
print("DataFrame com Transações com Valor Acima de 1000:")
df_filtrado.show()

DataFrame com Transações com Valor Acima de 1000:
+------------+--------------+-------+--------------+--------------------+
|id_transacao|data_transacao|  valor|tipo_transacao|           descricao|
+------------+--------------+-------+--------------+--------------------+
|           1|    2024-07-01|1500.45|      DEPOSITO| Depósito de Salário|
|           4|    2024-07-03|2000.78| TRANSFERENCIA|Transferência par...|
|           7|    2024-07-05|2500.89|      DEPOSITO| Pagamento Freelance|
|          11|    2024-07-08|3200.99|      DEPOSITO|             Salário|
|          15|    2024-07-12|1800.75| TRANSFERENCIA|Transferência de ...|
|          20|    2024-07-17|1000.65|      DEPOSITO|               Bônus|
|          22|    2024-07-19| 2200.8| TRANSFERENCIA|Transferência par...|
|          28|    2024-07-25| 2000.1| TRANSFERENCIA|Transferência de ...|
|          30|    2024-07-27|2750.75|      DEPOSITO|             Salário|
|          34|    2024-08-01|1600.95|      DEPOSITO| Depósito 

# Bloco 5 e 6

In [6]:
# Salvar o DataFrame filtrado como uma tabela Delta particionada pela data da transação
delta_caminho = "/home/jovyan/work/delta/financial_data"
df_filtrado.write.format("delta").partitionBy("data_transacao").mode("overwrite").save(delta_caminho)


In [9]:
# Ler a tabela Delta
df_delta = spark.read.format("delta").load(delta_caminho)

# Mostrar os dados carregados da tabela Delta
print("DataFrame Carregado da Tabela Delta (Transações com Valor Acima de 1000):")
df_delta.show()

DataFrame Carregado da Tabela Delta (Transações com Valor Acima de 1000):
+------------+--------------+-------+--------------+--------------------+
|id_transacao|data_transacao|  valor|tipo_transacao|           descricao|
+------------+--------------+-------+--------------+--------------------+
|          28|    2024-07-25| 2000.1| TRANSFERENCIA|Transferência de ...|
|          37|    2024-08-03| 2150.6| TRANSFERENCIA|Transferência par...|
|           4|    2024-07-03|2000.78| TRANSFERENCIA|Transferência par...|
|          22|    2024-07-19| 2200.8| TRANSFERENCIA|Transferência par...|
|          15|    2024-07-12|1800.75| TRANSFERENCIA|Transferência de ...|
|          34|    2024-08-01|1600.95|      DEPOSITO| Depósito de Salário|
|           1|    2024-07-01|1500.45|      DEPOSITO| Depósito de Salário|
|          40|    2024-08-05| 2650.4|      DEPOSITO| Pagamento Freelance|
|           7|    2024-07-05|2500.89|      DEPOSITO| Pagamento Freelance|
|          48|    2024-08-10|1350.45| 

In [10]:
# Verificar os tipos de cada coluna na tabela Delta
print("Schema do DataFrame Carregado da Tabela Delta:")
df_delta.printSchema()

Schema do DataFrame Carregado da Tabela Delta:
root
 |-- id_transacao: integer (nullable = true)
 |-- data_transacao: date (nullable = true)
 |-- valor: double (nullable = true)
 |-- tipo_transacao: string (nullable = true)
 |-- descricao: string (nullable = true)



In [9]:
# Ler a tabela Delta
df_delta = spark.read.format("delta").load(delta_caminho)

# Mostrar os dados carregados da tabela Delta
print("DataFrame Carregado da Tabela Delta (Transações com Valor Acima de 1000):")
df_delta.show()

DataFrame Carregado da Tabela Delta (Transações com Valor Acima de 1000):
+------------+--------------+-------+--------------+--------------------+
|id_transacao|data_transacao|  valor|tipo_transacao|           descricao|
+------------+--------------+-------+--------------+--------------------+
|          28|    2024-07-25| 2000.1| TRANSFERENCIA|Transferência de ...|
|          37|    2024-08-03| 2150.6| TRANSFERENCIA|Transferência par...|
|           4|    2024-07-03|2000.78| TRANSFERENCIA|Transferência par...|
|          22|    2024-07-19| 2200.8| TRANSFERENCIA|Transferência par...|
|          15|    2024-07-12|1800.75| TRANSFERENCIA|Transferência de ...|
|          34|    2024-08-01|1600.95|      DEPOSITO| Depósito de Salário|
|           1|    2024-07-01|1500.45|      DEPOSITO| Depósito de Salário|
|          40|    2024-08-05| 2650.4|      DEPOSITO| Pagamento Freelance|
|           7|    2024-07-05|2500.89|      DEPOSITO| Pagamento Freelance|
|          48|    2024-08-10|1350.45| 

# Bloco 7

In [11]:
# Consultar transações com valor acima de 2000
df_acima_2000 = df_delta.filter(df_delta['valor'] > 2000)

# Mostrar os dados filtrados
print("Transações com Valor Acima de 2000:")
df_acima_2000.show()

Transações com Valor Acima de 2000:
+------------+--------------+-------+--------------+--------------------+
|id_transacao|data_transacao|  valor|tipo_transacao|           descricao|
+------------+--------------+-------+--------------+--------------------+
|          28|    2024-07-25| 2000.1| TRANSFERENCIA|Transferência de ...|
|          37|    2024-08-03| 2150.6| TRANSFERENCIA|Transferência par...|
|           4|    2024-07-03|2000.78| TRANSFERENCIA|Transferência par...|
|          22|    2024-07-19| 2200.8| TRANSFERENCIA|Transferência par...|
|          40|    2024-08-05| 2650.4|      DEPOSITO| Pagamento Freelance|
|           7|    2024-07-05|2500.89|      DEPOSITO| Pagamento Freelance|
|          44|    2024-08-08|3300.75|      DEPOSITO|         Bônus Anual|
|          11|    2024-07-08|3200.99|      DEPOSITO|             Salário|
|          30|    2024-07-27|2750.75|      DEPOSITO|             Salário|
+------------+--------------+-------+--------------+--------------------+



# Bloco 8

In [13]:
# Exibir o histórico de versões da tabela Delta
history_df = spark.sql(f"DESCRIBE HISTORY delta.`{delta_caminho}`")
history_df.select("version", "timestamp", "operation", "operationParameters", "operationMetrics", "engineInfo").show(truncate=False)

+-------+-----------------------+---------+------------------------------------------------------+--------------------------------------------------------------+-----------------------------------+
|version|timestamp              |operation|operationParameters                                   |operationMetrics                                              |engineInfo                         |
+-------+-----------------------+---------+------------------------------------------------------+--------------------------------------------------------------+-----------------------------------+
|0      |2024-08-02 14:51:24.522|WRITE    |{mode -> Overwrite, partitionBy -> ["data_transacao"]}|{numFiles -> 14, numOutputRows -> 14, numOutputBytes -> 19597}|Apache-Spark/3.5.0 Delta-Lake/3.2.0|
+-------+-----------------------+---------+------------------------------------------------------+--------------------------------------------------------------+-----------------------------------+



# Bloco 9

In [22]:
# Dados novos a serem adicionados
novos_dados = [
    (11, '2024-07-08', 3000.95, 'DEPOSITO', 'Bônus Anual'),
    (12, '2024-07-08', -150.40, 'PAGAMENTO', 'Restaurante'),
    (13, '2024-07-09', 800.75, 'TRANSFERENCIA', 'Transferência da Conta Corrente'),
    (14, '2024-07-09', -200.22, 'PAGAMENTO', 'Cinema')
]

# Esquema dos novos dados
novos_dados_schema = StructType([
    StructField("id_transacao", IntegerType(), True),
    StructField("data_transacao", StringType(), True),
    StructField("valor", DoubleType(), True),
    StructField("tipo_transacao", StringType(), True),
    StructField("descricao", StringType(), True)
])

# Criar DataFrame com os novos dados usando o mesmo esquema
novos_dados_df = spark.createDataFrame(novos_dados, novos_dados_schema)

In [23]:
# Criar DataFrame com os novos dados usando o mesmo esquema
novos_dados_df = spark.createDataFrame(novos_dados, novos_dados_schema)

# Converter a coluna 'data_transacao' para o tipo DateType
novos_dados_df = novos_dados_df.withColumn("data_transacao", novos_dados_df["data_transacao"].cast(DateType()))

# Adicionar os novos dados à tabela Delta
novos_dados_df.write.format("delta").mode("append").option("mergeSchema", "true").save(delta_caminho)

# Ler a tabela Delta atualizada
df_delta_atualizado = spark.read.format("delta").load(delta_caminho)

# Mostrar os dados carregados da tabela Delta atualizada
print("DataFrame Carregado da Tabela Delta Atualizada:")
df_delta_atualizado.show()

DataFrame Carregado da Tabela Delta Atualizada:
+------------+--------------+-------+--------------+--------------------+
|id_transacao|data_transacao|  valor|tipo_transacao|           descricao|
+------------+--------------+-------+--------------+--------------------+
|          13|    2024-07-09| 800.75| TRANSFERENCIA|Transferência da ...|
|          28|    2024-07-25| 2000.1| TRANSFERENCIA|Transferência de ...|
|          37|    2024-08-03| 2150.6| TRANSFERENCIA|Transferência par...|
|           4|    2024-07-03|2000.78| TRANSFERENCIA|Transferência par...|
|          22|    2024-07-19| 2200.8| TRANSFERENCIA|Transferência par...|
|          15|    2024-07-12|1800.75| TRANSFERENCIA|Transferência de ...|
|          34|    2024-08-01|1600.95|      DEPOSITO| Depósito de Salário|
|           1|    2024-07-01|1500.45|      DEPOSITO| Depósito de Salário|
|          40|    2024-08-05| 2650.4|      DEPOSITO| Pagamento Freelance|
|           7|    2024-07-05|2500.89|      DEPOSITO| Pagamento F

# Bloco 10

In [25]:
# Realizar consulta para selecionar todas as transações com valor acima de 1000
transacoes_acima_1000 = df_delta_atualizado.filter(df_delta_atualizado["valor"] > 1000)

# Mostrar os dados filtrados
print("Transações com Valor Acima de 1000:")
transacoes_acima_1000.show()

Transações com Valor Acima de 1000:
+------------+--------------+-------+--------------+--------------------+
|id_transacao|data_transacao|  valor|tipo_transacao|           descricao|
+------------+--------------+-------+--------------+--------------------+
|          28|    2024-07-25| 2000.1| TRANSFERENCIA|Transferência de ...|
|          37|    2024-08-03| 2150.6| TRANSFERENCIA|Transferência par...|
|           4|    2024-07-03|2000.78| TRANSFERENCIA|Transferência par...|
|          22|    2024-07-19| 2200.8| TRANSFERENCIA|Transferência par...|
|          15|    2024-07-12|1800.75| TRANSFERENCIA|Transferência de ...|
|          34|    2024-08-01|1600.95|      DEPOSITO| Depósito de Salário|
|           1|    2024-07-01|1500.45|      DEPOSITO| Depósito de Salário|
|          40|    2024-08-05| 2650.4|      DEPOSITO| Pagamento Freelance|
|           7|    2024-07-05|2500.89|      DEPOSITO| Pagamento Freelance|
|          48|    2024-08-10|1350.45|      DEPOSITO|    Reembolso Médico|
| 

# Bloco 11

In [29]:
# Agrupar transações por tipo e somar os valores
agrupado_por_tipo = df_delta_atualizado.groupBy("tipo_transacao").agg(sum("valor").alias("total_valor"))

# Mostrar os dados agrupados
print("Transações Agrupadas por Tipo e Soma dos Valores:")
agrupado_por_tipo.show()

Transações Agrupadas por Tipo e Soma dos Valores:
+--------------+------------------+
|tipo_transacao|       total_valor|
+--------------+------------------+
| TRANSFERENCIA|10953.779999999999|
|      DEPOSITO|22857.230000000003|
|     PAGAMENTO|           -350.62|
+--------------+------------------+

