# Transformações

Realizar transformações nos datasets, de modo a converter o dado ingestado previamente no formato mais otimizado para Big Data, o formato parquet, particionando-o fisicamente quando necessário.

## Transformação das tabelas de formato JSON para Parquet

### Definindo um schema para cada tabela

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

In [0]:
# schema da tabela pais

schema_pais = StructType([
    StructField("ID", IntegerType(), True),
    StructField("NOME", StringType(), True),
    StructField("SIGLA", StringType(), True),
    StructField("SLUG", StringType(), True)
])

In [0]:
# schema da tabela dados_paises

schema_dados_paises = StructType([
    StructField("ACTIVE", IntegerType(), True),
    StructField("CONFIRMED", IntegerType(), True),
    StructField("DATE", DateType(), True),
    StructField("DEATHS", IntegerType(), True),
    StructField("ID", IntegerType(), True),
    StructField("ID_PAIS", IntegerType(), True),
    StructField("LAT", IntegerType(), True),
    StructField("LON", IntegerType(), True),
    StructField("RECOVERED", IntegerType(), True),
])

In [0]:
# schema da tabela summary

schema_summary = StructType([
    StructField("ID", IntegerType(), True),
    StructField("ID_PAIS", IntegerType(), True),
    StructField("NEWCONFIRMED", IntegerType(), True),
    StructField("TOTALCONFIRMED", IntegerType(), True),
    StructField("NEWDEATHS", IntegerType(), True),
    StructField("TOTALDEATHS", IntegerType(), True),
    StructField("NEWRECOVERED", IntegerType(), True),
    StructField("TOTALRECOVERED", IntegerType(), True),
    StructField("DATE", DateType(), True),
])

In [0]:
# Leitura dos arquivos que estão no diretório _raw

from datetime import date
today=date.today()

df_summary_json = spark.read.json("dbfs:/FileStore/_covid_data_lake/_raw/_summary/{}.json".format(today), schema=schema_summary)
df_pais_json = spark.read.json("dbfs:/FileStore/_covid_data_lake/_raw/_pais/{}.json".format(today), schema=schema_pais)
df_dados_paises_json = spark.read.json("dbfs:/FileStore/_covid_data_lake/_raw/_dados_paises/{}.json".format(today), schema=schema_dados_paises)

In [0]:
# Salvando arquivos com formato Parquet no diretório _ready

df_summary_json.write.parquet("dbfs:/FileStore/_covid_data_lake/_ready/_summary/summary.parquet", mode='overwrite')
df_pais_json.write.parquet("dbfs:/FileStore/_covid_data_lake/_ready/_pais/pais.parquet", mode='overwrite')
df_dados_paises_json.write.parquet("dbfs:/FileStore/_covid_data_lake/_ready/_dados_paises/dados_paises.parquet", mode='overwrite')

## Carregando as tabelas summary_paises, pais e dados_paises

In [0]:
data_pais = "dbfs:/FileStore/_covid_data_lake/_ready/_pais/pais.parquet"
data_dados_paises = "dbfs:/FileStore/_covid_data_lake/_ready/_dados_paises/dados_paises.parquet"
data_summary_paises = "dbfs:/FileStore/_covid_data_lake/_ready/_summary/summary.parquet"

In [0]:
df_pais = spark.read.load(data_pais, format='parquet', header='true')
display(df_pais)
df_pais.printSchema()

ID,NOME,SIGLA,SLUG
497,Timor-Leste,TL,timor-leste
498,Bouvet Island,BV,bouvet-island
499,Kenya,KE,kenya
500,Lithuania,LT,lithuania
501,Mali,ML,mali
502,Panama,PA,panama
503,Pitcairn,PN,pitcairn
504,Saint Kitts and Nevis,KN,saint-kitts-and-nevis
505,Lesotho,LS,lesotho
506,Nigeria,NG,nigeria


In [0]:
df_dados_paises = spark.read.load(data_dados_paises, format='parquet', header='true')
display(df_dados_paises)
df_dados_paises.printSchema()

