### Status:
1. State machine keeps running till files for all states are fetched - AOK! 
2. Lambda is invoked in parallel by the state machine - AOK! 

### Todos:
1. Needs to be put in one script
2. lambda files: zip and python script need to be updated

In [1]:
# Installing libraries

import pandas as pd
import requests
import boto3
import time
import json
import pyarrow.parquet as pq
from concurrent.futures import ThreadPoolExecutor

In [2]:
# Initialize the clients for S3 and DynamoDB
from botocore.client import Config
s3 = boto3.client('s3', region_name= "us-east-1",config=Config(signature_version='s3v4'))
s3_resource = boto3.resource('s3')
iam_client = boto3.client('iam')
role_arn = iam_client.get_role(RoleName='LabRole')['Role']['Arn']
print(f"Lab role arn is {role_arn}")

# Initialize the client for Step Functions
sfn = boto3.client('stepfunctions')

# Define S3 bucket name for project
bucket_name = 'final-project-nrel-stations'

# define folder name inside s3 bucket
folder_name = 'station-parquet-files'

# Check if s3 bucket exists; if not, create 
if 'Buckets' in s3.list_buckets():
    for bucket in s3.list_buckets()['Buckets']:
        if bucket['Name'] == 'final-project-nrel-stations':
            print('S3 bucket exists')
            break
    else:
        s3.create_bucket(Bucket=bucket_name)

Lab role arn is arn:aws:iam::040284194960:role/LabRole
S3 bucket exists


In [11]:
# Define lambda function
def lambda_handler(event,context):

    # Define start time
    start_time = time.time()

    # Extract relevant attributes from the event
    api_key = event['api_key']
    state=event['state']

    url = f"https://developer.nrel.gov/api/alt-fuel-stations/v1.json?api_key={api_key}&state={state}"
    try:
        response = requests.get(url)
        response
        data = response.json()
    except Exception as e:
        return {
            "status": "FAILED",
            "state": state,
            "error": str(e)
        }
    
    # Count the number of fuel stations returned
    print(f"Number of fuel stations in {state}: {data['total_results']}")
    
    # Extract the list of fuel stations
    stations = data['fuel_stations']
    stations_df = pd.DataFrame(stations)

    # Upload stations dataframe to S3 bucket as parquet with state initials
    try:
        stations_df.to_parquet(f"/tmp/{state}_stations.parquet")
        s3.upload_file(f"/tmp/{state}_stations.parquet", bucket_name, f"{folder_name}/{state}_stations.parquet")
        print(f'parquet file uploaded for state {state} to S3')
        
        # Define end time
        end_time = time.time()
        
        # Construct a success response
        return {
            "statusCode": 200,
            "body": json.dumps({
                "message": f"Success! Parquet file uploaded for state {state} to S3",
                "processingTime": f"{end_time - start_time:.2f} seconds"
            })
        }
    except Exception as e:
        print(e)
        print('Error uploading file to S3')
        return {
            "statusCode": 500,
            "body": json.dumps({
                "message": "Failed to upload parquet file to S3",
                "error": str(e)
            })
        }

In [12]:
# Testing lambda locally
api_key = 'uV8N2wWyuRXAFp1hJLIxVlHiq5pBOs1injQxbnjo'
state = 'CA'
event = {'api_key': api_key, 'state': state}
lambda_handler(event, None)

Number of fuel stations in CA: 20132
parquet file uploaded for state CA to S3


{'statusCode': 200,
 'body': '{"message": "Success! Parquet file uploaded for state CA to S3", "processingTime": "8.12 seconds"}'}

In [25]:
# State machine definition to invoke lambda in parallel for multiple states 
# Edited with help from Chat-GPT to include retry logic for failed states

state_machine_definition = {
  "Comment": "Invoke NREL EV station function in parallel for different states",
  "StartAt": "ParallelLambdaInvocations",
  "States": {
    "ParallelLambdaInvocations": {
      "Type": "Map",
      "InputPath": "$",
      "ItemsPath": "$.states",
      "MaxConcurrency": 8,
      "Iterator": {
        "StartAt": "InvokeLambda",
        "States": {
          "InvokeLambda": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:us-east-1:040284194960:function:nrel-ev-py3-9",
            "End": True
          }
        }
      },
      "ResultPath": "$.results",
      "Next": "CheckResults"
    },
    "CheckResults": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.results",
          "IsPresent": True,
          "Next": "RetryFailedStates"
        }
      ],
      "Default": "Success"
    },
    "RetryFailedStates": {
      "Type": "Map",
      "ItemsPath": "$.results",
      "MaxConcurrency": 8,
      "Iterator": {
        "StartAt": "InvokeFailedLambda",
        "States": {
          "InvokeFailedLambda": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:us-east-1:040284194960:function:nrel-ev-py3-9",
            "End": True
          }
        }
      },
      "ResultPath": "$.retryResults",
      "Next": "CheckRetryResults"
    },
    "CheckRetryResults": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.retryResults",
          "IsPresent": True,
          "Next": "RetryFailedStates"
        }
      ],
      "Default": "Success"
    },
    "Success": {
      "Type": "Succeed"
    }
  }
}


