# Data Engineering

In [1]:
from pyspark.sql import SparkSession
from glob import glob

from pyspark.sql.types import StructType, IntegerType, FloatType
from pyspark.sql.functions import (
    to_date, regexp_replace, udf, col, length, when, year, lit
)

spark = SparkSession.builder \
    .master("local") \
    .appName("master-covid") \
    .getOrCreate()
spark.sparkContext.setLogLevel("FATAL")

22/07/31 17:37:05 WARN Utils: Your hostname, predator resolves to a loopback address: 127.0.1.1; using 192.168.15.200 instead (on interface enp7s0)
22/07/31 17:37:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/07/31 17:37:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
PATH_DATA = 'datas'
PATH_RESULT = 'results'

## Ingest

dataset files

In [3]:
files = glob(f'{PATH_DATA}/landing/20*.csv.gz')
files.sort()
files

['datas/landing/2009.csv.gz',
 'datas/landing/2010.csv.gz',
 'datas/landing/2011.csv.gz',
 'datas/landing/2012.csv.gz',
 'datas/landing/2013.csv.gz',
 'datas/landing/2014.csv.gz',
 'datas/landing/2015.csv.gz',
 'datas/landing/2016.csv.gz',
 'datas/landing/2017.csv.gz',
 'datas/landing/2018.csv.gz',
 'datas/landing/2019.csv.gz',
 'datas/landing/2020.csv.gz',
 'datas/landing/2021.csv.gz']

concat datasets

In [4]:
df = spark.createDataFrame([], StructType())
for file in files:
    print(f'[INFO] Reading file: {file}')
    aux = spark.read \
        .format('csv') \
        .option('header', 'true') \
        .option('multiline', True) \
        .option('escape', '\"') \
        .load(file)
    _year = int(file.split('/')[-1].split('.')[0])
    aux = aux.withColumn('SOURCE_YEAR', lit(_year))
    df = df.unionByName(aux, allowMissingColumns=True)

print(f'[INFO] columns: {len(df.columns)}, count: {df.count()}')
out = f'{PATH_DATA}/raw.parquet'
df.write.mode('overwrite').parquet(out)
print(f'[INFO] Saved file: {out}')

[INFO] Reading file: datas/landing/2009.csv.gz
[INFO] Reading file: datas/landing/2010.csv.gz
[INFO] Reading file: datas/landing/2011.csv.gz
[INFO] Reading file: datas/landing/2012.csv.gz
[INFO] Reading file: datas/landing/2013.csv.gz
[INFO] Reading file: datas/landing/2014.csv.gz
[INFO] Reading file: datas/landing/2015.csv.gz
[INFO] Reading file: datas/landing/2016.csv.gz
[INFO] Reading file: datas/landing/2017.csv.gz
[INFO] Reading file: datas/landing/2018.csv.gz
[INFO] Reading file: datas/landing/2019.csv.gz
[INFO] Reading file: datas/landing/2020.csv.gz
[INFO] Reading file: datas/landing/2021.csv.gz


                                                                                

[INFO] columns: 241, count: 3223130




[INFO] Saved file: datas/raw.parquet


                                                                                

## Transform

In [60]:
df = spark.read.parquet(f'{PATH_DATA}/raw.parquet')

transform date columns

In [5]:
dates = [
    # 2009
    'DT_NASC',
    'DT_INTERNA',
    'DT_NOTIFIC',
    'DT_SIN_PRI',
    'DT_ANTIVIR',
    'DT_COLETA',
    'DT_CULTURA',
    'DT_ENTUTI',
    'DT_HEMAGLU',
    'DT_OBITO',
    'DT_PCR',
    'DT_SAIDUTI',
    'DT_RAIOX',
    'DT_ENCERRA',
    'DT_DIGITA',

    # 2010
    'DT_IFI',

    # 2012
    'DT_OUTMET',
    'DT_PCR_1',

    # 2017
    'DT_UT_DOSE',

    # 2019
    'DT_1_DOSE',
    'DT_2_DOSE',
    'DT_DOSEUNI',
    'DT_VAC_MAE',
    'DT_IF',
    'DT_EVOLUCA',
    'DT_RT_VGM',

    # 2020
    'DT_CO_SOR',
    'DT_RES',
    'DT_RES_AN',
    'DT_TOMO',
    'DT_VGM',

    # 2021
    'DOSE_1_COV',
    'DOSE_2_COV',
]

