<a href="https://colab.research.google.com/github/victorywwong/aws-big-data-machine-learning/blob/master/AWS_Datalake_for_ML_Training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

AWS Datalake for ML Application

In [0]:
import boto3
import botocore
import json
import time
import os
import getpass

import project_path # path to helper methods
from lib import workshop
from pandas import read_sql

glue = boto3.client('glue')
s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
cfn = boto3.client('cloudformation')
redshift_client = boto3.client('redshift')
ec2_client = boto3.client('ec2')
rds = boto3.client('rds')

session = boto3.session.Session()
region = session.region_name
account_id = boto3.client('sts').get_caller_identity().get('Account')

workshop_user = 'db'
database_name = 'training' # AWS Glue Data Catalog Database Name
environment_name = 'workshop{0}'.format(workshop_user)

use_existing = True

Create S3 Bucket

In [0]:
bucket = workshop.create_bucket(region, session, 'db-')
print(bucket)

Create VPC

In [0]:
if use_existing:
    vpc_filter = [{'Name':'isDefault', 'Values':['true']}]
    default_vpc = ec2_client.describe_vpcs(Filters=vpc_filter)
    vpc_id = default_vpc['Vpcs'][0]['VpcId']

    subnet_filter = [{'Name':'vpc-id', 'Values':[vpc_id]}]
    subnets = ec2_client.describe_subnets(Filters=subnet_filter)
    subnet1_id = subnets['Subnets'][0]['SubnetId']
    subnet2_id = subnets['Subnets'][1]['SubnetId']
else: 
    vpc, subnet1, subnet2 = workshop.create_and_configure_vpc()
    vpc_id = vpc.id
    subnet1_id = subnet1.id
    subnet2_id = subnet2.id
print(vpc_id)
print(subnet1_id)
print(subnet2_id)

Install CloudFormation Template

In [0]:
rds_file = 'sqlserver-rds.yaml'
file_path = os.path.join('cfn', rds_file)
session.resource('s3').Bucket(bucket).Object(file_path).upload_file(file_path)

Execute Script to Install RDS

In [0]:
admin_user = getpass.getpass()
admin_password = getpass.getpass()
import re

pattern = re.compile(r"^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)[a-zA-Z\d]{8,}$")
result = pattern.match(admin_password)
if result:
    print('Valid')
else:
    print('Invalid, Password must be 8 characters long alphanumeric only 1 Upper, 1 Lower')

cfn_template = 'https://s3.amazonaws.com/{0}/cfn/{1}'.format(bucket, rds_file)
print(cfn_template)

rds_stack_name = 'SQLServerRDSStack'
response = cfn.create_stack(
    StackName=rds_stack_name,
    TemplateURL=cfn_template,
    Capabilities = ["CAPABILITY_NAMED_IAM"],
    Parameters=[
        {
            'ParameterKey': 'SqlServerInstanceName',
            'ParameterValue': environment_name
        },
        {
            'ParameterKey': 'DatabaseUsername',
            'ParameterValue': admin_user
        },
        {
            'ParameterKey': 'DatabasePassword',
            'ParameterValue': admin_password
        }
    ]
)

print(response)

RDS Status Check

In [0]:
response = cfn.describe_stacks(
    StackName=rds_stack_name
)

while response['Stacks'][0]['StackStatus'] != 'CREATE_COMPLETE':
    print('Not yet complete.')
    time.sleep(30)
    response = cfn.describe_stacks(
        StackName=rds_stack_name
    )

for output in response['Stacks'][0]['Outputs']:
    if (output['OutputKey'] == 'SQLDatabaseEndpoint'):
        rds_endpoint = output['OutputValue']
        print('RDS Endpoint: {0}'.format(rds_endpoint))
    if (output['OutputKey'] == 'SQLServerSecurityGroup'):
        rds_sg_id = output['OutputValue']
        print('RDS Security Group: {0}'.format(rds_sg_id))

Create DB for labelled data colelction

In [0]:
!pip install pymssql

In [0]:
import pymssql

conn = pymssql.connect(rds_endpoint, admin_user, admin_password)
conn.autocommit(True)
cr=conn.cursor()
cr.execute('create database training;')
conn.commit()

def run_sql_file(filename, connection):
    '''
    The function takes a filename and a connection as input
    and will run the SQL query on the given connection  
    '''
    file = open(filename, 'r')
    sql = s = " ".join(file.readlines())
    cursor = connection.cursor()
    cursor.execute(sql)    
    connection.commit()
  
run_sql_file('db-scripts/data-model.sql', conn)

Collect Data

Get execution role

In [0]:
# Import SageMaker Python SDK to get the Session and execution_role
import sagemaker
from sagemaker import get_execution_role
sagemaker_session = sagemaker.Session()
role = get_execution_role()
role_name = role[role.rfind('/') + 1:]
print(role_name)

