In [24]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import json

## Carregar os dados

In [25]:
# Carregar o dataset JSON
with open('../datasets/HPI_master.json', 'r') as f:
    json_data = json.load(f)

# Converter o dataset JSON para um DataFrame do pandas
data_json = pd.DataFrame(json_data)

# Carregar o dataset CSV - Population size
data_csv_ps = pd.read_csv("../datasets/cu.data.19.PopulationSize.csv")
data_csv_ps = pd.DataFrame(data_csv_ps)

# Carregar o dataset CSV - 
data_csv_fb = pd.read_csv("../datasets/cu.data.11.USFoodBeverage.csv")
data_csv_fb = pd.DataFrame(data_csv_fb)


## Visualizar os dados

In [26]:
data_json.head()

Unnamed: 0,hpi_type,hpi_flavor,frequency,level,place_name,place_id,yr,period,index_nsa,index_sa
0,traditional,purchase-only,monthly,USA or Census Division,East North Central Division,DV_ENC,1991,1,100.0,100.0
1,traditional,purchase-only,monthly,USA or Census Division,East North Central Division,DV_ENC,1991,2,100.91,100.96
2,traditional,purchase-only,monthly,USA or Census Division,East North Central Division,DV_ENC,1991,3,101.3,100.91
3,traditional,purchase-only,monthly,USA or Census Division,East North Central Division,DV_ENC,1991,4,101.69,100.98
4,traditional,purchase-only,monthly,USA or Census Division,East North Central Division,DV_ENC,1991,5,102.32,101.36


In [27]:
data_csv_ps.head()

Unnamed: 0,series_id,year,period,value,footnote_codes
0,CUURA000AA0,1986,M12,100.0,
1,CUURA000AA0,1987,M01,100.6,
2,CUURA000AA0,1987,M02,101.1,
3,CUURA000AA0,1987,M03,101.6,
4,CUURA000AA0,1987,M04,102.2,


In [28]:
data_csv_fb.head()

Unnamed: 0,series_id,year,period,value,footnote_codes
0,CUSR0000SAF,1967,M01,34.8,
1,CUSR0000SAF,1967,M02,34.7,
2,CUSR0000SAF,1967,M03,34.7,
3,CUSR0000SAF,1967,M04,34.6,
4,CUSR0000SAF,1967,M05,34.6,


#### **Tipos de dados do dataset: Food & Beverage**

In [29]:
data_csv_fb.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 119198 entries, 0 to 119197
Data columns (total 5 columns):
 #   Column          Non-Null Count   Dtype  
---  ------          --------------   -----  
 0   series_id       119198 non-null  object 
 1   year            119198 non-null  int64  
 2   period          119198 non-null  object 
 3   value           119198 non-null  float64
 4   footnote_codes  0 non-null       float64
dtypes: float64(2), int64(1), object(2)
memory usage: 4.5+ MB


#### **Tipos de dados do dataset: Population Size**

