## **Desnormalização - Transformação /raw (Json) para /ready (Parquet) **

####1. Definindo função para carregamento arquivos json nos dataframes de maninpulação

In [0]:
def read_from_raw(file_name):
  data_dir = f'dbfs:/FileStore/covid_data_lake/raw/db_raw_{file_name}_json'
  return spark.read.load(data_dir, format="json", inferSchema="true", header="true")
  

In [0]:
df_country = read_from_raw('country')
df_daily = read_from_raw('daily')
df_summary = read_from_raw('summary')
df_continent = read_from_raw('continent')
df_country_continent = read_from_raw('country_continent') 
df_country_iso=read_from_raw('country_iso')

####2. Desnormalizando os dataframes de manipulação

######1. Desnormalizando dataframes

In [0]:
# Desnormalizando df_country_continent
df_country_continent=df_country_continent.join(df_continent, how="left", on="continentcode")
display(df_country_continent)



continentcode,countrycode,continentname
AS,AF,Asia
EU,AL,Europe
AN,AQ,Antarctica
AF,DZ,Africa
OC,AS,Oceania
EU,AD,Europe
AF,AO,Africa
,AG,North America
EU,AZ,Europe
AS,AZ,Asia


In [0]:
df_country=df_country.join(df_country_iso, how="left", on="countryname")


In [0]:
#df_country=df_country.fillna('N/A')

In [0]:
display(df_country.filter(df_country.isocode.isNull()))

countryname,countrycode,countryslug,isocode
Palestinian Territory,PS,palestine,
"Macao, SAR China",MO,macao-sar-china,
"Macedonia, Republic of",MK,macedonia,
Congo (Brazzaville),CG,congo-brazzaville,
Faroe Islands,FO,faroe-islands,
French Southern Territories,TF,french-southern-territories,
South Sudan,SS,south-sudan,
ALA Aland Islands,AX,ala-aland-islands,
Republic of Kosovo,XK,kosovo,
Congo (Kinshasa),CD,congo-kinshasa,


In [0]:
#Desnormalizando df_daily
df_daily=df_daily.join(df_country_continent, how="left", on="countrycode")
df_daily=df_daily.join(df_country, how="left", on="countrycode")
display(df_daily)


countrycode,datereg,newconfirmed,newdeaths,totalconfirmed,totaldeaths,continentcode,continentname,countryname,countryslug,isocode
AR,2020-01-22,0,0,0,0,SA,South America,Argentina,argentina,ARG
AR,2020-01-23,0,0,0,0,SA,South America,Argentina,argentina,ARG
AR,2020-01-24,0,0,0,0,SA,South America,Argentina,argentina,ARG
AR,2020-01-25,0,0,0,0,SA,South America,Argentina,argentina,ARG
AR,2020-01-26,0,0,0,0,SA,South America,Argentina,argentina,ARG
AR,2020-01-27,0,0,0,0,SA,South America,Argentina,argentina,ARG
AR,2020-01-28,0,0,0,0,SA,South America,Argentina,argentina,ARG
AR,2020-01-29,0,0,0,0,SA,South America,Argentina,argentina,ARG
AR,2020-01-30,0,0,0,0,SA,South America,Argentina,argentina,ARG
AR,2020-01-31,0,0,0,0,SA,South America,Argentina,argentina,ARG


In [0]:
# Desnormalizando df_summary
df_summary=df_summary.join(df_country_continent, how="left", on="countrycode")
df_summary=df_summary.join(df_country, how="left", on="countrycode")
display(df_summary)

countrycode,lastupdated,newconfirmed,newdeaths,totalconfirmed,totaldeaths,continentcode,continentname,countryname,countryslug,isocode
AR,2021-03-09,5058,241,2154694,53121,SA,South America,Argentina,argentina,ARG
BE,2021-03-07,2082,21,787891,22261,EU,Europe,Belgium,belgium,BEL
BR,2021-03-09,70764,1972,11122429,268370,SA,South America,Brazil,brazil,BRA
CA,2021-03-07,2348,22,756336,20319,,North America,Canada,canada,CAN
CL,2021-03-07,5302,69,855785,21077,SA,South America,Chile,chile,CHL
CO,2021-03-09,3511,78,2282372,60676,SA,South America,Colombia,colombia,COL
CZ,2021-03-09,10524,265,1335815,22147,EU,Europe,Czech Republic,czech-republic,CZE
DE,2021-03-09,6834,298,2520618,72534,EU,Europe,Germany,germany,DEU
EC,2021-03-09,294999,26,589617,16069,SA,South America,Ecuador,ecuador,ECU
ES,2021-03-07,0,0,3149012,71138,EU,Europe,Spain,spain,ESP


####3. Defindo função para salvar arquivos Parquet no dir [dbfs:/FileStore/covid_data_lake/ready](/dbfs/FileStore/covid_data_lake/ready)

In [0]:
def save_ready_data(df, file_name):
  df.write\
    .partitionBy('continentname')\
    .mode('overwrite')\
    .option('header', 'true')\
    .parquet(f'dbfs:/FileStore/covid_data_lake/ready/raw_ready_{file_name}_parquet')

In [0]:
save_ready_data(df_summary, 'summary')
save_ready_data(df_daily, 'daily')
