## Setup

Configuração do ambiente de desenvolvimento para realização do ETL (Extract Transform Load)

### Instalando bibliotecas

Instalando todas as bibliotecas necessárias e criação do ambiente spark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pandas as pd
import numpy as np

# Create a Spark session
spark = SparkSession.builder.master("local").config("spark.executor.memory", "6g") \
    .config("spark.driver.memory", "6g") \
    .config("spark.driver.maxResultSize", "6g") \
    .appName("PySpark Tutorial").getOrCreate()

# Verify Spark version
print("Spark version: ", spark.version)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/07 00:40:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark version:  3.5.4


## Pre Processamento

Uma vez que os arquivos bases são grandes, para evitar esforço computacional e consumo de disco desnecessário, Estamos reescrevendo os dados Bronze para parquet, com somente os meses necessários (Necessário rodar somente uma vez, para gerar `bronze.parquet`)

In [7]:
INPUT_PATH = '/home/lucas-nunes/workspace/Postech/challenges/3_covid19/data/bronze/micro_data'
INPUT_PATH_SAMPLE = '/home/lucas-nunes/workspace/Postech/challenges/3_covid19/input/data/micro_data/ano=2020/item=05/PNAD_COVID_052020.csv'

SILVER_PATH = '/home/lucas-nunes/workspace/Postech/challenges/3_covid19/data/silver'
BRONZE_PATH = '/home/lucas-nunes/workspace/Postech/challenges/3_covid19/data/bronze'

df = spark.read.csv(INPUT_PATH, header=True)

df = df.where(col('mes') >= 9)

df.toPandas().to_parquet(f'{BRONZE_PATH}/bronze.parquet')

25/05/07 00:49:05 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 148, schema size: 145
CSV file: file:///home/lucas-nunes/workspace/Postech/challenges/3_covid19/data/bronze/micro_data/ano_part=2020/mes=11/PNAD_COVID_112020.csv
                                                                                

### Criação da estrutura dos valores

Mapeamento dos campos que serão recebidos e atribuição do tipo primitivo de cada um, Normalização de todos as colunas, retirando acentuação e caracteres especiais, e definição dos campos de particionamento.

In [1]:

schema_investing_fields = StructType([
    StructField("Data", DateType(), True),
    StructField("Último", FloatType(), True),
    StructField("Abertura", FloatType(), True),
    StructField("Máxima", FloatType(), True),
    StructField("Mínima", FloatType(), True),
    StructField("Vol.", StringType(), True),
    StructField("Var%", StringType(), True),
])

columns_to_float = ['ultimo', 'abertura', 'maxima', 'minima']


rename_fields = {
    "Data": "data",
    "Último": "ultimo",
    "Abertura": "abertura",
    "Máxima": "maxima",
    "Mínima": "minima",
    "Vol.": "volume",
    "Var%": "variacao"
}

partitions = ['category', 'item']

NameError: name 'StructType' is not defined

## Read

Mapeamento dos paths que irão ser utilizados para o tratamento dos dados e leitura dos arquivos utilizando particionamento spark

In [None]:
INPUT_PATH = '/home/lucas-nunes/workspace/Postech/challenges/3_covid/data/bronze/micro_data/'
INPUT_PATH_SAMPLE = '/home/lucas-nunes/workspace/Postech/challenges/3_covid/input/data/micro_data/ano=2020/item=cobre/Dados Históricos - Cobre Futuros.csv'

SILVER_PATH = '/home/lucas-nunes/workspace/Postech/challenges/3_covid/data/silver'
BRONZE_PATH = '/home/lucas-nunes/workspace/Postech/challenges/3_covid/data/bronze'

# df = spark.read.csv(INPUT_PATH, header=True)
df = pd.read_parquet(f'{BRONZE_PATH}/bronze.parquet')

df = df.withColumnsRenamed(rename_fields)



25/05/07 00:40:18 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


## Gerando Bronze parquet para processamentos

Uma vez que os arquivos bases são grandes, para evitar esforço computacional e consumo de disco desnecessário, Estamos reescrevendo os dados Bronze para parquet, com somente os meses necessários (Necessário rodar somente uma vez, para gerar `bronze.parquet`)

In [None]:
# df.toPandas().to_csv(f'{SILVER_PATH}/silver.csv')
df.toPandas().to_parquet(f'{BRONZE_PATH}/bronze.parquet')

In [3]:
df.printSchema

