In [None]:
from datetime import datetime
from pyspark.sql.functions import to_date,date_format
from pyspark.sql import types,Row
import pandas as pd
# Ler as duas tabelas e todas as partiçoes
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)


## Efetuando leitura dos dados em JSON para transformação dos dados

In [None]:
countries = spark.read \
                .json("./FileStore/covid_data_lake/raw/countries/*.json")

In [None]:
# Juntar as tabelas
covid_cases = spark.read \
                   .json("./FileStore/covid_data_lake/raw/covid_cases/*.json")

## Junção das tabelas carregadas

In [None]:
#df = spark.sql("select * from countries, covid_cases where countries.country_name = covid_cases.country_name")
df = countries.join(covid_cases,'country_name','inner') \
                .select(covid_cases.active, covid_cases.cases_covid_id, covid_cases.confirmed, countries.country_name, covid_cases.date_register, covid_cases.deaths, covid_cases.lat, covid_cases.lon, covid_cases.recovered, countries.country_code,countries.slug)

In [None]:
#Alterando dados do campo "date_regsiter" para o tipo date
df = df.withColumn('date_register',df["date_register"].cast(types.DateType())) \
       .withColumn('country',df["country_name"])

In [None]:
#Verificando se o tipo foi alterado
df.dtypes

## Realizando a criação de features que poderiam ser utilizadas futuramente

In [None]:
#Criando alguns campos baseados nas datas
df = df.withColumn('year_register',date_format(df['date_register'],'yyyy')) \
       .withColumn('month_year_register',date_format(df['date_register'],'yyyy-MM'))

In [None]:
# Dropando colunas que não serão utilizadas
df = df.drop("country_name") \
       .drop("slug")

## Corrigindo Outliers encontradas na India

In [None]:
#Transformando outlier da India - 07-03-2021
dl = df.filter(df["date_register"] >= '2021-03-06') \
       .filter(df["date_register"] <= '2021-03-08') \
       .filter(df["country"] == "India").collect()

In [None]:
result_mean = (dl[0]['active'] + dl[2]['active'])/2
result_mean

In [None]:
dp = df.toPandas()

In [None]:
dp.loc[(dp['country'] == 'India') & (dp['date_register'] == pd.to_datetime('2021-03-07')),'active'] = result_mean

In [None]:
df = spark.createDataFrame(dp)

In [None]:
df.createOrReplaceTempView('data_check')

In [None]:
spark.sql("select * from data_check where data_check.country = 'India' and data_check.date_register = '2021-03-07'")

active,cases_covid_id,confirmed,date_register,deaths,lat,lon,recovered,country_code,country,year_register,month_year_register
185992.5,137969,11229398,2021-03-07,157853,20.59,78.96,10882798,IN,India,2021,2021-03
185992.5,100589,11229398,2021-03-07,157853,20.59,78.96,10882798,IN,India,2021,2021-03


In [None]:
df

active,cases_covid_id,confirmed,date_register,deaths,lat,lon,recovered,country_code,country,year_register,month_year_register
5977.0,259897,10183,2020-10-25,122,55.17,23.88,4633,LT,Lithuania,2020,2020-10
5.0,20802,88,2020-12-14,0,15.41,-61.37,83,DM,Dominica,2020,2020-12
12457.0,212226,475355,2021-02-07,8394,31.79,-7.09,454504,MA,Morocco,2021,2021-02
2.0,20621,18,2020-06-16,0,15.41,-61.37,16,DM,Dominica,2020,2020-06
2061.0,137161,3538,2020-05-12,47,40.07,45.04,1430,AM,Armenia,2020,2020-05
5542.0,67589,113767,2020-08-31,6556,-1.83,-78.18,101669,EC,Ecuador,2020,2020-08
959556.0,95090,973604,2021-04-30,14048,60.13,18.64,0,SE,Sweden,2021,2021-04
82767.0,112601,255970,2020-11-21,3824,39.4,-8.22,169379,PT,Portugal,2020,2020-11
147994.0,138870,438805,2020-11-11,5570,49.82,15.47,285241,CZ,Czech Republic,2020,2020-11
0.0,226547,113,2020-08-29,3,-35.47,149.01,110,AU,Australia,2020,2020-08


In [None]:
# Inserindo dados no parquet
df.write.parquet('./FileStore/covid_data_lake/cleaned', mode='overwrite',partitionBy=['year_register'])