In [1]:
import csv
import requests
import pandas as pd
import s3fs
from datetime import datetime
from dotenv import load_dotenv

load_dotenv()

import os

# Get credentials
chicago_app_token = os.getenv('chicago_app_token')
chicago_app_secret = os.getenv('chicago_app_secret')
aws_key = os.getenv('aws_key')
aws_secret = os.getenv('aws_secret')

# Set AWS S3 file system
fs = s3fs.S3FileSystem(key=aws_key, secret=aws_secret)

In [4]:
# BASE FUNCTIONS

def last_df_row_to_tuple(df):
    last_row_df = df[-1:][['trip_start_timestamp', 'trip_id']]
    cutoff_ts = last_row_df.iloc[0]['trip_start_timestamp'][:-4]
    cutoff_id = last_row_df.iloc[0]['trip_id']
    return (cutoff_ts, cutoff_id)

def make_df_cols_consistent(df, column_order):
    for col in column_order:
        if col not in df.columns:
            df[col] = None
    df = df[column_order]
    return df


# DATA HANDLING FUNCTIONS

# Note - still contains static file path
def store_cutoff_state(cutoff_tuple):
    try:
        with fs.open('s3://josh-chicago-data/dev/last_batch_cutoff.csv', 'w', newline='') as file:
            csv_writer = csv.writer(file)
            csv_writer.writerow(cutoff_tuple)
    except (TypeError, ValueError):
        print(f"Error: ")


# Note - still contains static file path. To do: make filepath dynamic
def retrieve_cutoff_state():
    with fs.open('s3://josh-chicago-data/dev/last_batch_cutoff.csv', 'r', newline='') as file:
        csv_reader = csv.reader(file)
        for row in csv_reader:
            last_batch_cutoff = tuple(row)
            break
    return last_batch_cutoff


# Note - this generates a query_string that is specific only to Chicago Taxi dataset
def generate_api_query_string(last_timestamp, last_trip_id, this_batch_size):
    return f"?$query=SELECT * WHERE trip_start_timestamp >= '{last_timestamp}' AND (trip_id > '{last_trip_id}' OR trip_start_timestamp != '{last_timestamp}') ORDER BY trip_start_timestamp ASC, trip_id ASC LIMIT {this_batch_size}"


def query_soda_api(base_url, query_string, app_token):
    url = base_url + query_string

    headers = {
        'Accept': 'application/json',
        'X-App-Token': chicago_app_token
    }
    
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
    except requests.exceptions.RequestException as e:
        raise Exception(f"Error: {str(e)}")
        
    data = response.json()
    return pd.DataFrame.from_records(data)


def save_data_to_S3_csv(df, file_path, output_filename):
    try:
        with fs.open(file_path + output_filename + '.csv', 'w') as file:
            df.to_csv(file, index=False, line_terminator='\n')
    except (TypeError, ValueError):
        print(f"Error: ")
        
        
# MAIN FUNCTION

def run_connector(connector_definitions, batch_size):
    try:
        for conn in connector_definitions:
            this_base_url = None
            this_file_path = None
            this_file_prefix = None
            this_columns_order = None
            cutoff_timestamp = None
            cutoff_id = None
            api_query_string = None
            df = None
            df_consistent = None
            
            this_base_url = conn['soda_base_url']
            this_file_path = conn['dest_S3_path']
            this_file_prefix = conn['dest_file_prefix']
            this_columns_order = conn['dest_columns_order']

            # retrieve the stored cutoff state
            cutoff_timestamp, cutoff_id = retrieve_cutoff_state()

            # generate the query string for extracting the next batch of data
            api_query_string = generate_api_query_string(cutoff_timestamp, cutoff_id, batch_size)

            # get the data from the API
            df = query_soda_api(this_base_url, api_query_string, chicago_app_token)

            df_row_count = len(df)
            
            # store the data as a csv file in S3 if API response contains data
            if df_row_count > 0:
                print(f"{df_row_count} rows to load.")
                now_str = None
                filename = None
                latest_cutoff_state = None

                now_str = datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
                filename = f"{ this_file_prefix }_{ now_str }"
                df_consistent = make_df_cols_consistent(df, this_columns_order)
                save_data_to_S3_csv(df_consistent, this_file_path, filename)

                # update the last_batch_cutoff file
                latest_cutoff_state = last_df_row_to_tuple(df)
                store_cutoff_state(latest_cutoff_state)
                print(latest_cutoff_state)
            else:
                print("No rows to load.")

        print("Connector ran successfully.")
    except Exception as e:
        print(f"Connector failed. Error: {e}")

In [19]:
# Set the connector definitions - maps 1 or more SODA APIs

column_order = ['trip_id',
                'taxi_id',
                'trip_start_timestamp',
                'trip_end_timestamp',
                'trip_seconds',
                'trip_miles',
                'pickup_community_area',
                'dropoff_community_area',
                'fare',
                'tips',
                'tolls',
                'extras',
                'trip_total',
                'payment_type',
                'company',
                'pickup_centroid_latitude',
                'pickup_centroid_longitude',
                'pickup_centroid_location',
                'dropoff_centroid_latitude',
                'dropoff_centroid_longitude',
                'dropoff_centroid_location',
                'pickup_census_tract',
                'dropoff_census_tract']


connector_definitions_list = [
                                {
                                     'soda_base_url': 'https://data.cityofchicago.org/resource/wrvz-psew.json',
                                     'dest_S3_path': 's3://josh-chicago-data/dev/batch_extracts/',
                                     'dest_file_prefix': 'raw_taxi_trips',
                                     'dest_columns_order': column_order
                                }
                            ]

