# Credentials

In [2]:
# City of Chicago Data Portal API Creds
socrata_domain = "data.cityofchicago.org"
socrata_dataset_identifier = "wrvz-psew"
app_token = "xxxxxxxxxxxxxxxxxxxxx" # Replace with API token obtained from Chicago Data Portal
api_username = "xxxxxxxxx@gmail.com" # Replace with API user email ID
api_password = "xxxxxxxx" # Replace with API Password

# Importing required packages

In [4]:
from sodapy import Socrata
import time
import pandas as pd

# Extracting Chicago Taxi Trips data

In [51]:
client = Socrata(socrata_domain, app_token, username=api_username, password=api_password, timeout = 800)
print("Establishing connection with Chicago Data Portal")

Establishing connection with Chicago Data Portal


## Importing sample data

We do this to see how long it takes to extract a certain amount of data. This will determine how to break down the chunks of data to be extracted from the Chicago Data Portal and how long to set the Lambda function timeout

In [42]:
year = 2019
week = 30
month = 11
day = 11

In [91]:
start_time = time.time()
print("Pulling Taxi Trips data from Chicago Data portal using Socrata...")
taxi_sample_dyn = client.get(socrata_dataset_identifier, 
                        select = '''trip_id, taxi_id,
                                    trip_start_timestamp, trip_end_timestamp,
                                    date_extract_y(trip_start_timestamp) as trip_year,
                                    date_extract_m(trip_start_timestamp) as trip_month,
                                    trip_seconds, trip_miles,
                                    pickup_community_area, dropoff_community_area,
                                    fare, tips, tolls, extras, trip_total, payment_type, company,
                                    pickup_centroid_latitude, pickup_centroid_longitude,
                                    pickup_centroid_location,
                                    dropoff_centroid_latitude, dropoff_centroid_longitude, 
                                    dropoff_centroid_location''',
                        where = str('''date_extract_y(trip_start_timestamp) = {} AND
                                    date_extract_woy(trip_start_timestamp) = {} AND
                                    dropoff_community_area IS NOT NULL''').format(year, week),
                        limit = 10000000)

print("--- %s seconds ---" % (time.time() - start_time))

Pulling Taxi Trips data from Chicago Data portal using Socrata...
--- 249.9893193244934 seconds ---


In [92]:
taxi_sample_dyn[0].keys()

dict_keys(['trip_id', 'taxi_id', 'trip_start_timestamp', 'trip_end_timestamp', 'trip_year', 'trip_month', 'trip_seconds', 'trip_miles', 'dropoff_community_area', 'fare', 'tips', 'tolls', 'extras', 'trip_total', 'payment_type', 'company', 'dropoff_centroid_latitude', 'dropoff_centroid_longitude', 'dropoff_centroid_location'])

## Checking if API data extraction is successful

In [94]:
# convert to pandas dataframe
taxi_sample_dyn_df = pd.DataFrame(taxi_sample_dyn)
len(taxi_sample_dyn_df.index)

275980

In [95]:
taxi_sample_dyn_df.head()

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_year,trip_month,trip_seconds,trip_miles,dropoff_community_area,fare,...,trip_total,payment_type,company,dropoff_centroid_latitude,dropoff_centroid_longitude,dropoff_centroid_location,pickup_community_area,pickup_centroid_latitude,pickup_centroid_longitude,pickup_centroid_location
0,25fc5d7da0117e6433ce0a5ad2b640d70088a986,89f75adc47573b24826e52e49553465713812aacc95123...,2019-07-28T23:45:00.000,2019-07-29T00:30:00.000,2019,7,2285,1.6,8,17.75,...,17.75,Cash,Chicago Carriage Cab Corp,41.892507781,-87.626214906,"{'type': 'Point', 'coordinates': [-87.62621490...",,,,
1,5cf1b4203283259680b188dfbe546224b5c6e025,491bd6098f4633e73dbf8c5f95b1244623b68996ba8050...,2019-07-28T23:45:00.000,2019-07-28T23:45:00.000,2019,7,145,0.33,8,4.25,...,4.25,Cash,Nova Taxi Affiliation Llc,41.892042136,-87.63186395,"{'type': 'Point', 'coordinates': [-87.63186394...",8.0,41.89321636,-87.63784421,"{'type': 'Point', 'coordinates': [-87.63784420..."
2,2be5fa1413c79359a90bbeca23145820af5dba54,0e0462dc8c7fe5581e3ed2e4bdce670e475dce86c46a01...,2019-07-28T23:45:00.000,2019-07-28T23:45:00.000,2019,7,78,0.28,8,3.75,...,4.75,Cash,Sun Taxi,41.892072635,-87.628874157,"{'type': 'Point', 'coordinates': [-87.62887415...",32.0,41.880994471,-87.632746489,"{'type': 'Point', 'coordinates': [-87.63274648..."
3,dfe74774bdcadd65061531e485d87960939fe98c,9e00db075b98e6b900e33ebc47977f154cd3aaae5385b1...,2019-07-28T23:45:00.000,2019-07-28T23:45:00.000,2019,7,187,0.6,8,4.75,...,5.75,Cash,Medallion Leasin,41.892072635,-87.628874157,"{'type': 'Point', 'coordinates': [-87.62887415...",8.0,41.890922026,-87.618868355,"{'type': 'Point', 'coordinates': [-87.61886835..."
4,7dd314cc8cc50480836589e2f2fb8d93d5854a1d,3fa040c240f0b86be0d9b46ce482b0e761904f5872f616...,2019-07-28T23:45:00.000,2019-07-28T23:45:00.000,2019,7,18,0.01,76,3.25,...,7.75,Cash,Sun Taxi,41.97907082,-87.903039661,"{'type': 'Point', 'coordinates': [-87.90303966...",76.0,41.97907082,-87.903039661,"{'type': 'Point', 'coordinates': [-87.90303966..."


