<a target="_blank" href="https://colab.research.google.com/github/vijay-ravi/data-engineering/blob/main/netflix/01_AWS_data_project_netflix.ipynb"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# AWS: Building a basic ETL pipeline with AWS Glue and S3

1. **Import Libraries**
2. **Enter AWS Credentials**
  - Here we initialize credentials we can pass to boto3.
3. **Assign bucket name**
  - S3 bucket names should be unique globally.
4. **Initialize boto3**
  - Boto3 is the AWS SDK for Python.
5. **Create IAM role for glue**
  - IAM stands for Identity Access and Management.
  - In order for Glue to access S3 resources, we need to assign it a role to do that.
6. **Create S3 bucket and upload files to S3**
  - S3 stands for Simple Storage Service.
  - Here we upload the files for netflix.
7. **Create Glue pyspark script and upload to S3**
  - Here we create a pyspark script which aggregates the episodes by year.
8. **Create AWS Glue Job**
  - AWS Glue is an ETL (Extract Transform Load) service.
  - We can run both python and pyspark scripts on it.
  - It is serverless, which means the servers are managed by AWS.
9. **Run AWS Glue Job**
10. **Cleanup S3 and Glue**


Tutorial created by @[vijayxtech](https://beacons.ai/vijayxtech). Follow for more projects and guidance. **🤖**

![picture](https://img1.wsimg.com/isteam/ip/9ef33804-39fa-4619-913a-87a9f3820fde/alchemyrefiner_alchemymagic_0_36045e9a-de11-4.jpeg/:/cr=t:0%25,l:0%25,w:100%25,h:100%25/rs=w:1280)

## 1. Import Libraries

In [None]:
!pip3 install -q boto3

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.3/139.3 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.0/12.0 MB[0m [31m40.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m82.1/82.1 kB[0m [31m8.4 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import os
import boto3
import random
from botocore.exceptions import NoCredentialsError, ClientError

## 2. Enter AWS Credentials

> Create access key and secret access key here:<br>
 https://us-east-1.console.aws.amazon.com/iam/home?region=us-east-1#/security_credentials





In [None]:
aws_access_key_id='123'
aws_secret_access_key='123'

## 3. Initialize bucket name


In [None]:
random_number = random.randint(10000, 99999)
bucket_name = f'aws-data-projects-{random_number}'

## 4. Initialize boto3 and other variables

In [None]:
## create access key and secret access key here:
## https://us-east-1.console.aws.amazon.com/iam/home?region=us-east-1#/security_credentials

# Initialize the boto3 client

region='us-east-1'
role_name='glue-service-role'
glue_policy_arn='arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole'
s3_policy_arn = 'arn:aws:iam::aws:policy/AmazonS3FullAccess'
policy_arns = [glue_policy_arn, s3_policy_arn]


file_paths = [
    'netflix_india_shows_seasons.csv'
]
file_keys = [
    'netflix/raw_data/netflix_india_shows_seasons.csv'
]

session = boto3.Session(region_name=region,
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,)

# Initialize the boto3 clients
s3_client = session.client(
    's3',
)

glue_client = session.client(
    'glue',
)

iam_client = session.client('iam')

## 5. Create IAM role for glue

In [None]:
### CREATE GLUE IAM ROLE ####

trust_policy = '''{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "glue.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}'''

# Step 1: Try to create the IAM role
try:
    iam_client.create_role(
        RoleName=role_name,
        AssumeRolePolicyDocument=trust_policy,
        Description='Role for AWS Glue Service with S3 full-access policy.',
    )
    print(f"Role '{role_name}' successfully created.")
except iam_client.exceptions.EntityAlreadyExistsException:
    print(f"Role '{role_name}' already exists. Moving on to attaching policies.")
except Exception as error:
    print(f"Error creating role '{role_name}': {error}")


# Step 2: Attaching IAM managed policies to the IAM role
for policy_arn in policy_arns:
    try:
        iam_client.attach_role_policy(
            RoleName=role_name,
            PolicyArn=policy_arn
        )
        print(f"Policy {policy_arn} successfully attached to role {role_name}.")
    except Exception as error:
            print(f"Error attaching policy {policy_arn} to role '{role_name}': {error}")


Role 'glue-service-role' already exists. Moving on to attaching policies.
Policy arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole successfully attached to role glue-service-role.
Policy arn:aws:iam::aws:policy/AmazonS3FullAccess successfully attached to role glue-service-role.


## 6. Create S3 bucket and upload files to S3



In [None]:
## Create S3 bucket

bucket_exists = True

s3 = boto3.resource('s3',aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,)

try:
    s3.meta.client.head_bucket(Bucket=bucket_name)
    print(f"Bucket '{bucket_name}' already exists.")
except ClientError as e:
    error_code = int(e.response['Error']['Code'])
    if error_code == 404:
        bucket_exists = False

if not bucket_exists:
    try:
        if region is None:
            s3_client.create_bucket(Bucket=bucket_name)
        else:
            location = {'LocationConstraint': region}
            s3_client.create_bucket(Bucket=bucket_name)
        print(f"Bucket '{bucket_name}' created successfully.")
    except ClientError as e:
        print(f"Failed to create bucket: {e}")


# Upload files to S3

try:
    for file_path, file_key in zip(file_paths, file_keys):
        s3_client.upload_file(file_path, bucket_name, file_key)
        print(f"Uploaded {file_key} to S3 bucket {bucket_name}")
except NoCredentialsError:
    print("Credentials not available")
except Exception as e:
    print(f"An error occurred: {e}")


Bucket 'aws-data-projects-23721' created successfully.
Uploaded netflix/raw_data/netflix_india_shows_seasons.csv to S3 bucket aws-data-projects-23721


## 7. Create Glue pyspark script and upload to S3

In [None]:
###### GLUE SCRIPT AND JOB ############

# S3 bucket and script file details
script_file_name = 'netflix_data_transformation.py'
s3_script_location = f's3://{bucket_name}/netflix/scripts/{script_file_name}'

# AWS Glue job details
job_name = 'NetflixIndiaDataTransformation'

# PySpark script as a string
pyspark_script = f"""
import sys
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from awsglue.transforms import *
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as _sum

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

output_dir = "s3://{bucket_name}/netflix/transformed-data/"

# Define the S3 paths for your datasets
seasons_table_s3_path = "s3://{bucket_name}/netflix/raw_data/netflix_india_shows_seasons.csv"

# Practice datasets
# movies_table_s3_path = "s3://{bucket_name}/netflix/raw_data/netflix_india_shows_and_movies.csv"
# episodes_table_s3_path = "s3://{bucket_name}/netflix/raw_data/netflix_india_shows_episodes.csv"

# Adjust the read method to directly load from S3 paths
seasons_df = spark.read.option("header", "true").option("sep",",").csv(seasons_table_s3_path)

episodes_per_year = seasons_df.groupBy("release_year").agg(_sum("episode_count").alias("total_episodes"))

episodes_per_year.coalesce(1).write.mode("overwrite") \
    .option("header", "true") \
    .option("encoding", "UTF-8") \
    .option("escape", "'") \
    .option("quoteAll", "true") \
    .csv(output_dir)



print("Script execution completed.")

"""


# Function to save and upload the PySpark script to S3
def upload_script_to_s3(script_content, bucket, file_name):
    # Save the script to a file
    with open(file_name, 'w') as file:
        file.write(script_content)

    # Upload the file to S3
    try:
        s3_client.upload_file(file_name, bucket, f'netflix/scripts/{script_file_name}')
        print(f"Uploaded {file_name} to S3 bucket {bucket}")
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        # Clean up the local file
        os.remove(file_name)


upload_script_to_s3(pyspark_script, bucket_name, script_file_name)



Uploaded netflix_data_transformation.py to S3 bucket aws-data-projects-23721


## 8. Create AWS Glue Job


In [None]:
# Function to create an AWS Glue job
def create_glue_job(job_name, script_location, role):
    try:
        response = glue_client.create_job(
            Name=job_name,
            Role=role_name,
            ExecutionProperty={'MaxConcurrentRuns': 1},
            Command={
                'Name': 'glueetl',
                'ScriptLocation': script_location,
                'PythonVersion': '3'
            },
            DefaultArguments={
                '--TempDir': f's3://{bucket_name}/temp/',
                '--job-bookmark-option': 'job-bookmark-enable',
            },
            MaxRetries=0,
            Timeout=60,
            GlueVersion='3.0',
            WorkerType='G.1X',
            NumberOfWorkers=2
        )
        print(f"Glue job '{job_name}' created successfully.")
    except Exception as e:
        print(f"An error occurred: {e}")



create_glue_job(job_name, s3_script_location, role_name)


Glue job 'NetflixIndiaDataTransformation' created successfully.


## 9. Run the Glue job

In [None]:
# Function to start an AWS Glue job by name
def start_glue_job(job_name):
    try:
        # Start the job
        response = glue_client.start_job_run(JobName=job_name)
        job_run_id = response['JobRunId']
        print(f"Glue job '{job_name}' started. Job run ID: {job_run_id}")
        return job_run_id
    except Exception as e:
        print(f"An error occurred starting the Glue job '{job_name}': {e}")
        return None


start_glue_job(job_name)

Glue job 'NetflixIndiaDataTransformation' started. Job run ID: jr_cff19611725a262767c270b22c30c05a56a62982ae6566ba18e311e1400e50a9


'jr_cff19611725a262767c270b22c30c05a56a62982ae6566ba18e311e1400e50a9'

## 10. Cleanup S3 and Glue

In [None]:
def empty_s3_bucket(bucket_name):
    """
    Empty all objects in an S3 bucket.
    :param bucket_name: str. Name of the S3 bucket to be emptied.
    """
    s3_resource = session.resource('s3')
    bucket = s3_resource.Bucket(bucket_name)
    s3_client = session.client('s3')
    try:
        # Delete all objects in the bucket
        bucket.objects.all().delete()
        print(f"All objects in bucket '{bucket_name}' have been deleted.")
        s3_client.delete_bucket(Bucket=bucket_name)
        print(f"Bucket '{bucket_name}' deleted successfully.")
    except ClientError as e:
        print(f"An error occurred: {e}")

def delete_glue_job(job_name):
    """
    Delete an AWS Glue job.
    :param job_name: str. Name of the Glue job to be deleted.
    """
    glue = session.client('glue')
    try:
        # Delete the Glue job
        glue.delete_job(JobName=job_name)
        print(f"Glue job '{job_name}' has been deleted.")
    except ClientError as e:
        print(f"An error occurred: {e}")


# Call functions to empty bucket and delete Glue job
empty_s3_bucket(bucket_name)
delete_glue_job(job_name)