ACTIVE,CONFIRMED,DATE,DEATHS,ID,ID_PAIS,LAT,LON,RECOVERED
3224,45152,2020-09-09,905,109528,736,0,0,41023
3187,45326,2020-09-10,906,109529,736,0,0,41233
3160,45503,2020-09-11,909,109530,736,0,0,41434
3159,45675,2020-09-12,911,109531,736,0,0,41605
3287,45862,2020-09-13,916,109532,736,0,0,41659
3357,45969,2020-09-14,919,109533,736,0,0,41693
3258,46119,2020-09-15,920,109534,736,0,0,41941
3384,46376,2020-09-16,923,109535,736,0,0,42069
3515,46671,2020-09-17,925,109536,736,0,0,42231
3615,46910,2020-09-18,926,109537,736,0,0,42369


In [0]:
df_summary_paises = spark.read.load(data_summary_paises, format='parquet', header='true')
display(df_summary_paises)
df_summary_paises.printSchema()

ID,ID_PAIS,NEWCONFIRMED,TOTALCONFIRMED,NEWDEATHS,TOTALDEATHS,NEWRECOVERED,TOTALRECOVERED,DATE
381,738,18,55894,0,2451,90,49499,2021-03-12
382,671,631,114840,17,1986,714,77498,2021-03-12
383,672,138,114681,4,3026,109,79428,2021-03-12
384,721,41,11130,0,112,23,10708,2021-03-12
385,557,47,21161,0,516,84,19761,2021-03-12
386,618,20,882,1,23,34,489,2021-03-12
387,636,7693,2169694,107,53359,5049,1961640,2021-03-12
388,736,340,175538,5,3237,163,165259,2021-03-12
389,605,16,29090,0,909,9,22945,2021-03-12
390,655,2528,481919,19,8776,2012,449053,2021-03-12


## Transformação das tabelas para análise dos dados

### Tabela summary_country

In [0]:
# Realizado a junção das tabelas paises e resumo

df_summary_dados_pais = df_pais.join(df_summary_paises,on=df_pais.ID == df_summary_paises.ID_PAIS)
display(df_summary_dados_pais)

ID,NOME,SIGLA,SLUG,ID.1,ID_PAIS,NEWCONFIRMED,TOTALCONFIRMED,NEWDEATHS,TOTALDEATHS,NEWRECOVERED,TOTALRECOVERED,DATE
497,Timor-Leste,TL,timor-leste,553,497,3,145,0,0,0,94,2021-03-12
499,Kenya,KE,kenya,468,499,713,110356,12,1898,167,87903,2021-03-12
500,Lithuania,LT,lithuania,479,500,486,203386,10,3351,409,189479,2021-03-12
501,Mali,ML,mali,486,501,97,8710,0,358,2,6452,2021-03-12
502,Panama,PA,panama,511,502,542,346301,13,5957,670,333675,2021-03-12
504,Saint Kitts and Nevis,KN,saint-kitts-and-nevis,523,504,0,41,0,0,0,41,2021-03-12
505,Lesotho,LS,lesotho,475,505,0,10525,0,309,0,3922,2021-03-12
506,Nigeria,NG,nigeria,506,506,394,159646,5,1993,927,139983,2021-03-12
507,Congo (Kinshasa),CD,congo-kinshasa,420,507,54,26738,0,712,0,22432,2021-03-12
508,Jordan,JO,jordan,466,508,6649,448851,60,5106,3827,379305,2021-03-12


In [0]:
# Exclusão das tabelas sem dados relavantes ID, SLUG, ID, ID_PAIS

df_summary_country = df_summary_dados_pais.drop('ID', 'SLUG', 'ID', 'ID_PAIS')

In [0]:
display(df_summary_country)

NOME,SIGLA,NEWCONFIRMED,TOTALCONFIRMED,NEWDEATHS,TOTALDEATHS,NEWRECOVERED,TOTALRECOVERED,DATE
Afghanistan,AF,18,55894,0,2451,90,49499,2021-03-12
Albania,AL,631,114840,17,1986,714,77498,2021-03-12
Algeria,DZ,138,114681,4,3026,109,79428,2021-03-12
Andorra,AD,41,11130,0,112,23,10708,2021-03-12
Angola,AO,47,21161,0,516,84,19761,2021-03-12
Antigua and Barbuda,AG,20,882,1,23,34,489,2021-03-12
Argentina,AR,7693,2169694,107,53359,5049,1961640,2021-03-12
Armenia,AM,340,175538,5,3237,163,165259,2021-03-12
Australia,AU,16,29090,0,909,9,22945,2021-03-12
Austria,AT,2528,481919,19,8776,2012,449053,2021-03-12


In [0]:
# Salva a tabela no formato parquet no diretório ready

