
## Extração da 'IMP_COMPLETA' para a camada Silver


## Faz um select para verificar a tabela de importação que está na camada Bronze.
Nesse caso será uma tabela de importação para cada ano.

In [0]:
%sql

select * from bronze.imp_2022

CO_ANO,CO_MES,CO_NCM,CO_UNID,CO_PAIS,SG_UF_NCM,CO_VIA,CO_URF,QT_ESTAT,KG_LIQUIDO,VL_FOB,VL_FRETE,VL_SEGURO
2022,12,62014000,11,160,SC,4,927800,81,58,410,289,0
2022,6,85365090,11,160,PR,4,917900,70348,260,22860,5019,120
2022,4,84811000,11,764,PR,4,817700,1,0,106,0,0
2022,6,73261900,10,23,SP,4,817600,428,428,25456,3903,40
2022,3,83024100,10,351,SP,1,817800,3064,3064,27823,1694,61
2022,12,25223000,10,160,SP,1,817800,12000,12000,2952,3623,0
2022,5,83016000,10,245,SP,1,817800,302,302,12465,76,34
2022,3,84841000,10,23,SP,1,817800,17380,17380,1029655,11557,446
2022,7,85443000,10,493,SP,1,817800,10158,10158,448502,4937,1079
2022,10,87087090,11,160,SP,1,817800,507737,779754,3239071,636817,814


### Faz o mapeamento das colunas que terão seus nomes alteradas.

In [0]:
mapping_columns = {
    'CO_ANO':'cod_ano',
    'CO_MES':'cod_mes',
    'CO_NCM':'cod_ncm',
    'CO_UNID':'cod_unid',
    'CO_PAIS':'cod_pais',
    'SG_UF_NCM':'cod_uf',
    'CO_VIA':'cod_via',
    'CO_URF':'cod_urf',
    'QT_ESTAT':'cod_quantidade_estatistica',
    'KG_LIQUIDO':'qt_peso_liquido',
    'VL_FOB':'vl_fob',
    'VL_FRETE':'vl_frete',
    'VL_SEGURO':'vl_seguro'
}

### Cria o dataframe a partir da consulta SQL.
Faz o UNION ALL entre as tabelas dos anos de 2012 a 2022, renomea as colunas de acordo com o mapeamento e cria a coluna etl_date.


In [0]:
from pyspark.sql.functions import col

df = spark.sql("""

SELECT *
FROM bronze.imp_2012

UNION ALL

SELECT *
FROM bronze.imp_2013

UNION ALL

SELECT *
FROM bronze.imp_2014

UNION ALL

SELECT *
FROM bronze.imp_2015

UNION ALL

SELECT *
FROM bronze.imp_2016

UNION ALL

SELECT *
FROM bronze.imp_2017

UNION ALL

SELECT *
FROM bronze.imp_2018

UNION ALL

SELECT *
FROM bronze.imp_2019

UNION ALL

SELECT *
FROM bronze.imp_2020

UNION ALL

SELECT *
FROM bronze.imp_2021

UNION ALL

SELECT *,
cast(current_timestamp() as timestamp) as etl_date
FROM bronze.imp_2022

""")


for old_col, new_col in mapping_columns.items():
    df = df.withColumnRenamed(old_col, new_col)

### Usa a função cast pra converter as colunas 'cod_ano' e 'cod_mes' para int

In [0]:
df = df.withColumn("cod_ano", col("cod_ano").cast("int")) /
       .withColumn("cod_mes", col("cod_mes").cast("int"))


### Usa as funções concat e to_date para criar uma coluna de data

In [0]:
df = df.withColumn("dt_operacao", to_date(concat(lit("01/"), col("cod_mes"), lit("/"), col("cod_ano")), "dd/MM/yyyy"))

### Usa a função cast para converter as colunas de valor em decimal

In [0]:
df = df.withColumn("qt_peso_liquido", col("qt_peso_liquido").cast("decimal(20,2)")) \
       .withColumn("vl_fob", col("vl_fob").cast("decimal(20,2)")) \
       .withColumn("vl_frete", col("vl_frete").cast("decimal(20,2)")) \
       .withColumn("vl_seguro", col("vl_seguro").cast("decimal(20,2)"))

### Usa a função cast para converter as demais colunas em string

In [0]:
df = df.withColumn("cod_ncm", col("cod_ncm").cast("string")) \
       .withColumn("cod_ano", col("cod_ano").cast("string")) \
       .withColumn("cod_mes", col("cod_mes").cast("string")) \
       .withColumn("cod_unid", col("cod_unid").cast("string")) \
       .withColumn("cod_pais", col("cod_pais").cast("string")) \
       .withColumn("cod_uf", col("cod_uf").cast("string")) \
       .withColumn("cod_via", col("cod_via").cast("string")) \
       .withColumn("cod_urf", col("cod_urf").cast("string")) \
       .withColumn("cod_quantidade_estatistica", col("cod_quantidade_estatistica").cast("string")) 

### Cria uma visão temporária

In [0]:
df.createOrReplaceTempView('vw_imp_completa_silver')

### Salva a tabela no formato parquet

In [0]:
permanent_table = "imp_completa_silver"

df.write.format("parquet").saveAsTable(permanent_table)

### Salva a tabela na camada Silver

In [0]:
%sql

CREATE TABLE IF NOT EXISTS silver.importacao_completa AS SELECT * FROM default.imp_completa_silver;

num_affected_rows,num_inserted_rows
