# Delta Lake - Cidades Brasileiras

Este notebook demonstra a criação, leitura e manipulação de uma tabela Delta usando o dataset `cidades_brasileiras.csv`.

## Configuração Inicial do DELTA

Faz a importação correta do pyspark para ativar a sessão SparkSession, além de importar os tipos de dados como String, Double, Int...

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

from delta import *

import logging

logging.getLogger("py4j").setLevel(logging.DEBUG)

Logo após temos o builder inicial da Spark Session com as packages necessárias

In [2]:
spark = (
    SparkSession
    .builder
    .master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

25/04/24 11:37:12 WARN Utils: Your hostname, edsatc resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/04/24 11:37:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/ed/.cache/pypoetry/virtualenvs/eng-dados-spark-EC3tKTXJ-py3.11/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ed/.ivy2/cache
The jars for the packages stored in: /home/ed/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-89a93d06-a9e0-4e0c-bd50-0f4780420911;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.2.0 in central
	found io.delta#delta-storage;3.2.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 298ms :: artifacts dl 16ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.2.0 from central in [default]
	io.delta#delta-storage;3.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0

In [13]:
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("cidade", StringType(), True),
    StructField("estado", StringType(), True),
    StructField("sigla", StringType(), True),
    StructField("ibge", IntegerType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True)
])

## Criação do Schema e import do dataset

In [4]:
df = spark.read.csv("../data/cidades_brasileiras.csv", header=True, schema=schema)
df.show(5)

                                                                                

+---+-------------------+------------+-----+-------+----------+----------+
| id|             cidade|      estado|sigla|   ibge|  latitude| longitude|
+---+-------------------+------------+-----+-------+----------+----------+
|  1|    ABADIA DE GOIÁS|       GOIÁS|   GO|5200050|-16.757264| -49.44122|
|  2|ABADIA DOS DOURADOS|MINAS GERAIS|   MG|3100104|-18.491063|-47.406365|
|  3|          ABADIÂNIA|       GOIÁS|   GO|5200100|-16.194723|-48.706812|
|  4|             ABAETÉ|MINAS GERAIS|   MG|3100203|-19.156683|-45.448121|
|  5|         ABAETETUBA|        PARÁ|   PA|1500107| -1.721828|-48.878843|
+---+-------------------+------------+-----+-------+----------+----------+
only showing top 5 rows



## Criação da Tabela
Os dados são lidos de um arquivo CSV, convertidos em DataFrame com schema definido e salvos no formato Delta Lake. A tabela cidades_delta é criada apontando para o diretório com os dados no formato Delta.

In [5]:
df.write.format("delta").mode("overwrite").saveAsTable("cidades_delta")
spark.sql("DROP TABLE IF EXISTS cidades_delta")
spark.sql("CREATE TABLE cidades_delta USING DELTA LOCATION '../output/delta/cidades'")

                                                                                

DataFrame[]

In [6]:
spark.sql("SHOW TABLES").show()



+---------+-------------+-----------+
|namespace|    tableName|isTemporary|
+---------+-------------+-----------+
|  default|cidades_delta|      false|
+---------+-------------+-----------+



In [7]:
spark.sql("SELECT * FROM cidades_delta").show(5)


25/04/24 11:37:57 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+---+-------------------+------------+-----+-------+----------+----------+
| id|             cidade|      estado|sigla|   ibge|  latitude| longitude|
+---+-------------------+------------+-----+-------+----------+----------+
|  1|    ABADIA DE GOIÁS|       GOIÁS|   GO|5200050|-16.757264| -49.44122|
|  2|ABADIA DOS DOURADOS|MINAS GERAIS|   MG|3100104|-18.491063|-47.406365|
|  3|          ABADIÂNIA|       GOIÁS|   GO|5200100|-16.194723|-48.706812|
|  4|             ABAETÉ|MINAS GERAIS|   MG|3100203|-19.156683|-45.448121|
|  5|         ABAETETUBA|        PARÁ|   PA|1500107| -1.721828|-48.878843|
+---+-------------------+------------+-----+-------+----------+----------+
only showing top 5 rows



## INSERT
Este comando adiciona uma nova linha à tabela Delta Lake, inserindo uma cidade fictícia no conjunto de dados existente.

In [8]:
spark.sql("""
    INSERT INTO cidades_delta VALUES
    (9999, 'Cidade Exemplo', 'Estado Exemplo', 'EX', 9999999, -10.1234, -50.5678)
""")

spark.sql("SELECT * FROM cidades_delta WHERE id = 9999").show()


                                                                                

+----+--------------+--------------+-----+-------+--------+---------+
|  id|        cidade|        estado|sigla|   ibge|latitude|longitude|
+----+--------------+--------------+-----+-------+--------+---------+
|9999|Cidade Exemplo|Estado Exemplo|   EX|9999999|-10.1234| -50.5678|
+----+--------------+--------------+-----+-------+--------+---------+



## UPDATE
Atualiza o valor dos campos latitude e longitude na linha onde o id é 9999. Isso demonstra como realizar atualizações em tabelas Delta usando comandos SQL.

