In [None]:
import boto3
import json

In [None]:
aws_lambda = boto3.client('lambda', region_name='us-east-1')
iam_client = boto3.client('iam')
role = iam_client.get_role(RoleName='LabRole')

In [None]:
rds = boto3.client('rds')

# READ IN RDS
# Connect to db with mysql.connector
ENDPOINT = '' ## NEED THESE FROM LOIZOS CODE FOR RDS
PORT = ''
import mysql.connector
conn =  mysql.connector.connect(host=ENDPOINT,
                                user="username",
                                passwd="password", 
                                port=PORT, 
                                database='books')
cur = conn.cursor()

In [None]:
# Create or update lambda function

# Open our Zipped directory
with open('./deployment-packages/lamda_function.zip', 'rb') as f:
    lambda_zip = f.read()

try:
    # If function hasn't yet been created, create it
    response = aws_lambda.create_function(
        FunctionName='scrape_books',
        Runtime='python3.9',
        Role=role['Role']['Arn'],
        Handler='lambda_function.lambda_handler',
        Code=dict(ZipFile=lambda_zip),
        Timeout=300
    )
except aws_lambda.exceptions.ResourceConflictException:
    # If function already exists, update it based on zip
    # file contents
    response = aws_lambda.update_function_code(
    FunctionName='scrape_books',
    ZipFile=lambda_zip
    )

lambda_arn = response['FunctionArn']

In [None]:
# Step function

sfn = boto3.client('stepfunctions')

def make_def(lambda_arn):
    definition = {
      "Comment": "My State Machine",
      "StartAt": "Map",
      "States": {
        "Map": {
          "Type": "Map",
          "End": True,
          "Iterator": {
            "StartAt": "Lambda Invoke",
            "States": {
              "Lambda Invoke": {
                "Type": "Task",
                "Resource": "arn:aws:states:::lambda:invoke",
                "OutputPath": "$.Payload",
                "Parameters": {
                  "Payload.$": "$",
                  "FunctionName": lambda_arn
                },
                "Retry": [
                  {
                    "ErrorEquals": [
                      "Lambda.ServiceException",
                      "Lambda.AWSLambdaException",
                      "Lambda.SdkClientException",
                      "Lambda.TooManyRequestsException",
                      "States.TaskFailed"
                    ],
                    "IntervalSeconds": 2,
                    "MaxAttempts": 6,
                    "BackoffRate": 2
                  }
                ],
                "End": True
              }
            }
          }
        }
      }
    }
    return definition
    
sf_def = make_def(lambda_arn)

try:
    response = sfn.create_state_machine(
        name='scrape_books_sm',
        definition=json.dumps(sf_def),
        roleArn=role['Role']['Arn'],
        type='EXPRESS'
    )
except sfn.exceptions.StateMachineAlreadyExists:
    response = sfn.list_state_machines()
    state_machine_arn = [sm['stateMachineArn'] 
                         for sm in response['stateMachines'] 
                         if sm['name'] == 'scrape_books_sm'][0]
    response = sfn.update_state_machine(
        stateMachineArn=state_machine_arn,
        definition=json.dumps(sf_def),
        roleArn=role['Role']['Arn']
    )

In [None]:
response = sfn.list_state_machines()
print(response)

In [None]:
## NEED CODE HERE TO READ IN SQL INFORMATION AND DIVIDE BETWEEN BATCHES
# do select query on books table to check that data was scraped and ingested in db
cur.execute("""SELECT * from books""")
# divide scraped links in batches to pass to lambda 
batch_size = 50
n = len(cur) // batch_size # subdivide list of urls into 50 equal batches
urls_to_scrape_batches = [{'urls': cur[i:i + n]}
                          for i in range(0, len(cur), n)]

In [None]:
# Get arn for Step Function state machine
state_machine_arn = [sm['stateMachineArn'] 
                     for sm in response['stateMachines'] 
                     if sm['name'] == 'scrape_books_sm'][0]

# generate test data to pass as input
# "Map" will automatically invoke a separate Lambda function
# to process each dictionary in the list (50 concurrently)

data = urls_to_scrape_batches[:2]

In [None]:
# Async; perhaps writing results to db and don't need to wait for execution to finish before moving on with code
response = sfn.start_execution(
    stateMachineArn=state_machine_arn,
    name='async_test',
    input=json.dumps(urls_to_scrape_batches)
)

print(response) # no results returned for async option
# Can go into logs in Cloud Watch and see execution results (Express SF workflow)
# Note that Standard Step Function workflow allows us to audit results via Boto3)

In [None]:
# Connect to db using mysql.connector
import mysql.connector
conn =  mysql.connector.connect(host=ENDPOINT,
                                user="username",
                                passwd="password", 
                                port=PORT, 
                                database='books')
cur = conn.cursor()