# Creating Lambda function to extract and upload data

In [245]:
ACCESS_KEY = 'ASIA2PBPPGFLYAVGZSFG'
SECRET_KEY = '7BHN+oKpR62J2hPVRsYfCj666pUDNSV6MazJrkn+'
SESSION_TOKEN = 'FwoGZXIvYXdzEO///////////wEaDD53kGWAnrnEfLRrDyLGAT0XrvF9AOGuzPsps4f2I/1JbfVpyuKWK+XLD1F6lqBCRY72jikTilcUtzYokXYcp9IrJ3T9iVa1Twz10aO8INYQZEyfTo+YYZZqRGuzlxsAbUB5PWPS5oLqtfDyXpNDpXUd1wrVOAXpZDNvLXeGpj73aq4i3vhypKSROvDdQxl9RYZCdnrzFduh2SP9ehirxgTUCHECCs4uK8/FhW9GhTvqnUHTb3jmBQXiiM5txErOVZ8vqawUo+uvu/a8CXcXz3IJ17lmZij1vb2jBjItnyQSY9iLYsShUG00QGpKRdFIROftDjRDLjeHvEydXT/YVM9V3FGw3YjUT/h0'

# To be added to code when accessing AWS services:
# aws_access_key_id=ACCESS_KEY,
# aws_secret_access_key=SECRET_KEY,
# aws_session_token=SESSION_TOKEN

In [246]:
import boto3
from botocore.config import Config
import json

In [247]:
# Access our class IAM role, which allows Lambda
# to interact with other AWS resources
aws_lambda = boto3.client('lambda',
                          config = Config(read_timeout = 900, connect_timeout = 900, retries={"max_attempts":0}),
                          aws_access_key_id=ACCESS_KEY,
                          aws_secret_access_key=SECRET_KEY,
                          aws_session_token=SESSION_TOKEN)

iam_client = boto3.client('iam',
                          aws_access_key_id=ACCESS_KEY,
                          aws_secret_access_key=SECRET_KEY,
                          aws_session_token=SESSION_TOKEN)

role = iam_client.get_role(RoleName='LabRole')

sfn = boto3.client('stepfunctions',
                   aws_access_key_id=ACCESS_KEY,
                   aws_secret_access_key=SECRET_KEY,
                   aws_session_token=SESSION_TOKEN)

To create the lambda function, the code is stored inside `my-deployment-package.zip` that also contains the required binaries for sodapy library required for the lambda function to connect to the Chicago Data Portal. For quick perusal, the `lambda_function.py` file is also stored outside of the `my-deployment-package.zip`

NOTE: The `lambda_function.py` currently has dummy values for api token, username and password. Replace them with own credentials prior to creating deployment package and/or lambda function

In [None]:
# Open zipped directory
with open('my-deployment-package.zip', 'rb') as f:
    lambda_zip = f.read()

try:
    # If function hasn't yet been created, create it
    response = aws_lambda.create_function(
        FunctionName='chi_taxi_upload',
        Runtime='python3.9',
        Role=role['Role']['Arn'],
        Handler='lambda_function.lambda_handler',
        Code=dict(ZipFile=lambda_zip),
        Timeout=900
    )
