In [2]:
import numpy as np
import pandas as pd
import google.auth
from google.cloud import bigquery


credentials, project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

bq_client = bigquery.Client(credentials=credentials, project=project_id,)


In [3]:
#create target variable if arrival_delay is more than 15 then marked as delayed

query="""
SELECT * 
FROM `bigquery-samples.airline_ontime_data.flights` 
where 
MOD(ABS(FARM_FINGERPRINT(CAST(date AS STRING))), 500)= 1
"""
flights = bq_client.query(query).to_dataframe()
num_of_rows = flights.shape[0]
print('rows:',num_of_rows)
flights.head(n=5)

rows: 107450


Unnamed: 0,date,airline,airline_code,departure_airport,departure_state,departure_lat,departure_lon,arrival_airport,arrival_state,arrival_lat,arrival_lon,departure_schedule,departure_actual,departure_delay,arrival_schedule,arrival_actual,arrival_delay
0,2006-12-03,AA,19805,BNA,TN,36.12,-86.67,DFW,TX,32.89,-97.03,1115,1111,-4.0,1320,1352,32.0
1,2006-12-03,AA,19805,BNA,TN,36.12,-86.67,DFW,TX,32.89,-97.03,645,642,-3.0,850,905,15.0
2,2006-12-03,AA,19805,BNA,TN,36.12,-86.67,DFW,TX,32.89,-97.03,820,818,-2.0,1025,1024,-1.0
3,2006-12-03,AA,19805,BNA,TN,36.12,-86.67,DFW,TX,32.89,-97.03,1855,1853,-2.0,2100,2058,-2.0
4,2006-12-03,AA,19805,BNA,TN,36.12,-86.67,DFW,TX,32.89,-97.03,1640,1636,-4.0,1845,1847,2.0


In [4]:
import pandas as pd
def format_time(t):
    timestr = str(t)
    if len(timestr) < 4:
        #zfill fills leading zero until width 4 is reached
        timestr=timestr.zfill(4)
        return timestr[:2]+":"+timestr[2:]
    else:
        return timestr[:2]+":"+timestr[2:]


flights['dep_sched']=flights['departure_schedule'].apply(format_time)
flights['event_timestamp'] = flights['date'] +" "+ flights['dep_sched']
flights['created_timestamp'] = flights['date'] +" "+ flights['dep_sched']
flights['event_timestamp']=pd.to_datetime(flights['event_timestamp'],format='%Y-%m-%d %H:%M:%S')
flights['created_timestamp']=pd.to_datetime(flights['created_timestamp'],format='%Y-%m-%d %H:%M:%S')
flights_data = flights.drop(['dep_sched','date'], axis=1)

In [8]:
#generate a feature for flight delay prediction
flight_stats = flights_data.groupby(['airline','departure_airport', 'arrival_airport']).agg(avg_departure_delay=('departure_delay', 'mean'),
                                                                                      avg_arrival_delay=('arrival_delay', 'mean'), 
                                                                                             event_timestamp=('event_timestamp', 'max'),
                                                                                             created_timestamp=('event_timestamp','max')).reset_index()
flight_stats.head(n=5)
#flight_stats.info()

Unnamed: 0,airline,departure_airport,arrival_airport,avg_departure_delay,avg_arrival_delay,event_timestamp,created_timestamp
0,9E,ABE,DTW,13.6,8.8,2010-08-04 15:30:00,2010-08-04 15:30:00
1,9E,ALB,ATL,-5.0,-13.0,2010-08-04 12:40:00,2010-08-04 12:40:00
2,9E,ALB,DTW,-6.333333,-8.166667,2010-08-04 19:12:00,2010-08-04 19:12:00
3,9E,ALO,MSP,-14.0,72.0,2009-10-22 06:25:00,2009-10-22 06:25:00
4,9E,AMA,MEM,-0.333333,5.0,2010-08-04 15:58:00,2010-08-04 15:58:00


In [9]:
# round float types to 2 decimals
flight_stats = flight_stats.round(2)
flight_stats.head()

Unnamed: 0,airline,departure_airport,arrival_airport,avg_departure_delay,avg_arrival_delay,event_timestamp,created_timestamp
0,9E,ABE,DTW,13.6,8.8,2010-08-04 15:30:00,2010-08-04 15:30:00
1,9E,ALB,ATL,-5.0,-13.0,2010-08-04 12:40:00,2010-08-04 12:40:00
2,9E,ALB,DTW,-6.33,-8.17,2010-08-04 19:12:00,2010-08-04 19:12:00
3,9E,ALO,MSP,-14.0,72.0,2009-10-22 06:25:00,2009-10-22 06:25:00
4,9E,AMA,MEM,-0.33,5.0,2010-08-04 15:58:00,2010-08-04 15:58:00


In [10]:
#store data as historical retrival from data warehouse
def store_hist_retrieval(dataset, table, data):
    reference = bq_client.dataset(dataset_id)
    dataset = bq_client.get_dataset(reference)
    print(dataset)

    job_config = bigquery.job.LoadJobConfig()
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

    job = bq_client.load_table_from_dataframe(data, table, job_config=job_config)

    print("BigQuery job started, please hold for result")
    job.result()


    #if not dataset.exists():
    #dataset.create()
    print('Loaded {} rows into Big Query{}:{}.'.format(
            job.output_rows, dataset_id, table))


In [11]:
dataset_id="redbq"
print("BigQuery jobs started, please hold...")
#store flights retrieved
store_hist_retrieval(dataset_id,"redbq.flight_records", flights_data)
#store flight statistics
store_hist_retrieval(dataset_id,"redbq.flight_statistics", flight_stats)
print("BigQuery jobs completed.")

BigQuery jobs started, please hold...
Dataset(DatasetReference('red-beryl-labs', 'redbq'))
BigQuery job started, please hold for result
Loaded 107450 rows into Big Queryredbq:redbq.flight_records.
Dataset(DatasetReference('red-beryl-labs', 'redbq'))
BigQuery job started, please hold for result
Loaded 10247 rows into Big Queryredbq:redbq.flight_statistics.
BigQuery jobs completed.
