In [2]:
import pandas as pd
import psycopg2
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import OffsetPaginator
import requests
from pathlib import Path
import json
import io

In [3]:
mvc_crashes = "https://data.cityofnewyork.us/resource/h9gi-nx95.json"
mvc_vehicles = "https://data.cityofnewyork.us/resource/bm4k-52h4.json"
mvc_persons = "https://data.cityofnewyork.us/resource/f55k-p6yu.json"
src_dict = {
    mvc_crashes:"mvc_crashes", 
    mvc_vehicles:"mvc_vehicles", 
    mvc_persons:"mvc_persons"}

In [4]:
def load_from_source(src, offset, max_offset):
    # Initialize the REST client
    client = RESTClient(
        base_url=src,
        paginator=OffsetPaginator(
            limit=1000,
            offset=offset,
            maximum_offset=max_offset,
            limit_param="$limit", 
            offset_param="$offset",
            total_path=None
        )
        )

    for page in client.paginate():
        yield page

# for page in load_from_source():
#     # Convert the page to a DataFrame
#     print(page)

In [5]:
def add_to_pipeline(data,name):

    # Initialize the pipeline
    pipeline = dlt.pipeline(destination="postgres", dataset_name="motor_vehicle_collisions")

    # Run the pipeline
    load_info = pipeline.run(data,table_name=name, write_disposition="append", loader_file_format="csv")

    # Print the load info
    print(load_info)
    # Print the number of rows loaded


In [6]:
def add_to_pipeline_two(data,name):

    # Initialize the pipeline
    pipeline = dlt.pipeline(destination="postgres", dataset_name="motor_vehicle_collisions")

    # Run the pipeline
    load_info = pipeline.run(data,table_name=name, write_disposition="append")

    # Print the load info
    print(load_info)
    # Print the number of rows loaded

In [7]:
for source in src_dict.keys():
        page_max_offset = 50000
        page_offset = 0
        page = 1
        name = src_dict[source]
        response = requests.get(f"{source}?$limit=1000&$offset={page_max_offset}")
        while len(response.json()) != 0:
            print(f"Loading page {page} from {name} with offset {page_offset}")
            data = load_from_source(source, page_offset, page_max_offset)
            add_to_pipeline_two(data,name)
            page_offset += 50000
            page += 1
            response = requests.get(f"{source}?$limit=1000&$offset={(page_offset+1000)}")
            page_max_offset +=50000

Loading page 1 from mvc_crashes with offset 0
Pipeline dlt_ipykernel_launcher load step completed in 2.07 seconds
1 load package(s) were loaded to destination postgres and into dataset motor_vehicle_collisions
The postgres destination used postgresql://loader:***@localhost:5432/dlt_data location to store data
Load package 1748231800.1802902 is LOADED and contains no failed jobs
Loading page 2 from mvc_crashes with offset 50000
Pipeline dlt_ipykernel_launcher load step completed in 1.70 seconds
1 load package(s) were loaded to destination postgres and into dataset motor_vehicle_collisions
The postgres destination used postgresql://loader:***@localhost:5432/dlt_data location to store data
Load package 1748231835.720766 is LOADED and contains no failed jobs
Loading page 3 from mvc_crashes with offset 100000
Pipeline dlt_ipykernel_launcher load step completed in 2.30 seconds
1 load package(s) were loaded to destination postgres and into dataset motor_vehicle_collisions
The postgres destina

In [None]:
try:    
    for source in src_dict.keys():
        page_max_offset = 50000
        page_offset = 0
        page = 1
        name = src_dict[source]
        response = requests.get(f"{source}?$limit=1000&$offset={page_max_offset}")
        while len(response.json()) != 0:
            data_list = []
            for page_data in load_from_source(source, page_offset, page_max_offset):
                # Convert the page to a DataFrame
                data_list.extend(page_data)
            df = pd.DataFrame(data_list)
            df.to_csv(f"./raw/{name}/{name}_{page}.csv", index=False)
            page_offset += 50000
            page += 1
            response = requests.get(f"{source}?$limit=1000&$offset={(page_offset+1000)}")
            page_max_offset +=50000