except aws_lambda.exceptions.ResourceConflictException:
    # If function already exists, update it based on zip
    # file contents
    response = aws_lambda.update_function_code(
        FunctionName='chi_taxi_upload',
        ZipFile=lambda_zip
        )

lambda_arn = response['FunctionArn']

## Testing if Lambda function works as expected

In [112]:
test_data = {'year': 2019, 'week': 30}

In [None]:
lambda_arn = 'arn:aws:lambda:us-east-1:719506583895:function:chi_taxi_upload'

In [111]:
lambda_arn

'arn:aws:lambda:us-east-1:719506583895:function:chi_taxi_upload'

In [113]:
# run synchronously:
r = aws_lambda.invoke(FunctionName='chi_taxi_upload',
                      InvocationType='RequestResponse',
                      Payload=json.dumps(test_data))

`.csv` file created as required in S3 bucket. Downloading the file for further analysis

In [114]:
temp_csv_check = pd.read_csv("chi_taxi_data_2019_30.csv")

In [115]:
temp_csv_check.head()

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_year,trip_month,trip_seconds,trip_miles,pickup_community_area,dropoff_community_area,...,extras,trip_total,payment_type,company,pickup_centroid_latitude,pickup_centroid_longitude,pickup_centroid_location,dropoff_centroid_latitude,dropoff_centroid_longitude,dropoff_centroid_location
0,7a21e28ce94ed9aed58559cf586363393c83031f,6fb5a8ee938d1b5889226c5c63543241224edf0ff04b42...,2019-07-28T23:45:00.000,2019-07-29T00:15:00.000,2019,7,1829.0,18.16,76,32,...,4.0,56.92,Credit Card,Flash Cab,41.980264,-87.913625,"{'type': 'Point', 'coordinates': [-87.91362459...",41.878866,-87.625192,"{'type': 'Point', 'coordinates': [-87.62519214..."
1,3bd419eaaa746ebda44d3c47bedc46c86450d17d,918e4b0b5ee31425b7c511d49dd73b371abdd9159c793d...,2019-07-28T23:45:00.000,2019-07-29T00:15:00.000,2019,7,1320.0,2.6,21,19,...,0.0,13.5,Cash,Taxi Affiliation Services,41.938666,-87.711211,"{'type': 'Point', 'coordinates': [-87.71121059...",41.927261,-87.765502,"{'type': 'Point', 'coordinates': [-87.76550160..."
2,67db20f5d5100771d8035bf483a68590cefb7e69,44367ae8c260038452e9d4e625548eb42e57456bb77ffd...,2019-07-28T23:45:00.000,2019-07-29T00:15:00.000,2019,7,1680.0,17.6,76,32,...,4.0,56.5,Credit Card,Star North Management LLC,41.979071,-87.90304,"{'type': 'Point', 'coordinates': [-87.90303966...",41.880994,-87.632746,"{'type': 'Point', 'coordinates': [-87.63274648..."
3,8743e94931d656be910ae0bc36644c9f906476bb,354fafd255c37a15b0c8f6f67d810b3e5374f54ebbadb9...,2019-07-28T23:45:00.000,2019-07-29T00:00:00.000,2019,7,1069.0,9.09,32,4,...,2.5,33.3,Credit Card,City Service,41.878866,-87.625192,"{'type': 'Point', 'coordinates': [-87.62519214...",41.975171,-87.687516,"{'type': 'Point', 'coordinates': [-87.68751551..."
4,9e820c3e904964ea5780d89c29c85b458f552990,343d099d6cca99a245757b878f39ef1777121a56028dea...,2019-07-28T23:45:00.000,2019-07-28T23:45:00.000,2019,7,239.0,0.57,8,8,...,1.0,8.5,Credit Card,City Service,41.893216,-87.637844,"{'type': 'Point', 'coordinates': [-87.63784420...",41.893216,-87.637844,"{'type': 'Point', 'coordinates': [-87.63784420..."


# Using Step Functions to run embarassingly parallel data extraction

