### Installing Luigi

In [1]:
#pip install luigi

### Importing Libraries

In [2]:
import numpy as np
import sys
import pandas as pd
import matplotlib as mp
import matplotlib.pyplot as plt
import seaborn as sns
import csv
import mysql.connector as mysql
from sqlalchemy import create_engine
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
import luigi
import json
import logging

### Define the connection parameters for MySQL and MongoDB

In [3]:
MYSQL_HOST = 'localhost'
MYSQL_USER = 'root'
MYSQL_PASSWORD = 'sana123'
MYSQL_DB = 'montgomery'

In [4]:
mongo_uri = "mongodb+srv://x22237941:sana123@montgomerycluster.tzxvtsd.mongodb.net/?retryWrites=true&w=majority&appName=montgomerycluster"

### Importing API librariers

In [5]:
import os
from sodapy import Socrata

### Define the task to extract data from the Socrata API

In [6]:
class ExtractSocrataDataJSON(luigi.Task):
    def output(self):
        return luigi.LocalTarget("incidents.json")
    
    def run(self):
        socrata_domain = 'data.montgomerycountymd.gov'
        socrata_dataset_identifier_incidents = 'bhju-22kf'
        socrata_token = os.environ.get("SODAPY_APPTOKEN")
        client = Socrata(socrata_domain, socrata_token)
        results = client.get(socrata_dataset_identifier_incidents)
        df = pd.DataFrame.from_dict(results)
        incidents_data = df.to_json(orient='records')
        with self.output().open('w') as f:
            f.write(incidents_data)

In [7]:
class ExtractSocrataDataCSV(luigi.Task):
    def output(self):
        return luigi.LocalTarget("incidents.csv")  # Output CSV file
    
    def run(self):
        print("-----------------------------------------------------------------------------")
        print("Currently ExtractSocrataDataCSV is in progress")
        print("-----------------------------------------------------------------------------")
        socrata_domain = 'data.montgomerycountymd.gov'
        socrata_dataset_identifier_incidents = 'bhju-22kf'
        socrata_token = os.environ.get("SODAPY_APPTOKEN")
        client = Socrata(socrata_domain, socrata_token)
        results = client.get(socrata_dataset_identifier_incidents)
        df = pd.DataFrame.from_dict(results)
        #df.head()
        # Use the filter method to select columns that don't start with ":@"
        filtered_columns = df.filter(regex="^:@", axis=1)
        columns_to_drop = ['latitude', 'longitude','geolocation']
        # Drop the selected columns
        df.drop(filtered_columns, axis=1, inplace=True)
        df.drop(columns_to_drop, axis=1, inplace=True)
        df.to_csv(self.output().path, index=False)  # Save data to CSV file
        print("-----------------------------------------------------------------------------")
        print("ExtractSocrataDataCSV Finished Successfully")
        print("-----------------------------------------------------------------------------")

### Define the task to load data into MySQL