except:
    print("Error occurred while fetching data from the API.")
# The data is saved in the raw folder with the following structure:
# raw/
# mvc_crashes/mvc_crashes_1.csv
# mvc_vehicles/mvc_vehicles_1.csv
# mvc_persons/mvc_persons_1.csv
# The csv file saves 50000 rows per page
        
#crashes - 44 pages, vehicles - 87 pages, persons - 113 pages

In [5]:
def rearrange_collision_id(data):
    cols = list(data.columns)
    column_list = ['collision_id']
    column_list.extend(cols[:cols.index('collision_id')])
    column_list.extend(cols[cols.index('collision_id')+1:])
    data = data.reindex(columns=column_list)
    return data

In [None]:
def reduce_nulls(data):
    threshold = 0.3
    null_percentage = data.isnull().sum() / len(data)
    cols_to_drop = null_percentage[null_percentage > threshold].index
    data = data.drop(columns=cols_to_drop)
    return data

In [21]:
def convert_dataframe(data):  
    for column in data.columns:
        if column == "crash_date":
            data[column] = pd.to_datetime(data[column])
        if column == "crash_time":
            data[column] = pd.to_datetime(data[column], format='%H:%M').dt.time
        if data[column].dtype == "object":
            data[column] = data[column].fillna("N/A")
            data[column] = data[column].astype(str)
            
        if data[column].dtype == "int64" or data[column].dtype == "float64":
            data[column] = data[column].fillna(0)
    return data

In [27]:
def remove_unavailable(data):
    cols_to_drop = []
    for column in data.columns:
        if data[column].dtype == "object":
            if len(data[column]=="N/A")/len(data[column]) > 0.7:
                cols_to_drop.append(column)
        elif data[column].dtype == "int64" or data[column].dtype == "float64":
            if len(data[column]==0)/len(data[column]) > 0.7:
                cols_to_drop.append(column)
    data = data.drop(columns=cols_to_drop)
    return data

In [7]:
def transform_df_crashes(data):
    
    data["crash_date"] = pd.to_datetime(data["crash_date"])
    data["crash_time"] = pd.to_datetime(data["crash_time"], format='%H:%M').dt.time
    
    data['contributing_factor_vehicle_1'] = data['contributing_factor_vehicle_1'].fillna('N/A')
    data['contributing_factor_vehicle_1'] = data['contributing_factor_vehicle_1'].astype(str)
    
    data['vehicle_type_code1'] = data['vehicle_type_code1'].fillna('N/A')
    data['vehicle_type_code1'] = data['vehicle_type_code1'].astype(str)
    
    data['latitude'] = data['latitude'].fillna(0)
    data['longitude'] = data['longitude'].fillna(0)
    
    data.drop('location', axis=1, inplace=True)
    return data

In [67]:
def transform_df_vehicles(data):
    
    data["crash_date"] = pd.to_datetime(data["crash_date"])
    data["crash_time"] = pd.to_datetime(data["crash_time"], format='%H:%M').dt.time
    data["vehicle_id"] = data["vehicle_id"].astype(str)

    data['state_registration'] = data['state_registration'].fillna('N/A')
    data["state_registration"] = data["state_registration"].astype(str)

    data['vehicle_type'] = data['vehicle_type'].fillna('N/A')
    data['vehicle_type'] = data['vehicle_type'].astype(str)

    data['contributing_factor_1'] = data['contributing_factor_1'].fillna('N/A')
    data['contributing_factor_1'] = data['contributing_factor_1'].astype(str)
    

    return data