Setup IAM Role


```
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": [
          "sagemaker.amazonaws.com",
          "glue.amazonaws.com"
        ]
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
```



Get RDS Instance

In [0]:
response = rds.describe_db_instances(DBInstanceIdentifier=environment_name)

rds_sg_id = response['DBInstances'][0]['VpcSecurityGroups'][0]['VpcSecurityGroupId']
print(rds_sg_id)

rds_az = response['DBInstances'][0]['AvailabilityZone']
print(rds_az)

for sub in response['DBInstances'][0]['DBSubnetGroup']['Subnets']:
    if sub['SubnetAvailabilityZone']['Name'] == rds_az:
        rds_subnet = sub['SubnetIdentifier']
        print(rds_subnet)
workshop.create_db(glue, account_id, database_name, 'Training data in MS SQL Server database')

Create Glue SQL Connection

In [0]:
db_connection_name = 'MSSQLTrainingConnection{0}'.format(workshop_user)

response = glue.create_connection(
    CatalogId=account_id,
    ConnectionInput={
        'Name': db_connection_name,
        'Description': 'MS SQL Server Training Connection',
        'ConnectionType': 'JDBC',
        'MatchCriteria': [
            'string',
        ],
        'ConnectionProperties': {
            'JDBC_CONNECTION_URL': 'jdbc:sqlserver://{0};databaseName={1}'.format(rds_endpoint, database_name),
            'JDBC_ENFORCE_SSL': 'false',
            'PASSWORD': admin_password,
            'USERNAME': admin_user
        },
        'PhysicalConnectionRequirements': {
            'SubnetId': rds_subnet,
            'SecurityGroupIdList': [
                rds_sg_id,
            ],
            'AvailabilityZone': rds_az
        }
    }
)

print(response)

Use Glue Crawler to discover data

In [0]:
crawler_name = 'MSSQL-Training-Crawler'

response = glue.create_crawler(
    Name=crawler_name,
    Role=role,
    DatabaseName=database_name,
    Description='Crawler for SQL Server Training Database',
    Targets={
        'JdbcTargets': [
            {
                'ConnectionName': db_connection_name,
                'Path': database_name
            },
        ]
    },
    TablePrefix='R_',
    SchemaChangePolicy={
        'UpdateBehavior': 'UPDATE_IN_DATABASE',
        'DeleteBehavior': 'DEPRECATE_IN_DATABASE'
    }
)

Start Glue Crawler

In [0]:
response = glue.start_crawler(
    Name=crawler_name
)

print ("Crawler: https://{0}.console.aws.amazon.com/glue/home?region={0}#crawler:name={1}".format(region, crawler_name))

Get Glue Crawler Status

In [0]:
crawler_status = glue.get_crawler(Name=crawler_name)['Crawler']['State']
while crawler_status not in ('READY'):
    crawler_status = glue.get_crawler(Name=crawler_name)['Crawler']['State']
    print(crawler_status)
    time.sleep(30)
print('https://{0}.console.aws.amazon.com/glue/home?region={0}#database:name={1}'.format(region, database_name))

AWS Glue ETL & Balance Dataset

In [0]:
import sys
import os
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'S3_OUTPUT_BUCKET', 'S3_OUTPUT_KEY_PREFIX', 'DATABASE_NAME', 'REGION'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database=args['DATABASE_NAME'], table_name='r_training_dbo_labelled', transformation_ctx = "datasource0")

# datasource1 = glueContext.create_dynamic_frame.from_catalog(database=args['DATABASE_NAME'], table_name='r_training_dbo_label', transformation_ctx = "datasource1")
# remap data here as needed to generate output
# datasource2 = RenameField.apply(datasource1, "id", "xyzid")
# datasource3 = datasource2.join( ["abc_id"],["id"], datasource0, transformation_ctx = "join")
# applymapping1 = ApplyMapping.apply(frame = datasource3, mappings = [("context", "string", "context", "string"), ("label", "string", "label", "string")], transformation_ctx = "applymapping1")

df1 = datasource0.toDF()

df = df1

per_category_count = df.groupBy('label').count().collect()

#Find the category with least data
counts = [x['count'] for x in per_category_count]
min_count = float(min(counts))
factors = map(lambda x: (x['label_category'], min_count/float(x['count'])), per_category_count)

samples = []
for category, n in factors:
  sample = glueContext.create_dynamic_frame.from_catalog(database=args['DATABASE_NAME'], table_name='r_training_dbo_labelled',
      push_down_predicate = "label_category == '{}'".format(category)
  )
  sample = sample.toDF().sample(
      withReplacement=False,
      fraction=n,
      seed=42
  )
  samples.append(sample)

