In [1]:
import pandas as pd
import os
from sqlalchemy import create_engine, select, MetaData, Table
pd.set_option('display.max_columns', None)
from glob import glob
from datetime import datetime
import numpy as np
from sqlalchemy import event

In [None]:
# Read the airport data csv file
airport_df = pd.read_csv('Airport_Lat_Long.csv')
# Drop duplicate records and rename column
airport_df = airport_df.drop_duplicates(subset=['iata'], keep='first')
airport_df = airport_df.rename(columns={'airport': 'airport_name'})

In [2]:
# Define the SQL Server connection parameters
server = 'airborneanalytics.cpdhuvhrm3nv.us-east-1.rds.amazonaws.com'
database = 'airborneanalytics'  
username = 'Hidden'
password = 'Hidden'

# Create a SQLAlchemy engine
connection_string = f"mssql+pyodbc://{username}:{password}@{server}/{database}?driver=ODBC+Driver+17+for+SQL+Server"

engine = create_engine(connection_string)

In [None]:
# Use pandas to write the DataFrame to the SQL Server table
airport_df.to_sql(name='AirportData', con=engine, if_exists='append', index=False)

print("Data has been appended to the AirportData table.")

In [4]:
# Create a MetaData object to reflect the database schema
metadata = MetaData()

# Reflect the "AirportData" table from the database
airport_data_table = Table('on_time_flight_data', metadata, autoload=True, autoload_with=engine)

# Perform a SELECT query on the table
query = select([airport_data_table])

# Execute the query and fetch the results into a pandas DataFrame
connection = engine.connect()
results = connection.execute(query)

# Create a DataFrame from the query results
sql_airport_df = pd.DataFrame(results.fetchall(), columns=results.keys())

# Close the database connection
connection.close()

# Display the DataFrame
sql_airport_df

Unnamed: 0,Year,Quarter,Month,DayofMonth,DayOfWeek,FlightDate,Reporting_Airline,Tail_Number,Flight_Number_Reporting_Airline,Origin,Dest,CRSDepTime,DepTime,DepDelay,DepDelayMinutes,DepDel15,DepartureDelayGroups,DepTimeBlk,TaxiOut,WheelsOff,WheelsOn,TaxiIn,CRSArrTime,ArrTime,ArrDelay,ArrDelayMinutes,ArrDel15,ArrivalDelayGroups,ArrTimeBlk,Cancelled,CancellationCode,Diverted,CRSElapsedTime,ActualElapsedTime,AirTime,Flights,Distance,DistanceGroup,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay,DivAirportLandings


