In [None]:
import pyodbc
from sqlalchemy import create_engine
import pandas as pd
import psycopg2
import os
import numpy as np

#sql server db details
sql_conn = pyodbc.connect(
    r"DRIVER={ODBC Driver 18 for SQL Server};"
    r"SERVER=DESKTOP-MS4DILS\THUS;"
    r"DATABASE=US_Traffic_Accidents_ETL;"
    r"Trusted_Connection=yes;"
    r"Encrypt=yes;"
    r"TrustServerCertificate=yes;",
    autocommit=True
)

#allows interaction with sql database
cursor = sql_conn.cursor()

#test connection
print(sql_conn)

# Increase the maximum number of rows to display
pd.set_option('display.max_rows', None)

# Increase the maximum number of columns to display
pd.set_option('display.max_columns', None)

# If necessary, increase the maximum column width
pd.set_option('display.max_colwidth', None)

Functions

In [None]:
# Map pandas dtypes to SQL Server types
def map_dtype(dtype, max_val=None):
    if pd.api.types.is_integer_dtype(dtype):
        if max_val is not None:
            return 'BIGINT' if max_val > 2_147_483_647 else 'INT'
        return 'BIGINT'
    elif pd.api.types.is_float_dtype(dtype):
        return "FLOAT"
    elif pd.api.types.is_bool_dtype(dtype):
        return "BIT"
    elif pd.api.types.is_datetime64_any_dtype(dtype):
        return "DATETIME"
    else:
        return "NVARCHAR(MAX)"
    
#removes white space function
def remove_whitespace(df):
    for column in df.columns:
        if df[column].dtype == 'object':
            df[column] = df[column].str.strip()
    return df

#turns csv into sql server extract table
def csv_to_extract(df, table_name):
    cursor = sql_conn.cursor()
    df.replace(r'^\s*$', np.nan, regex=True, inplace=True)
    # Then replace all NaN values with None (SQL Server interprets None as NULL)
    df = df.where(pd.notnull(df), None)
    # Replace "" and strings that are only whitespace with NaN
    df = df.fillna(0)
    columns_with_types = ', '.join(
    f"{col} {map_dtype(dtype)}" for col, dtype in df.dtypes.items()
    )
    drop_stmt = f"DROP TABLE IF EXISTS {table_name}"
    cursor.execute(drop_stmt)
    create_stmt = f"IF OBJECT_ID('{table_name}', 'U') IS NULL CREATE TABLE {table_name} ({columns_with_types})" 
    cursor.execute(create_stmt)
    sql_conn.commit()
    # Insert data into table
    for _, row in df.iterrows():
        placeholders = ', '.join('?' for _ in row)
        insert_stmt = f"INSERT INTO {table_name} VALUES ({placeholders})"
        cursor.execute(insert_stmt, *row)
    sql_conn.commit()
    cursor.close()
    return

Extract all data

In [None]:
#Currently takes 1 hour. Need to bulk insert
filepath = r'acc_16.csv'
data = pd.read_csv(filepath,encoding='cp1252')
df = data.copy()
df = df.where(pd.notnull(df), None) #replace all NaN or null-like vlaues with actual None (maps to NULL in SQL)
new_table_name = "[extract].[accidents_2016]"
csv_to_extract(df, new_table_name)

filepath = r'acc_17.csv'
data = pd.read_csv(filepath,encoding='cp1252')
df = data.copy()
df = df.where(pd.notnull(df), None) #replace all NaN or null-like vlaues with actual None (maps to NULL in SQL)
new_table_name = "[extract].[accidents_2017]"
csv_to_extract(df, new_table_name)

filepath = r'acc_18.csv'
data = pd.read_csv(filepath,encoding='cp1252')
df = data.copy()
df = df.where(pd.notnull(df), None) #replace all NaN or null-like vlaues with actual None (maps to NULL in SQL)
new_table_name = "[extract].[accidents_2018]"
csv_to_extract(df, new_table_name)

filepath = r'acc_19.csv'
data = pd.read_csv(filepath,encoding='cp1252')
df = data.copy()
df = df.where(pd.notnull(df), None) #replace all NaN or null-like vlaues with actual None (maps to NULL in SQL)
new_table_name = "[extract].[accidents_2019]"
csv_to_extract(df, new_table_name)

filepath = r'acc_20.csv'
data = pd.read_csv(filepath,encoding='cp1252')
df = data.copy()
df = df.where(pd.notnull(df), None) #replace all NaN or null-like vlaues with actual None (maps to NULL in SQL)
new_table_name = "[extract].[accidents_2020]"
csv_to_extract(df, new_table_name)

filepath = r'pers_16.csv'
data = pd.read_csv(filepath,encoding='cp1252')
df = data.copy()
new_table_name = "[extract].[person_2016]"
csv_to_extract(df, new_table_name)

filepath = r'pers_17.csv'
data = pd.read_csv(filepath,encoding='cp1252')
df = data.copy()
new_table_name = "[extract].[person_2017]"
csv_to_extract(df, new_table_name)

filepath = r'pers_18.csv'
data = pd.read_csv(filepath,encoding='cp1252')
df = data.copy()
new_table_name = "[extract].[person_2018]"
csv_to_extract(df, new_table_name)

filepath = r'pers_19.csv'
data = pd.read_csv(filepath,encoding='cp1252')
df = data.copy()
new_table_name = "[extract].[person_2019]"
csv_to_extract(df, new_table_name)

filepath = r'pers_20.csv'
data = pd.read_csv(filepath,encoding='cp1252')
df = data.copy()
new_table_name = "[extract].[person_2020]"
csv_to_extract(df, new_table_name)

filepath = r'veh_16.csv'
data = pd.read_csv(filepath,encoding='cp1252')
df = data.copy()
new_table_name = "[extract].[vehicles_2016]"
csv_to_extract(df, new_table_name)

filepath = r'veh_17.csv'
data = pd.read_csv(filepath,encoding='cp1252')
df = data.copy()
new_table_name = "[extract].[vehicles_2017]"
csv_to_extract(df, new_table_name)

filepath = r'veh_18.csv'
data = pd.read_csv(filepath,encoding='cp1252')
df = data.copy()
new_table_name = "[extract].[vehicles_2018]"
csv_to_extract(df, new_table_name)

filepath = r'veh_19.csv'
data = pd.read_csv(filepath,encoding='cp1252')
df = data.copy()
new_table_name = "[extract].[vehicles_2019]"
csv_to_extract(df, new_table_name)

filepath = r'veh_20.csv'
data = pd.read_csv(filepath,encoding='cp1252')
df = data.copy()
new_table_name = "[extract].[vehicles_2020]"
csv_to_extract(df, new_table_name)