<bound method DataFrame.printSchema of DataFrame[Ano: string, UF: string, CAPITAL: string, RM_RIDE: string, V1008: string, V1012: string, V1013: string, V1016: string, Estrato: string, UPA: string, V1022: string, V1023: string, V1030: string, V1031: string, V1032: string, posest: string, A001: string, A001A: string, A001B1: string, A001B2: string, A001B3: string, A002: string, A003: string, A004: string, A005: string, A006: string, A007: string, A008: string, A009: string, B0011: string, B0012: string, B0013: string, B0014: string, B0015: string, B0016: string, B0017: string, B0018: string, B0019: string, B00110: string, B00111: string, B00112: string, B00113: string, B002: string, B0031: string, B0032: string, B0033: string, B0034: string, B0035: string, B0036: string, B0037: string, B0041: string, B0042: string, B0043: string, B0044: string, B0045: string, B0046: string, B005: string, B006: string, B007: string, B008: string, B009A: string, B009B: string, B009C: string, B009D: string

In [None]:
df.show()

In [9]:
df.printSchema

<bound method DataFrame.printSchema of DataFrame[Ano: string, UF: string, CAPITAL: string, RM_RIDE: string, V1008: string, V1012: string, V1013: string, V1016: string, Estrato: string, UPA: string, V1022: string, V1023: string, V1030: string, V1031: string, V1032: string, posest: string, A001: string, A001A: string, A001B1: string, A001B2: string, A001B3: string, A002: string, A003: string, A004: string, A005: string, A006: string, A007: string, A008: string, A009: string, B0011: string, B0012: string, B0013: string, B0014: string, B0015: string, B0016: string, B0017: string, B0018: string, B0019: string, B00110: string, B00111: string, B00112: string, B00113: string, B002: string, B0031: string, B0032: string, B0033: string, B0034: string, B0035: string, B0036: string, B0037: string, B0041: string, B0042: string, B0043: string, B0044: string, B0045: string, B0046: string, B005: string, B006: string, B007: string, B008: string, B009A: string, B009B: string, B009C: string, B009D: string

## Process

Tratamento dos valores e colunas, remoção de caracteres especiais e abreviações de milhar "K" ou milhão "M"

In [4]:
for column in columns_to_float:

    df = df.withColumn(column, regexp_replace(regexp_replace(column, r'\.', ''), ',', r'\.').astype('float'))


df = df.withColumn('variacao', regexp_replace(regexp_replace('variacao', r'%', ''), ',', r'\.').astype('float'))
df = df.withColumn('volume', regexp_replace(regexp_replace('volume', r'K', ''), ',', r'\.').astype('float'))
df = df.withColumn('data', to_date(col('data'), 'dd.MM.yyyy'))
df = df.drop_duplicates(subset=['data', 'item'])


## Write

### Escrevendo arquivo tratado pelo tier bronze, com todos os dados concatenados e tratados

In [4]:
# df.toPandas().to_csv(f'{SILVER_PATH}/silver.csv')
df.toPandas().to_parquet(f'{SILVER_PATH}/silver.parquet')


25/05/07 00:40:19 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/05/07 00:40:23 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 148, schema size: 145
CSV file: file:///home/lucas-nunes/workspace/Postech/challenges/3_covid19/data/bronze/micro_data/ano_part=2020/mes=11/PNAD_COVID_112020.csv
                                                                                

In [5]:
df = pd.read_parquet(f'{SILVER_PATH}/silver.parquet')

In [6]:
df

Unnamed: 0,Ano,UF,CAPITAL,RM_RIDE,V1008,V1012,V1013,V1016,Estrato,UPA,...,F0022,F002A1,F002A2,F002A3,F002A4,F002A5,F0061,F006,ano_part,mes
0,2020,11,11,,01,4,09,5,1110011,110015970,...,,1,1,1,2,1,1,01,2020,9
1,2020,11,11,,01,4,09,5,1110011,110015970,...,,1,1,1,2,1,1,01,2020,9
2,2020,11,11,,01,4,09,5,1110011,110015970,...,,1,1,1,2,1,1,01,2020,9
3,2020,11,11,,01,4,09,5,1110011,110015970,...,,1,1,1,2,1,1,01,2020,9
4,2020,11,11,,02,1,09,5,1110011,110015970,...,,1,1,1,2,1,1,01,2020,9
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1149192,2020,53,53,,06,3,10,6,5310220,530009738,...,,1,1,1,2,1,1,03,2020,10
1149193,2020,53,53,,06,3,10,6,5310220,530009738,...,,1,1,1,2,1,1,03,2020,10
1149194,2020,53,53,,06,3,10,6,5310220,530009738,...,,1,1,1,2,1,1,03,2020,10
1149195,2020,53,53,,10,2,10,6,5310220,530009738,...,,1,1,1,1,1,1,02,2020,10