In [66]:
def transform_df_persons(data):
    
    data["crash_date"] = pd.to_datetime(data["crash_date"])
    data["crash_time"] = pd.to_datetime(data["crash_time"], format='%H:%M').dt.time

    data["person_id"] = data["person_id"].fillna('N/A')
    data["person_id"] = data["person_id"].astype(str)

    data["person_type"] = data["person_type"].astype(str)
    data["person_injury"] = data["person_injury"].astype(str)

    data['ped_role'] = data['ped_role'].fillna('N/A')
    data["ped_role"] = data["ped_role"].astype(str)
    
    
    return data

In [35]:
def transform_and_load(data):
    
    # For example, you can rename columns, change data types, etc.
    data = rearrange_collision_id(data)
    
    # if name == "mvc_crashes":
    #     data = transform_df_crashes(data)
    # elif name == "mvc_vehicles":
    #     data = transform_df_vehicles(data)
    # elif name == "mvc_persons":
    #     data = transform_df_persons(data)
    data = reduce_nulls(data)
    data = convert_dataframe(data)
    
    return data

In [39]:
df = pd.read_csv("./data/raw/mvc_persons/mvc_persons_1.csv")
df = transform_and_load(df)
list(df.columns)

['collision_id',
 'unique_id',
 'crash_date',
 'crash_time',
 'person_id',
 'person_type',
 'person_injury',
 'ped_role']

In [41]:

path = Path("./data/raw")
for roots,dirs,files in path.walk(on_error=print):
    for dir in dirs:
        path = Path(roots) / dir
        name = dir
        page = 1
        required_columns = []
        if name == "mvc_crashes":
            required_columns = ['collision_id',
                                'crash_date',
                                'crash_time',
                                'on_street_name',
                                'number_of_persons_injured',
                                'number_of_persons_killed',
                                'number_of_pedestrians_injured',
                                'number_of_pedestrians_killed',
                                'number_of_cyclist_injured',
                                'number_of_cyclist_killed',
                                'number_of_motorist_injured',
                                'number_of_motorist_killed',
                                'contributing_factor_vehicle_1',
                                'contributing_factor_vehicle_2',
                                'vehicle_type_code1',
                                'latitude',
                                'longitude',
                                'location']
        elif name == "mvc_vehicles":
            required_columns = ['collision_id',
                                'unique_id',
                                'crash_date',
                                'crash_time',
                                'vehicle_id',
                                'state_registration',
                                'vehicle_type',
                                'contributing_factor_1']
        elif name == "mvc_persons":
            required_columns = ['collision_id',
                                'unique_id',
                                'crash_date',
                                'crash_time',
                                'person_id',
                                'person_type',
                                'person_injury',
                                'ped_role']
        try:
            for file in path.glob("*.csv"):
                print(f"Transforming and loading data for {file.name.split('.')[0]}")
                df = pd.read_csv(file)
                df = transform_and_load(df)
                if list(df.columns) != required_columns:
                    df = df[required_columns]
                df.to_csv(f"./data/transformed/{name}/{file.name}", index=False)
                page += 1
        except Exception as e:
            print(f"Error occurred while transforming and loading data: {e}")
            continue

Transforming and loading data for mvc_crashes_1
Transforming and loading data for mvc_crashes_10
Transforming and loading data for mvc_crashes_11
Transforming and loading data for mvc_crashes_12
Transforming and loading data for mvc_crashes_13
Transforming and loading data for mvc_crashes_14
Transforming and loading data for mvc_crashes_15
Transforming and loading data for mvc_crashes_16
Transforming and loading data for mvc_crashes_17
Transforming and loading data for mvc_crashes_18
Transforming and loading data for mvc_crashes_19
Transforming and loading data for mvc_crashes_2
Transforming and loading data for mvc_crashes_20
Transforming and loading data for mvc_crashes_21
Transforming and loading data for mvc_crashes_22
Transforming and loading data for mvc_crashes_23
Transforming and loading data for mvc_crashes_24
Transforming and loading data for mvc_crashes_25
Transforming and loading data for mvc_crashes_26
Transforming and loading data for mvc_crashes_27
Error occurred while t

  df = pd.read_csv(file)