In [117]:
def make_def(lambda_arn):
    definition = {
      "Comment": "My State Machine",
      "StartAt": "Map",
      "States": {
        "Map": {
          "Type": "Map",
          "End": True,
          "Iterator": {
            "StartAt": "Lambda Invoke",
            "States": {
              "Lambda Invoke": {
                "Type": "Task",
                "Resource": "arn:aws:states:::lambda:invoke",
                "OutputPath": "$.Payload",
                "Parameters": {
                  "Payload.$": "$",
                  "FunctionName": lambda_arn
                },
                "Retry": [
                  {
                    "ErrorEquals": [
                      "Lambda.ServiceException",
                      "Lambda.AWSLambdaException",
                      "Lambda.SdkClientException",
                      "Lambda.TooManyRequestsException",
                      "States.TaskFailed"
                    ],
                    "IntervalSeconds": 2,
                    "MaxAttempts": 6,
                    "BackoffRate": 2
                  }
                ],
                "End": True
              }
            }
          }
        }
      }
    }
    return definition
    
sf_def = make_def(lambda_arn)

try:
    response = sfn.create_state_machine(
        name='chi_taxi_sm',
        definition=json.dumps(sf_def),
        roleArn=role['Role']['Arn'],
        type='EXPRESS'
    )
except sfn.exceptions.StateMachineAlreadyExists:
    response = sfn.list_state_machines()
    state_machine_arn = [sm['stateMachineArn'] 
                         for sm in response['stateMachines'] 
                         if sm['name'] == 'chi_taxi_sm'][0]
    response = sfn.update_state_machine(
        stateMachineArn=state_machine_arn,
        definition=json.dumps(sf_def),
        roleArn=role['Role']['Arn']
    )

In [118]:
response = sfn.list_state_machines()
print(response)