In [8]:
class LoadMySQLData(luigi.Task):
    def requires(self):
        return ExtractSocrataDataCSV()

    def output(self):
        return luigi.LocalTarget("sql_frame.csv")  # Output file
        
    def run(self):
            # Define the MySQL connection parameters
            host = 'localhost'
            user = 'root'
            password = 'sana123'
            database = 'montgomery2'
        
            # Define the SQL queries
            create_database_query = f"CREATE DATABASE IF NOT EXISTS {database}"
            use_database_query = f"USE {database}"
            create_table_query = '''CREATE TABLE IF NOT EXISTS incidents (
                                report_number TEXT,
                                local_case_number TEXT,
                                agency_name TEXT,
                                acrs_report_type TEXT,
                                crash_date_time TEXT,
                                hit_run TEXT,
                                lane_number TEXT,
                                number_of_lanes TEXT,
                                non_traffic TEXT,
                                off_road_description TEXT,
                                at_fault TEXT,
                                collision_type TEXT,
                                weather TEXT,
                                light TEXT,
                                traffic_control TEXT,
                                driver_substance_abuse TEXT,
                                first_harmful_event TEXT,
                                second_harmful_event TEXT,
                                fixed_object_struck TEXT,
                                route_type TEXT,
                                mile_point TEXT,
                                mile_point_direction TEXT,
                                lane_direction TEXT,
                                direction TEXT,
                                distance TEXT,
                                distance_unit TEXT,
                                road_grade TEXT,
                                road_name TEXT,
                                cross_street_type TEXT,
                                cross_street_name TEXT,
                                municipality TEXT,
                                surface_condition TEXT,
                                junction TEXT,
                                intersection_type TEXT,
                                intersection_area TEXT,
                                road_alignment TEXT,
                                road_condition TEXT,
                                road_division TEXT,
                                related_non_motorist TEXT,
                                non_motorist_substance_abuse TEXT,
                                lane_type TEXT
                                )'''
            show_table_query = "SHOW TABLES"
            drop_columns_query = '''ALTER TABLE incidents
                                DROP COLUMN latitude,
                                DROP COLUMN longitude,
                                DROP COLUMN location'''
            insert_data_query = '''INSERT INTO incidents (report_number, local_case_number, agency_name, acrs_report_type, crash_date_time, hit_run, lane_number, number_of_lanes, non_traffic, off_road_description, at_fault,collision_type, weather, light, traffic_control, driver_substance_abuse, first_harmful_event, second_harmful_event, fixed_object_struck, route_type, mile_point, mile_point_direction, lane_direction, direction, distance, distance_unit, road_grade, road_name, cross_street_type, cross_street_name, municipality, surface_condition, junction, intersection_type, intersection_area, road_alignment, road_condition, road_division,related_non_motorist,non_motorist_substance_abuse,lane_type) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'''

            print("-----------------------------------------------------------------------------")
            print("Currently LoadMySQLData is in progress")
            print("-----------------------------------------------------------------------------")
        
            # Connect to MySQL database
            conn = mysql.connect(host=host, user=user, password=password)
            cursor = conn.cursor()

            # Create database if it does not exist
            cursor.execute(create_database_query)

            print("-----------------------------------------------------------------------------")
            print("Database created successfully")
            print("-----------------------------------------------------------------------------")
        
            # Use the specified database
            cursor.execute(use_database_query)

            df = pd.read_csv(self.input().path)
            #read csv file and create a table structure
            #df = pd.read_csv('incidents.csv')
            print(df.dtypes)
            print("-----------------------------------------------------------------------------")

            # Set default data type for columns
            default_dtype = 'TEXT'
            # Create column definitions for SQL table
            columns = [f"{col} {default_dtype}" for col in df.columns]
            table_name = 'incidents'
            create_table_sql = f"CREATE TABLE {table_name} ({', '.join(columns)});"

            # Create table if it does not exist
            cursor.execute(create_table_sql)

            # Show tables in the database
            cursor.execute(show_table_query)
            tables = cursor.fetchall()
            for table in tables:
                print(table[0])
            print("-----------------------------------------------------------------------------")

            # Show columns in the table
            cursor.execute("DESCRIBE incidents")
            columns = cursor.fetchall()
            for column in columns:
                print(column[0], "-", column[1])
            print("-----------------------------------------------------------------------------")

            #Construct the INSERT statement
            table_name = 'incidents'
            columns = ', '.join(df.columns)
            placeholders = ', '.join(['%s'] * len(df.columns))
            #insert_statement = '''INSERT INTO incidents ({columns}) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'''
            insert_statement =f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"

            # Insert data into the table
            df_filled = df.fillna("Missing")
            df_filled.astype(str)
            inserted_records_count = 0
            for index, row in df_filled.iterrows():
                #cursor.execute(insert_data_query, tuple(row))
                cursor.execute(insert_statement, tuple(row))
                inserted_records_count += 1
            conn.commit()
        
            #print("Number of records inserted into the incidents table:", inserted_records_count)
        
            print("-----------------------------------------------------------------------------")
            print("Records inserted successfully")
            print("-----------------------------------------------------------------------------")

            #Check the count of inserted data
            cursor.execute("SELECT COUNT(*) FROM incidents")
            count = cursor.fetchone()[0]
            print("Number of records in 'incidents' table:", count)

            cursor.close()
            #conn.close()
        
            print("-----------------------------------------------------------------------------")
            print("LoadMySQLData finished successfully")
            print("-----------------------------------------------------------------------------")

            #Define the SQL query to read data
            read_data_query = "SELECT * FROM montgomery2.incidents"

            # Connect to MySQL database
            #conn = mysql.connect(host=host, user=user, password=password, database=database)
        
            #Read data from MySQL into a DataFrame
            sql_frame = pd.read_sql(read_data_query, conn)
            sql_frame.to_csv(self.output().path, index=False)
            conn.close()
            #return sql_frame

### Define the transformation task using Pandas

In [9]:
class TransformData(luigi.Task):
    def requires(self):
        return LoadMySQLData()
    
    def output(self):
        return luigi.LocalTarget("transformed_data.json")  # Output file
    
    def run(self):
        print("-----------------------------------------------------------------------------")
        print("TransformData is in progress and reading data from MySQL")
        print("-----------------------------------------------------------------------------")

        # #Define the MySQL connection parameters
        # host = 'localhost'
        # user = 'root'
        # password = 'sana123'
        # database = 'montgomery2'

        # # Define the SQL query to read data
        # read_data_query = "SELECT * FROM montgomery2.incidents"

        # # Connect to MySQL database
        # conn = mysql.connect(host=host, user=user, password=password, database=database)
        
        # # Read data from MySQL into a DataFrame
        # sql_frame = pd.read_sql(read_data_query, conn)
        # #sql_frame=self.input().run()
        #df = pd.read_csv(self.input().path)
        sql_frame=pd.read_csv(self.input().path)

        print("-----------------------------------------------------------------------------")
        print("Get the total number of records")
        print("-----------------------------------------------------------------------------")
        
        num_rows, num_columns = sql_frame.shape
        print("Number of rows:", num_rows)
        print("Number of columns:", num_columns)

        print("-----------------------------------------------------------------------------")
        print("Checking for Duplicate records")
        print("-----------------------------------------------------------------------------")
        
        # Checking for duplicate records
        duplicate_rows = sql_frame.duplicated()
        print("Number of duplicate rows:", duplicate_rows.sum())

        print("-----------------------------------------------------------------------------")
        print("Checking for 'Missing' Values")
        print("-----------------------------------------------------------------------------")
        
        # Check where "Missing" values occur in each column
        missing_mask = sql_frame.eq('Missing')
        # Check which columns have at least one "Missing" value
        columns_with_missing = missing_mask.any()
        # Extract the column names where "Missing" values occur
        columns_with_missing_values = columns_with_missing[columns_with_missing].index.tolist()
        # Print the columns with "Missing" values
        print("Columns with 'Missing' values:", columns_with_missing_values)

        # Iterate through columns with missing values
        for column in columns_with_missing_values:
            # Get unique values and their counts
            unique_values_counts = sql_frame[column].value_counts() 
            # Print column name
            print("Column:", column)
            # Print unique values and their counts
            print(unique_values_counts)
            print("-----------------------------------------------------------------------------")
        

        # Drop specified columns
        columns_to_drop = ['off_road_description', 'first_harmful_event', 'related_non_motorist','second_harmful_event', 'fixed_oject_struck', 'cross_street_name','non_motorist_substance_abuse','lane_type','mile_point_direction', 'intersection_area', 'road_division']
        sql_frame.drop(columns=columns_to_drop, inplace=True)

        # Print the remaining columns
        print("Remaining columns after dropping:")
        print(sql_frame.columns.tolist())
        print("--------------------------------------------------")

        #Transformations
        
        # Save the transformed DataFrame to JSON
        #filename = "transformed_data.json"
        transformed_frame=sql_frame
        transformed_frame.to_json(self.output().path, orient='records')

        # Close MySQL connection
        #conn.close()

### Define the MongoDB task to load data

In [10]:
class LoadMongoDBData(luigi.Task):
    def requires(self):
        return TransformData()
    
    def run(self):
        # MongoDB connection URI
        uri = "mongodb+srv://x22237941:sana123@montgomerycluster.tzxvtsd.mongodb.net/?retryWrites=true&w=majority&appName=montgomerycluster"

        # Create a new client and connect to the server
        client = MongoClient(uri, server_api=ServerApi('1'))

        try:
            # Ping the MongoDB deployment
            client.montgomery.command('ping')
            print("Pinged the MongoDB deployment. Successfully connected to MongoDB!")
        except Exception as e:
            print("Error:", e)

        try:
            # Check server status
            server_status = client.montgomery.command('serverStatus')
            print("Server is up and running.")
        except Exception as e:
            print("Error:", e)

        # List databases
        databases = client.list_database_names()
        print("Databases:")
        for db_obj in databases:
            print(db_obj)

        # Select database and collection
        database_name = "montgomery"
        db = client[database_name]

        # List collections in the selected database
        collections = db.list_collection_names()
        print("\nCollections in", database_name, ":")
        for col in collections:
            print(col)

        # Load JSON file
        #filename = "transformed_data.json"
        with open(self.input().path, 'r') as file:
            data = json.load(file)

        # Insert documents into collection
        collection_name = 'incidents'  # assuming collection name is 'incidents'
        collection = db[collection_name]
        collection.insert_many(data)
        print("JSON data successfully loaded into MongoDB collection 'incidents' in database 'montgomery'.")

        # Get the total number of documents in the collection
        total_records = collection.count_documents({})
        print("Total number of records in the collection:", total_records)


### Define the task to fetch data from MongoDB into a DataFrame

In [11]:
class FetchMongoDBData(luigi.Task):
    # def requires(self):
    #     return LoadMongoDBData()
        
    def output(self):
        return luigi.LocalTarget("fetched_data.csv")  # Output file
    
    def run(self):
        # MongoDB connection URI
        uri = "mongodb+srv://x22237941:sana123@montgomerycluster.tzxvtsd.mongodb.net/?retryWrites=true&w=majority&appName=montgomerycluster"
        # Create a new client and connect to the server
        client = MongoClient(uri, server_api=ServerApi('1'))
        db = client['montgomery']
        collection = db['incidents']
        
        # Query all documents from the collection
        data = list(collection.find())
        
        # Convert data to DataFrame
        df = pd.DataFrame(data)
        df.to_csv(self.output().path, index=False)

### Define the main task to run the ETL pipeline

In [12]:
# Define the main task to run the ETL pipeline
class ETLPipeline(luigi.Task):
    def requires(self):
        return FetchMongoDBData()

In [15]:
if __name__ == '__main__':
    # Configure logging
    logging.basicConfig(filename='etl_pipeline.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

    # Redirect stdout and stderr to the log file
    sys.stdout = open('etl_pipeline.log', 'a')
    sys.stderr = open('etl_pipeline.log', 'a')

    # Run the Luigi build process
    try:
        luigi.build([ETLPipeline()], local_scheduler=True)
        logging.info("ETL pipeline executed successfully.")
    except Exception as e:
        logging.error(f"Error in ETL pipeline execution: {e}")

DEBUG: Checking if ETLPipeline() is complete
DEBUG: Checking if FetchMongoDBData() is complete
INFO: Informed scheduler that task   ETLPipeline__99914b932b   has status   PENDING
DEBUG: Checking if LoadMongoDBData() is complete
INFO: Informed scheduler that task   FetchMongoDBData__99914b932b   has status   PENDING
DEBUG: Checking if TransformData() is complete
INFO: Informed scheduler that task   LoadMongoDBData__99914b932b   has status   PENDING
INFO: Informed scheduler that task   TransformData__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 34396] Worker Worker(salt=7650770777, workers=1, host=LAPTOP-JCBDQEO9, username=SANA JALGAONKAR, pid=34396) running   LoadMongoDBData()
INFO: [pid 34396] Worker Worker(salt=7650770777, workers=1, host=LAPTOP-JCBDQEO9, username=SANA JALGAONKAR, pid=34396) done      LoadMongoDBData()
DEBUG: 1 running tasks, waiting for next task