In [None]:
usecols = ['Year', 'Quarter', 'Month', 'DayofMonth', 'DayOfWeek', 'FlightDate', 'Reporting_Airline', 'Tail_Number',
'Flight_Number_Reporting_Airline', 'OriginCityMarketID', 'Origin', 'DestCityMarketID', 'Dest', 'CRSDepTime',
'DepTime', 'DepDelay', 'DepDelayMinutes', 'DepDel15', 'DepartureDelayGroups', 'DepTimeBlk', 'TaxiOut', 'WheelsOff', 
'WheelsOn', 'TaxiIn', 'CRSArrTime', 'ArrTime', 'ArrDelay', 'ArrDelayMinutes', 'ArrDel15', 'ArrivalDelayGroups', 'ArrTimeBlk',
'Cancelled', 'CancellationCode', 'Diverted', 'CRSElapsedTime', 'ActualElapsedTime', 'AirTime', 'Flights', 'Distance',
'DistanceGroup', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay', 'DivAirportLandings']

# File name for testing
# fname = 'On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2015_1.csv'

# Function to read individual CSV into dataframe and clean for further use
def getCleanDF(df, usecols=usecols, removeCanDiv=False):
    # Read CSV into PDF
#     df = pd.read_csv(fname, usecols=usecols, keep_default_na=False, 
#                      parse_dates = ['FlightDate'], low_memory=False, dtype=str)

    # Coerce certain columns to integers
    for col in ['Year', 'Quarter', 'Month', 'DayofMonth', 'DayOfWeek', 
                'Flight_Number_Reporting_Airline','OriginCityMarketID']:
        df[col] = df[col].astype(int)

    # Fix null values and coerce additional columns to integers
    for col in ['DepDelay', 'DepDelayMinutes', 'DepDel15', 'TaxiOut', 'TaxiIn', 'ArrDelay', 
                    'ArrDelayMinutes','ArrDel15', 'Cancelled', 'Diverted', 'CRSElapsedTime', 
                    'ActualElapsedTime', 'AirTime', 'Flights', 'Distance', 'CarrierDelay', 
                    'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay', 'DivAirportLandings']:
        df[col] = df[col].replace('',0)
        df[col] = df[col].replace(np.nan,0)
        df[col] = df[col].astype(float)
        df[col] = df[col].apply(lambda x: int(x))

    # Function to convert time string into time object
    def formatTime(t):
        if t=='':
            return None
        elif t=='2400':
            t='0000'
        return datetime.strptime(t, '%H%M').time()

    # Coerce certain columns to time
    for col in ['CRSDepTime', 'DepTime', 'WheelsOff', 'WheelsOn', 'CRSArrTime', 'ArrTime']:
        df[col] = df[col].apply(lambda t: formatTime(t))
    
    return df

In [3]:
# Identify columns to keep and define data types

usecols = ['Year', 'Quarter', 'Month', 'DayofMonth', 'DayOfWeek', 'FlightDate', 'Reporting_Airline', 'Tail_Number',
'Flight_Number_Reporting_Airline', 'Origin', 'Dest', 'CRSDepTime',
'DepTime', 'DepDelay', 'DepDelayMinutes', 'DepDel15', 'DepartureDelayGroups', 'DepTimeBlk', 'TaxiOut', 'WheelsOff', 
'WheelsOn', 'TaxiIn', 'CRSArrTime', 'ArrTime', 'ArrDelay', 'ArrDelayMinutes', 'ArrDel15', 'ArrivalDelayGroups', 'ArrTimeBlk',
'Cancelled', 'CancellationCode', 'Diverted', 'CRSElapsedTime', 'ActualElapsedTime', 'AirTime', 'Flights', 'Distance',
'DistanceGroup', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay', 'DivAirportLandings']

# Define data types for specific columns
data_types = {
    'Year': int,
    'Quarter': int,
    'Month': int,
    'DayofMonth': int,
    'DayOfWeek': int,
     'Reporting_Airline': object,
     'Tail_Number': object,
     'Flight_Number_Reporting_Airline': object ,
     'Origin': object,
     'Dest': object,
     'CRSDepTime': object ,
     'DepTime': object ,
     'DepDelay': object,
     'DepDelayMinutes': object,
     'DepDel15': object,
     'DepartureDelayGroups': object,
     'DepTimeBlk': object,
     'TaxiOut': object,
     'WheelsOff': object,
     'WheelsOn': object,
     'TaxiIn': object,
     'CRSArrTime': object,
     'ArrTime': object,
     'ArrDelay': object,
     'ArrDelayMinutes': object,
     'ArrDel15': object,
     'ArrivalDelayGroups': object,
     'ArrTimeBlk': object,
     'Cancelled': object,
     'CancellationCode': object,
     'Diverted': object,
     'CRSElapsedTime': object,
     'ActualElapsedTime': object,
     'AirTime': object,
     'Flights': object,
     'Distance': object,
     'DistanceGroup': object,
     'CarrierDelay': object,
     'WeatherDelay': object,
     'NASDelay': object,
     'SecurityDelay': object,
     'LateAircraftDelay': object,
     'DivAirportLandings': object
}

In [5]:
%%time
# Specify the base directory
base_dir = r'...\On_Time_Reporting_Carrier\On_Time_Reporting_Carrier\SAMPLE_On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_1'

# Define the event listener function
@event.listens_for(engine, "before_cursor_execute")
def receive_before_cursor_execute(
        conn, cursor, statement, params, context, executemany
):
    if executemany:
        cursor.fast_executemany = True
        
        
# Chunk size for reading CSV
chunk_size = 10000

# Loop through folders and subfolders
for root, dirs, files in os.walk(base_dir):
    for file in files:
        if file.endswith(".csv"):
            # Construct the full file path
            file_path = os.path.join(root, file)
            print(f"Processing..... {file}")

            # Read the CSV file into a dataframe in chunks with data types
            na_value_for_int_columns = 0
            chunks = pd.read_csv(
                    file_path,
                    usecols=usecols,
                    dtype=data_types,
                    keep_default_na=False,
                    parse_dates=['FlightDate'],
                    chunksize=chunk_size,
                    low_memory=False,
                    na_values=""
                )

            for chunk_number, chunk in enumerate(chunks, start=1):
                # Print the chunk number and the total number of chunks
                #print(f"Processing chunk {chunk_number} of {num_chunks}")

                # Print the first few rows of the chunk to identify the issue
                chunk = chunk.fillna(0)
                print(f"Processing chunk #{chunk_number}")

                try:
                    # Set the fast_executemany attribute to True for this cursor
                    with engine.begin() as connection:
                        connection.execution_options(
                            stream_results=True
                        ).execute("SET NOCOUNT ON;")
                    chunk.to_sql(name='on_time_test_flight_data', con=engine, index=False, if_exists="append")
                except Exception as e:
                    print(f"Error writing chunk {chunk_number}: {e}")

#11:40AM

Processing..... On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_1.csv
Processing chunk #1
Processing chunk #2
Processing chunk #3
Processing chunk #4
Processing chunk #5
Processing chunk #6
Processing chunk #7
Processing chunk #8
Processing chunk #9
Processing chunk #10
Processing chunk #11
Processing chunk #12
Processing chunk #13
Processing chunk #14
Processing chunk #15
Processing chunk #16
Processing chunk #17
Processing chunk #18
Processing chunk #19
Processing chunk #20
Processing chunk #21
Processing chunk #22
Processing chunk #23
Processing chunk #24
Processing chunk #25
Processing chunk #26
Processing chunk #27
Processing chunk #28
Processing chunk #29
Processing chunk #30
Processing chunk #31
Processing chunk #32
Processing chunk #33
Processing chunk #34
Processing chunk #35
Processing chunk #36
Processing chunk #37
Processing chunk #38
Processing chunk #39
Processing chunk #40
Processing chunk #41
Processing chunk #42
Processing chunk #43
Processing chunk #

PermissionError: [Errno 13] Permission denied

In [None]:
# Month 8 66 chunks
# Month 9 61 chunks
df.info()


In [None]:
# %%time
# def load_data(df, tablename):
#     # Use pandas to write the DataFrame to the SQL Server table
#     df.to_sql(name='tablename', con=engine, if_exists='append', index=False)
    
from sqlalchemy import event
@event.listens_for(engine, "before_cursor_execute")
def receive_before_cursor_execute(
       conn, cursor, statement, params, context, executemany
        ):
            if executemany:
                cursor.fast_executemany = True

df.to_sql(tbl, engine, index=False, if_exists="append", schema="dbo")

    print(f"Data has been appended to the {tablename} table.")
    
load_data(df, 'on_time_flights')

In [None]:
# Connect to sqlite database file
conn = sqlite3.connect('bts_on_time_v3.db')

num_records = 0
num_files = 0
start = datetime.now()

# Could do this using a glob, but this keeps the records in date order
for yr in range(2015,2024):
    for mo in range(1,13):
        try:
            base = 'On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_'
            fname = base + str(yr) + '_' + str(mo) + '.csv'
            cdf = getCleanDF(fname)
            cdf.to_sql('on_time', conn, if_exists='append', index=False)
            print(fname, f"({len(cdf)} x {len(cdf.columns)})")
            num_records += len(cdf)
            num_files += 1
        except:
            end = datetime.now()
            print(f'Finished processing! Total: {num_records} records from {num_files} files')
            print(f'Time elapsed: {end-start}')
            break
            
            
# Use pandas to write the DataFrame to the SQL Server table
airport_df.to_sql(name='AirportData', con=engine, if_exists='append', index=False)