In [7]:
import pymongo
import pandas as pd

In [15]:
climate_data_csv =pd.read_csv(r"climate_change_data.csv")
climate_data_csv = climate_data_csv.rename(columns={
    'Date': 'date',
    'Location': 'location',
    'Country' : 'country',
    'Temperature': 'temperature',
    'CO2 Emissions': 'co2_emissions',
    'Sea Level Rise': 'sea_level_rise',
    'Precipitation': 'precipitation',
    'Humidity': 'humidity',
    'Wind Speed': 'wind_speed'
})

In [16]:
data = []
for index, row in climate_data_csv.iterrows():
  row_dict = row.to_dict()
  data.append(row_dict)

In [17]:
data = []
for index, row in climate_data_csv.iterrows():
  row_dict = row.to_dict()
  data.append(row_dict)

In [18]:
uri = "http://localhost:27017/"

In [19]:
from pymongo import MongoClient
client = MongoClient(host='localhost', port=27017)

# Connect to the database
db = client['DAP_Project']

# Connect to the collection
collection = db['climate_analysis']

# Insert the data into the collection
collection.insert_many(data)
print("Completed Insertion of data to MongoDB")

# Close the client connection
client.close()

Completed Insertion of data to MongoDB


In [8]:
pip install psycopg2

Note: you may need to restart the kernel to use updated packages.


In [9]:
pip install luigi

Note: you may need to restart the kernel to use updated packages.


ETL

In [37]:
import luigi
import pymongo
import csv
import pandas as pd
import psycopg2
class ExtractTask(luigi.Task):
    
    def output(self):
        return luigi.LocalTarget('data_extracted.csv')
    
    def run(self):
        # Connect to MongoDB
        client = pymongo.MongoClient("mongodb://localhost:27017/")
        db = client["DAP_Project"]
        collection = db["climate_analysis"]
        
        cursor = collection.find({})
        data = list(cursor)
    
        df = pd.DataFrame(data)
        df.to_csv(self.output().path, index=False)

class TransformTask(luigi.Task):
    
    def requires(self):
        return ExtractTask() # Assuming ExtractTask is defined to extract data from MongoDB
    
    def input(self):
        return luigi.LocalTarget("data_extracted.csv")
    
    def output(self):
        return luigi.LocalTarget('data_transformed.csv')
    
    def remove_outliers(self, df):
        cols = ['temperature', 'co2_emissions']

        for col in cols:
            quartiles = df[col].quantile([0.20, 0.80])
            q1 = quartiles.loc[0.20]
            q3 = quartiles.loc[0.80]

            low_bound = q1 - 1.5 * (q3 - q1)
            upp_bound = q3 + 1.5 * (q3 - q1)

            df = df[(df[col] >= low_bound) & (df[col] <= upp_bound)]

        return df
    
    def run(self):
        # Load the extracted data from the previous task
        df = pd.read_csv(self.input().path)
        
        # Data cleaning steps
        df = self.remove_outliers(df)
        # Save the cleaned data to a new CSV file
        df.to_csv(self.output().path, index=False)

class LoadTask(luigi.Task):

    # Parameters for the task 
    target_connection_string = luigi.Parameter(default="postgresql:http://localhost:5433/")  # PostgreSQL connection string
    target_connection_string = "dbname=postgres user=postgres password=Melin@123 host=localhost port=5433"

    target_table = luigi.Parameter(default="climate_analysis")  # Target table name

    def requires(self):
        return TransformTask()  # This task depends on the TransformTask

    def input(self):
        return luigi.LocalTarget("data_transformed.csv")

    def run(self):
        # Connect to PostgreSQL
        connection = psycopg2.connect(self.target_connection_string)
        cursor = connection.cursor()

        # Read the transformed data from the CSV file
        df = pd.read_csv(self.input().path)

        # Create the target table
        create_table_query = f"""CREATE TABLE IF NOT EXISTS {self.target_table} (
            row_id NUMERIC  , 
            id VARCHAR(255) NOT NULL PRIMARY KEY ,
            date VARCHAR(255) NOT NULL,
            country VARCHAR(255) NOT NULL,
            location VARCHAR(255) NOT NULL,
            temperature NUMERIC NOT NULL,
            co2_emission NUMERIC NOT NULL,
            sea_level_rise NUMERIC NOT NULL,
            precipitation NUMERIC NOT NULL,
            humidity NUMERIC NOT NULL,
            windspeed NUMERIC NOT NULL
        );"""
        cursor.execute(create_table_query)

        # Load data from the DataFrame to the target table
        for index, row in df.iterrows():
            #insert_query
            sql = """
                INSERT INTO climate_analysis ( id,date,location,country,temperature,co2_emission,sea_level_rise,precipitation,humidity,windspeed)
                VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
            """
            values = tuple(row) 
            cursor.execute(sql, values)

        connection.commit()
        connection.close()

        print(f"Loaded data from {self.input().path} to PostgreSQL table {self.target_table}")
        
if __name__ == "__main__":
    luigi.build([LoadTask()], local_scheduler=True)  

DEBUG: Checking if LoadTask(target_table=climate_analysis) is complete
  is_complete = task.complete()
DEBUG: Checking if TransformTask() is complete
INFO: Informed scheduler that task   LoadTask_climate_analysis_d0ea38b10e   has status   PENDING
INFO: Informed scheduler that task   TransformTask__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 8884] Worker Worker(salt=3507287736, workers=1, host=LAPTOP-CLHSFIUK, username=melin, pid=8884) running   LoadTask(target_table=climate_analysis)
INFO: [pid 8884] Worker Worker(salt=3507287736, workers=1, host=LAPTOP-CLHSFIUK, username=melin, pid=8884) done      LoadTask(target_table=climate_analysis)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   LoadTask_climate_analysis_d0ea38b10e   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks t

Loaded data from data_transformed.csv to PostgreSQL table climate_analysis