df_summary_country.write.parquet("dbfs:/FileStore/_covid_data_lake/_ready/_summary/summary_country.parquet", mode='overwrite')

### Tabela data_by_country

In [0]:
# Realizado a junção das tabelas paises e resumo

df_country_data = df_pais.join(df_dados_paises,on=df_pais.ID == df_dados_paises.ID_PAIS)
display(df_country_data)

ID,NOME,SIGLA,SLUG,ACTIVE,CONFIRMED,DATE,DEATHS,ID.1,ID_PAIS,LAT,LON,RECOVERED
736,Armenia,AM,armenia,3224,45152,2020-09-09,905,109528,736,0,0,41023
736,Armenia,AM,armenia,3187,45326,2020-09-10,906,109529,736,0,0,41233
736,Armenia,AM,armenia,3160,45503,2020-09-11,909,109530,736,0,0,41434
736,Armenia,AM,armenia,3159,45675,2020-09-12,911,109531,736,0,0,41605
736,Armenia,AM,armenia,3287,45862,2020-09-13,916,109532,736,0,0,41659
736,Armenia,AM,armenia,3357,45969,2020-09-14,919,109533,736,0,0,41693
736,Armenia,AM,armenia,3258,46119,2020-09-15,920,109534,736,0,0,41941
736,Armenia,AM,armenia,3384,46376,2020-09-16,923,109535,736,0,0,42069
736,Armenia,AM,armenia,3515,46671,2020-09-17,925,109536,736,0,0,42231
736,Armenia,AM,armenia,3615,46910,2020-09-18,926,109537,736,0,0,42369


In [0]:
# Exclusão das tabelas sem dados relavantes ID, SLUG, ID, ID_PAIS, LAT, LON

df_data_by_country = df_country_data.drop('ID', 'SLUG', 'ID', 'ID_PAIS', 'LAT', 'LON')
display(df_data_by_country)

NOME,SIGLA,ACTIVE,CONFIRMED,DATE,DEATHS,RECOVERED
Armenia,AM,3224,45152,2020-09-09,905,41023
Armenia,AM,3187,45326,2020-09-10,906,41233
Armenia,AM,3160,45503,2020-09-11,909,41434
Armenia,AM,3159,45675,2020-09-12,911,41605
Armenia,AM,3287,45862,2020-09-13,916,41659
Armenia,AM,3357,45969,2020-09-14,919,41693
Armenia,AM,3258,46119,2020-09-15,920,41941
Armenia,AM,3384,46376,2020-09-16,923,42069
Armenia,AM,3515,46671,2020-09-17,925,42231
Armenia,AM,3615,46910,2020-09-18,926,42369


In [0]:
# Lógica para inclusão das colunas mês e ano na tabela

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
from datetime import datetime, date

def month_collected(data):
  return data.strftime("%B")

def year_collected(data):
  return data.strftime("%Y")

month_collected_udf = udf(month_collected, StringType())
year_collected_udf = udf(year_collected, StringType())

In [0]:
# Acrescentando a coluna mês na tabela

df_mes = df_data_by_country.select("*", month_collected_udf("DATE").alias("MONTH"))

In [0]:
# Acrescentando a coluna ano na tabela

df_ano = df_mes.select("*", year_collected_udf("DATE").alias("YEAR"))

In [0]:
df_data_by_country = df_ano
display(df_data_by_country)

NOME,SIGLA,ACTIVE,CONFIRMED,DATE,DEATHS,RECOVERED,MONTH,YEAR
Armenia,AM,3224,45152,2020-09-09,905,41023,September,2020
Armenia,AM,3187,45326,2020-09-10,906,41233,September,2020
Armenia,AM,3160,45503,2020-09-11,909,41434,September,2020
Armenia,AM,3159,45675,2020-09-12,911,41605,September,2020
Armenia,AM,3287,45862,2020-09-13,916,41659,September,2020
Armenia,AM,3357,45969,2020-09-14,919,41693,September,2020
Armenia,AM,3258,46119,2020-09-15,920,41941,September,2020
Armenia,AM,3384,46376,2020-09-16,923,42069,September,2020
Armenia,AM,3515,46671,2020-09-17,925,42231,September,2020
Armenia,AM,3615,46910,2020-09-18,926,42369,September,2020


In [0]:
# Salva a tabela no formato parquet no diretório ready

df_data_by_country.write.parquet("dbfs:/FileStore/_covid_data_lake/_ready/_data_by_country/data_by_country.parquet", mode='overwrite')