In [62]:
df = df.withColumns({
    date: to_date(
        when(length(date) != 10, None).otherwise(col(date)), 'dd/MM/yyyy'
    ) for date in dates
}).withColumns({
    date: when(year(date) > 2021, None) \
        .when(year(date) < 1800, None) \
        .otherwise(col(date)) for date in dates
})


transform numeric columns

In [63]:
df = df.withColumn('OBES_IMC', regexp_replace('OBES_IMC', ',', '.')) \
    .withColumn('OBES_IMC', regexp_replace('OBES_IMC', '>', '') \
    .cast(FloatType())
)

transform cod to age

In [64]:
@udf(returnType=IntegerType())
def cod2age(cod: int, tp: int = None) -> int:
    '''
    if tp is None, len(str(cod)) == 4
    first digit in cod is tp, where:
      1 = hour
      2 = days
      3 = months
      4 = years

    if tp is not None, then:
      1 = days
      2 = months
      3 = years
    '''
    offset = 0
    _aux = str(cod)

    if tp is None:
        tp = int(_aux[0]) - 1
        offset = 1

    tp = int(tp)
    if tp == 1: # 2 = days
        return int(_aux[offset:])//365
    elif tp == 2: # 3 = months
        return int(_aux[offset:])//12
    elif tp == 3: # 4 = years
        return int(_aux[offset:])
    else:
        return 0

df = df.withColumn('IDADE', cod2age(col('NU_IDADE_N'), col('TP_IDADE'))) \
    .drop('TP_IDADE', 'NU_IDADE_N')

transform sem_not

In [65]:
@udf(returnType=IntegerType())
def modsn(sem_not: str) -> int:
    return int(sem_not)%100

df = df.withColumn('SEM_NOT', modsn(col('SEM_NOT')))

transfom gender column

In [66]:
df = df.withColumn('CS_SEXO',
    when(col('CS_SEXO') == 'M', 1) \
    .when(col('CS_SEXO') == 'F', 2) \
    .when(col('CS_SEXO') == 'I', 9) \
    .otherwise(col('CS_SEXO'))
    .cast(IntegerType())
)

change scheme of classes columns

In [67]:
classes = [
    'FEBRE',
    'TOSSE',
    'CALAFRIO',
    'DISPNEIA',
    'GARGANTA',
    'ARTRALGIA',
    'MIALGIA',
    'CONJUNTIV',
    'CORIZA',
    'DIARREIA',
    'DESC_RESP',
    'SATURACAO',
    'VOMITO',
    'DOR_ABD',
    'FADIGA',
    'PERD_OLFT',
    'PERD_PALA',

    'CARDIOPATI',
    'PNEUMOPATI',
    'RENAL',
    'HEMOGLOBI',
    'IMUNODEPRE',
    'TABAGISMO',
    'METABOLICA',
    'HEPATICA',
    'NEUROLOGIC',
    'OBESIDADE',
    'PUERPERA',
    'SIND_DOWN',
    'HEMATOLOGI',
    'ASMA',
    'DIABETES',

    'CLASSI_FIN',
    'EVOLUCAO',
]

df = df.withColumns({cl: col(cl).cast(IntegerType()) for cl in classes})

store trusted files

In [68]:
df.write.mode('overwrite').parquet(f'{PATH_DATA}/trusted.parquet')

                                                                                

developed by [patricksferraz](https://github.com/patricksferraz)