In [1]:
import boto3
import json

## Create S3 Bucket

In [13]:
# Initialize boto3 handler
s3 = boto3.resource('s3')

# Create a new bucket to store your files
BUCKETNAME = 'reibucket'
s3.create_bucket(Bucket=BUCKETNAME)

# This is what we will use to interface with the specific bucket
bucket = s3.Bucket( BUCKETNAME )

## Create Lambda Function 

In [3]:
aws_lambda = boto3.client('lambda')
iam_client = boto3.client('iam')
role = iam_client.get_role(RoleName='LabRole')

In [18]:
# First we zip up the lambda function (and any dependencies required), and read it in here
with open('lambda_function.zip', 'rb') as f:
    lambda_zip = f.read()

In [19]:
# Create a new Lambda function in AWS, or update it based on the zip
try:
    response = aws_lambda.create_function(
        FunctionName='lambda_func',
        Runtime='python3.8',
        Role=role['Role']['Arn'],
        Handler='lambda_deployment.lambda_handler',
        Code=dict(ZipFile=lambda_zip),
        Timeout=300
    )
except aws_lambda.exceptions.ResourceConflictException:
    response = aws_lambda.update_function_code(
        FunctionName='lambda_func',
        ZipFile=lambda_zip
        )

In [20]:
response = aws_lambda.put_function_concurrency(
        FunctionName= 'lambda_func',
        ReservedConcurrentExecutions=10
    )

In [21]:
lambda_arn = [f['FunctionArn']
                  for f in aws_lambda.list_functions()['Functions']
                  if f['FunctionName'] == "lambda_func"][0]
lambda_arn

'arn:aws:lambda:us-east-1:740039711057:function:lambda_func'

In [22]:

event = {
  "db_host": "encore-db-cluser-qa.cluster-c9vbijptlril.us-east-1.rds.amazonaws.com",
  "db_name": "db_encore_prod",
  "db_user": "un_encore_prod",
  "db_password": "taM6mdBhtrxalwFm1u3b",
  "db_port": "5432",
  "s3_bucket": BUCKETNAME,
  "s3_key": "results/query_results.csv"
}

In [23]:
# We now trigger the Lambda function by manually invoking it
r = aws_lambda.invoke(FunctionName='lambda_func',
                      InvocationType='RequestResponse',
                      Payload= json.dumps(event))
json.loads(r['Payload'].read())

{'statusCode': 200,
 'body': 'uploaded to s3://reibucket/results/query_results.csv'}

## Upload File to S3 (this step can be skiped after having real data)

In [24]:
# Upload my csv file to S3

FILENAME = 'Mock_data/fake_user_interaction.csv'
with open('Mock_data/fake_user_interaction.csv', 'rb') as myfile:
    bucket.put_object(Key=FILENAME, Body=myfile)

## Connect to EMR cluster to run pySpark

In [33]:
%%bash 

aws emr create-cluster \
    --name "Spark Cluster" \
    --release-label "emr-6.2.0" \
    --applications Name=Hadoop Name=Hive Name=JupyterEnterpriseGateway Name=JupyterHub Name=Livy Name=Pig Name=Spark Name=Tez \
    --instance-type m5.xlarge \
    --instance-count 8 \
    --use-default-roles \
    --region us-east-1 \
    --ec2-attributes '{"KeyName": "vockey"}' \
    --configurations '[{"Classification": "jupyter-s3-conf", "Properties": {"s3.persistence.enabled": "true", "s3.persistence.bucket": "reibucket"}}]'

{
    "ClusterId": "j-HV9VF3VDI17H",
    "ClusterArn": "arn:aws:elasticmapreduce:us-east-1:740039711057:cluster/j-HV9VF3VDI17H"
}


Need to redownload pem file every times ssh -i "vockey.pem" -NL 9443:localhost:9443 hadoop@EMR-PUBLIC-ADDRESS This forwards the remote connection to your https://localhost:9443, Reset the site restriction/cookie if can't log in and click advance and you can log in with username jovyan, password jupyter.

## Access the recommondation list stored in S3 through presigned URL 

In [3]:
s3_client = boto3.client('s3')

In [14]:
def list_files(bucket, prefix):
    response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
    files = [content['Key'] for content in response.get('Contents', []) if content['Key'].endswith('.json')]
    return files

def generate_presigned_url(bucket_name, object_key, expiration=3600):
    s3_client = boto3.client('s3')
    try:
        response = s3_client.generate_presigned_url('get_object',
                                                    Params={'Bucket': bucket_name,
                                                            'Key': object_key},
                                                    ExpiresIn=expiration)
    except Exception as e:
        print(f"Error generating presigned URL: {e}")
        return None
    return response

In [17]:

# Usage
bucket_name = 'reibucket'
files = list_files(bucket_name, 'recommendations/')
for i, val in enumerate(files):
    object_key = files[i]  # Assuming you want to generate a URL for the first file
    url = generate_presigned_url(bucket_name, object_key)
    print(f"Pre-Signed URL{i}: {url}")


Pre-Signed URL0: https://reibucket.s3.amazonaws.com/recommendations/part-00000-f50433a4-652e-40d9-8878-c6317443e011-c000.json?AWSAccessKeyId=ASIA2YTON3FIXSEIBQUE&Signature=9tlqXr5rbgeXK2OfRi2jE6qGjyo%3D&x-amz-security-token=IQoJb3JpZ2luX2VjEBcaCXVzLXdlc3QtMiJHMEUCIQCO2pE%2BjiM1R%2F49FkcPz%2BNxftfqki9NYzVxzzzobZgG2gIgCW%2F5Mxg5Emfzib6QdOpEexJjYV7vWXfMFgpVy2m2jpsqtwIIkP%2F%2F%2F%2F%2F%2F%2F%2F%2F%2FARAAGgw3NDAwMzk3MTEwNTciDHRZ3%2FSjnLyn9g5duyqLAlF6UTHoq9HC9VPu9HbFjOPifghx3a%2BJc1MXO%2BDbQ8e432WEtQj5z%2Fmk6zFscUcsmS4ZC4J199d31VBCSNPg4xT1bj9cZ5vBq68VrZoXnUs%2FK1Wauksd250vlnwvFWUJVk1YhXdvMtAxQvSlCytQ4otYFc22s6nD2YBVrxzEyKOL1PQ1%2FFmoldjsVjnDRfPe6g6m3J2a2WzhtlMZ%2F1S4ZB%2FiQQzciTpmrMc527rRj6gpUTgf%2Bd5JQkIvz9krvtyWfGDf%2FTdywhHEVk%2F8SjcU8weUaYF%2BUHp5FG8Dzly2%2BILgGGWhr5OG5p%2BwTKI7JGruu0jbrItg0TKLfIRAbjWe7rCN5Kia0Rtii7KKVjDh3cKyBjqdARtOdVSrAXTMF8PPiq8ygTK9dqpmEQoC168BrRlK0m0%2FTjOGCvMALd1DHO6UqaNWdwbORvpKTwLUhsCYQFnMqiQbPUcfNw3G81DUNtHHO8WG6Y%2FXQdVqzFfUe5rgbeUnQ6KkotKj1hI43Iw9LSKg%2F5vMX2