# Read in the parquet data tables via dask.DataFrames and create MySQL tables

In [3]:
import sqlalchemy as sa
import pymysql
import dask.dataframe as dd
import pandas as pd
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()

pd.set_option('display.max_columns', None)

with open('../src/metadata/MySQLpwd.txt') as f:
    pw = f.readlines()
pw = pw[0]

# define where the data is
popparquet = 'northwest_2021_Q4_population.parquet'
tripsatparquet = 'northwest_2021_Q4_saturday_trip.parquet'
tripthuparquet = 'northwest_2021_Q4_thursday_trip.parquet'
datapath = '../../data/'

# read in the data
pop_df = dd.read_parquet(datapath + popparquet)
trip_sat_df = dd.read_parquet(datapath + tripsatparquet)
trip_thu_df = dd.read_parquet(datapath + tripthuparquet)

# add in a weekday column to the trips data
trip_sat_df['weekday'] = 'saturday'
trip_thu_df['weekday'] = 'thursday'

# sort the pop data. ~ 5 minutes
sorted_pop_df = pop_df.sort_values(by='person_id', ascending=True)

# these two tables will get appended so no need to sort
# sorted_trip_sat_df = trip_sat_df.sort_values(by='person_id', ascending=True)
# sorted_trip_thu_df = trip_thu_df.sort_values(by='person_id', ascending=True)


# these are the general parameters for the connection
# make sure you defined your password in the metadata folder
user = 'root'
server = 'localhost'
database = 'replica'
to_sql_uri = f'mysql+pymysql://{user}:{pw}@{server}/{database}'   




[########################################] | 100% Completed | 34.05 s
[########################################] | 100% Completed | 34.15 s
[########################################] | 100% Completed | 123.45 s
[########################################] | 100% Completed | 121.82 s
[########################################] | 100% Completed | 262.00 s
[########################################] | 100% Completed | 255.31 s


## Here is the step where we add the data to the MySQL server


In [None]:
# this will create the database if it doesn't exist
# if you want to append to an existing database, change if_exists to 'append'
sorted_pop_df.to_sql('population', uri=to_sql_uri, if_exists='replace', index=False)

SQL didnt like the timedelta dtype so converting it. 

In [22]:
trip_sat_df
trip_sat_df['start_time'].dtype

dtype('<m8[ns]')

In [None]:
trip_sat_df['start_time'] = trip_sat_df['start_time'].values.astype('datetime64[ns]')
trip_sat_df['end_time'] = trip_sat_df['end_time'].values.astype('datetime64[ns]')

trip_thu_df['start_time'] = trip_thu_df['start_time'].values.astype('datetime64[ns]')
trip_thu_df['end_time'] = trip_thu_df['end_time'].values.astype('datetime64[ns]')

In [24]:
trip_sat_df.dtypes

activity_id                            object
person_id                              object
mode                                   object
travel_purpose                         object
previous_activity_type                 object
start_time                     datetime64[ns]
end_time                       datetime64[ns]
distance_miles                        float64
vehicle_type                           object
origin_bgrp                            object
origin_bgrp_lat                       float64
origin_bgrp_lng                       float64
destination_bgrp                       object
destination_bgrp_lat                  float64
destination_bgrp_lng                  float64
origin_land_use_l1                     object
origin_land_use_l2                     object
origin_building_use_l1                 object
origin_building_use_l2                 object
destination_land_use_l1                object
destination_land_use_l2                object
destination_building_use_l1       

In [25]:
trip_sat_df.to_sql('trips', uri=to_sql_uri, if_exists='replace', index=False)

[###########################             ] | 68% Completed | 15m 3sss

In [None]:
trip_thu_df['start_time'] = trip_thu_df['start_time'].values.astype('datetime64[ns]')
trip_thu_df['end_time'] = trip_thu_df['end_time'].values.astype('datetime64[ns]')
trip_thu_df.to_sql('trips', uri=to_sql_uri, if_exists='append', index=False)