# Settings

In [2]:
from delta.tables import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

StatementMeta(, 68a092a1-70ca-4f31-90fe-2eb556562b9a, 3, Finished, Available, Finished)

In [1]:
%%configure -f
{
    'conf': {
        'spark.native.enabled': 'true',
        'spark.shuffle.manager': 'org.apache.spark.shuffle.sort.ColumnarShuffleManager'
    }
}

StatementMeta(, 68a092a1-70ca-4f31-90fe-2eb556562b9a, -1, Finished, Available, Finished)

# Criando e inserindo valores na tabela

**Em SQL**

In [6]:
%%sql
CREATE TABLE people
(
    ID INT NOT NULL,
    nome CHAR(50),
    idade INT NOT NULL,
    cidade VARCHAR(70)
)
USING DELTA
LOCATION 'Files/Data/people'

StatementMeta(, f4fab8a8-c0db-49d0-bd93-eea7117913af, 8, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [7]:
%%sql

INSERT INTO people VALUES
(1, 'Alice', 25, 'São Paulo'),
(2, 'Bob', 40, 'Rio de Janeiro'),
(3, 'Carlos', 30, 'Curitiba'),
(4, 'Diana', 27, 'Recife'),
(5, 'Eduardo', 35, 'Salvador'),
(6, 'Fernanda', 50, 'Brasília'),
(7, 'Gabriel', 22, 'Fortaleza'),
(8, 'Helena', 43, 'Porto Alegre'),
(9, 'Igor', 37, 'Manaus'),
(10, 'Julia', 28, 'Belo Horizonte')

StatementMeta(, f4fab8a8-c0db-49d0-bd93-eea7117913af, 9, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

**Em Pyspark**

In [9]:
DeltaTable.create(spark) \
.tableName('people_pyspark') \
.addColumn('ID', 'INT') \
.addColumn('nome', 'STRING') \
.addColumn('idade', 'INT') \
.addColumn('cidade', 'STRING') \
.execute()

StatementMeta(, f4fab8a8-c0db-49d0-bd93-eea7117913af, 11, Finished, Available, Finished)

<delta.tables.DeltaTable at 0x794febd66bd0>

In [80]:
dados = [
    (1, 'Alice', 25, 'São Paulo'),
    (2, 'Bob', 40, 'Rio de Janeiro'),
    (3, 'Carlos', 30, 'Curitiba'),
    (4, 'Diana', 27, 'Recife'),
    (5, 'Eduardo', 35, 'Salvador'),
    (6, 'Fernanda', 50, 'Brasília'),
    (7, 'Gabriel', 22, 'Fortaleza'),
    (8, 'Helena', 43, 'Porto Alegre'),
    (9, 'Igor', 37, 'Manaus'),
    (10, 'Julia', 28, 'Belo Horizonte')
]

schema = StructType([
    StructField('ID', IntegerType()),
    StructField('nome', StringType()),
    StructField('idade', IntegerType()),
    StructField('cidade', StringType())
])

df = spark.createDataFrame(data=dados, schema=schema)
df.write.mode('append').saveAsTable('people_pyspark')

StatementMeta(, 8df28bb2-f8c3-41cc-b432-24682f10503c, 81, Finished, Available, Finished)

## Em caso de não precisar criar uma nova tabela

In [57]:
df = spark.read.format('delta').table('people')

StatementMeta(, 8df28bb2-f8c3-41cc-b432-24682f10503c, 58, Finished, Available, Finished)

# Analisando Tabela

In [4]:
# Visualizando a tabela
display(df)

StatementMeta(, 8df28bb2-f8c3-41cc-b432-24682f10503c, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 63b86a65-a6aa-47d2-8eef-8ba2febb5342)

In [5]:
# selecionando apenas algumas colunas
display(df.select(['nome', 'idade']))

StatementMeta(, 8df28bb2-f8c3-41cc-b432-24682f10503c, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 49d8a4c7-b289-4431-840e-06448a8c24a7)

In [7]:
# Selecionando as colunas onde atendem uma certa condição (filtro)
display(
    df.select(['nome', 'idade']) \
    .where(df['idade'] > 30)
)

StatementMeta(, 8df28bb2-f8c3-41cc-b432-24682f10503c, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8b0a6ae3-51d5-4a07-be60-c9381ac737b4)

In [87]:
# Criando nova coluna de faixa de dados usando lógica condicional, com .withColumn para criar a coluna nova
# when para encapsular nossas condições e os valores de saída e .otherwise que funciona como um else

df = df.withColumn('categoria', when(col('idade') <= 25, 'Jovem') \
    .when((col('idade') > 25) & (col('idade') < 50), 'adulto') \
    .otherwise('Senior')
)

StatementMeta(, 8df28bb2-f8c3-41cc-b432-24682f10503c, 88, Finished, Available, Finished)

In [29]:
# Agora com a coluna de categorias exibo o DataFrame Inteiro ordenado pela idade de forma decrescente
display(df.orderBy('idade', ascending=False))

StatementMeta(, 8df28bb2-f8c3-41cc-b432-24682f10503c, 30, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b803d1bb-c4d4-4c07-b521-78ceb8712013)

In [59]:
# Agora, salvamos a tabela no metastore do spark e no lakehouse, na pasta de 'Tabelas'
df.write.format('delta').saveAsTable('Clientes')

StatementMeta(, 8df28bb2-f8c3-41cc-b432-24682f10503c, 60, Finished, Available, Finished)

In [60]:
# Testanto alteração de valores juntamente com lógica condicional

df = df.withColumn('cidade', when(col('cidade') == 'São Paulo', 'SP').otherwise(col('cidade')))

StatementMeta(, 8df28bb2-f8c3-41cc-b432-24682f10503c, 61, Finished, Available, Finished)

In [62]:
# utilizando .mode('overwrite') para sobreescrever os dados na tabela salva 'Clientes'
df.write.format('delta').mode('overwrite').saveAsTable('Clientes')

StatementMeta(, 8df28bb2-f8c3-41cc-b432-24682f10503c, 63, Finished, Available, Finished)

In [88]:
# Construindo um novo esquema de estruturas de colunas para criamos um dataframe com dados novos e conseguirmos
# adicioná-los a tabela salva 'Clientes'.
schema = StructType([
    StructField('ID', IntegerType()),
    StructField('nome', StringType()),
    StructField('idade', IntegerType()),
    StructField('cidade', StringType()),
    StructField('categoria', StringType())
])

# Criamos novas linhas inserindo os dados numa estrutura de 'lista de tuplas', inserimo-os em spark.createDataFrame com
# o esquema especificado e então criamos outro novo DataFrame, que será a união dos dois até o momento.dados

novas_linhas = [(11, 'juan', 19, 'SP', 'jovem'), (12, 'vitoria', 24, 'SP', 'jovem')]
df_novas_linhas = spark.createDataFrame(novas_linhas, schema=schema)
df_completo = df.union(df_novas_linhas)

StatementMeta(, 8df28bb2-f8c3-41cc-b432-24682f10503c, 89, Finished, Available, Finished)

In [70]:
# Confirmando a existencia das novas linhas e treinando agrupamento.
display(df_completo.groupBy('cidade').count())

StatementMeta(, 8df28bb2-f8c3-41cc-b432-24682f10503c, 71, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e8aa42e7-52c9-4a5b-812f-cd35d3214676)

In [89]:
# Utilizando .mode('append') para adicionar linhas a tabela salva 'Clientes'
df_novas_linhas.write.format('delta').mode('append').saveAsTable('Clientes')

StatementMeta(, 8df28bb2-f8c3-41cc-b432-24682f10503c, 90, Finished, Available, Finished)

# Histórico e *'Viagem no tempo'*

In [71]:
%%sql


--- visualizando o histórico de mudanças ocorridas na tabela 'Clientes'.

DESCRIBE HISTORY Clientes

StatementMeta(, 8df28bb2-f8c3-41cc-b432-24682f10503c, 72, Finished, Available, Finished)

<Spark SQL result set with 4 rows and 15 fields>

**o histórico de mudanças de uma tabela delta fica armazenado numa pasta interna de sua estrutura chamada _delta_log, O que além de nos possibilitar visualizar as versões de uma tabela, também nos permite acessar versões específicas dela, usando a versão ou a data da mudança.**

In [74]:
# Acessando a versão da mudança com .option('versionAsOf', versão), acessando a tabela existente com .table

df_v0 = spark.read.format('csv').option('versionAsOf', 0).table('Clientes')
df_v0.show()

StatementMeta(, 8df28bb2-f8c3-41cc-b432-24682f10503c, 75, Finished, Available, Finished)

+---+--------------------+-----+--------------+---------+
| ID|                nome|idade|        cidade|categoria|
+---+--------------------+-----+--------------+---------+
|  3|Carlos           ...|   30|      Curitiba|   adulto|
|  7|Gabriel          ...|   22|     Fortaleza|    Jovem|
|  1|Alice            ...|   25|     São Paulo|    Jovem|
|  2|Bob              ...|   40|Rio de Janeiro|   adulto|
|  6|Fernanda         ...|   50|      Brasília|   Senior|
|  8|Helena           ...|   43|  Porto Alegre|   adulto|
|  4|Diana            ...|   27|        Recife|   adulto|
|  5|Eduardo          ...|   35|      Salvador|   adulto|
|  9|Igor             ...|   37|        Manaus|   adulto|
| 10|Julia            ...|   28|Belo Horizonte|   adulto|
+---+--------------------+-----+--------------+---------+



In [4]:
# Acessando a versão específica da mudança com .option('timeStampAsOf', timestamp)

df_16_47 = spark.read.format('delta').option('timeStampAsOf', '2024-11-23T16:47:38.627Z').table('Clientes')
df_16_47.show()

StatementMeta(, 68a092a1-70ca-4f31-90fe-2eb556562b9a, 5, Finished, Available, Finished)

+---+--------------------+-----+--------------+---------+
| ID|                nome|idade|        cidade|categoria|
+---+--------------------+-----+--------------+---------+
|  3|Carlos           ...|   30|      Curitiba|   adulto|
|  7|Gabriel          ...|   22|     Fortaleza|    Jovem|
|  1|Alice            ...|   25|     São Paulo|    Jovem|
|  2|Bob              ...|   40|Rio de Janeiro|   adulto|
|  6|Fernanda         ...|   50|      Brasília|   Senior|
|  8|Helena           ...|   43|  Porto Alegre|   adulto|
|  4|Diana            ...|   27|        Recife|   adulto|
|  5|Eduardo          ...|   35|      Salvador|   adulto|
|  9|Igor             ...|   37|        Manaus|   adulto|
| 10|Julia            ...|   28|Belo Horizonte|   adulto|
+---+--------------------+-----+--------------+---------+