In [41]:
# Max batch size 50000, can increase this by using pagination in API requests
batch_size = 50000

In [38]:
# RUN THE CONNECTOR ONCE

run_connector(connector_definitions_list, batch_size)

1000 rows to load.
('2023-01-06T16:30:00', 'fcf2e6eb625849bcfcf36154d09aa4ab86c66f10')
Connector ran successfully.


In [45]:
# RUN THE CONNECTOR A SPECIFIED NUMBER OF TIMES WITH A DELAY

num_batches = 2
delay_seconds = 20

import time

for c in range(1, num_batches+1):
    run_connector(connector_definitions_list, batch_size)
    time.sleep(delay_seconds)
    print(f"Completed batch {c} of {num_batches}")

41619 rows to load.
('2023-03-01T00:00:00', 'fecfaeb2101d51d30bce22e30ba9f9cfe44b42c7')
Connector ran successfully.
Completed batch 1 of 2
No rows to load.
Connector ran successfully.
Completed batch 2 of 2


In [21]:
##### VERIFICATION ONLY - Doesn't write or update anything to S3

initial_cutoff = ('2022-01-01T00:00:00','aaaaaaaaaa')

this_base_url = None
this_file_path = None
this_file_prefix = None
this_columns_order = None

conn = connector_definitions_list[0]

this_base_url = conn['soda_base_url']
this_file_path = conn['dest_S3_path']
this_file_prefix = conn['dest_file_prefix']
this_columns_order = conn['dest_columns_order']

batch_size = 1000

cutoff_timestamp = None
cutoff_id = None
api_query_string = None
df = None
df_consistent = None

# retrieve the stored cutoff state
cutoff_timestamp, cutoff_id = initial_cutoff

# generate the query string for extracting the next batch of data
api_query_string = generate_api_query_string(cutoff_timestamp, cutoff_id, batch_size)

# get the data from the API
df = query_soda_api(this_base_url, api_query_string, chicago_app_token)

# store the data as a csv file in S3 if API response contains data
if len(df) > 0:
    now_str = None
    filename = None
    latest_cutoff_state = None
    
    now_str = datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
    filename = f"{ this_file_prefix }_{ now_str }"
    df_consistent = make_df_cols_consistent(df, this_columns_order)
    
df_consistent.head(3)

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_community_area,dropoff_community_area,fare,tips,...,payment_type,company,pickup_centroid_latitude,pickup_centroid_longitude,pickup_centroid_location,dropoff_centroid_latitude,dropoff_centroid_longitude,dropoff_centroid_location,pickup_census_tract,dropoff_census_tract
0,ab9a34002ce91e74f9139f4a9275fef9c4196106,1d68f4a1c3b620f920838643bf3801e757b6791f86700a...,2022-01-01T00:00:00.000,2022-01-01T00:15:00.000,601,1.89,8,7,9.36,2.14,...,Mobile,Sun Taxi,41.899602111,-87.633308037,"{'type': 'Point', 'coordinates': [-87.63330803...",41.922686284,-87.649488729,"{'type': 'Point', 'coordinates': [-87.64948872...",,
1,b165b7888e10b51316a05fd141fbad07ec027135,b5c02d201b243945eb850f84478b473694a0b017009daa...,2022-01-01T00:00:00.000,2022-01-01T00:15:00.000,1080,7.9,56,72,21.5,6.0,...,Credit Card,"Taxicab Insurance Agency, LLC",41.79259236,-87.769615453,"{'type': 'Point', 'coordinates': [-87.76961545...",41.713148612,-87.675075312,"{'type': 'Point', 'coordinates': [-87.67507531...",,
2,b3314d4758e4cb751f4e2c5f42286b64d46d5e89,57fa54b44d1e25d82c1de3200158c2475a894490b1c32c...,2022-01-01T00:00:00.000,2022-01-01T00:00:00.000,104,0.2,28,24,9.36,0.0,...,Mobile,Sun Taxi,41.874005383,-87.66351755,"{'type': 'Point', 'coordinates': [-87.66351754...",41.901206994,-87.676355989,"{'type': 'Point', 'coordinates': [-87.67635598...",,
3,ba36a5b8ac76e4280ea07dc07f9a9cc0ddd6cee3,e8b60c43eaaae4c1bad2c23e45e93ad77c1e85d3a2f13d...,2022-01-01T00:00:00.000,2022-01-01T00:00:00.000,4,0.0,8,8,23.0,5.88,...,Credit Card,Sun Taxi,41.899602111,-87.633308037,"{'type': 'Point', 'coordinates': [-87.63330803...",41.899602111,-87.633308037,"{'type': 'Point', 'coordinates': [-87.63330803...",,
4,bbc82a3f22c1f921ac9d165b177b234d83a1de29,b16774f4cc26db3c0bb6beb35343dd596516566be79316...,2022-01-01T00:00:00.000,2022-01-01T00:30:00.000,1573,8.56,2,2,25.0,0.0,...,Cash,Flash Cab,42.001571027,-87.695012589,"{'type': 'Point', 'coordinates': [-87.69501258...",42.001571027,-87.695012589,"{'type': 'Point', 'coordinates': [-87.69501258...",,


In [46]:
##### Check the current cutoff state
retrieve_cutoff_state()

('2023-03-01T00:00:00', 'fecfaeb2101d51d30bce22e30ba9f9cfe44b42c7')

In [26]:
##### Manually set and store the cutoff state for next batch starting point

#initial_cutoff = ('2022-01-01T00:00:00','aaaaaaaaaa')
#store_cutoff_state(initial_cutoff)