In [30]:
data_csv_ps.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 84760 entries, 0 to 84759
Data columns (total 5 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   series_id       84760 non-null  object 
 1   year            84760 non-null  int64  
 2   period          84760 non-null  object 
 3   value           84760 non-null  float64
 4   footnote_codes  0 non-null      float64
dtypes: float64(2), int64(1), object(2)
memory usage: 3.2+ MB


Conseguimos observar a existência de dados do tipo `object`. Poderá ser necessário tratar este tipo de dados.

#### **Tipos de dados do dataset Json**

In [31]:
data_json.dtypes

hpi_type       object
hpi_flavor     object
frequency      object
level          object
place_name     object
place_id       object
yr              int64
period          int64
index_nsa     float64
index_sa       object
dtype: object

#### **Verificar a existencia de Missing Values em cada um dos Dataframes**

In [32]:
print("Number of missing values in House Price dataset:")
json_MV = data_json.isnull().sum()
print(json_MV)

Number of missing values in House Price dataset:
hpi_type      0
hpi_flavor    0
frequency     0
level         0
place_name    0
place_id      0
yr            0
period        0
index_nsa     0
index_sa      0
dtype: int64


In [33]:
print("Number of missing values in FoodBeverage dataset:")
foodBeverage_MV = data_csv_fb.isnull().sum()
print(foodBeverage_MV)

Number of missing values in FoodBeverage dataset:
series_id              0
year                   0
period                 0
value                  0
footnote_codes    119198
dtype: int64


In [34]:
print("Number of missing values in Population Size dataset:")
populationSize_MV = data_csv_ps.isnull().sum()
print(populationSize_MV)

Number of missing values in Population Size dataset:
series_id             0
year                  0
period                0
value                 0
footnote_codes    84760
dtype: int64


#### **Verificar os valores da coluna `period`**

In [35]:
period_values = data_csv_ps['period'].unique()
print("The 'period' column contains the following values:")
print(period_values)

The 'period' column contains the following values:
['M12' 'M01' 'M02' 'M03' 'M04' 'M05' 'M06' 'M07' 'M08' 'M09' 'M10' 'M11'
 'M13' 'S01' 'S02' 'S03']


## Tratamento dos Dados

#### **Transformar `index_sa` em dados numéricos**

In [36]:
data_json['index_sa'] = data_json['index_sa'].astype(float)

#### **Transformar `period` em dados numéricos**

In [37]:
# Dicionário para mapear o período número ao valor da string
period_map = {
    'M01': 1,
    'M02': 2,
    'M03': 3,
    'M04': 4,
    'M05': 5,
    'M06': 6,
    'M07': 7,
    'M08': 8,
    'M09': 9,
    'M10': 10,
    'M11': 11,
    'M12': 12,
    'M13': 13,
    'S01': 14,
    'S02': 15,
    'S03': 16
}

data_csv_ps['period'] = data_csv_ps["period"].replace(period_map)
data_csv_fb['period'] = data_csv_fb["period"].replace(period_map)
data_json['period'].astype(str).astype(int)

0         1
1         2
2         3
3         4
4         5
         ..
121457    4
121458    1
121459    2
121460    3
121461    4
Name: period, Length: 121462, dtype: int64

#### **Remover a coluna `footnote_codes`**

In [38]:
data_csv_ps.drop(columns=["footnote_codes"], inplace=True)
data_csv_fb.drop(columns=["footnote_codes"], inplace=True)

In [39]:
data_csv_ps.head()

Unnamed: 0,series_id,year,period,value
0,CUURA000AA0,1986,12,100.0
1,CUURA000AA0,1987,1,100.6
2,CUURA000AA0,1987,2,101.1
3,CUURA000AA0,1987,3,101.6
4,CUURA000AA0,1987,4,102.2


In [40]:
data_csv_fb.head()

Unnamed: 0,series_id,year,period,value
0,CUSR0000SAF,1967,1,34.8
1,CUSR0000SAF,1967,2,34.7
2,CUSR0000SAF,1967,3,34.7
3,CUSR0000SAF,1967,4,34.6
4,CUSR0000SAF,1967,5,34.6


#### **Tratar Nan Values**

In [41]:
median_value = data_json['index_sa'].median()
data_json['index_sa'] = data_json['index_sa'].fillna(median_value)


#### **Renomear colunas**

Estas colunas são necessárias renomear porque irão dar erro ao efetuar o merge

In [42]:
data_json.rename(columns = {'yr':'year'}, inplace = True)
data_csv_fb.rename(columns = {'value':'valueFoodBeverage'}, inplace = True)
data_csv_ps.rename(columns = {'value':'valuePopSize'}, inplace = True)
data_csv_fb.rename(columns = {'series_id':'idFoodBeverage'}, inplace = True)
data_csv_fb.rename(columns = {'series_id':'idPopSize'}, inplace = True)

## Converter para Parquet

In [43]:
pq1 = data_csv_fb.to_parquet('../parquetFiles/data_fb.parquet')
pq2 = data_csv_ps.to_parquet('../parquetFiles/data_ps.parquet')
pq3 = data_json.to_parquet('../parquetFiles/data_json.parquet')

pd.read_parquet('../parquetFiles/data_fb.parquet')
pd.read_parquet('../parquetFiles/data_ps.parquet')
pd.read_parquet('../parquetFiles/data_json.parquet')

Unnamed: 0,hpi_type,hpi_flavor,frequency,level,place_name,place_id,year,period,index_nsa,index_sa
0,traditional,purchase-only,monthly,USA or Census Division,East North Central Division,DV_ENC,1991,1,100.00,100.00
1,traditional,purchase-only,monthly,USA or Census Division,East North Central Division,DV_ENC,1991,2,100.91,100.96
2,traditional,purchase-only,monthly,USA or Census Division,East North Central Division,DV_ENC,1991,3,101.30,100.91
3,traditional,purchase-only,monthly,USA or Census Division,East North Central Division,DV_ENC,1991,4,101.69,100.98
4,traditional,purchase-only,monthly,USA or Census Division,East North Central Division,DV_ENC,1991,5,102.32,101.36
...,...,...,...,...,...,...,...,...,...,...
121457,developmental,purchase-only,quarterly,Puerto Rico,Puerto Rico,PR,2021,4,185.03,183.17
121458,developmental,purchase-only,quarterly,Puerto Rico,Puerto Rico,PR,2022,1,185.82,190.35
121459,developmental,purchase-only,quarterly,Puerto Rico,Puerto Rico,PR,2022,2,179.30,179.96
121460,developmental,purchase-only,quarterly,Puerto Rico,Puerto Rico,PR,2022,3,190.09,187.85


## Realizar o Merge dos datasets

In [44]:
# from pyspark.sql.functions import *
# from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql import functions as Func
from pyspark.sql.functions import *
from pyspark.sql.functions import expr
from pyspark.sql.types import *
# spark = SparkSession.builder.getOrCreate()

spark = SparkSession.builder \
    .appName("Conexao ao MongoDB Atlas") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:2.4.0") \
    .getOrCreate()

pq1 = spark.read.format("parquet").load('../parquetFiles/data_fb.parquet')
pq2 = spark.read.format("parquet").load('../parquetFiles/data_ps.parquet')
pq3 = spark.read.format("parquet").load('../parquetFiles/data_json.parquet')

df4 = pq1.join(pq2, on=['period', 'year'], how='inner').join(pq3, on=['period', 'year'], how='inner')

df4.head()



Row(period=1, year=1978, idFoodBeverage='CUSR0000SAF', valueFoodBeverage=68.1, series_id='CUURD000SETB01', valuePopSize=51.2, hpi_type='traditional', hpi_flavor='all-transactions', frequency='quarterly', level='USA or Census Division', place_name='United States', place_id='USA', index_nsa=79.58, index_sa=180.51)

#### **Escrever o dataset com o merge efetuado para o formato Parquet**

In [45]:
# pq_merged = df4.write.parquet('../parquetFiles/merged.parquet')

----
## **Conversão para formato Documental**

In [None]:
from pyspark.sql.functions import *
import json

# dataframe é o seu dataframe com as colunas mencionadas
json_data = df4\
    .groupBy('year')\
    .agg(collect_list(struct(*df4.columns)).alias('data')) \
    .select(to_json(struct(col("year"), col("data"))).alias("json_data")).collect()

# O resultado é uma lista de objetos JSON
json_list = [json.loads(row.json_data) for row in json_data]

# Salva o resultado em um arquivo JSON
with open('../parquetFiles2/output.json', 'w') as f:
    json.dump(json_list, f)

# Converte o resultado da consulta para um dicionário Python
# result = spark\
#     .sql("SELECT year, collect_list(struct(quarter, group_name, group_value)) AS data FROM dados GROUP BY year")\
#         .toJSON().map(json.loads).collect()

----
## Armazenar os dados no MongoDB

#### **Teste para enviar dados para o MongoDB (na Organization que o Rodrigo Criou)**

In [None]:


import pandas as pd

data = {'Nome': ['Maria', 'João', 'Pedro', 'Ana'],
        'Idade': [25, 30, 20, 27],
        'Cidade': ['São Paulo', 'Rio de Janeiro', 'Belo Horizonte', 'Curitiba']}

dfTeste = pd.DataFrame(data)

pqFile = dfTeste.to_parquet('../parquetFiles2/teste.parquet')
pqFile = spark.read.format("parquet").load('../parquetFiles2/teste.parquet')
# pqFile = pqFile.write.parquet('../parquetFiles2/teste2.parquet')
pqFile = spark.read.format("parquet").load('../parquetFiles2/teste2.parquet')

pqFile.show()

pqFile.printSchema()

pqFile.write.format("com.mongodb.spark.sql.DefaultSource").option("database", "test").option("collection", "inflation").option("uri", "mongodb+srv://bigDataAdmin:admin@cluster0.of2q4ow.mongodb.net/test").mode("overwrite").save()

In [None]:
from pyspark.sql import SparkSession
import pandas as pd

df4.write.format("com.mongodb.spark.sql.DefaultSource").option("database", "test").option("collection", "inflation").option("uri", "mongodb+srv://bigDataAdmin:admin@bigdatacluster.l1dei5j.mongodb.net/test").mode("overwrite").save()