# **Big Data Analysis**

In [51]:
# Load libs:
#!pip install pandas pyspark pymongo

### Data Preprocessing

In this section, data preprocessing will be performed. Specifically, we will **merge the data** from different datasets, *aligning them* by the columns **"Entity"** (equivalent to country name), **"Code"** (equivalent to the country's IUPAC identifier code), and **"Year"** (corresponding to the year the data was collected).

We will also *trim* the data based on the limiting indicator, in this case, the year of available data, with the **period from 1990 to 2016** being the common period across all datasets.

For comparison purposes, we will merge the datasets using the Pandas and PySpark packages to compare the efficiency of both in different contexts.


### Using Pandas

In [52]:
# Importamos o módulo time para efeitos de comparação de tempos de execução
import time

# Carregamento dos datasets
import pandas as pd

pd_start = time.time() # inicio do contador pandas

In [53]:
# Dataset on world population
pop = pd.read_csv('../datasets/raw/population.csv')
pop.rename(columns={'Entity': 'Country'}, inplace = True)
pop = pop[pop['Year'] >= 1990]
pop = pop[pop['Year'] <= 2016]
pop.head()

Unnamed: 0,Country,Code,Year,Population (historical estimates)
227,Afghanistan,AFG,1990,10694804
228,Afghanistan,AFG,1991,10745168
229,Afghanistan,AFG,1992,12057436
230,Afghanistan,AFG,1993,14003764
231,Afghanistan,AFG,1994,15455560


In [54]:
# Dataset on obesity
obes = pd.read_csv('../datasets/raw/share-of-adults-defined-as-obese.csv')
obes.rename(columns={'Entity': 'Country'}, inplace = True)
obes = obes[obes['Year'] >= 1990]
obes = obes[obes['Year'] <= 2016] 
obes.head()

Unnamed: 0,Country,Code,Year,"Indicator:Prevalence of obesity among adults, BMI &GreaterEqual; 30 (crude estimate) (%) - Sex:Both sexes"
15,Afghanistan,AFG,1990,1.0
16,Afghanistan,AFG,1991,1.1
17,Afghanistan,AFG,1992,1.2
18,Afghanistan,AFG,1993,1.2
19,Afghanistan,AFG,1994,1.3


In [55]:
# Dataset on mental disorders prevalence
mental = pd.read_csv('../datasets/raw/mental-illnesses-prevalence.csv')
mental.rename(columns={'Entity': 'Country'}, inplace = True)
mental = mental[mental['Year'] >= 1990]
mental = mental[mental['Year'] <= 2016] 
mental.head()

Unnamed: 0,Country,Code,Year,Schizophrenia disorders (share of population) - Sex: Both - Age: Age-standardized,Depressive disorders (share of population) - Sex: Both - Age: Age-standardized,Anxiety disorders (share of population) - Sex: Both - Age: Age-standardized,Bipolar disorders (share of population) - Sex: Both - Age: Age-standardized,Eating disorders (share of population) - Sex: Both - Age: Age-standardized
0,Afghanistan,AFG,1990,0.223206,4.996118,4.713314,0.703023,0.1277
1,Afghanistan,AFG,1991,0.222454,4.98929,4.7021,0.702069,0.123256
2,Afghanistan,AFG,1992,0.221751,4.981346,4.683743,0.700792,0.118844
3,Afghanistan,AFG,1993,0.220987,4.976958,4.673549,0.700087,0.115089
4,Afghanistan,AFG,1994,0.220183,4.977782,4.67081,0.699898,0.111815


In [56]:
# Mergint the dataframes
dataframes = [pop, obes, mental]

fused = dataframes[0]

for dataframe in dataframes[1:]:
    try:
        fused = pd.merge(
            fused,
            dataframe,
            on = ['Country', 'Year', 'Code'],
            how = 'inner'
        )
    except KeyError:
        fused = pd.merge(
            fused,
            dataframe,
            on = ['Country', 'Year'],
            how = 'outer'
        )

# Exporting to CSV
fused.to_csv('../datasets/processed/pd_processed_data.csv')

In [57]:
pd_end = time.time()
pd_elapsed = pd_end - pd_start # tempo de execução do contador pandas
print('Pandas took',pd_elapsed,'seconds to process data.')

Pandas took 0.10265493392944336 seconds to process data.


### Using Spark

In [58]:
spark_start = time.time()

In [59]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Pandas to PySpark") \
    .getOrCreate()

# Load datasets
pop = spark.read.csv('../datasets/raw/population.csv', header=True)
obes = spark.read.csv('../datasets/raw/share-of-adults-defined-as-obese.csv', header=True)
mental = spark.read.csv('../datasets/raw/mental-illnesses-prevalence.csv', header=True)

# Rename columns
pop = pop.withColumnRenamed('Entity', 'Country')
obes = obes.withColumnRenamed('Entity', 'Country')
mental = mental.withColumnRenamed('Entity', 'Country')

# Merge datasets
fused = pop.join(obes, ['Country', 'Year', 'Code'], 'inner') \
           .join(mental, ['Country', 'Year', 'Code'], 'inner')

# Drop rows with null values
fused = fused.dropna()

# Export to CSV
fused.coalesce(1).write.mode("overwrite").option("header", "true").csv('../datasets/processed/spark_processed_data')

# Stop SparkSession
spark.stop()


In [60]:
# Comparamos o tempo de computação para cada um dos modelos
spark_end = time.time()
spark_elapsed = spark_end - spark_start
print(f'Pandas took {round(pd_elapsed, 3)} seconds.',
      f'Spark took {round(spark_elapsed, 3)} seconds.',
      sep = '\n')

Pandas took 0.103 seconds.
Spark took 0.542 seconds.


### Obs.:
Pandas completed the pre-processing task in approximately 0.15 seconds, whereas Spark took around 6.5 seconds to accomplish the same task. This significant difference in processing time underscores the efficiency of Pandas for smaller datasets, where its lightweight nature and streamlined processes result in faster execution.

However, it is essential to recognize that Spark's strength lies in its ability to handle larger-scale datasets efficiently. Despite the longer processing time observed in our experiment, Spark has demonstrated superior performance in processing datasets with millions of rows, as reported by our colleagues.

Therefore, while Spark may not be the optimal choice for every pre-processing task, particularly for smaller datasets, its capabilities shine when dealing with large-scale data operations. The selection of pre-processing tools should be tailored to the specific requirements and characteristics of the dataset, ensuring optimal performance and efficiency.

## Loading data to database (MongoDB)

In [None]:
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi

uri = 'YOUR URL'

# Create a new client and connect to the server
client = MongoClient(uri, server_api=ServerApi('YOUR API')) # replace YOUR API with your MongoDB API

# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

# Access database
db = client.get_database("BigData")

# Access/create collection
collection = db.get_collection("ObesPovMen")
collection

# Read CSV file using pandas
csv_file = "../datasets/processed/pd_processed_data.csv"
data = pd.read_csv(csv_file)
data.head()

In [None]:
# Convert DataFrame to dictionary
data_dict = data.to_dict(orient='records')
print(data_dict)

In [None]:
# Insert data into MongoDB collection
collection.insert_many(data_dict)

# Close connection
client.close()