In [14]:
spark.sql("""
    UPDATE cidades_delta
    SET latitude = -11.0000, longitude = -51.0000
    WHERE id = 9999
""")

spark.sql("SELECT * FROM cidades_delta WHERE id = 9999").show()

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/ed/.cache/pypoetry/virtualenvs/eng-dados-spark-EC3tKTXJ-py3.11/lib/python3.11/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ed/.cache/pypoetry/virtualenvs/eng-dados-spark-EC3tKTXJ-py3.11/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ed/.cache/pypoetry/virtualenvs/eng-dados-spark-EC3tKTXJ-py3.11/lib/python3.11/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
INFO:py4j.clientserver:Closing down c

Py4JError: PythonUtils does not exist in the JVM

DEBUG:py4j.java_gateway:Exception while garbage collecting an object
Traceback (most recent call last):
  File "/home/ed/.cache/pypoetry/virtualenvs/eng-dados-spark-EC3tKTXJ-py3.11/lib/python3.11/site-packages/py4j/java_gateway.py", line 969, in garbage_collect_object
    self.send_command(
  File "/home/ed/.cache/pypoetry/virtualenvs/eng-dados-spark-EC3tKTXJ-py3.11/lib/python3.11/site-packages/py4j/java_gateway.py", line 1036, in send_command
    connection = self._get_connection()
                 ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ed/.cache/pypoetry/virtualenvs/eng-dados-spark-EC3tKTXJ-py3.11/lib/python3.11/site-packages/py4j/clientserver.py", line 284, in _get_connection
    connection = self._create_new_connection()
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ed/.cache/pypoetry/virtualenvs/eng-dados-spark-EC3tKTXJ-py3.11/lib/python3.11/site-packages/py4j/clientserver.py", line 291, in _create_new_connection
    connection.connect_to_java_server()
  File "/home/e

INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
DEBUG:py4j.java_gateway:Exception while shutting down a socket
Traceback (most recent call last):
  File "/home/ed/.cache/pypoetry/virtualenvs/eng-dados-spark-EC3tKTXJ-py3.11/lib/python3.11/site-packages/py4j/clientserver.py", line 438, in connect_to_java_server
    self.socket.connect((self.java_address, self.java_port))
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ed/.cache/pypoetry/virtualenvs/eng-dados-spark-EC3tKTXJ-py3.11/lib/python3.11/site-packages/py4j/java_gateway.py", line 503, in quiet_shutdown
    socket_instance.shutdown(socket.SHUT_RDWR)
OSError: [Errno 107] Transport endpoint is not connected
DEBUG:py4j.java_gateway:Exception while garbage collecting an object
Traceback (most recent call last):
  File "/home/ed/.cache/pypoetr

## DELETE
Remove a linha da tabela Delta onde o id é igual a 9999, demonstrando a operação de exclusão de dados em uma tabela Delta.

In [10]:
spark.sql("""
    DELETE FROM cidades_delta
    WHERE id = 9999
""")

spark.sql("SELECT * FROM cidades_delta WHERE id = 9999").show()




+---+------+------+-----+----+--------+---------+
| id|cidade|estado|sigla|ibge|latitude|longitude|
+---+------+------+-----+----+--------+---------+
+---+------+------+-----+----+--------+---------+



                                                                                

## Resultado Final
Carrega novamente os dados da tabela Delta em um dataframe final após as operações e exibe as primeiras linhas para verificar o estado final da tabela.

In [None]:
df_final = spark.read.format("delta").load("../output/delta/cidades")
df_final.show()



+---+-------------------+--------------+-----+-------+----------+----------+
| id|             cidade|        estado|sigla|   ibge|  latitude| longitude|
+---+-------------------+--------------+-----+-------+----------+----------+
|  1|    ABADIA DE GOIÁS|         GOIÁS|   GO|5200050|-16.757264| -49.44122|
|  2|ABADIA DOS DOURADOS|  MINAS GERAIS|   MG|3100104|-18.491063|-47.406365|
|  3|          ABADIÂNIA|         GOIÁS|   GO|5200100|-16.194723|-48.706812|
|  4|             ABAETÉ|  MINAS GERAIS|   MG|3100203|-19.156683|-45.448121|
|  5|         ABAETETUBA|          PARÁ|   PA|1500107| -1.721828|-48.878843|
|  6|            ABAIARA|         CEARÁ|   CE|2300101| -7.360781|-39.048788|
|  7|             ABAÍRA|         BAHIA|   BA|2900108| -13.25033|-41.664034|
|  8|              ABARÉ|         BAHIA|   BA|2900207| -8.723991|-39.113969|
|  9|             ABATIÁ|        PARANÁ|   PR|4100103|-23.305297|-50.310253|
| 10|      ABDON BATISTA|SANTA CATARINA|   SC|4200051|-27.612419|-51.022992|

                                                                                

25/04/24 15:41:04 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 10899530 ms exceeds timeout 120000 ms
25/04/24 15:41:04 WARN SparkContext: Killing executors is not supported by current scheduler.
25/04/24 15:41:04 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint