In [17]:
import psycopg2
from zipfile import ZipFile
import pandas as pd
import os
from io import StringIO
import numpy as np
import shutil

zipped_folder=os.path.join('C:\\Users\\Munira\\Desktop\\demo-pipeline','zipped_raw_data')

unzipped_folder=os.path.join('C:\\Users\\Munira\\Desktop\\demo-pipeline','raw_data')

def connect_to_db(db_name,password):
    conn = psycopg2.connect(f"dbname={db_name} user=dataengineer host='mydb.cb7sygkkbh0u.us-east-2.rds.amazonaws.com' port=5432 password={password}")
    cur = conn.cursor()
    return conn,cur

def extract_and_unzip_files(zipped_folder, unzipped_folder):
    for zip_filename in os.listdir(zipped_folder):
        z=ZipFile(os.path.join(zipped_folder,zip_filename))
        z.extractall(unzipped_folder)



In [25]:
def move_files_to_new_subfolder(new_folder_name,file_identifier,original_file_location):
    try:
        # Create subfolder
        os.mkdir(os.path.join(unzipped_folder,new_folder_name))
        # Get all files with file_identifier as part of their filenames
        files_list=[x  for x in os.listdir(unzipped_folder) if file_identifier in x]
        # Move files to new subfolder
        for file in files_list:
            shutil.move(os.path.join(original_file_location,file), os.path.join(original_file_location,new_folder_name,file))
    except FileExistsError:
        pass
    print('Done')

In [27]:
extract_and_unzip_files(zipped_folder,unzipped_folder)

move_files_to_new_subfolder('ReadMeFiles','Readme',unzipped_folder)

move_files_to_new_subfolder('DataFiles','txt',unzipped_folder)

In [123]:
file_identifier='Readme'

In [124]:
files_list = [
    x
    for x in os.listdir(unzipped_folder)
    if file_identifier.lower() in x.lower()
]

In [125]:
files_list


['ReadMeFiles', 'ReadMe_Census.txt']

In [None]:

# Move files to new subfolder
for file in files_list:
    shutil.move(
        os.path.join(original_file_location, file),
        os.path.join(original_file_location, new_folder_name, file),
    )

In [119]:
move_files_to_new_subfolder('ReadMeFiles','Readme',unzipped_folder)

Done


## Modifying raw data files

In [60]:
os.mkdir(os.path.join(unzipped_folder,'CleanedData'))

In [90]:
os.path.join(unzipped_folder,'CleanedData').split('\\')[-1]

'CleanedData'

In [106]:
def clean_raw_data_files(original_filepath,cleaned_folder_path):

    df=pd.read_csv(original_filepath,encoding='cp1252')

    # Remove rows where all values are missing
    df=df.dropna(how='all')

    # Replacing empty strings for float columns with nans
    num_cols = [col for col in df.columns if df[col].dtype in ['float64','int64']]
    for col in num_cols:
        df[col]=df[col].fillna(np.nan)
        df.loc[df[col]=='',col]=np.nan

    # Replacing nulls with empty strings for text columns
    cols=[col for col in df.columns if df[col].dtype=='O']
    for col in cols:
        df[col]=df[col].fillna('')

    # Write into CSV file
    df.to_csv(os.path.join(cleaned_folder_path,os.path.split(original_filepath)[-1].replace('.txt','.csv')),index=False)


In [116]:
original_filepath=os.path.join(unzipped_folder,'DataFiles','FMCSA_CENSUS1_2022Aug.txt')

In [117]:
cleaned_folder_path=os.path.join(unzipped_folder,'CleanedData')

In [118]:
clean_raw_data_files(original_filepath,cleaned_folder_path)

## Copying into postgres

In [None]:
\copy crash FROM 'C:\\Users\Munira\Desktop\demo-pipeline\raw_data\CleanedData\2022Aug_Crash.csv' WITH (FORMAT CSV, DELIMITER ',',HEADER);

In [None]:
\copy inspection FROM 'C:\\Users\Munira\Desktop\demo-pipeline\raw_data\CleanedData\2022Aug_Inspection.csv' WITH (FORMAT CSV, DELIMITER ',',HEADER);

# Creating tables

In [None]:
### Crash

