# Camada Silver

## 1.0 Ingestão

### 1.1 Visualizando arquivo na Camada Bronze

In [0]:
%fs
ls dbfs:/mnt/Cotacao_Dolar/Bronze/

1.2 Carregando os arquivos parquet em DataFrames

### 1.2 Carregando o arquivo parquet em DataFrames

In [0]:
cotacao_dolar_df = (
    spark.read
         .parquet("dbfs:/mnt/Cotacao_Dolar/Bronze/Cotacao_Dolar")
)

##2.0 Transformação

### 2.1 Verificando se há dados nulos em todas as colunas

In [0]:
listagem_contagem_nulos = [
    (coluna, cotacao_dolar_df.where(cotacao_dolar_df[coluna].isNull()).count())
    for coluna in cotacao_dolar_df.columns
]

display(listagem_contagem_nulos)

### 2.2 Renomeando colunas

In [0]:
cotacao_dolar_df = (
    cotacao_dolar_df
    .withColumnRenamed("cotacaoCompra", "Cotacao_Compra")
    .withColumnRenamed("cotacaoVenda", "Cotacao_Venda")
    .withColumnRenamed("dataHoraCotacao", "Data_Cotacao")
)

### 2.3 Substituindo separador decimal para manipulação dos números

In [0]:
from pyspark.sql.functions import col, regexp_replace

cotacao_dolar_df = (
    cotacao_dolar_df
    .withColumn("Cotacao_Compra", regexp_replace(col("Cotacao_Compra"), ",", "."))
    .withColumn("Cotacao_Venda", regexp_replace(col("Cotacao_Venda"), ",", "."))
    )

### 2.4 Convertendo campos para o tipo correto

In [0]:
from pyspark.sql.functions import col, round, to_date, to_timestamp

cotacao_dolar_df = (
    cotacao_dolar_df
    .withColumn("Cotacao_Compra", round(col("Cotacao_Compra").cast("double"), 2))
    .withColumn("Cotacao_Venda", round(col("Cotacao_Venda").cast("double"), 2))
    .withColumn("Data_Cotacao", to_date(to_timestamp(col("Data_Cotacao"), "yyyy-MM-dd HH:mm:ss.SSS")))
)
display(cotacao_dolar_df)

### 2.5 Coluna indicando quando os dados foram atualizados

In [0]:
from pyspark.sql.functions import current_timestamp, date_format, from_utc_timestamp

cotacao_dolar_df = cotacao_dolar_df.withColumn(
    "Atualizacao_Base",
    date_format(
        from_utc_timestamp(current_timestamp(), "America/Sao_Paulo"),
        "yyyy-MM-dd HH:mm:ss"
    )
)

display(cotacao_dolar_df)

##3.0 Carga

### 3.1 Salvando o DataFrame em arquivo parquet particionado por ano

In [0]:
from pyspark.sql.functions import year

cotacao_dolar_df = cotacao_dolar_df.withColumn("Ano", year("Data_Cotacao"))

In [0]:
cotacao_dolar_df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("Ano") \
    .save("dbfs:/mnt/Cotacao_Dolar/Silver/Ft_Cotacao_Dolar")

### 3.2 Visualizando arquivo na Camada Silver

In [0]:
%fs
ls dbfs:/mnt/Cotacao_Dolar/Silver/Ft_Cotacao_Dolar/