# PL04 - Data Integration and Processing I

In [None]:
import pandas as pd
import time

In [None]:
energy_consumption = pd.read_csv('DataSets/change-energy-consumption.csv')
renewable_energy = pd.read_csv('DataSets/modern-renewable-prod.csv')
per_capita_energy_use = pd.read_csv('DataSets/per-capita-energy-use.csv')
primary_energy = pd.read_csv('DataSets/primary-energy-cons.csv')

In [None]:
energy_consumption.head()

### **Energy Data Analysis**

This analysis merges multiple datasets related to energy consumption, primary energy, renewable energy, and per capita energy use.


In [None]:
energy_analysis1 = pd.merge(energy_consumption,primary_energy, on=['Entity','Code','Year'], how='left')
energy_analysis2 = pd.merge(energy_analysis1,renewable_energy, on=['Entity','Code','Year'], how='left')
energy_analysis = pd.merge(energy_analysis2, per_capita_energy_use, on=['Entity','Code','Year'], how='left')

In [None]:
energy_analysis

### Dropping rows with missing Code values

Ensures all entries have a valid country or region code.

Rows without a Code correspond to continents rather than individual countries.

Removing these rows prevents aggregation errors and ensures consistency in country-level analysis.

The code 'OWID_WRL' represents a global entity that aggregates data from all countries. Since the analysis focuses on individual countries, excluding this row ensures that the dataset only includes country-specific data, preventing distortions in statistical calculations.

In [None]:
energy_analysis = energy_analysis.dropna(subset=['Code'])

In [None]:
energy_analysis = energy_analysis[energy_analysis['Code'] != 'OWID_WRL']

### Common years for all the countries

In [None]:
energy_analysis.groupby("Entity")["Year"].min()

In [None]:
energy_analysis.groupby("Entity")["Year"].max()

### Data Filtering for Consistency Across Countries

Yugoslavia no longer exists since 1991, so it only has data up to that year. We’ll remove this country from the dataset.

Most countries have data available up to 2021, so we will discard those that have data extending into 2022 and 2023.

Additionally, most countries have data starting from 1981, so we will eliminate any records prior to this year.

In [None]:
energy_analysis = energy_analysis[energy_analysis["Entity"] != "Yugoslavia"]
energy_analysis = energy_analysis[energy_analysis["Year"] <= 2021]
energy_analysis = energy_analysis[energy_analysis["Year"] >= 1981]

In [None]:
# Checking for missing values to identify columns with incomplete data

energy_analysis.isna().sum()

### Handling Missing Values

The missing values correspond to renewable energy data for the years 1981 to 1999. This likely indicates that renewable energy sources were not widely used or reported during this period for some countries.
To ensure consistency in the dataset and avoid misinterpretation of missing values, we will replace these NA values with 0, assuming that renewable energy usage was negligible or nonexistent at that time.

In [None]:
energy_analysis.fillna(0, inplace=True)

In [None]:
# Exporting our new DataSet into a csv type file

energy_analysis.to_csv('Energy_Analysis.csv', index=False)

Deteção de outliers

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 5))
sns.boxplot(data=energy_analysis.select_dtypes(include=['float64', 'int64']))
plt.xticks(rotation=90)
plt.show()

**Medição de Tempo**

In [None]:
!pip install seaborn

In [None]:
import time
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd

def algoritmo_python():
    
    energy_consumption = pd.read_csv('DataSets/change-energy-consumption.csv')
    renewable_energy = pd.read_csv('DataSets/modern-renewable-prod.csv')
    per_capita_energy_use = pd.read_csv('DataSets/per-capita-energy-use.csv')
    primary_energy = pd.read_csv('DataSets/primary-energy-cons.csv')
    energy_consumption.head()
    energy_analysis1 = pd.merge(energy_consumption,primary_energy, on=['Entity','Code','Year'], how='left')
    energy_analysis2 = pd.merge(energy_analysis1,renewable_energy, on=['Entity','Code','Year'], how='left')
    energy_analysis = pd.merge(energy_analysis2, per_capita_energy_use, on=['Entity','Code','Year'], how='left')
    energy_analysis = energy_analysis.dropna(subset=['Code'])
    energy_analysis = energy_analysis[energy_analysis['Code'] != 'OWID_WRL']
    energy_analysis.groupby("Entity")["Year"].min()
    energy_analysis.groupby("Entity")["Year"].max()
    energy_analysis = energy_analysis[energy_analysis["Entity"] != "Yugoslavia"]
    energy_analysis = energy_analysis[energy_analysis["Year"] <= 2021]
    energy_analysis = energy_analysis[energy_analysis["Year"] >= 1981]
    energy_analysis.isna().sum()
    energy_analysis.fillna(0, inplace=True)
    energy_analysis.to_csv('Energy_Analysis.csv', index=False)
    plt.figure(figsize=(10, 5))
    sns.boxplot(data=energy_analysis.select_dtypes(include=['float64', 'int64']))
    plt.xticks(rotation=90)
    plt.show()
  
    soma = sum(range(10**6))  # Exemplo de operação
    return soma

inicio = time.time()
resultado_python = algoritmo_python()
fim = time.time()

tempo_python = fim - inicio
print(f"Tempo de execução em Python: {tempo_python:.4f} segundos")

In [None]:
from pyspark.sql import SparkSession
import time

spark = SparkSession.builder.appName("ComparacaoPythonPySpark").getOrCreate()

def algoritmo_pyspark():
    rdd = spark.sparkContext.parallelize(range(10**6))
    return rdd.sum()

inicio = time.time()
resultado_pyspark = algoritmo_pyspark()
fim = time.time()

tempo_pyspark = fim - inicio
print(f"Tempo de execução em PySpark: {tempo_pyspark:.4f} segundos")

spark.stop()


Comparação de Resultados

In [None]:
print(f"Python foi {tempo_python / tempo_pyspark:.2f} vezes mais rápido que PySpark" if tempo_python < tempo_pyspark else 
      f"PySpark foi {tempo_pyspark / tempo_python:.2f} vezes mais rápido que Python")


Load and insert the data from a CSV file into a PostgreSQL database, we use **Pandas** for data handling and **SQLAlchemy** to manage the connection to the database.


In [None]:
from sqlalchemy import create_engine
import pandas as pd

df = pd.read_csv("Energy_Analysis.csv")

# Create engine
engine = create_engine("postgresql+psycopg2://postgres:1234@localhost:5432/postgres")

# Use connection from engine instead of passing engine directly
with engine.begin() as conn:
    df.to_sql("energy_analysis", con=conn, if_exists="replace", index=False)


Verify that the energy_analysis table was created successfully and that it contains the expected data in the PostgreSQL database.

In [None]:
import pandas as pd
from sqlalchemy import create_engine

engine = create_engine("postgresql+psycopg2://postgres:1234@localhost:5432/postgres")

df_check = pd.read_sql("SELECT * FROM energy_analysis LIMIT 5", con=engine)
print(df_check)


        Entity Code  Year  Annual change in primary energy consumption (%)  \
0  Afghanistan  AFG  1981                                        12.663031   
1  Afghanistan  AFG  1982                                         6.505477   
2  Afghanistan  AFG  1983                                        22.333790   
3  Afghanistan  AFG  1984                                         0.462401   
4  Afghanistan  AFG  1985                                        -2.365375   

   Primary energy consumption (TWh)  Electricity from wind - TWh  \
0                          8.777320                          0.0   
1                          9.348327                          0.0   
2                         11.436162                          0.0   
3                         11.489043                          0.0   
4                         11.217284                          0.0   

   Electricity from hydro - TWh  Electricity from solar - TWh  \
0                           0.0                          