In [None]:
# Defining step function (state machine) to invoke lambda concurrently for all states

state_machine_definition = {
    "Comment": "Invoke NREL EV station function in parallel for different states",
    "StartAt": "ParallelLambdaInvocations",
    "States": {
        "ParallelLambdaInvocations": {
            "Type": "Map", # Allows parallel processing for states
            "InputPath": "$",
            "ItemsPath": "$.states",
            "MaxConcurrency": 8,
            "Iterator": {
                "StartAt": "InvokeLambda",
                "States": {
                    "InvokeLambda": {
                        "Type": "Task",
                        "Resource": "arn:aws:lambda:us-east-1:040284194960:function:nrel-ev-py3-9",
                        "End": True
                    }
                }
            },
            "ResultPath": "$.results", # New key in the output
            "Next": "CheckResults" # Changed state to CheckResults
        },
        "CheckResults": {
            "Type": "Choice",
            "Choices": [
                {
                    "Variable": "$.results[*].status",
                    "StringEquals": "FAILED",
                    "Next": "RetryFailedStates"
                }
            ],
            "Default": "Succeess"
        },
        "RetryFailedStates": {
            "Type": "Map",
            "ItemsPath": "$.results[?(@.status=='FAILED')]",
            "MaxConcurrency": 8,
            "Iterator": {
                "StartAt": "InvokeFailedLambda",
                "States": {
                    "InvokeFailedLambda": {
                        "Type": "Task",
                        "Resource": "arn:aws:lambda:us-east-1:040284194960:function:nrel-ev-py3-9",
                        "End": True
                    }
                }
            },
            "ResultPath": "$.retryResults",
            "Next": "CheckRetryResults"
            },
        "CheckRetryResults": {
            "Type": "Choice",
            "Choices": [
                {
                    "Variable": "$.retryResults[*].status",
                    "StringEquals": "FAILED",
                    "Next": "RetryFailedStates"
                }
            ],
            "Default": "Success"
        },
        "Success": {
            "Type": "Succeed"
        }
    }
}
response = sfn.create_state_machine(
    name='NREL_EV_Stations_State_Machine',
    definition=json.dumps(state_machine_definition),
    roleArn=role_arn
)

# Capture the state machine ARN for later use
state_machine_arn = response['stateMachineArn']

In [29]:
# Check if state machine exists; if not, create. If it does, update the definition
state_machine_arn = None
state_machine_name = 'NREL_EV_Stations_State_Machine'
for state_machine in sfn.list_state_machines()['stateMachines']:
    if state_machine['name'] == state_machine_name:
        state_machine_arn = state_machine['stateMachineArn']
        break
if state_machine_arn:
    response = sfn.update_state_machine(
        stateMachineArn=state_machine_arn,
        definition=json.dumps(state_machine_definition)
    )
    print(f"State machine updated: {state_machine_arn}")
else:
    response = sfn.create_state_machine(
        name=state_machine_name,
        definition=json.dumps(state_machine_definition),
        roleArn=role_arn
    )
    state_machine_arn = response['stateMachineArn']
    print(f"State machine created: {state_machine_arn}")
    
print(f"State machine ARN: {state_machine_arn}")

State machine updated: arn:aws:states:us-east-1:040284194960:stateMachine:NREL_EV_Stations_State_Machine
State machine ARN: arn:aws:states:us-east-1:040284194960:stateMachine:NREL_EV_Stations_State_Machine


In [30]:
# Executing the state machine
# Before execution, check if files corresponding to certain states exist in S3 bucket already

# Define the list of states for which we want to extract EV stations
api_key = 'uV8N2wWyuRXAFp1hJLIxVlHiq5pBOs1injQxbnjo'
states = ['AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA', 'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD', 'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ', 'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC', 'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY']

existing_files = []
for state in states:
    try:
        s3_resource.Object(bucket_name, f"{folder_name}/{state}_stations.parquet").load()
        existing_files.append(state)
    except:
        pass

# Filter out states with existing files
states_to_process = [state for state in states if state not in existing_files]

# With the remaining states, create the input for the state machine
execution_input = {
    "states": [{
        "api_key": api_key, 
        "state": state} for state in states_to_process]
}

# Start the execution
execution_response = sfn.start_execution(
    stateMachineArn=state_machine_arn,
    input=json.dumps(execution_input)
)

# Optionally, monitor the execution status
execution_arn = execution_response['executionArn']
describe_response = sfn.describe_execution(
    executionArn=execution_arn
)
print(describe_response['status'])

RUNNING
