# Data Lakehouse Medallion - LiveLabs
## SUMÁRIO:
---

- <a href='#prerequisites'>1. Pré-requisitos</a>
    - <a href='#prerequisites_policies'>1.1 Políticas</a>
    - <a href='#prerequisites_helpers'>1.2 Helpers</a>
    - <a href='#prerequisites_authentication'>1.3 Autenticação</a>
    - <a href='#prerequisites_variables'>1.4 Variáveis</a>   
- <a href='#dataflow_spark_magic'>2. Data Flow Spark Magic</a>
    - <a href='#dataflow_spark_magic_comands'>2.1 Carregando Comandos do Data Flow Spark Magic</a>
- <a href='#create_session'>3. Criando uma sessão</a>
    - <a href='#use_session'>3.1 Utilizando sessão existente</a>
- <a href='#basic_spark_comands'>4. Comandos Spark Básicos</a>
- <a href='#dataset_information'>5.Visão Geral do Conjunto de Dados</a>
    - <a href='#FDA_information'>5.1 Informações Gerais [FDA - Food & Drug Administration]</a>
    - <a href='#dataset_dictionary'>5.2 Dicionário de Dados</a>
- <a href='#import_json'>6. Importando arquivos JSON - <span style="color:#CD7F32">**Camada Bronze**</span></a>
    - <a href='#read_json'>6.1 Configuração de Leitura de Arquivos JSON no Spark</a>
    - <a href='#verification_null'>5.2 Verificação de Valores Nulos</a>
- <a href='#sql_spark'>7. Exploração e Tratamento - SQL Spark</a>
   
    

<a id='prerequisites'></a>
## 1.Pré-Requisitos