CREATE TABLE crash (
  Report_number VARCHAR(50) NOT NULL,
  Report_seq_no REAL NOT NULL,
  DOT_Number VARCHAR(50),
  Report_Date DATE,
  Report_State CHAR(2),
  Fatalities REAL,
  Injuries REAL,
  Tow_Away CHAR(1),
  Hazmat_released CHAR(1),
  Trafficway_Desc VARCHAR(255),
  Access_Control_Desc VARCHAR(255),
  Road_surface_Condition_Desc VARCHAR(255),
  Weather_Condition_Desc VARCHAR(255),
  Light_Condition_Desc VARCHAR(255),
  Vehicle_ID_Number VARCHAR(50),
  Vehicle_License_number VARCHAR(50),
  Vehicle_license_state CHAR(2),
  Severity_Weight REAL,
  Time_weight REAL,
  citation_issued_desc VARCHAR(255),
  seq_num REAL,
  Not_Preventable CHAR(1)
);


### Inspection

CREATE TABLE inspection (
  Unique_ID FLOAT PRIMARY KEY,
  Report_Number VARCHAR(255)  NOT NULL,
  Report_State VARCHAR(2) ,
  DOT_Number VARCHAR(255) NOT NULL,
  Insp_Date DATE NOT NULL,
  Insp_level_ID FLOAT NOT NULL,
  County_code_State VARCHAR(2) NOT NULL,
  Time_Weight FLOAT NOT NULL,
  Driver_OOS_Total FLOAT ,
  Vehicle_OOS_Total FLOAT,
  Total_Hazmat_Sent FLOAT ,
  OOS_Total FLOAT,
  Hazmat_OOS_Total FLOAT,
  Hazmat_Placard_req CHAR(1) ,
  Unit_Type_Desc VARCHAR(255),
  Unit_Make VARCHAR(255) ,
  Unit_License VARCHAR(255) ,
  Unit_License_State VARCHAR(2),
  VIN VARCHAR(255) ,
  Unit_Decal_Number VARCHAR(255) ,
  Unit_Type_Desc2 VARCHAR(255),
  Unit_Make2 VARCHAR(255),
  Unit_License2 VARCHAR(255),
  Unit_License_State2 VARCHAR(2),
  VIN2 VARCHAR(255),
  Unit_Decal_Number2 VARCHAR(255),
  Unsafe_Insp CHAR(1) ,
  Fatigued_Insp CHAR(1) ,
  Dr_Fitness_Insp CHAR(1) ,
  Subt_Alcohol_Insp CHAR(1),
  Vh_Maint_Insp CHAR(1),
  HM_Insp CHAR(1) ,
  BASIC_Viol FLOAT ,
  Unsafe_Viol FLOAT,
  Fatigued_Viol FLOAT,
  Dr_Fitness_Viol FLOAT,
  Subt_Alcohol_Viol FLOAT,
  Vh_Maint_Viol FLOAT ,
  HM_Viol FLOAT 
);


### Violation

CREATE TABLE violation (
    Unique_ID FLOAT PRIMARY KEY,
    Insp_Date DATE NOT NULL,
    DOT_Number VARCHAR(50) NOT NULL,
    Viol_Code VARCHAR(10) NOT NULL,
    BASIC_Desc VARCHAR(50) ,
    OOS_Indicator CHAR(1),
    OOS_Weight REAL,
    Severity_Weight FLOAT,
    Time_Weight REAL,
    Total_Severity_Wght FLOAT,
    Section_Desc VARCHAR(200) ,
    Group_Desc VARCHAR(200),
    Viol_Unit CHAR(1)
);


### Census

CREATE TABLE census (
    DOT_NUMBER TEXT PRIMARY KEY,
    LEGAL_NAME TEXT,
    DBA_NAME TEXT,
    CARRIER_OPERATION TEXT,
    HM_FLAG TEXT,
    PC_FLAG TEXT,
    PHY_STREET TEXT,
    PHY_CITY TEXT,
    PHY_STATE TEXT,
    PHY_ZIP TEXT,
    PHY_COUNTRY TEXT,
    MAILING_STREET TEXT,
    MAILING_CITY TEXT,
    MAILING_STATE TEXT,
    MAILING_ZIP TEXT,
    MAILING_COUNTRY TEXT,
    TELEPHONE TEXT,
    FAX TEXT,
    EMAIL_ADDRESS TEXT,
    MCS150_DATE DATE,
    MCS150_MILEAGE NUMERIC,
    MCS150_MILEAGE_YEAR FLOAT,
    ADD_DATE DATE,
    OIC_STATE TEXT,
    NBR_POWER_UNIT FLOAT,
    DRIVER_TOTAL FLOAT,
    RECENT_MILEAGE NUMERIC,
    RECENT_MILEAGE_YEAR FLOAT,
    VMT_SOURCE_ID FLOAT
);
