# Loading Data to Local location

In [30]:
# Packages needed for data write
import requests
import os

In [31]:
# Define URLs for data pulls
url_taxi = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet'
url_location = 'https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv'

In [32]:
# Define output files for data
file_taxi = './jan_2023_yellow_tripdata.parquet'
file_location = './nyc_taxi_location_zones.csv'

In [33]:
'''
Define function for writing url data to local file
    * url is the url where data is pulled from
    * f_name is the location where the data will be written
'''
def write_data_local(url, f_name):
    if not os.path.isfile(f_name): # Check if file already exists!
        response = requests.get(url, stream=True) # Stream file with requests to avoid loading all in memory
        if response.status_code == 200: # Only continue if we have a successful request
            with open(f_name, 'wb') as out: # Open a file to write results to
                for resp in response.iter_content(chunk_size=500000): # Stream in 5mb chunks
                    if resp: # Handle end cases
                        out.write(resp)
            print('Data successfully written!')
        else:
            print(f'Error Encountered! {response.status_code}')
    else:
        print('File already exists - no need to rewrite')

In [34]:
# Execute for taxi data
write_data_local(url_taxi, file_taxi)

File already exists - no need to rewrite


In [35]:
# Execute for location data
write_data_local(url_location, file_location)

File already exists - no need to rewrite


# Explore Data

In [36]:
# Import packages for data exploration
import pandas as pd
import pyarrow.parquet as pq

In [37]:
# View number of rows in parquet dataset
# Because the file is large we do this in chunks to avoid loading everything to memory
taxi_pq = pq.ParquetFile(file_taxi) # Open the file
num_rows = 0
for i in range(taxi_pq.num_row_groups): # iterate over all row groups
    row_group = taxi_pq.read_row_group(i) # Load in specific row group
    num_rows += row_group.num_rows # Add the number of rows
print(num_rows)

3066766


In [38]:
# We can also view the schema information on the parquet file!
taxi_data = next(taxi_pq.iter_batches(batch_size=100000))
taxi_data.schema

VendorID: int64
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: double
trip_distance: double
RatecodeID: double
store_and_fwd_flag: string
PULocationID: int64
DOLocationID: int64
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
airport_fee: double
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 2492

In [39]:
# Convert to pandas and check data
taxi_df = taxi_data.to_pandas() # Converts parquet table to pandas
taxi_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 19 columns):
 #   Column                 Non-Null Count   Dtype         
---  ------                 --------------   -----         
 0   VendorID               100000 non-null  int64         
 1   tpep_pickup_datetime   100000 non-null  datetime64[us]
 2   tpep_dropoff_datetime  100000 non-null  datetime64[us]
 3   passenger_count        100000 non-null  float64       
 4   trip_distance          100000 non-null  float64       
 5   RatecodeID             100000 non-null  float64       
 6   store_and_fwd_flag     100000 non-null  object        
 7   PULocationID           100000 non-null  int64         
 8   DOLocationID           100000 non-null  int64         
 9   payment_type           100000 non-null  int64         
 10  fare_amount            100000 non-null  float64       
 11  extra                  100000 non-null  float64       
 12  mta_tax                100000 non-null  float

# Writing data to Postgres

In [40]:
# Import packages needed
import time # For timing purposes!
from sqlalchemy import URL, create_engine

In [41]:
# Create URL and engine object
pg_URL = URL.create(
    drivername='postgresql', # Driver to use
    username='root', # Username of login
    password='root', # Password of login
    host='localhost', # Host for login
    port='5432', # Connection port
    database='ny_taxi' # Database for connection
)
# Note: if the below fails, you may need to install psycopg2-binary via pip in your virtual env
pg_engine = create_engine(pg_URL) # Create URL (NOTE: postgres container must be active for this to work!)

In [42]:
# View create table statement that will be ran
table_name = 'yellow_taxi_data' # Name to use for the table in the database
print(pd.io.sql.get_schema(taxi_df, name=table_name, con=pg_engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




In [43]:
# Iterate through chunks to write data
init_time = time.time() # Total start time

# Iterate through in 100K sized batches as file is too large for memory
with pg_engine.connect() as conn:
    for idx, batch in enumerate(taxi_pq.iter_batches(batch_size=100000)): 
        # idx is the index of the batch, batch is the data to writeython -m pip install psycopg2-binary
        batch_df = batch.to_pandas() # Convert to pandas df
        if idx == 0: # On first baython -m pip install psycopg2-binary, create table and replace if it exists
            batch_df.head(0).to_sql(name=table_name, con=conn, 
                                    if_exists='replace', index=False) # Write only header, clear data present
        print(f'Writing batch {str(idx + 1)}')
        start_time = time.time() # Start time of write
        batch_df.to_sql(name=table_name, con=conn, if_exists='append', index=False) # Write batch and append to result
        total_time = time.time() - start_time # Time to write batch
        print(f'Batch {str(idx + 1)} completed in {str(round(total_time, 3))} seconds')
    
    # Write out total completion time
    print(f'Completed write in {str(round(time.time() - init_time, 3))} seconds')

Writing batch 1
Batch 1 completed in 9.108 seconds
Writing batch 2
Batch 2 completed in 8.484 seconds
Writing batch 3
Batch 3 completed in 7.641 seconds
Writing batch 4
Batch 4 completed in 7.399 seconds
Writing batch 5
Batch 5 completed in 7.924 seconds
Writing batch 6
Batch 6 completed in 8.401 seconds
Writing batch 7
Batch 7 completed in 8.247 seconds
Writing batch 8
Batch 8 completed in 8.219 seconds
Writing batch 9
Batch 9 completed in 7.576 seconds
Writing batch 10
Batch 10 completed in 7.265 seconds
Writing batch 11
Batch 11 completed in 8.383 seconds
Writing batch 12
Batch 12 completed in 7.606 seconds
Writing batch 13
Batch 13 completed in 8.268 seconds
Writing batch 14
Batch 14 completed in 7.806 seconds
Writing batch 15
Batch 15 completed in 7.647 seconds
Writing batch 16
Batch 16 completed in 7.869 seconds
Writing batch 17
Batch 17 completed in 7.682 seconds
Writing batch 18
Batch 18 completed in 6.842 seconds
Writing batch 19
Batch 19 completed in 7.444 seconds
Writing bat

In [44]:
# The other file is much smaller and can be handled with pure pandas
taxi_zones = pd.read_csv(file_location)
with pg_engine.connect() as conn:
    taxi_zones.to_sql(name='pickup_locations', con=conn, if_exists='replace', index=False)

In [45]:
# Close engine once completed
pg_engine.dispose()