In [13]:
# if __name__ == '__main__':
#     luigi.build([ETLPipeline()], local_scheduler=True)

DEBUG: Checking if ETLPipeline() is complete
  is_complete = task.complete()
DEBUG: Checking if FetchMongoDBData() is complete
INFO: Informed scheduler that task   ETLPipeline__99914b932b   has status   PENDING
DEBUG: Checking if LoadMongoDBData() is complete
  is_complete = task.complete()
INFO: Informed scheduler that task   FetchMongoDBData__99914b932b   has status   PENDING
DEBUG: Checking if TransformData() is complete
INFO: Informed scheduler that task   LoadMongoDBData__99914b932b   has status   PENDING
INFO: Informed scheduler that task   TransformData__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 34396] Worker Worker(salt=715081680, workers=1, host=LAPTOP-JCBDQEO9, username=SANA JALGAONKAR, pid=34396) running   LoadMongoDBData()


Pinged the MongoDB deployment. Successfully connected to MongoDB!
Server is up and running.
Databases:
montgomery
montgomery_reverse
admin
local

Collections in montgomery :
drivers
non_motorists
non_motorists_dataset
incidents


INFO: [pid 34396] Worker Worker(salt=715081680, workers=1, host=LAPTOP-JCBDQEO9, username=SANA JALGAONKAR, pid=34396) done      LoadMongoDBData()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   LoadMongoDBData__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 34396] Worker Worker(salt=715081680, workers=1, host=LAPTOP-JCBDQEO9, username=SANA JALGAONKAR, pid=34396) running   FetchMongoDBData()
ERROR: [pid 34396] Worker Worker(salt=715081680, workers=1, host=LAPTOP-JCBDQEO9, username=SANA JALGAONKAR, pid=34396) failed    FetchMongoDBData()
Traceback (most recent call last):
  File "C:\Users\SANA JALGAONKAR\AppData\Local\Programs\Python\Python312\Lib\site-packages\luigi\worker.py", line 195, in run
    raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
RuntimeError: Unfulfilled dependency at run time: LoadMongoDBData__99914b932b


JSON data successfully loaded into MongoDB collection 'incidents' in database 'montgomery'.
Total number of records in the collection: 3000


DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   FetchMongoDBData__99914b932b   has status   FAILED
DEBUG: Checking if FetchMongoDBData() is complete
DEBUG: Checking if LoadMongoDBData() is complete
INFO: Informed scheduler that task   FetchMongoDBData__99914b932b   has status   PENDING
DEBUG: Checking if TransformData() is complete
INFO: Informed scheduler that task   LoadMongoDBData__99914b932b   has status   PENDING
INFO: Informed scheduler that task   TransformData__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 34396] Worker Worker(salt=715081680, workers=1, host=LAPTOP-JCBDQEO9, username=SANA JALGAONKAR, pid=34396) running   FetchMongoDBData()
ERROR: [pid 34396] Worker Worker(salt=715081680, workers=1, host=LAPTOP-JCBDQEO9, username=SANA JALGAONKAR, pid=34396) failed    FetchMongoDBData()
Traceback (most recent call last):
  File "C:\Users\SANA JALGAONKAR\AppData\Local\Programs\Pyth

In [14]:
# analyis_frame=pd.read_csv("fetched_data.csv")