Transforming and loading data for mvc_vehicles_41
Transforming and loading data for mvc_vehicles_42
Transforming and loading data for mvc_vehicles_43
Transforming and loading data for mvc_vehicles_44
Transforming and loading data for mvc_vehicles_45
Transforming and loading data for mvc_vehicles_46
Transforming and loading data for mvc_vehicles_47
Transforming and loading data for mvc_vehicles_48
Transforming and loading data for mvc_vehicles_49


  df = pd.read_csv(file)


Transforming and loading data for mvc_vehicles_5
Transforming and loading data for mvc_vehicles_50
Transforming and loading data for mvc_vehicles_51
Transforming and loading data for mvc_vehicles_52
Transforming and loading data for mvc_vehicles_53
Transforming and loading data for mvc_vehicles_54
Transforming and loading data for mvc_vehicles_55
Transforming and loading data for mvc_vehicles_56


  df = pd.read_csv(file)


Transforming and loading data for mvc_vehicles_57
Transforming and loading data for mvc_vehicles_58
Transforming and loading data for mvc_vehicles_59


  df = pd.read_csv(file)


Transforming and loading data for mvc_vehicles_6
Transforming and loading data for mvc_vehicles_60


  df = pd.read_csv(file)


Transforming and loading data for mvc_vehicles_61
Transforming and loading data for mvc_vehicles_62
Transforming and loading data for mvc_vehicles_63
Transforming and loading data for mvc_vehicles_64


  df = pd.read_csv(file)


Transforming and loading data for mvc_vehicles_65
Transforming and loading data for mvc_vehicles_66
Transforming and loading data for mvc_vehicles_67
Transforming and loading data for mvc_vehicles_68
Transforming and loading data for mvc_vehicles_69
Transforming and loading data for mvc_vehicles_7
Transforming and loading data for mvc_vehicles_70


  df = pd.read_csv(file)


Transforming and loading data for mvc_vehicles_71
Transforming and loading data for mvc_vehicles_72


  df = pd.read_csv(file)


Transforming and loading data for mvc_vehicles_73
Transforming and loading data for mvc_vehicles_74
Transforming and loading data for mvc_vehicles_75


  df = pd.read_csv(file)


Transforming and loading data for mvc_vehicles_76
Transforming and loading data for mvc_vehicles_77
Transforming and loading data for mvc_vehicles_78
Transforming and loading data for mvc_vehicles_79
Transforming and loading data for mvc_vehicles_8
Transforming and loading data for mvc_vehicles_80
Transforming and loading data for mvc_vehicles_81
Transforming and loading data for mvc_vehicles_82
Transforming and loading data for mvc_vehicles_83
Transforming and loading data for mvc_vehicles_84


  df = pd.read_csv(file)


Transforming and loading data for mvc_vehicles_85
Transforming and loading data for mvc_vehicles_86
Transforming and loading data for mvc_vehicles_87


  df = pd.read_csv(file)


Transforming and loading data for mvc_vehicles_88
Transforming and loading data for mvc_vehicles_9


In [None]:
try:    
    path = Path("./raw")
    for roots,dirs,files in path.walk(on_error=print):
        for dir in dirs:
            path = Path(roots) / dir
            name = dir
            for file in path.glob("*.csv"):
                df = pd.read_csv(file)
                transform_and_load(df,name)
except:
    print("Error occurred while transforming and loading data.")

In [83]:
df_crashes_27 = pd.read_csv("./data/raw/mvc_crashes/mvc_crashes_27.csv")
df_crashes_27 = rearrange_collision_id(df_crashes_27)
df_crashes_27 = reduce_nulls(df_crashes_27)
df_crashes_27.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50000 entries, 0 to 49999
Data columns (total 18 columns):
 #   Column                         Non-Null Count  Dtype  