As sessões do Data Flow são acessíveis por meio do seguinte [Conda Environment](https://docs.oracle.com/en-us/iaas/data-science/using/conda-pyspark-fam.htm).: **PySpark 3.2 e Data Flow CPU no Python 3.8 (versão 3.0)**


<a id='prerequisites_policies'></a>
### 1.1 Políticas

Esta seção aborda a criação de grupos dinâmicos e políticas necessárias para usar o serviço.

* [Data Flow Policies](https://docs.oracle.com/iaas/data-flow/using/policies.htm/)
* [Getting Started with Data Flow](https://docs.oracle.com/iaas/data-flow/using/dfs_getting_started.htm)
* [About Data Science Policies](https://docs.oracle.com/iaas/data-science/using/policies.htm)
* [Data Catalog Metastore](https://docs.oracle.com/en-us/iaas/data-catalog/using/metastore.htm)

<a id='prerequisites_helpers'></a>
### 1.2 Helpers

Nesta seção, estamos criando uma função simples para facilitar a manipulação dos argumentos e parâmetros ao configurar a sessão no Data Flow Studio. Essa função é usada em todo o notebook para preparar os argumentos necessários para os comandos mágicos. É especialmente útil quando você deseja passar variáveis Python como argumentos para os comandos mágicos do Spark.

In [1]:
import json
def prepare_command(command: dict) -> str:
    """Converts dictionary command to the string formatted commands."""
    return f"'{json.dumps(command)}'"

<a id='prerequisites_authentication'></a>
### 1.3 Autenticação

O [Oracle Accelerated Data Science SDK (ADS)](https://docs.oracle.com/iaas/tools/ads-sdk/latest/index.html) controla o mecanismo de autenticação com o cluster Spark da Sessão de Fluxo de Dados. Para configurar a autenticação, use ```ads.set_auth("resource_principal")``` ou ```ads.set_auth("api_key")```


In [2]:
#Importando a biblioteca ADS e realizando a autenticação.
import ads
ads.set_auth("resource_principal")

<a id='prerequisites_variables'></a>
### 1.4 Variáveis

Importamos variáveis de ambiente da OCI Data Science Notebook através da biblioteca **os**. Para criar e executar uma sessão do Data Flow, você deve especificar um ```<compartment_id>``` e um bucket ```<logs_bucket_uri>``` para armazenar os logs. Esses recursos devem estar no mesmo compartimento.

In [3]:
import os
compartment_id = os.environ.get("NB_SESSION_COMPARTMENT_OCID") #identificando o compartimento da OCI em utilização
logs_bucket_uri = "oci://bucket_livelabs_logs@id3kyspkytmr" #definindo o bucket para armazenamento de logs

<a id='dataflow_spark_magic'></a>
## 2. Data Flow Spark Magic


Por meio da API REST do Livy, ele fornece um conjunto de comandos mágicos de células do Jupyter Notebook para transformar o Jupyter em um ambiente de desenvolvimento Spark integrado para clusters remotos.

Data Flow Magic permite que você:

* Execute código Spark em um cluster Spark remoto do Data Flow.
* Crie uma sessão Spark do Data Flow com SparkContext e HiveContext em um cluster Spark remoto do Data Flow.
* Capture a saída de consultas Spark como um DataFrame Pandas local para interagir facilmente com outras bibliotecas Python (por exemplo, matplotlib).

<a id='dataflow_spark_magic_comands'></a>
### 2.1. Carregando Comandos do Data Flow Spark Magic


O Data Flow Spark Magic é uma extensão do JupyterLab que você precisa ativar em seu notebook usando o comando mágico ```%load_ext dataflow.magics.```
Após a ativação da extensão, o comando ```%help``` pode ser usado para obter a lista de comandos suportados.

In [4]:
%load_ext dataflow.magics

O comando ```%help``` fornece uma lista de todos os comandos disponíveis, juntamente com uma lista de seus argumentos e exemplos de chamadas.

In [5]:
%help

Magic,Example,Explanation
create_session,"%create_session -l python -c '{""compartmentId"":""Data Flow Run resource compartment OCID"",""displayName"":""SessionApp"",""sparkVersion"":""3.2.1"",""driverShape"":""VM.Standard2.1"",""executorShape"":""VM.Standard2.1"",""numExecutors"":1,""archiveUri"":""Object Storage URL for Data Flow zip archive."",""metastoreId"":""optional metastore OCID"",""configuration"":{ ""spark.archives"":""oci://bucket@namespace/path/to/conda/pack"", #optional property to use Dataflow 'Run' resource to access OCI resources.  ""dataflow.auth"":""resource_principal"" }}'","Creates new session by providing session details. Example command for Flex shapes :  %create_session -l python -c '{""compartmentId"":""Data Flow Run resource compartment OCID"",""displayName"":""SessionApp"",""sparkVersion"":""3.2.1"",""driverShape"":""VM.Standard.E4.Flex"",""executorShape"":""VM.Standard.E4.Flex"",""numExecutors"":1,""driverShapeConfig"":{""ocpus"":1,""memoryInGBs"":16},""executorShapeConfig"":{""ocpus"":1,""memoryInGBs"":16}}'  Example command for Spark dynamic allocation :  %create_session -l python -c '{""compartmentId"":""Data Flow Run resource compartment OCID"",""displayName"":""SessionApp"",""sparkVersion"":""3.2.1"",""driverShape"":""VM.Standard2.1"",""executorShape"":""VM.Standard2.1"",""numExecutors"":1,""configuration"":{ ""spark.dynamicAllocation.enabled"":""true"", ""spark.dynamicAllocation.shuffleTracking.enabled"":""true"", ""spark.dynamicAllocation.minExecutors"":""1"", ""spark.dynamicAllocation.maxExecutors"":""4"", ""spark.dynamicAllocation.executorIdleTimeout"":""60"", ""spark.dynamicAllocation.schedulerBacklogTimeout"":""60"", ""spark.dataflow.dynamicAllocation.quotaPolicy"":""min"" }}'"
use_session,%use_session -s {sessionId},To use already existing active session.The force flag -f is mandatory to create a new session if no active session found.
status,%status,Outputs current session status.
update_session,"%update_session -i '{""maxDurationInMinutes"": 4896,""idleTimeoutInMinutes"": 4888}'",Updates current active session[not session config] for max duration or idle time out.
stop_session,%stop_session,Stops current active session. One active session should be associated with current notebook to stop.
config,%config,Outputs current session configuration.
configure_session,"%configure_session -i '{""driverShape"": ""VM.Standard2.1"", ""executorShape"": ""VM.Standard2.1"", ""numExecutors"": 1}'","Configures the session creation parameters. The force flag -f is mandatory for immediate effect of the config change, in that case session will be dropped and recreated."
spark,%%spark -o df df = spark.read.parquet('...,"Executes spark commands.  Parameters:  -o VAR_NAME: The Spark dataframe of name VAR_NAME will be available in the %%local Python context as a  Pandas dataframe with the same name.  -m METHOD: Sample method, either take or sample.  -n MAXROWS: The maximum number of rows of a dataframe that will be pulled from Dataflow to Jupyter.  If this number is negative, then the number of rows will be unlimited.  -r FRACTION: Fraction used for sampling."


Para acessar as docstrings de qualquer comando mágico e descobrir quais argumentos fornecer, basta adicionar ```?``` no final do comando.

In [6]:
?%create_session

[0;31mDocstring:[0m
::

  %create_session [-l LANGUAGE] [-c CONFIG] [-e ENDPOINT]

optional arguments:
  -l LANGUAGE, --language LANGUAGE
                        Language to use.
  -c CONFIG, --config CONFIG
                        Region to use.
  -e ENDPOINT, --endpoint ENDPOINT
                        Endpoint to use.
[0;31mFile:[0m      ~/conda/pyspark32_p38_cpu_v3/lib/python3.8/site-packages/dataflow/dataflowclientlib/exceptions.py


<a id='create_session'></a>
## 3. Criando uma Sessão

Para criar uma nova sessão de cluster do Data Flow, utilize o comando mágico ```%create_session```.


No exemplo abaixo, a nova sessão de cluster do Data Flow é iniciada com a configuração básica nas formas flexíveis para o driver e os executores.
A [VM.Standard.E4.Flex](https://docs.oracle.com/en-us/iaas/Content/Compute/References/computeshapes.htm) é usada tanto para as formas do driver quanto dos executores. Para cada um, definimos independentemente o número de OCPUs e a quantidade de memória primária.

In [8]:
%create_session -l python -c '{\
        "compartmentId": "ocid1.compartment.oc1..aaaaaaaaoyy7kev7c7dzab3uacklnqjdjfookaqsqq4cekercgy5zifo3wja",\
        "displayName": "DataFlow Studio - Data Lakehouse Livelabs",\
        "language": "PYTHON",\
        "sparkVersion": "3.2.1",\
        "numExecutors": 1,\
        "driverShape": "VM.Standard.E4.Flex",\
        "executorShape": "VM.Standard.E4.Flex",\
        "driverShapeConfig": {"ocpus": 1, "memoryInGBs": 16},\
        "executorShapeConfig": {"ocpus": 1, "memoryInGBs": 16},\
        "logsBucketUri": "oci://bkt_demo_project_logs@id3kyspkytmr",\
        "configuration":{\
          "spark.archives": "oci://bkt_demo_project_logs@id3kyspkytmr/conda_environments/cpu/PySpark 3.2 and Data Flow/3.0/pyspark32_p38_cpu_v3#conda"}}'

Setting up the Cluster..


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Cluster is ready..
Starting Spark application..


Session ID,Kind,State,Current session
ocid1.dataflowapplication.oc1.sa-saopaulo-1.antxeljrtsbrckqa37rysx65mmd3xdp4dstbxl6eckx5p2mgsg3lpbz3rncq,pyspark,IN_PROGRESS,Dataflow Run


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.
SparkContext available as 'sc'.


%configure_session -f -i
'{"configuration": {
"spark.archives": "oci://bkt_demo_project_logs@id3kyspkytmr/conda_environments/cpu/PySpark 3.2 and Data Flow/3.0/pyspark32_p38_cpu_v3#conda",
"spark.oracle.datasource.enabled":"true"}}'

<a id='use_session'></a>
### 3.1 Utilizando sessão existente

Para se conectar à sessão existente, utilize o comando mágico `%use_session`. Esse comando permite que você reutilize uma sessão já criada e ativa para continuar trabalhando nela sem a necessidade de criar uma nova sessão.

In [None]:
%use_session -s 'ocid1.datasciencenotebooksession.oc1.sa-saopaulo-1.amaaaaaatsbrckqa6sd5ao4olxfzddu2yuf4j3njroxkbneizp3omywlnklq' -f

Utilize o comando mágico `%status` para verificar o status da sessão atual.

In [None]:
%status

Utilize o comando mágico `%config`  para visualizar a configuração da sessão atual. Este comando pode fornecer informações detalhadas sobre a configuração da sessão em uso.

In [None]:
%config

Para encerrar a sessão atual, utilize o comando mágico `%stop_session.`

In [None]:
%stop_session

<a id='basic_spark_comands'></a>
## 4. Comandos Spark Básicos

A variável sc representa o contexto Spark e está disponível quando o comando mágico %%spark é usado no ambiente do Data Flow Spark Magic. A célula a seguir é um exemplo simples de como usar sc em uma célula do Data Flow Spark Magic. A célula chama o método parallelize, que cria um RDD (Resilient Distributed Dataset), chamado numbers, a partir de uma lista de números. Em seguida, informações sobre o RDD são impressas. O método toDebugString retorna uma descrição do RDD.

In [9]:
%%spark
print(f'A versão do Spark em execução no cluster do Data Flow Studio é: {sc.version}')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

A versão do Spark em execução no cluster do Data Flow Studio é: 3.2.1

<a id='dataset_information'></a>

## 5.Visão Geral do Conjunto de Dados

---
<span style="color:blue">**Importação/exportação do Brasil desde 1996 a 2023**</span>

---
O conjunto de dados a seguir é gerado pelo Governo Federal Brasileiro em língua portuguesa e apresenta detalhadamente todos os dados utilizados para construir a balança comercial brasileira, bem como a fonte de dados do sistema Comex Stat, detalhada por NCM ou por Municípios do exportador/importador (e SH4). Os dados estão disponíveis em CSV com os nomes das colunas na primeira linha.

Esta base de dados é detalhada pela NCM e é um arquivo CSV com separador ponto e vírgula (;) detalhado por ano, mês, código NCM, código da unidade estatística, código do país de destino/origem do produto, código de UF do produto origem/destino, código da rota de transporte, código URF de carga/descarga, quantidade estatística, quilograma líquido, valor em dólar FOB (US$).

---

<a id='dataset_dictionary'></a>

### 5.2 Dicionário de Dados

| <span style="color:red">**Campo**</span>| <span style="color:red">**Descrição**</span>|
|-------------------------------------------|-----------|
| **CO_ANO**               | Ano (1997 a 2021) |
| **CO_MES**               | códigos de mês (1:janeiro 12:dezembro) |
| **CO_NCM**               | Código da Nomenclatura Comum do Mercosul - Utilizado para controlar e identificar mercadorias comercializadas no Brasil e demais países do Mercosul (cada NCM representa um produto diferente) |
| **Países do Mercosul**   | O Mercado Comum do Sul, comumente conhecido pela abreviatura espanhola Mercosul e portuguesa Mercosul, é um bloco comercial sul-americano estabelecido pelo Tratado de Assunção em 1991 e pelo Protocolo de Ouro Preto em 1994. Seus membros plenos são Argentina, Brasil, Paraguai e Uruguai. Fonte: Wikipédia |
| **CO_UNID**              | Código da Unidade de Medida Estatística que é uma unidade de medida padrão para cada NCM, podendo ter valores como quilograma, metro, litro, pares, tonelada, entre outros. |
| **CO_PAIS**              | Código do nome do país com o qual foi realizada a operação comercial (importação ou exportação) |
| **SG_UF _NCM**           | Código da Unidade Federativa (estado) de origem (exportação) ou destino (importação) da mercadoria. |
| **CO_VIA**               | Código para identificar o meio de transporte utilizado (aéreo, marítimo, rodoviário, ferroviário e outros). Na exportação, é o meio utilizado para transportar a mercadoria entre o último local de embarque para o exterior. Na importação, configura-se através do meio de acesso da mercadoria no primeiro ponto de entrada no território nacional. |
| **CO_URF**               | Código do órgão responsável pela execução dos trâmites necessários ao desembaraço aduaneiro de mercadorias importadas/exportadas |
| **QT_ESTAT**             | Na discriminação por NCM, cada produto possui sua unidade estatística. A maioria dos produtos tem o peso em quilogramas como unidade estatística, mas existem outras: quilograma líquido, número (unidades), pares, dezenas, mil e tonelada. A tabela completa que relaciona cada NCM à sua unidade estatística pode ser encontrada na tabela “NCM_UNIDADE”. É importante ressaltar que não devem ser somadas quantidades estatísticas de NCM que contenham unidades estatísticas diferentes. [dois] |
| **KG_NET**               | Medida que expressa o peso líquido da mercadoria. Mesmo produtos com quantidades estatísticas diferentes de quilogramas também possuem disponível a medida em quilogramas, referente ao peso líquido da mercadoria, ou seja, mercadoria desconsiderando embalagens, caixas ou qualquer outro transporte adicional. Vale lembrar que essas informações, assim como outras informações prestadas nas operações de comércio exterior, são de livre preenchimento e de responsabilidade exclusiva dos operadores de comércio exterior. [dois] |
| **VL_FOB**               | O valor FOB indica o preço da mercadoria em dólares americanos (USD $) no Incoterm FOB (Free on Board), modalidade em que o vendedor é responsável pelo envio da mercadoria enquanto o comprador paga frete, seguro e outros custos. custos pós-embarque. [dois] |


### Análises sobre as relações comerciais entre Brasil e Mercosul:

1. **Principais produtos (NCM) exportados pelo Brasil para os países do Mercosul**:
    - **Passos**:
        - Agrupar os dados por `CO_NCM`, filtrando pelas operações de exportação e pelos países do Mercosul (CO_PAIS correspondente).
        - Somar o `VL_FOB` para cada NCM.
        - Ordenar em ordem decrescente.

2. **Estado brasileiro que mais importou mercadorias do Mercosul no último ano**:
    - **Passos**:
        - Filtrar os dados pelo ano mais recente e pelas operações de importação dos países do Mercosul.
        - Agrupar por `SG_UF_NCM` e somar o `VL_FOB`.
        - Ordenar os estados em ordem decrescente pelo valor total de importação.

3. **Principal via de transporte utilizada para as importações brasileiras do Mercosul no ano de 2023**:
    - **Passos**:
        - Filtrar os dados pelo ano 2020 e pelas operações de importação dos países do Mercosul.
        - Agrupar por `CO_VIA` e contar o número de operações ou somar o `VL_FOB` para cada meio de transporte.
        - Ordenar em ordem decrescente.

4. **Principais produtos importados pelo Brasil do Mercosul que são medidos em toneladas**:
    - **Passos**:
        - Filtrar os dados pelas operações de importação e pelos produtos com unidade estatística de tonelada (`QT_ESTAT`).
        - Agrupar por `CO_NCM` e somar o `VL_FOB`.
        - Ordenar os produtos em ordem decrescente pelo valor total de importação.

5. **Total pago pelo Brasil em importações do Mercosul no último trimestre de 2021, excluindo os custos pós-embarque**:
    - **Passos**:
        - Filtrar os dados pelo ano 2021 e pelos meses de outubro a dezembro.
        - Somar o `VL_FOB` para todas as operações de importação dos países do Mercosul.

6. **Correlação entre o peso líquido (KG_NET) e o valor FOB (VL_FOB) das importações do Mercosul**:
    - **Passos**:
        - Usar uma análise de correlação estatística entre as variáveis `KG_NET` e `VL_FOB` para todas as importações dos países do Mercosul.

7. **Órgão (CO_URF) com maior atividade de desembaraço aduaneiro em 2023**:
    - **Passos**:
        - Filtrar os dados pelo ano 2023.
        - Agrupar por `CO_URF` e contar o número de operações de desembaraço ou somar o `VL_FOB` para cada órgão.
        - Ordenar em ordem decrescente.

8. **Variação do peso líquido médio (KG_NET) das exportações para os países do Mercosul ao longo dos anos**:
    - **Passos**:
        - Agrupar os dados por ano (`CO_ANO`) e calcular a média de `KG_NET` para cada ano.
        - Plotar um gráfico de linhas para visualizar a tendência ao longo dos anos.


<a id='import_json'></a>
## 6. Importando arquivos JSON - <span style="color:#CD7F32">**Camada Bronze**</span>

---
<span style="color:blue">**Buscando arquivos em Bucket OCI**</span>

---

Na estrutura **oci://bucket_livelabs_bronze@id3kyspkytmr/FDA_enforcement.json**, podemos identificar os seguintes componentes:

**Nome do Bucket: bucket_livelabs_bronze**

Isso representa o nome do bucket de armazenamento. Em sistemas de armazenamento em nuvem, um bucket é um contêiner de nível superior usado para organizar e armazenar dados, como arquivos.

**Namespace: id3kyspkytmr**

O namespace é uma unidade de organização dentro de um sistema de armazenamento em nuvem. Ele atua como um espaço de nomenclatura único para evitar conflitos de nomes de objetos e fornecer uma maneira de acessar recursos dentro do ambiente de armazenamento.

**Nome do Arquivo: BRASIL_EXPORTACAO_LIVELABS.csv**

Este é o nome do arquivo específico localizado dentro do bucket bkt_demo_project_bronze. No contexto desse código, é o arquivo JSON que está sendo lido e processado pelo Spark.

---


<a id='read_json'></a>
### 6.2 Configuração de Leitura de Arquivos CSV no Spark - Bucket OCI

Neste passo, estamos definindo a variável `json_file_path` para armazenar o caminho do arquivo JSON que queremos importar. Estamos buscando esse arquivo de um bucket chamado `oci://bkt_demo_project_bronze@id3kyspkytmr`.
Em seguida, estamos usando o Spark para ler o arquivo JSON e CSV. O Spark é um framework de processamento de big data. Estamos configurando algumas opções para o processo de leitura:

- `multiline` está definido como **true**, o que significa que estamos permitindo que o JSON tenha várias linhas para representar um objeto. Isso é útil quando os objetos JSON estão formatados em várias linhas.

In [10]:
%%spark
csv_file_path = "oci://bucket_livelabs_bronze@id3kyspkytmr/EXPORTACAO_BRASIL_LIVELABS.csv"
df_csv = spark.read.csv(csv_file_path, header=True, inferSchema=True)
df_csv.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+------+--------+-------+-------+---------+------+-------+--------+----------+------+
|CO_ANO|CO_MES|  CO_NCM|CO_UNID|CO_PAIS|SG_UF_NCM|CO_VIA| CO_URF|QT_ESTAT|KG_LIQUIDO|VL_FOB|
+------+------+--------+-------+-------+---------+------+-------+--------+----------+------+
|  2023|     3| 4029900|     10|     63|       RJ|     1| 717600|      11|        11|    59|
|  2023|     8|39199020|     10|    160|       SP|     4| 817600|       1|         1|    51|
|  2023|     4|90183929|     11|     40|       PR|     4| 817600|      25|         4|   578|
|  2023|     7|87089100|     11|    239|       PR|     4| 817600|       1|       114|   800|
|  2023|     3|84671900|     11|     87|       SP|     4| 817600|       4|         4|   593|
|  2023|     1| 2064900|     10|    351|       SC|     1| 927700|   26802|     26802| 39755|
|  2023|     6|84796000|     11|    845|       RS|     7|1017701|      56|       971|  8016|
|  2023|     5|84799090|     10|    589|       RS|     4| 817700|     

In [11]:
%%spark
df_csv.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- CO_ANO: integer (nullable = true)
 |-- CO_MES: integer (nullable = true)
 |-- CO_NCM: integer (nullable = true)
 |-- CO_UNID: integer (nullable = true)
 |-- CO_PAIS: integer (nullable = true)
 |-- SG_UF_NCM: string (nullable = true)
 |-- CO_VIA: integer (nullable = true)
 |-- CO_URF: integer (nullable = true)
 |-- QT_ESTAT: long (nullable = true)
 |-- KG_LIQUIDO: long (nullable = true)
 |-- VL_FOB: integer (nullable = true)

In [19]:
%%spark
json_file_path = "oci://bucket_livelabs_bronze@id3kyspkytmr/CODIGO_PAISES_LIVELABS.json"
df_json = spark.read.option("multiline","true").json(json_file_path)
df_json.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|                data|
+--------------------+
|[{0, ZZZ, 898, Nã...|
+--------------------+

In [20]:
%%spark
df_json.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- CO_PAIS: string (nullable = true)
 |    |    |-- CO_PAIS_ISOA3: string (nullable = true)
 |    |    |-- CO_PAIS_ISON3: string (nullable = true)
 |    |    |-- NO_PAIS: string (nullable = true)
 |    |    |-- NO_PAIS_ESP: string (nullable = true)
 |    |    |-- NO_PAIS_ING: string (nullable = true)

### 6.3 Salvando um DataFrame como um Arquivo Delta na Oracle Cloud Infrastructure (OCI) - Bronze

Este código está salvando o DataFrame `df_json` como um arquivo Delta na Oracle Cloud Infrastructure (OCI) no local especificado. Isso permite que você mantenha seus dados de forma confiável e transac### 5.2 Salvando um DataFrame como um Arquivo Delta na Oracle Cloud Infrastructure (OCI)### 5.2 Salvando um DataFrame como um Arquivo Delta na Oracle Cloud Infrastructure (OCI)ional na OCI para análise e processamento posterior.

In [None]:
%%spark
from delta import *

df_csv.write.format("delta").save("oci://bucket_livelabs_bronze@id3kyspkytmr/delta/csv_data")
df_json.write.format("delta").save("oci://bucket_livelabs_bronze@id3kyspkytmr/delta/json_data")

## 7. Exploração e Tratamento - Silver

In [17]:
%%spark
df_csv_parquet_bronze = spark.read.parquet("oci://bucket_livelabs_bronze@id3kyspkytmr/delta/csv_data/*.parquet", header=True, inferSchema=True)
df_json_parquet_bronze = spark.read.parquet("oci://bucket_livelabs_bronze@id3kyspkytmr/delta/json_data/*.parquet", header=True, inferSchema=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
%%spark
df_json_flatten = flatten_array_struct_df(df_json_parquet_bronze)
df_json_flatten.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- data_CO_PAIS: string (nullable = true)
 |-- data_CO_PAIS_ISOA3: string (nullable = true)
 |-- data_CO_PAIS_ISON3: string (nullable = true)
 |-- data_NO_PAIS: string (nullable = true)
 |-- data_NO_PAIS_ESP: string (nullable = true)
 |-- data_NO_PAIS_ING: string (nullable = true)

### 7.1 Função flatten para dataset JSON

Abaixo temos uma função Python que tem o objetivo de "achatar" (flatten) o DataFrame chamado `df_selected` que contém dados aninhados em uma coluna chamada `results` e `meta`, especificamente colunas de tipo "array" e "struct". A operação de achatar expande os elementos dessa coluna aninhada em linhas independentes, mantendo a coluna `meta` original, e o resultado é armazenado no DataFrame `df_flatten`.

In [22]:
%%spark
import pyspark.sql.functions as F

def flatten_structs(nested_df):
    stack = [((), nested_df)]
    columns = []
    
    while len(stack) > 0:
        parents, df = stack.pop()
        array_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:5] == "array"
        ]
        flat_cols = [
            F.col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]
        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]
        columns.extend(flat_cols)
        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))
    return nested_df.select(columns)

def flatten_array_struct_df(df):
    array_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:5] == "array"
        ]
    while len(array_cols) > 0:
        for array_col in array_cols:
            cols_to_select = [x for x in df.columns if x != array_col ]
            df = df.withColumn(array_col, F.explode_outer(F.col(array_col)))
        df = flatten_structs(df)
        array_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:5] == "array"
        ]
    return df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

O código está realizando uma operação de transformação de dados no DataFrame chamado `df_json`, achatando sua estrutura e, em seguida, imprimindo o esquema do novo DataFrame resultante `df_flatten`. O detalhe específico da transformação depende da implementação da função `flatten_array_struct_df()`

### 7.2 Join JSON e CSV

In [25]:
%%spark

join_condition = df_json_flatten["data_CO_PAIS"] == df_csv["CO_PAIS"]

df = df_json_flatten.join(df_csv, join_condition, 'inner').select(df_json_flatten["data_NO_PAIS"], *df_csv.columns)

df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------+------+--------+-------+-------+---------+------+-------+--------+----------+------+
|        data_NO_PAIS|CO_ANO|CO_MES|  CO_NCM|CO_UNID|CO_PAIS|SG_UF_NCM|CO_VIA| CO_URF|QT_ESTAT|KG_LIQUIDO|VL_FOB|
+--------------------+------+------+--------+-------+-------+---------+------+-------+--------+----------+------+
|           Argentina|  2023|     3| 4029900|     10|     63|       RJ|     1| 717600|      11|        11|    59|
|               China|  2023|     8|39199020|     10|    160|       SP|     4| 817600|       1|         1|    51|
|              Angola|  2023|     4|90183929|     11|     40|       PR|     4| 817600|      25|         4|   578|
|             Equador|  2023|     7|87089100|     11|    239|       PR|     4| 817600|       1|       114|   800|
|             Bélgica|  2023|     3|84671900|     11|     87|       SP|     4| 817600|       4|         4|   593|
|           Hong Kong|  2023|     1| 2064900|     10|    351|       SC|     1| 927700|  

In [26]:
%%spark
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- data_NO_PAIS: string (nullable = true)
 |-- CO_ANO: integer (nullable = true)
 |-- CO_MES: integer (nullable = true)
 |-- CO_NCM: integer (nullable = true)
 |-- CO_UNID: integer (nullable = true)
 |-- CO_PAIS: integer (nullable = true)
 |-- SG_UF_NCM: string (nullable = true)
 |-- CO_VIA: integer (nullable = true)
 |-- CO_URF: integer (nullable = true)
 |-- QT_ESTAT: long (nullable = true)
 |-- KG_LIQUIDO: long (nullable = true)
 |-- VL_FOB: integer (nullable = true)

### 7.3 Eliminando duplicatas

Neste código, estamos realizando uma contagem do número de linhas e colunas no DataFrame `df_selected`, em seguida, removendo as linhas duplicadas do DataFrame `df_selected` e, em seguida, contando o número de linhas no novo DataFrame `df` após a remoção das duplicatas. 

In [27]:
%%spark
# Contagem de colunas
numero_de_linhas = df.count()

# Para contar o número de colunas em 'df'
numero_de_colunas = len(df.columns)

# Exibir o número de linhas e colunas
print(f"Número de linhas: {numero_de_linhas}")
print(f"Número de colunas: {numero_de_colunas}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Número de linhas: 1042360
Número de colunas: 12

In [None]:
%%spark
df = df.dropDuplicates()
numero_de_linhas = df.count()
print(f"Número de linhas: {numero_de_linhas}")

### 7.4 Padronização de colunas do DataFrame

Neste código em PySpark, estamos renomeando as colunas do DataFrame `df`, removendo o prefixo `results_`

In [28]:
%%spark

colunas = df.columns

for coluna in colunas:
    novo_nome = coluna.replace("data_", "")
    df = df.withColumnRenamed(coluna, novo_nome)
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- NO_PAIS: string (nullable = true)
 |-- CO_ANO: integer (nullable = true)
 |-- CO_MES: integer (nullable = true)
 |-- CO_NCM: integer (nullable = true)
 |-- CO_UNID: integer (nullable = true)
 |-- CO_PAIS: integer (nullable = true)
 |-- SG_UF_NCM: string (nullable = true)
 |-- CO_VIA: integer (nullable = true)
 |-- CO_URF: integer (nullable = true)
 |-- QT_ESTAT: long (nullable = true)
 |-- KG_LIQUIDO: long (nullable = true)
 |-- VL_FOB: integer (nullable = true)

<a id='verification_null'></a>

### 7.5 Identificação e Preenchimento de Valores Nulos em Colunas

Nesta abordagem, estamos verificando individualmente cada coluna para ver se há valores nulos usando `df.filter(col(col_name).isNull()).count()`. Se o resultado for maior que zero, isso significa que há valores nulos na coluna e a coluna é adicionada à lista `null_columns`, caso existam, imprimimos o número de valores nulos em cada coluna. 

Para colunas do tipo booleano, os valores nulos são substituídos por False, enquanto para todas as outras colunas, os valores nulos são substituídos por 0. Esse tratamento é útil quando desejamos garantir que não haja valores nulos em colunas específicas ou que eles sejam substituídos por valores padrão.

In [None]:
%%spark
from pyspark.sql.functions import isnan, when, col, expr

null_columns = [
    col_name
    for col_name in df.columns
    if df.filter(col(col_name).isNull()).count() > 0
]

if null_columns:
    print("Colunas com valores nulos:")
    for null_col in null_columns:
        null_count = df.filter(col(null_col).isNull()).count()
        print(f"{null_col}: {null_count} valores nulos")
else:
    print("Não há valores nulos em nenhuma coluna do DataFrame.")

In [None]:
%%spark
from pyspark.sql.functions import when, col
from pyspark.sql.types import BooleanType 

null_columns = df.columns  

for null_col in null_columns:
    data_type = df.schema[null_col].dataType
    if isinstance(data_type, BooleanType):
        df = df.withColumn(null_col, when(col(null_col).isNull(), False).otherwise(col(null_col)))
    else:
        df = df.withColumn(null_col, when(col(null_col).isNull(), 0).otherwise(col(null_col)))

### 7.6 Conversão de colunas 

Todas as colunas de data especificadas na lista `colunas_de_data` serão convertidas do formato string para o formato DateType com o formato de data `yyyyMMdd`. 

In [29]:
%%spark
from pyspark.sql.functions import col, expr

# Listando todas as colunas
all_columns = [
    "NO_PAIS", "CO_ANO", "CO_MES", "CO_NCM", "CO_UNID", 
    "CO_PAIS", "SG_UF_NCM", "CO_VIA", "CO_URF", "QT_ESTAT", 
    "KG_LIQUIDO", "VL_FOB"
]

# Colunas que não devem ser convertidas para string
exclude_columns = ["KG_LIQUIDO", "VL_FOB"]

# Loop através de todas as colunas e convertendo para string, exceto as excluídas
for column_name in all_columns:
    if column_name not in exclude_columns:
        df = df.withColumn(column_name, col(column_name).cast("string"))

# Mostrar o esquema atualizado
df.printSchema()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- NO_PAIS: string (nullable = true)
 |-- CO_ANO: string (nullable = true)
 |-- CO_MES: string (nullable = true)
 |-- CO_NCM: string (nullable = true)
 |-- CO_UNID: string (nullable = true)
 |-- CO_PAIS: string (nullable = true)
 |-- SG_UF_NCM: string (nullable = true)
 |-- CO_VIA: string (nullable = true)
 |-- CO_URF: string (nullable = true)
 |-- QT_ESTAT: string (nullable = true)
 |-- KG_LIQUIDO: long (nullable = true)
 |-- VL_FOB: integer (nullable = true)

<a id='sql_spark'></a>

## 8. Exploração dos dados - SQL Spark

Neste passo, estamos usando o método `createOrReplaceTempView` para criar ou substituir uma visualização temporária do DataFrame df. Em muitos sistemas de processamento de big data, como o Apache Spark, você pode criar visualizações temporárias de DataFrames. Essas visualizações temporárias permitem que você execute consultas SQL no DataFrame como se ele fosse uma tabela em um banco de dados.

Neste código, estamos executando operações de consulta SQL no DataFrame restaurant_data usando o ambiente Spark:
- **-c sql:** Essa opção indica que o código na célula será interpretado como SQL. Ou seja, você está prestes a executar uma consulta SQL no DataFrame.

- **-o df:** Essa opção significa que desejamos que o resultado da consulta SQL seja armazenado na variável df. O resultado da consulta será exibido na célula seguinte.

- **DESCRIBE df_FDA;:** Esta é a consulta SQL em si. Estamos executando um comando DESCRIBE na tabela temporária restaurant_data, que é usado para exibir informações sobre a estrutura da tabela, como os nomes das colunas, tipos de dados e outras informações relevantes.

In [30]:
%%spark
df.createOrReplaceTempView("df")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
%%spark -c sql
DESCRIBE df;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Unnamed: 0,col_name,data_type
0,NO_PAIS,string
1,CO_ANO,string
2,CO_MES,string
3,CO_NCM,string
4,CO_UNID,string
5,CO_PAIS,string
6,SG_UF_NCM,string
7,CO_VIA,string
8,CO_URF,string
9,QT_ESTAT,string


In [32]:
%%spark -c sql
SELECT * FROM df LIMIT 3;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Unnamed: 0,NO_PAIS,CO_ANO,CO_MES,CO_NCM,CO_UNID,CO_PAIS,SG_UF_NCM,CO_VIA,CO_URF,QT_ESTAT,KG_LIQUIDO,VL_FOB
0,Argentina,2023-01-01,3,4029900,10,63,RJ,1,717600,11,11,59
1,China,2023-01-01,8,39199020,10,160,SP,4,817600,1,1,51
2,Angola,2023-01-01,4,90183929,11,40,PR,4,817600,25,4,578


### 8.1 Salvando um DataFrame como um Arquivo Delta na Oracle Cloud Infrastructure (OCI) - Silver

Este código está salvando o DataFrame `df_json` como um arquivo Delta na Oracle Cloud Infrastructure (OCI) no local especificado. Isso permite que você mantenha seus dados de forma confiável e transacional na OCI para análise e processamento posterior.

In [33]:
%%spark
from delta import *
df.write.format("delta").save("oci://bucket_livelabs_silver@id3kyspkytmr/delta")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 9. Refinamento - Gold

In [12]:
%%spark
json_NCM_file_path = "oci://bucket_livelabs_bronze@id3kyspkytmr/CODIGO_NCM.json"
df_json_NCM = spark.read.option("multiline","true").json(json_NCM_file_path)
df_json_NCM.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------------------------+--------------------+
|                 Ato|Data_Ultima_Atualizacao_NCM|       Nomenclaturas|
+--------------------+---------------------------+--------------------+
|Resolução Camex n...|       Vigente em 22/10/...|[{2021, 01, 31/12...|
+--------------------+---------------------------+--------------------+

In [15]:
%%spark
df_json_flatten_NCM = flatten_array_struct_df(df_json_NCM)
df_json_flatten_NCM.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Ato: string (nullable = true)
 |-- Data_Ultima_Atualizacao_NCM: string (nullable = true)
 |-- Nomenclaturas_Ano_Ato: string (nullable = true)
 |-- Nomenclaturas_Codigo: string (nullable = true)
 |-- Nomenclaturas_Data_Fim: string (nullable = true)
 |-- Nomenclaturas_Data_Inicio: string (nullable = true)
 |-- Nomenclaturas_Descricao: string (nullable = true)
 |-- Nomenclaturas_Numero_Ato: string (nullable = true)
 |-- Nomenclaturas_Tipo_Ato: string (nullable = true)

A implementação de um modelo de dados Star Schema é motivada pela necessidade de melhorar o desempenho das consultas em um ambiente de business intelligence (BI) ou análise de dados. Esse modelo é especialmente útil quando se lida com grandes volumes de dados e consultas complexas. Ele oferece uma estrutura otimizada para consultas analíticas, simplificando a navegação e agregação de dados, o que resulta em consultas mais rápidas e eficientes.

%%spark
import uuid
from pyspark.sql.functions import lit

# Adicionar coluna de ID único para a tabela Fato_Evento
df = df.withColumn("fato_evento_id", lit(str(uuid.uuid4())))

# Adicionar coluna de ID único para a tabela Dim_Fabricante
df = df.withColumn("dim_fabricante_id", lit(str(uuid.uuid4())))

# Adicionar coluna de ID único para a tabela Dim_Status
df = df.withColumn("dim_status_id", lit(str(uuid.uuid4())))

# Adicionar coluna de ID único para a tabela Dim_Produto
df = df.withColumn("dim_produto_id", lit(str(uuid.uuid4())))

# Adicionar coluna de ID único para a tabela Dim_Empresa_Recall
df = df.withColumn("dim_empresa_recall_id", lit(str(uuid.uuid4())))

# Adicionar coluna de ID único para a tabela Dim_Classificacao
df = df.withColumn("dim_classificacao_id", lit(str(uuid.uuid4())))

%%spark
df.createOrReplaceTempView("df_gold")

%%spark -c sql -o df_gold

CREATE TABLE Fato_Evento AS
SELECT
    fato_evento_id,
    event_id,
    product_type,
    distribution_pattern,
    initial_firm_notification,
    voluntary_mandated,
    report_date,
    recall_initiation_date,
    center_classification_date,
    termination_date,
    recall_number,
    dim_fabricante_id,
    dim_status_id,
    dim_produto_id,
    dim_empresa_recall_id,
    dim_classificacao_id
FROM df_gold

%%spark -c sql -o df

CREATE TABLE Dim_Fabricante 
SELECT
    dim_fabricante_id,
    openfda_manufacturer_name,
    openfda_is_original_packager
FROM df_gold

%%spark -c sql -o df

CREATE TABLE Dim_Status
SELECT
    dim_status_id,
    status
FROM df_gold

%%spark -c sql -o df

CREATE TABLE Dim_Produto
SELECT
    dim_produto_id,
    openfda_application_number,
    openfda_brand_name,
    openfda_generic_name,
    openfda_product_type,
    product_quantity,
    openfda_substance_name,
    openfda_route,
    openfda_manufacturer_name,
    openfda_is_original_packager
FROM df_gold

%%spark -c sql -o df

CREATE TABLE Dim_Empresa_Recall
SELECT
    dim_empresa_recall_id,
    recalling_firm,
    city,
    state,
    country,
    address_1,
    address_2,
    postal_code
FROM df_gold


%%spark -c sql -o df

CREATE TABLE Dim_Recall
SELECT
    recall_number,
    reason_for_recall,
    code_info,
    product_description,
    product_quantity
FROM df_gold


%%spark -c sql -o df

CREATE TABLE Dim_Classificacao
SELECT
    dim_classificacao_id,
    classification
FROM df_gold

%%spark -c sql -o df
SELECT * FROM Fato_Recalls

## 10. Visualização Matplotlib

**Pergunta de Negócio:** Como a diferença média de dias entre o início do recall e a classificação do centro varia ao longo dos meses para diferentes categorias/classificações de produtos?

O gráfico mostra a diferença nas médias de dias entre o início do recall e a classificação do centro para cada mês, desagregado por categoria/classificação de produtos. Isso pode ajudar a identificar se existem padrões sazonais ou tendências específicas relacionadas ao tempo de resposta para diferentes categorias de produtos.

In [None]:
%%spark
import matplotlib.pyplot as plt
from pyspark.sql.functions import col, datediff
import seaborn as sns  
import pandas as pd  

#df_mat = df.groupby("status").count().toPandas().plot.bar(x="status", y="count")


df_diff = df.withColumn("date_diff", datediff(col("center_classification_date"), col("recall_initiation_date")))
result_df = df_diff.groupBy("classification").agg({"date_diff": "mean"}).withColumnRenamed("avg(date_diff)", "avg_date_diff")
result_pd = result_df.toPandas()

plt.figure(figsize=(10, 6))  
plt.xticks(rotation=0, ha='right')
plt.bar(result_pd["classification"], result_pd["avg_date_diff"], color='skyblue')
plt.ylabel("Average Date Difference")
plt.title("Diferença média de dias - Início do Recall e Atendimento por Risco")
plt.grid(axis='y') 
plt.tight_layout()

%matplot plt

In [None]:
%%spark

import matplotlib.pyplot as plt
from pyspark.sql.functions import col, datediff, month
import seaborn as sns
import pandas as pd

df_diff = df_diff.withColumn("recall_initiation_month", month(col("recall_initiation_date")))
df_diff = df_diff.withColumn("center_classification_month", month(col("center_classification_date")))

result_recall_initiation = df_diff.groupBy("recall_initiation_month").agg({"date_diff": "mean"}).withColumnRenamed("avg(date_diff)", "avg_date_diff")
result_center_classification = df_diff.groupBy("center_classification_month").agg({"date_diff": "mean"}).withColumnRenamed("avg(date_diff)", "avg_date_diff")

result_recall_initiation_pd = result_recall_initiation.toPandas()
result_center_classification_pd = result_center_classification.toPandas()

plt.figure(figsize=(12, 6))

plt.subplot(1, 2, 1)
plt.bar(result_recall_initiation_pd["recall_initiation_month"], result_recall_initiation_pd["avg_date_diff"], color='skyblue')
plt.xlabel("Month")
plt.ylabel("Average Date Difference")
plt.title("Diferença Média de Dias - Início do Recall por Mês")
plt.grid(axis='y')
plt.tight_layout()

plt.subplot(1, 2, 2)
plt.bar(result_center_classification_pd["center_classification_month"], result_center_classification_pd["avg_date_diff"], color='salmon')
plt.xlabel("Month")
plt.ylabel("Average Date Difference")
plt.title("Diferença Média de Dias - Center Classification por Mês")
plt.xticks(range(1, 13))
plt.grid(axis='y')
plt.tight_layout()

%matplot plt

In [None]:
%%spark
import matplotlib.pyplot as plt
from pyspark.sql.functions import col, datediff, month
import pandas as pd

df_diff = df_diff.withColumn("recall_initiation_month", month(col("recall_initiation_date")))
df_diff = df_diff.withColumn("center_classification_month", month(col("center_classification_date")))

result_recall_initiation = df_diff.groupBy("recall_initiation_month", "classification").agg({"date_diff": "mean"}).withColumnRenamed("avg(date_diff)", "avg_date_diff_recall")
result_center_classification = df_diff.groupBy("center_classification_month", "classification").agg({"date_diff": "mean"}).withColumnRenamed("avg(date_diff)", "avg_date_diff_center")

result_recall_initiation_pd = result_recall_initiation.toPandas()
result_center_classification_pd = result_center_classification.toPandas()

# Calcular a diferença entre as médias para cada mês e classification
result_diff = pd.merge(result_recall_initiation_pd, result_center_classification_pd, left_on=["recall_initiation_month", "classification"], right_on=["center_classification_month", "classification"], suffixes=("_recall", "_center"))
result_diff["avg_date_diff_difference"] = result_diff["avg_date_diff_recall"] - result_diff["avg_date_diff_center"]

plt.figure(figsize=(12, 6))
plt.bar(result_diff["recall_initiation_month"], result_diff["avg_date_diff_difference"], color='purple')
plt.xlabel("Month")
plt.ylabel("Average Date Difference Difference")
plt.title("Diferença nas Médias de Dias entre Recall Initiation e Center Classification por Mês e Classificação")
plt.grid(axis='y')
plt.tight_layout()
plt.show()
%matplot plt

In [None]:
%%spark

import matplotlib.pyplot as plt
from pyspark.sql.functions import col, datediff, month
import seaborn as sns
import pandas as pd

df_diff = df.withColumn("date_diff", datediff(col("center_classification_date"), col("recall_initiation_date")))

df_diff = df_diff.withColumn("month", month(col("recall_initiation_date")))

result_df = df_diff.groupBy("classification", "month").agg({"date_diff": "mean"}).withColumnRenamed("avg(date_diff)", "avg_date_diff")

result_pd = result_df.toPandas()

plt.figure(figsize=(12, 8))
sns.lineplot(data=result_pd, x="month", y="avg_date_diff", hue="classification", palette="Set1")
plt.xlabel("Mês")
plt.ylabel("Diferença Média de Dias")
plt.title("Diferença Média de Dias - Início do Recall e Atendimento por Mês e Classificação")
plt.legend(title="Classificação")
plt.tight_layout()
plt.show()
%matplot plt