### Nota importante!

Este notebook funcionou com uma versão 12.2 LTS Runtime!

Não há preocupações quanto ao tamanho do cluster (memória e núcleos).

O objetivo deste notebook é apenas mostrar a versão dos comandos SQL para Python, **sempre use a versão SQL como referência**, como foi o usado durante o curso Databricks SQL.

## Comandos mágicos do Databricks Notebook

| Comando | Resultado |
|--------|--------|
| `%lsmagic` | Lista todos os comandos mágicos. |
| `%md` | Célula como um Markdown. |
| `%sql` | UsarSQL. |
| `%python` | Utilize Python. |
| `%scala` | Usar Scala. |
| `%r` | Usar R. |
| `%run` | Executa um arquivo Python ou outro notebook. |
| `%who` | Mostra todas as variáveis. |
| `%env` | Permite inserir variáveis ​​de ambiente. |
| `%fs` | Permite usar comandos do sistema de arquivos dos utilitários DBFD. |
| `%sh` | Executa comandos Shell no cluster. |
| `%matplotlib` | Recursos de back-end do Matplotlib. |
| `%config` | Pode definir configurações para notebook. |
| `%pip` | Instale pacotes Python. |
| `%load` | Carrega o conteúdo de um arquivo em uma célula. |
| `%reload` | Recarrega o conteúdo. |
| `%jobs` | Lista trabalhos em execução. |

## O que é Apache Spark?

Apache Spark é um mecanismo de processamento capaz de analisar dados usando SQL, Python, Scala, R e Java. Também possui estruturas para permitir aprendizado de máquina, processamento de gráficos ou streaming:

