# Loading Data to Local location

In [6]:
# Packages needed for data write
import requests
import os

In [7]:
# Define URLs for data pulls
url_taxi = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet'
url_location = 'https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv'

In [8]:
# Define output files for data
file_taxi = './jan_2023_yellow_tripdata.parquet'
file_location = './nyc_taxi_location_zones.csv'

In [9]:
'''
Define function for writing url data to local file
    * url is the url where data is pulled from
    * f_name is the location where the data will be written
'''
def write_data_local(url, f_name):
    if not os.path.isfile(f_name): # Check if file already exists!
        response = requests.get(url, stream=True) # Stream file with requests to avoid loading all in memory
        if response.status_code == 200: # Only continue if we have a successful request
            with open(f_name, 'wb') as out: # Open a file to write results to
                for resp in response.iter_content(chunk_size=500000): # Stream in 5mb chunks
                    if resp: # Handle end cases
                        out.write(resp)
            print('Data successfully written!')
        else:
            print(f'Error Encountered! {response.status_code}')
    else:
        print('File already exists - no need to rewrite')

In [None]:
# Execute for taxi data
write_data_local(url_taxi, file_taxi)

In [None]:
# Execute for location data
write_data_local(url_location, file_location)

# Explore Data

In [12]:
# Import packages for data exploration
import pandas as pd
import pyarrow.parquet as pq

In [None]:
# View number of rows in parquet dataset
# Because the file is large we do this in chunks to avoid loading everything to memory
taxi_pq = pq.ParquetFile(file_taxi) # Open the file
num_rows = 0
for i in range(taxi_pq.num_row_groups): # iterate over all row groups
    row_group = taxi_pq.read_row_group(i) # Load in specific row group
    num_rows += row_group.num_rows # Add the number of rows
print(num_rows)

In [None]:
# We can also view the schema information on the parquet file!
taxi_data = next(taxi_pq.iter_batches(batch_size=100000))
taxi_data.schema

In [None]:
# Convert to pandas and check data
taxi_df = taxi_data.to_pandas() # Converts parquet table to pandas
taxi_df.info()

# Writing data to Postgres

In [16]:
# Import packages needed
import time # For timing purposes!
from sqlalchemy import URL, create_engine

In [17]:
# Create URL and engine object
pg_URL = URL.create(
    drivername='postgresql', # Driver to use
    username='root', # Username of login
    password='root', # Password of login
    host='localhost', # Host for login
    port='5432', # Connection port
    database='ny_taxi' # Database for connection
)
# Note: if the below fails, you may need to install psycopg2-binary via pip in your virtual env
pg_engine = create_engine(pg_URL) # Create URL (NOTE: postgres container must be active for this to work!)

In [None]:
# View create table statement that will be ran
table_name = 'yellow_taxi_data' # Name to use for the table in the database
print(pd.io.sql.get_schema(taxi_df, name=table_name, con=pg_engine))

In [1]:
# Iterate through chunks to write data
init_time = time.time() # Total start time

# Iterate through in 100K sized batches as file is too large for memory
with pg_engine.connect() as conn:
    for idx, batch in enumerate(taxi_pq.iter_batches(batch_size=100000)): 
        # idx is the index of the batch, batch is the data to write
        batch_df = batch.to_pandas() # Convert to pandas df
        if idx == 0: # On first batch, create table and replace if it exists
            batch_df.head(0).to_sql(name=table_name, con=conn, 
                                    if_exists='replace', index=False) # Write only header, clear data present
        print(f'Writing batch {str(idx + 1)}')
        start_time = time.time() # Start time of write
        batch_df.to_sql(name=table_name, con=conn, if_exists='append',) # Write batch and append to result
        total_time = time.time() - start_time # Time to write batch
        print(f'Batch {str(idx + 1)} completed in {str(round(total_time, 3))} seconds')
    
    # Write out total completion time
    print(f'Completed write in {str(round(time.time() - init_time, 3))} seconds')

NameError: name 'time' is not defined

In [96]:
# The other file is much smaller and can be handled with pure pandas
taxi_zones = pd.read_csv(file_location)
with pg_engine.connect() as conn:
    taxi_zones.to_sql(name='pickup_locations', con=conn, if_exists='replace', index=False)

In [97]:
# Close engine once completed
pg_engine.dispose()