## Glue Job Preprocessing with Python

With a Python shell AWS Glue job, you can run scripts that are compatible with Python 2.7 or Python 3.6. This is an excellent option for processing small or medium datasets. In this notebook we execute a Python script in a Glue job using the AWS Wrangler library.

<a id='contents' />

## Table of contents

1. [Loading libraries](#loading)
2. [Creating S3 bucket](#s3bucket)
3. [Packing needed libraries](#libraries)
4. [Creating our AWS Glue ETL Python script](#etl)
5. [Create an IAM role for executing the AWS Glue job](#iam)
6. [Creating our Glue job](#job)
7. [Running our Glue job](#run)
8. [Review the transformed data](#review)

<a id='loading' />

## 1. Loading libraries:
[(back to top)](#contents)

Install the packages included in the `requirements.txt`file:

In [3]:
!pip install -r ../requirements.txt

Import needed libraries:

In [1]:
import numpy as np
import pandas as pd
import boto3
from datetime import datetime, timedelta
import gc
import time
import json

glue = boto3.client('glue')
s3 = boto3.resource('s3')
lakeformation = boto3.client('lakeformation')
iam = boto3.client('iam')
pd.set_option('display.max_columns', 500)

In [9]:
df=pd.read_parquet('../data/type2.parquet')

<a id='s3bucket' />

## 2.  Creating a S3 bucket for storing data and AWS Glue required artifacts
[(back to top)](#contents)

First, we create a bucket in S3 to store our scripts, libraries, data and results from our job

In [2]:
data_bucket_name='preprocessing-example' #Fill your bucket name

In [3]:
!aws s3 mb s3://{data_bucket_name}

make_bucket: preprocessing-example


Save the data files in the S3 bucket you just created:

- Events file: This file should be saved in the following path, for the Glue jobs to run correctly.

In [10]:
!aws s3 cp ../data/events.parquet s3://{data_bucket_name}/data/raw/events/dt=2020-06-30/events.parquet

upload: ./events.parquet to s3://preprocessing-example/data/raw/events/dt=2020-06-30/events.parquet


- Users file: This file should be saved in the following path, for the Glue jobs to run correctly.

In [11]:
!aws s3 cp ../data/users.parquet s3://{data_bucket_name}/data/raw/users/dt=2020-06-30/users.parquet

upload: ./users.parquet to s3://preprocessing-example/data/raw/users/dt=2020-06-30/users.parquet


- Transactions files: Transactions files should be saved in the following paths, for the Glue jobs to run correctly.

In [12]:
#Type 1 transactions
!aws s3 cp ../data/type1.parquet s3://{data_bucket_name}/data/raw/transactions/type_1/dt=2020-06-30/type1.parquet
#Type 2 transactions
!aws s3 cp ../data/type2.parquet s3://{data_bucket_name}/data/raw/transactions/type_2/dt=2020-06-30/type2.parquet

upload: ./type1.parquet to s3://preprocessing-example/data/raw/transactions/type_1/dt=2020-06-30/type1.parquet
upload: ./type2.parquet to s3://preprocessing-example/data/raw/transactions/type_2/dt=2020-06-30/type2.parquet


- Labels file: This file should be saved in the following path, for the Glue jobs to run correctly.

In [13]:
!aws s3 cp ../data/labels.csv s3://{data_bucket_name}/labels/dt=2020-06-30/labels.csv

upload: ./labels.csv to s3://preprocessing-example/labels/dt=2020-06-30/labels.csv


<a id='libraries' />

## 3.  Packing needed libraries
[(back to top)](#contents)

The environment for running a Python shell job supports the following libraries:

```
Boto3
collections
CSV
gzip
multiprocessing
NumPy
pandas
pickle
PyGreSQL
re
SciPy
sklearn
sklearn.feature_extraction
sklearn.preprocessing
xml.etree.ElementTree
zipfile
```

In addition to these libraries, in this job we will use the `awswrangler` library. For this, we will create a Python Wheels package with this library. For this, we will create a `setup.py` as follows:

In [14]:
%%writefile setup.py
from setuptools import setup

setup(
    name="glue_monthly_stage_dependencies", #Name for your Python Wheels package
    version="0.1",
    install_requires=[
        "awswrangler==2.4.0" #Required library and version
    ]
) 

Writing setup.py


Now, run the following command, that will create a `dist` folder and a `glue_monthly_stage_dependencies-0.1-py3-none-any.whl` file within. This is `whl` file that we will use for using `awswrangler` library in our Glue job.

In [15]:
!python3 setup.py bdist_wheel

running bdist_wheel
running build
installing to build/bdist.linux-x86_64/wheel
running install
running install_egg_info
running egg_info
creating glue_monthly_stage_dependencies.egg-info
writing glue_monthly_stage_dependencies.egg-info/PKG-INFO
writing dependency_links to glue_monthly_stage_dependencies.egg-info/dependency_links.txt
writing requirements to glue_monthly_stage_dependencies.egg-info/requires.txt
writing top-level names to glue_monthly_stage_dependencies.egg-info/top_level.txt
writing manifest file 'glue_monthly_stage_dependencies.egg-info/SOURCES.txt'
reading manifest file 'glue_monthly_stage_dependencies.egg-info/SOURCES.txt'
writing manifest file 'glue_monthly_stage_dependencies.egg-info/SOURCES.txt'
Copying glue_monthly_stage_dependencies.egg-info to build/bdist.linux-x86_64/wheel/glue_monthly_stage_dependencies-0.1-py3.6.egg-info
running install_scripts
creating build/bdist.linux-x86_64/wheel/glue_monthly_stage_dependencies-0.1.dist-info/WHEEL
creating 'dist/glue_mont

<a id='etl' />

## 4.  Creating our AWS Glue ETL Python script
[(back to top)](#contents)

We now create the ETL script for the data preprocessing. We will use the following parameters:

- `today`: Date for executing the job in format: `yyyy-mm-dd`. The value used for this example is `2020-07-01`
- `data_bucket_name`: Name of the bucket you created for storing data and scripts.

In [8]:
%%writefile ../src/glue_pyhton_script.py

print('Loading the required Python libraries')
import pandas as pd
import awswrangler as wr
import ast
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from awsglue.utils import getResolvedOptions
import gc
import boto3
import sys

print('Reading the AWS Glue job parameters')
args = getResolvedOptions(sys.argv,['today', 'data_bucket_name'])
today = datetime.strptime(args['today'], '%Y-%m-%d').date()
data_bucket_name = args['data_bucket_name']
first_day_current_month=today.replace(day=1)
last_day_previous_month=(first_day_current_month - timedelta(days=1))

print('Reading the two transactions types data')
transactions_type1=wr.s3.read_parquet(path=f's3://{data_bucket_name}/data/raw/transactions/type_1/dt={last_day_previous_month}')
transactions_type2=wr.s3.read_parquet(path=f's3://{data_bucket_name}/data/raw/transactions/type_2/dt={last_day_previous_month}')

print('Merging the two transactions types data')
accounts_=pd.Series(list(transactions_type1.id.unique())+list(transactions_type2.id.unique())).unique().astype('Int64')
df_transactions=( 
    pd.DataFrame({'id':list(accounts_)})
    .merge(transactions_type1,how='left',on='id')
    .merge(transactions_type2,how='left',on='id')
)

print('Delete temporal datasets no longer required')
del transactions_type1, transactions_type2, accounts_
gc.collect()
df_transactions.dropna(subset=['id'], inplace=True)
df_transactions['id']=pd.to_numeric(df_transactions['id'], errors='coerce').astype(np.int64)
df_transactions.fillna(0, inplace=True)

print('Read events data')
df_events=wr.s3.read_parquet(path=f's3://{data_bucket_name}/data/raw/events/dt={last_day_previous_month}')

print('Merging users and events data')
df_final=pd.merge(df_transactions, df_events, how='left', left_on='id', right_on='id')
del df_events
gc.collect()
#Fill missing values
df_final['fl_os_android']=df_final['fl_os_android'].fillna(1)
df_final['fl_os_ios']=df_final['fl_os_ios'].fillna(0)
df_final.fillna(0, inplace=True)
df_final['dt']=str(last_day_previous_month)

print('Save the final dataset in S3')
wr.s3.to_parquet(df_final,
                 path=f's3://{data_bucket_name}/data/monthly_stage/',
                 dataset=True,
                 partition_cols=['dt'],
                 mode="overwrite_partitions",
                 concurrent_partitioning=True,
                 index=False
                )  

Writing ../src/glue_pyhton_script.py


#### Save the AWS Glue script and libraries file on S3: 

Now, we will the Python script `glue_monthly_stage.py` and the Python Dependencies `whl` file `glue_monthly_stage_dependencies-0.1-py3-none-any.whl` in the S3 bucket we created before.

In [18]:
# Saving glue_monthly_stage.py file
s3.meta.client.upload_file('../src/glue_pyhton_script.py', #Name of the Python script
                            data_bucket_name, #Bucket name for saving artifacts
                           'artifacts/code/monthly_stage/glue_pyhton_script.py' #Include the key and filename
                          )
# Saving the whl dependencies file
s3.meta.client.upload_file('dist/glue_monthly_stage_dependencies-0.1-py3-none-any.whl', #Whl dependencies file
                           data_bucket_name, #Bucket name for saving artifacts
                           'artifacts/code/monthly_stage/glue_monthly_stage_dependencies-0.1-py3-none-any.whl' #Include the key and filename
                          ) 

<a id='iam' />

## 5.  Create an IAM role for executing the AWS Glue job
[(back to top)](#contents)

Now, we need to create an IAM role with AWSGlueServiceRole and AmazonS3FullAccess managed policies, for the data preprocessing Glue job. For doing this, you need to execute the following cells with an IAM role that has permissions for creating roles and attaching policies, as shown below:

```
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "iam:CreateRole",
                "iam:AttachRolePolicy"
            ],
            "Resource": "*"
        }
    ]
}
```

In [1]:
role_name = 'GluePreprocessingRole' #Specify the role name to create

In [27]:
#Function for creating an AWS Glue role with AWSGlueServiceRole and AmazonS3FullAccess policies
def create_glue_role(role_name):
    assume_role_policy_document = {
        "Version": "2012-10-17",
        "Statement": [
            {
              "Effect": "Allow",
              "Principal": {
                "Service": "glue.amazonaws.com"
              },
              "Action": "sts:AssumeRole"
            }
        ]
    }

    try:
        print('Creating role: {}...'.format(role_name))
        create_role_response = iam.create_role(
            RoleName = role_name,
            AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)
        )
    except Exception as e:
        print('Role creation failed. Likely already existed.')

    print('Attaching Glue Service Role policy')
    glue_policy_arn = 'arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole'
    iam.attach_role_policy(
        RoleName  = role_name,
        PolicyArn = glue_policy_arn
    )
    print('Attaching S3 full access policy...')
    s3_policy_arn = 'arn:aws:iam::aws:policy/AmazonS3FullAccess'
    iam.attach_role_policy(
        RoleName  = role_name,
        PolicyArn = s3_policy_arn
    )

    print('Waiting for policy attachment to propagate...')
    time.sleep(60) # Wait for a minute to allow IAM role policy attachment to propagate
    return True

In [28]:
create_glue_role(role_name)

Creating role: GluePreprocessingRole...
Attaching Glue Service Role policy
Attaching S3 full access policy...
Waiting for policy attachment to propagate...


True

<a id='job' />

## 6.  Creating our Glue Job
[(back to top)](#contents)

We are all set for creating our Glue job and starting a run. for this, we must provide the following properties:

- **IAM role:** Specify the IAM role created before.

- **Type:** Choose Python shell to run a Python script with the job command named pythonshell.

- **Python version:** Choose the Python version. The default is Python 3.

- **Custom script:** You must provide the script location in Amazon S3. 

- **Maximum capacity:** The maximum number of AWS Glue DPUs that can be allocated when the job runs. A DPU is a relative measure of processing power that consists of 4 vCPUs of compute capacity and 16 GB of memory. For Python Shell jobs, you can set the value to 0.0625 or 1. The default is 0.0625.

- **Extra-py-files (Python library path):** The Amazon S3 location of one or more Python libraries packaged as an `.egg` or a `.whl` file. In this example we created a `.whl` package.


In [29]:
job_name='job_aiml_monthly_stage_preprocess' #Replace with the name for your Glue job

Create the Glue job:

In [30]:
job = glue.create_job(Name=job_name, 
                      Role=role_name,
                      Command={'Name': 'pythonshell', #We specify that we will create a Python Shell job
                               "PythonVersion" : "3",
                               'ScriptLocation': f's3://{data_bucket_name}/artifacts/code/monthly_stage/glue_pyhton_script.py'}, #S3 location of the python script
                      DefaultArguments={
                        '--extra-py-files': f's3://{data_bucket_name}/artifacts/code/monthly_stage/glue_monthly_stage_dependencies-0.1-py3-none-any.whl'}, #S3 location for dependencies file
                      MaxCapacity=1 #This is the maximum capacity for Python Shell jobs
                      )

<a id='run' />

## 7.  Running our Glue Job
[(back to top)](#contents)

In [31]:
#Arguments needed for Glue job run:
today = '2020-07-01'

In [32]:
job_run = glue.start_job_run(
    JobName = job_name,
    Arguments = {
        '--today':today,
        '--data_bucket_name': data_bucket_name,
    } 
)

Wait for the job to run:

In [33]:
MAX_WAIT_TIME=time.time() + 60*10 # Maximum wait time set to 1 hour

In [34]:
max_time = time.time() + MAX_WAIT_TIME
while time.time() < max_time:
    response=glue.get_job_run(JobName=job_name, RunId=job_run['JobRunId'])
    status = response['JobRun']['JobRunState']
    print('Job run: {}'.format(status))
    
    if status == 'SUCCEEDED' or status == 'FAILED':
        break
        
    time.sleep(60)

Job run: RUNNING
Job run: RUNNING
Job run: SUCCEEDED


<a id='review' />

## 8.  Review the transformed data
[(back to top)](#contents)

Review the transformed data

In [35]:
today = datetime.strptime(today, '%Y-%m-%d').date()
first_day_current_month=today.replace(day=1)
last_day_previous_month=(first_day_current_month - timedelta(days=1))
df=wr.s3.read_parquet(path=f's3://{data_bucket_name}/data/monthly_stage/dt={str(last_day_previous_month)}')

In [11]:
df.head()

In [12]:
df.info()