balanced_df = samples[0]
for sample in samples[1:]
  balanced_df = balanced_df.union(sample)

#Backup balanced dataset
balanced = DynamicFrame.fromDF(balanced_df, glueContext, "balanced")

parquet_output_path = 's3://' + os.path.join(args['S3_OUTPUT_BUCKET'], args['S3_OUTPUT_KEY_PREFIX'])

sampled_data_sink = glueContext.write_dynamic_frame.from_options(
    frame = balanced,
    connection_type='s3',
    connection_options={"path": parquet_output_path, "partitionKeys": [["label_category"]]},
    format="parguet"
)

#Split balanced into train, test, and validation sets
train, validation, test = balanced_df.randomSplit(weights = [.6,.2, .2], seed=42)

# Repartition the data frame to store sets into single file and convert to DynamicFrame
train_set = DynamicFrame.fromDF(train.repartition(1), glueContext, "train")
validation_set = DynamicFrame.fromDF(validation.repartition(1), glueContext, "validation")
test_set = DynamicFrame.fromDF(test.repartition(1), glueContext, "test")

csv_output_path = 's3://' + os.path.join(args['S3_OUTPUT_BUCKET'], args['S3_OUTPUT_KEY_PREFIX'])

train_data_sink = glueContext.write_dynamic_frame.from_options(
    frame = train_set,
    connection_type='s3',
    connection_options={"path": "{}/train".format(csv_output_path)},
    format = "csv",
    format_options = {"seperator": " ", "writeHeader": False, "quoteChar": "-1"}}
)

validation_data_sink = glueContext.write_dynamic_frame.from_options(
    frame = validation_set,
    connection_type='s3',
    connection_options={"path": "{}/validation".format(csv_output_path)},
    format = "csv",
    format_options = {"seperator": " ", "writeHeader": False, "quoteChar": "-1"}}
)

test_data_sink = glueContext.write_dynamic_frame.from_options(
    frame = test_set,
    connection_type='s3',
    connection_options={"path": "{}/test".format(csv_output_path)},
    format = "csv",
    format_options = {"seperator": " ", "writeHeader": False, "quoteChar": "-1"}}
)

job.commit()

Upload the ETL Script to S3

In [0]:
script_location = sagemaker_session.upload_data(path='training_etl.py', bucket=bucket, key_prefix='codes')

In [0]:
# Output location of the data.
s3_output_key_prefix = 'datalake/training'

Creating jobs with AWS Glue

In [0]:
from time import gmtime, strftime
import time

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

job_name = 'training-etl-' + timestamp_prefix
response = glue.create_job(
    Name=job_name,
    Description='PySpark job to convert training SQL Server tables data to parquet and training, validation and test sets',
    Role=role, # you can pass your existing AWS Glue role here if you have used Glue before
    ExecutionProperty={
        'MaxConcurrentRuns': 1
    },
    Command={
        'Name': 'glueetl',
        'ScriptLocation': script_location
    },
    DefaultArguments={
        '--job-language': 'python',
        '--job-bookmark-option': 'job-bookmark-disable'
    },
    AllocatedCapacity=5,
    Timeout=60,
)
glue_job_name = response['Name']
print(glue_job_name)

Run jobs with AWS Glue

In [0]:
job_run_id = glue.start_job_run(JobName=job_name,
                                       Arguments = {
                                        '--S3_OUTPUT_BUCKET': bucket,
                                        '--S3_OUTPUT_KEY_PREFIX': s3_output_key_prefix,
                                        '--DATABASE_NAME': database_name,
                                        '--REGION': region
                                       })['JobRunId']
print(job_run_id)

In [0]:
job_run_status = glue.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']
while job_run_status not in ('FAILED', 'SUCCEEDED', 'STOPPED'):
    job_run_status = glue.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']
    print (job_run_status)
    time.sleep(60)
print(job_run_status)

Model training through AWS Sagemaker

In [0]:
import boto3
import pandas
import sagemaker



Clean Up

In [0]:
response = cfn.delete_stack(StackName=rds_stack_name)

In [0]:
response = glue.delete_crawler(Name=crawler_name)

In [0]:
response = glue.delete_job(JobName=glue_job_name)

In [0]:
response = glue.delete_database(
    CatalogId = account_id,
    Name = database_name
)

In [0]:
response = glue.delete_connection(
    CatalogId=account_id,
    ConnectionName=db_connection_name
)

In [0]:
waiter = cfn.get_waiter('stack_delete_complete')
waiter.wait(
    StackName=rds_stack_name
)

print('The wait is over for {0}'.format(rds_stack_name))

In [0]:
!aws s3 rb s3://$bucket --force

In [0]:
if not use_existing:
    workshop.vpc_cleanup(vpc_id)