---  ------                         --------------  -----  
 0   collision_id                   50000 non-null  int64  
 1   crash_date                     50000 non-null  object 
 2   crash_time                     50000 non-null  object 
 3   number_of_persons_injured      49998 non-null  float64
 4   number_of_persons_killed       49998 non-null  float64
 5   number_of_pedestrians_injured  50000 non-null  int64  
 6   number_of_pedestrians_killed   50000 non-null  int64  
 7   number_of_cyclist_injured      50000 non-null  int64  
 8   number_of_cyclist_killed       50000 non-null  int64  
 9   number_of_motorist_injured     50000 non-null  int64  
 10  number_of_motorist_killed      50000 non-null  int64  
 11  contributing_factor_vehicle_1  49778 non-null  object 
 12  contributing_factor_vehicle_2  42775 non-null 

In [31]:
def df_info_to_json(df):
    buffer = io.StringIO()
    df.info(buf=buffer)
    info_str = buffer.getvalue()

    lines = info_str.strip().split('\n')
    data = {}

    data["columns"] = []
    for line in lines[5:-2]:
        parts = line.split()
        data["columns"].append({
            "index": parts[0],
            "column": parts[1],
            "non-null count": parts[2],
            "dtype": parts[4]
        })
    
    return json.dumps(data)

In [73]:
metadata_df = pd.DataFrame(columns=["filename", "number_of_columns", "file_info"])
insert_df = pd.DataFrame([['mvc_vehicles_1',9,df_info_to_json(df_transformed_vehicles)]], columns=["filename", "number_of_columns", "file_info"])
metadata_df = pd.concat([metadata_df, insert_df], ignore_index=True)
len(metadata_df.columns)

3

In [42]:
metadata_df = pd.DataFrame(columns=["filename", "number_of_columns", "file_info"])
path = Path("./data/transformed")
for roots,dirs,files in path.walk(on_error=print):
    for dir in dirs:
        path = Path(roots) / dir
        name = dir
        page = 1
        for file in path.glob("*.csv"):
            print(f"Checking data for {name}, page {page}")
            df = pd.read_csv(file)
            file_info = df_info_to_json(df)
            insert_df = pd.DataFrame([[file.name,len(df.columns),file_info]], columns=["filename", "number_of_columns", "file_info"])
            metadata_df = pd.concat([metadata_df, insert_df], ignore_index=True)
            page += 1
metadata_df.to_csv("./data/transformed_metadata.csv", index=False)

Checking data for mvc_crashes, page 1
Checking data for mvc_crashes, page 2
Checking data for mvc_crashes, page 3
Checking data for mvc_crashes, page 4
Checking data for mvc_crashes, page 5
Checking data for mvc_crashes, page 6
Checking data for mvc_crashes, page 7
Checking data for mvc_crashes, page 8
Checking data for mvc_crashes, page 9
Checking data for mvc_crashes, page 10
Checking data for mvc_crashes, page 11
Checking data for mvc_crashes, page 12
Checking data for mvc_crashes, page 13
Checking data for mvc_crashes, page 14
Checking data for mvc_crashes, page 15
Checking data for mvc_crashes, page 16
Checking data for mvc_crashes, page 17
Checking data for mvc_crashes, page 18
Checking data for mvc_crashes, page 19
Checking data for mvc_persons, page 1
Checking data for mvc_persons, page 2
Checking data for mvc_persons, page 3
Checking data for mvc_persons, page 4
Checking data for mvc_persons, page 5
Checking data for mvc_persons, page 6
Checking data for mvc_persons, page 7
Ch

In [None]:
path = Path("./data/transformed")
for roots,dirs,files in path.walk(on_error=print):
    for dir in dirs:
        path = Path(roots) / dir
        name = dir
        page = 1
        for file in path.glob("*.csv"):
            print(file.name)
            break

In [77]:
df = pd.read_csv("./data/transfomed_metadata.csv")
df

FileNotFoundError: [Errno 2] No such file or directory: './data/transfomed_metadata.csv'