![Spark Engines](https://github.com/tiluan/trn-databricks-sql-zero-to-hero/blob/main/images/spark-engine.png?raw=true)
<br/>
<br/>

* A API DataFrames funciona devido a uma abstração acima dos RDDs, porém ganhando algo em torno de 5 a 20x em relação aos RDDs tradicionais com seu Catalyst Optimizer.

![Spark Unified Engine](https://github.com/tiluan/trn-databricks-sql-zero-to-hero/blob/main/images/unified-engine.png?raw=true)


## Arquitetura Spark Cluster: Drivers, Executores, Slots e Tarefas
![Spark Physical Cluster, slots](https://github.com/tiluan/trn-databricks-sql-zero-to-hero/blob/main/images/spark-driver.png?raw=true)

Ao criar um cluster, você pode escolher entre **nó único** ou **vários nós**, onde deve incluir o tipo de máquina do driver e também o tipo de máquina de trabalho


## Spark Jobs, avaliação preguiçosa, transformações e ações
![Spark Jobs, Lazy Evaluation, Transformations & Actions](https://github.com/tiluan/trn-databricks-sql-zero-to-hero/blob/main/images/spark_bookclub.png?raw=true)


* Toda execução do Apache Spark é classificada como Spark Job, que posteriormente é dividida em etapas que irão agrupar tarefas (menor unidade de execução de trabalho).
* Como o Apache Spark é anexado ao modelo Lazy Evaluation, cada processo pode ser classificado em **ação** OU **transformação**.
* Cada Spark Job é gerado por um comando de ação, enquanto os estágios serão organizados devido a funções de consulta embaralhadas e de otimização.
* Embaralhar? Sim, as transformações Spark também podem ser divididas em: **transformações estreitas** e **transformações amplas (shuffle)**.


## Alguns exemplos de Spark Action

| Comando | Resultado |
|--------|--------|
| `collect()` | Retorna uma matriz que contém todas as linhas deste conjunto de dados. |
| `count()` | Retorna o número de linhas no conjunto de dados. |
| `first()` | Retorna a primeira linha. |
| `foreach(f)` | Aplica uma função f a todas as linhas. |
| `head()` | Retorna a primeira linha. |
| `show(..)` | Exibe as 20 principais linhas do conjunto de dados em formato tabular. |
| `take(n)` | Retorna as primeiras n linhas do conjunto de dados. |



## Alguns exemplos de transformação Spark

| Comando | Tipo | Resultado |
|--------|--------|-------------|
| `filter()` | Estreito | Retorna um novo DataFrame após aplicar a função de filtro no conjunto de dados de origem. |
| `distinct()` | Largo | Retorna um novo DataFrame contendo as linhas distintas neste DataFrame. |
| `join()` | Largo | Junta-se a outro DataFrame, usando a expressão de junção fornecida. |
| `union()` | Estreito | Combina dois DataFrames e retorna o novo DataFrame. |
| `repartition()` | Largo | Retorna um novo DataFrame (particionado por hash) particionado pelas expressões de particionamento fornecidas. |
| `groupBy()` | Largo | Agrupa o DataFrame usando as colunas especificadas, para que possamos executar a agregação nelas. |

## O que é Delta Lake?

![Delta Lake](https://github.com/tiluan/trn-databricks-sql-zero-to-hero/blob/main/images/delta-lake.png?raw=true)

* Delta Lake é uma estrutura de armazenamento de código aberto que permite construir uma arquitetura Lakehouse com mecanismos de computação, atuando como um **formato de tabela nativo em Databricks**, usando Parquet como formato físico para anexar todos os dados.
* As tabelas Delta são criadas com base nas **garantias ACID** fornecidas pelo protocolo Delta Lake de código aberto. ACID significa atomicidade, consistência, isolamento e durabilidade.
* O Delta Lake pode implementar as propriedades ACID devido aos **controles de log de transações** salvos durante **cada confirmação** nos dados. Durante uma transação, os arquivos de dados são gravados no diretório de arquivos que suporta a tabela. Quando a transação for concluída, uma nova entrada será confirmada no log de transações que inclui os caminhos para todos os arquivos gravados durante a transação. Cada commit incrementa a versão da tabela e torna novos arquivos de dados visíveis para operações de leitura.

### Criando uma tabela Delta

In [0]:
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
table_name = "people_10millions"
df.write.mode("overwrite").saveAsTable(table_name)

In [0]:
dbutils.fs.ls('dbfs:/databricks-datasets/learning-spark-v2/people/')

### Lendo uma tabela Delta

In [0]:
df_people = spark.read.table("default.people_10millions")
display(df_people.take(5))

In [0]:
%fs
ls 'dbfs:/databricks-datasets/learning-spark-v2/people/'

In [0]:
display(df_people.count())

### Obtendo detalhes sobre a tabela Delta (propriedades, tamanho, localização, etc.)

In [0]:
display(df_people.describe())

In [0]:
df_people = spark.read.format("delta").table("people_10millions")
df_people.createOrReplaceTempView("people_10millions")
display(spark.sql("DESCRIBE TABLE EXTENDED people_10millions"))

### Inspecionando o histórico da tabela Delta (criação, upserts, exclusões, etc)

In [0]:
%sql
DESCRIBE DETAIL people_10millions;

In [0]:
%sql
DESCRIBE HISTORY hive_metastore.default.people_10millions;

## Alterando Tabela Delta

#### Inserindo novos dados

In [0]:
from pyspark.sql.functions import when

df_people = df_people.withColumn("gender", when(df_people.gender == "M", "H")
                                        .when(df_people.gender == "F", "M")
                                        .otherwise(df_people.gender))

#### Removendo linhas/valores

In [0]:
# df_people.delete("birthDate >= '2000-01-01'")
# DELETE FROM hive_metastore.default.people_10millions WHERE birthDate >= '2000-01-01'

df_people = df_people.filter("birthDate < '2000-01-01'")

In [0]:
display(df_people.count())

### Inspecionando novamente o histórico da tabela Delta (criação, upserts, exclusões, etc)

In [0]:
%sql
DESCRIBE HISTORY hive_metastore.default.people_10millions;

-- display(df_people.history())

### Resgatando através da Viagem no Tempo para recuperar a versão antiga da Tabela Delta

In [0]:
%sql
--SELECT * FROM hive_metastore.default.people_10millions VERSION AS OF 0
CREATE OR REPLACE TABLE hive_metastore.default.people_10millions AS SELECT * FROM hive_metastore.default.people_10millions VERSION AS OF 2

-- spark.sql("CREATE OR REPLACE TABLE hive_metastore.default.people_10millions AS SELECT * FROM hive_metastore.default.people_10millions VERSION AS OF 0")

In [0]:
df_people.describe()

In [0]:
display(df_people.describe())

In [0]:
dbutils.fs.ls('dbfs:/user/hive/warehouse/people_10millions/_delta_log')

In [0]:
spark.read.json('/user/hive/warehouse/people_10millions/_delta_log/00000000000000000003.json')

### Limpando e se livrando da história

In [0]:
%sql
VACUUM hive_metastore.default.people_10millions
-- df_people.vacuum()

In [0]:
%sql
SELECT COUNT(1) FROM hive_metastore.default.people_10millions

--display(df_people.count())