{'stateMachines': [{'stateMachineArn': 'arn:aws:states:us-east-1:719506583895:stateMachine:book_desc_sm', 'name': 'book_desc_sm', 'type': 'EXPRESS', 'creationDate': datetime.datetime(2023, 5, 5, 19, 41, 22, 280000, tzinfo=tzlocal())}, {'stateMachineArn': 'arn:aws:states:us-east-1:719506583895:stateMachine:chi_taxi_sm', 'name': 'chi_taxi_sm', 'type': 'EXPRESS', 'creationDate': datetime.datetime(2023, 5, 24, 20, 54, 43, 912000, tzinfo=tzlocal())}, {'stateMachineArn': 'arn:aws:states:us-east-1:719506583895:stateMachine:hello_world_sm', 'name': 'hello_world_sm', 'type': 'EXPRESS', 'creationDate': datetime.datetime(2023, 5, 4, 19, 24, 15, 402000, tzinfo=tzlocal())}], 'ResponseMetadata': {'RequestId': 'dba5fad4-f69e-4c35-8463-5008f052b19f', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'dba5fad4-f69e-4c35-8463-5008f052b19f', 'date': 'Thu, 25 May 2023 01:54:49 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '491'}, 'RetryAttempts': 0}}


In [119]:
# Get arn for Step Function state machine
state_machine_arn = [sm['stateMachineArn'] 
                     for sm in response['stateMachines'] 
                     if sm['name'] == 'chi_taxi_sm'][0]

In [248]:
state_machine_arn

'arn:aws:states:us-east-1:719506583895:stateMachine:chi_taxi_sm'

In [120]:
# generate test data to pass as input
# "Map" will automatically invoke a separate Lambda function
# to process each dictionary in the list (50 concurrently)
test_data = [{"year": 2019, "week": 32}, {"year": 2019, "week": 33}]

In [121]:
# Async; perhaps writing results to db and don't need to wait for execution to finish before moving on with code
response = sfn.start_execution(
    stateMachineArn=state_machine_arn,
    name='async_test',
    input=json.dumps(test_data)
)

print(response) # no results returned for async option
# Can go into logs in Cloud Watch and see execution results (Express SF workflow)
# Note that Standard Step Function workflow allows us to audit results via Boto3)

{'executionArn': 'arn:aws:states:us-east-1:719506583895:express:chi_taxi_sm:async_test:205cb079-aaf9-49a8-a9da-a8a0bcab4b01', 'startDate': datetime.datetime(2023, 5, 24, 20, 56, 1, 626000, tzinfo=tzlocal()), 'ResponseMetadata': {'RequestId': '6ada9f8b-0344-4526-83c6-c30dad7606a7', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '6ada9f8b-0344-4526-83c6-c30dad7606a7', 'date': 'Thu, 25 May 2023 01:56:01 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '153'}, 'RetryAttempts': 0}}


In [232]:
# generate data to pass as input
# "Map" will automatically invoke a separate Lambda function
# to process each dictionary in the list (50 concurrently)
data = []
for year in range(2019, 2022):
    for week in range(0, 52):
        data.append({"year": year, "week": week})

In [233]:
data

[{'year': 2019, 'week': 0},
 {'year': 2019, 'week': 1},
 {'year': 2019, 'week': 2},
 {'year': 2019, 'week': 3},
 {'year': 2019, 'week': 4},
 {'year': 2019, 'week': 5},
 {'year': 2019, 'week': 6},
 {'year': 2019, 'week': 7},
 {'year': 2019, 'week': 8},
 {'year': 2019, 'week': 9},
 {'year': 2019, 'week': 10},
 {'year': 2019, 'week': 11},
 {'year': 2019, 'week': 12},
 {'year': 2019, 'week': 13},
 {'year': 2019, 'week': 14},
 {'year': 2019, 'week': 15},
 {'year': 2019, 'week': 16},
 {'year': 2019, 'week': 17},
 {'year': 2019, 'week': 18},
 {'year': 2019, 'week': 19},
 {'year': 2019, 'week': 20},
 {'year': 2019, 'week': 21},
 {'year': 2019, 'week': 22},
 {'year': 2019, 'week': 23},
 {'year': 2019, 'week': 24},
 {'year': 2019, 'week': 25},
 {'year': 2019, 'week': 26},
 {'year': 2019, 'week': 27},
 {'year': 2019, 'week': 28},
 {'year': 2019, 'week': 29},
 {'year': 2019, 'week': 30},
 {'year': 2019, 'week': 31},
 {'year': 2019, 'week': 32},
 {'year': 2019, 'week': 33},
 {'year': 2019, 'week': 

There are a total of 156 weeks we collect the data for. We will break the collection process into 6 parts, so that all we can concurrently invoke 26 lambda functions to extract 6-months of data and store them in S3.

We are having to break these into parts since our current set-up allows only for 50 concurrent lambda function invokations

In [237]:
split_data = np.array_split(data, 6)

In [242]:
split_data_list = []
for i in range(0, 6):
    split_data_list.append(list(split_data[i]))

In [243]:
split_data_list

[[{'year': 2019, 'week': 0},
  {'year': 2019, 'week': 1},
  {'year': 2019, 'week': 2},
  {'year': 2019, 'week': 3},
  {'year': 2019, 'week': 4},
  {'year': 2019, 'week': 5},
  {'year': 2019, 'week': 6},
  {'year': 2019, 'week': 7},
  {'year': 2019, 'week': 8},
  {'year': 2019, 'week': 9},
  {'year': 2019, 'week': 10},
  {'year': 2019, 'week': 11},
  {'year': 2019, 'week': 12},
  {'year': 2019, 'week': 13},
  {'year': 2019, 'week': 14},
  {'year': 2019, 'week': 15},
  {'year': 2019, 'week': 16},
  {'year': 2019, 'week': 17},
  {'year': 2019, 'week': 18},
  {'year': 2019, 'week': 19},
  {'year': 2019, 'week': 20},
  {'year': 2019, 'week': 21},
  {'year': 2019, 'week': 22},
  {'year': 2019, 'week': 23},
  {'year': 2019, 'week': 24},
  {'year': 2019, 'week': 25}],
 [{'year': 2019, 'week': 26},
  {'year': 2019, 'week': 27},
  {'year': 2019, 'week': 28},
  {'year': 2019, 'week': 29},
  {'year': 2019, 'week': 30},
  {'year': 2019, 'week': 31},
  {'year': 2019, 'week': 32},
  {'year': 2019, 'w

We will run the below script for each of the 6 parts, collecting 6 months of data for each part

In [266]:
# Async; perhaps writing results to db and don't need to wait for execution to finish before moving on with code
response = sfn.start_execution(
    stateMachineArn=state_machine_arn,
    name='async_test',
    input=json.dumps(split_data_list[5])
)

print(response) # no results returned for async option
# Can go into logs in Cloud Watch and see execution results (Express SF workflow)
# Note that Standard Step Function workflow allows us to audit results via Boto3)

{'executionArn': 'arn:aws:states:us-east-1:719506583895:express:chi_taxi_sm:async_test:115d228d-1c25-41bd-b7ca-5dc3827b42fb', 'startDate': datetime.datetime(2023, 5, 25, 10, 43, 43, 938000, tzinfo=tzlocal()), 'ResponseMetadata': {'RequestId': '292c53d6-ba16-409c-bc0e-a0590c36f15f', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '292c53d6-ba16-409c-bc0e-a0590c36f15f', 'date': 'Thu, 25 May 2023 15:43:44 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '153'}, 'RetryAttempts': 0}}
