Skip to content

Commit

Permalink
Refactor ML-Ops and Lambda (#149)
Browse files Browse the repository at this point in the history
* refactor ML Ops to multiple workflows
* rewrite ML Ops in native HCL, wrapped in `jsonencode()`
* terraform fmt compliance for 0.14.0
* glue version fix
* rename 'batch_transform'->'batch_scoring'
* skip ecr push if using builtin sagemaker model
* pip and python fixes
* improved lambda and step function execution
* bump to terraform 0.14.2
* revert to upstream 'terraform-aws-key-pair' repo
* fix interpolation warnings

Co-authored-by: Mark Stein <mark.stein@slalom.com>
Co-authored-by: mystycs <mystycs@gmail.com>
Co-authored-by: jacksandom <jack.sandom@slalom.com>
  • Loading branch information
4 people committed Dec 27, 2020
1 parent ab8e462 commit 94289b8
Show file tree
Hide file tree
Showing 61 changed files with 957 additions and 706 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/slash-commands.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
git config --global user.name "AJ Steers (CI bot)"
- uses: hashicorp/setup-terraform@v1
with:
terraform_version: 0.13.0
terraform_version: 0.14.2
- name: Run Terraform auto-format
run: terraform fmt -recursive
- name: Commit and Push
Expand Down Expand Up @@ -90,7 +90,7 @@ jobs:
git config --global user.name "AJ Steers (CI bot)"
- uses: hashicorp/setup-terraform@v1
with:
terraform_version: 0.13.0
terraform_version: 0.14.2
- name: Install Go
uses: actions/setup-go@v2
- name: Install terraform-docs
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/terraform-ci-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- name: Setup Terraform
uses: hashicorp/setup-terraform@v1
with:
terraform_version: 0.13.0
terraform_version: 0.14.2
- name: Terraform Linter Check (Formatting)
run: |
terraform fmt -recursive -check
Expand Down Expand Up @@ -75,7 +75,7 @@ jobs:
- name: Setup Terraform
uses: hashicorp/setup-terraform@v1
with:
terraform_version: 0.13.0
terraform_version: 0.14.2
terraform_wrapper: false
- name: "Terraform Init (${{ matrix.sample-id }})"
run: |
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.9.6
0.9.7
2 changes: 1 addition & 1 deletion catalog/aws/data-lake-users/iam.tf
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ resource "aws_iam_group_membership" "group_membership" {
}
resource "local_file" "encrypted_secret_key_files" {
for_each = var.users
filename = "${path.root}/.terraform/tmp/${each.value}-encrypted-secret.txt"
filename = "${local.temp_artifacts_root}/${each.value}-encrypted-secret.txt"
content = <<EOF
-----BEGIN PGP MESSAGE-----
Version: Keybase OpenPGP v2.1.13
Expand Down
1 change: 1 addition & 0 deletions catalog/aws/data-lake-users/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ locals {
flatten(values(var.user_groups))
])
)
temp_artifacts_root = "${path.root}/.terraform/tmp"
}

resource "aws_kms_key" "group_kms_keys" {
Expand Down
7 changes: 2 additions & 5 deletions catalog/aws/data-lake/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ data "aws_s3_bucket" "data_bucket_override" {
locals {
s3_path_to_lambda_zip = "s3://${aws_s3_bucket.s3_metadata_bucket.id}/code/lambda/${var.name_prefix}lambda.zip"
random_bucket_suffix = lower(random_id.suffix.dec)
data_bucket_name = (
var.data_bucket_override != null ? data.aws_s3_bucket.data_bucket_override[0].id : aws_s3_bucket.s3_data_bucket[0].id
)
data_bucket_name = var.data_bucket_override != null ? data.aws_s3_bucket.data_bucket_override[0].id : aws_s3_bucket.s3_data_bucket[0].id
}

resource "aws_s3_bucket" "s3_data_bucket" {
Expand Down Expand Up @@ -74,8 +72,7 @@ module "triggered_lambda" {

runtime = "python3.8"
lambda_source_folder = var.lambda_python_source
upload_to_s3 = true
upload_to_s3_path = local.s3_path_to_lambda_zip
s3_upload_path = local.s3_path_to_lambda_zip
pip_path = var.pip_path

functions = {
Expand Down
7 changes: 5 additions & 2 deletions catalog/aws/data-lake/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ EOF
default = {}
}
variable "pip_path" {
description = "The path to a local pip executable, used to package python dependencies for any lambda triggers."
default = "pip3"
description = <<EOF
The path to a local pip executable, used to package python dependencies.
If omitted, will use 'pip' on Windows-based systems and 'pip3' on Linux/Mac.
EOF
default = null
}
3 changes: 2 additions & 1 deletion catalog/aws/dbt/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ locals {
contains(["CST"], var.scheduled_timezone) ? -6 :
contains(["EST"], var.scheduled_timezone) ? -5 :
contains(["UTC", "GMT"], var.scheduled_timezone) ? 0 :
1 / 0 # ERROR: currently supported timezone code are: "UTC", "GMT", "EST", "PST" and "PDT"
1 / 0
# ERROR: currently supported timezone code are: "UTC", "GMT", "EST", "PST" and "PDT"
)
}

Expand Down
1 change: 1 addition & 0 deletions catalog/aws/ml-ops/ecr-image.tf
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# OPTIONAL: Only if using 'bring your own model'

module "ecr_image_byo_model" {
count = var.built_in_model_image == null ? 1 : 0
source = "../../../components/aws/ecr-image"
name_prefix = var.name_prefix
environment = var.environment
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Return the name and path of the best model from the hyperparameter tuning job."""


def lambda_handler(event, context):
print(event)
return {
"HyperParameterTuningJobName" : event["HyperParameterTuningJobName"],
"HyperParameterTuningJobName": event["HyperParameterTuningJobName"],
"bestTrainingJobName": event["BestTrainingJob"]["TrainingJobName"],
"modelDataUrl": event["TrainingJobDefinition"]["OutputDataConfig"][
"S3OutputPath"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
sm_client = boto3.client("sagemaker")
s3_client = boto3.client("s3")

metadata_store_name = os.environ['metadata_store_name']
metadata_store_name = os.environ["metadata_store_name"]


def lambda_handler(event, context):
"""Retrieve transform job name from event and return transform job status."""
model_name = event["BestModelResult"]["bestTrainingJobName"]
model_data_url = event["BestModelResult"]["modelDataUrl"]


try:
# Query boto3 API to check training status.
Expand All @@ -36,19 +36,18 @@ def lambda_handler(event, context):
# all datetime objects returned to unix time.
for index, metric in enumerate(response["FinalMetricDataList"]):
metric["Timestamp"] = metric["Timestamp"].timestamp()

training_metrics = response["FinalMetricDataList"]

# log model metadata
# log model metadata
bucket = metadata_store_name
file_name = 'log/' + event['HyperParameterTuningJobName'] + '.json'
uploadByteStream = bytes(json.dumps(event).encode('UTF-8'))
file_name = "log/" + event["HyperParameterTuningJobName"] + ".json"
uploadByteStream = bytes(json.dumps(event).encode("UTF-8"))
s3_client.put_object(Bucket=bucket, Key=file_name, Body=uploadByteStream)



return {
"statusCode": 200,
"trainingMetrics": training_metrics,
"modelName": model_name,
"modelDataUrl": model_data_url
"modelDataUrl": model_data_url,
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@
import json
import boto3

client = boto3.client('glue')
client = boto3.client("glue")


def lambda_handler(event, context):

event = event["Payload"]

client.start_crawler(Name=event['CrawlerName'])

client.start_crawler(Name=event["CrawlerName"])
return {
'statusCode': 200,
"statusCode": 200,
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@


def lambda_handler(event, context):
return {
"statusCode": 200,
"JobName": event["JobName"] + "-" + datetime.today().strftime("%y%m%d%H%M%S"),
}
event["statusCode"] = 200
event["JobName"] = (
event["JobName"] + "-" + datetime.today().strftime("%y%m%d%H%M%S")
)
return event
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
""" This class implements a function to send evaluation metrics to CloudWatch using Boto3 """

# TODO: This appears unused. Consider deleting.

import boto3

modelname = "${var.job_name}"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
"""
This is a simply function to return the machine learning problem type
This is a simply function to return the machine learning problem type
"""

problem = "${var.data_drift_ml_problem_type}"


def lambda_hanlder(event, context):
def lambda_handler(event, context):
response = problem
return response

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""
This function will load model prediction outputs from S3 bucket to a selected database platform such as PostgreSQL.
"""

# import boto3
import psycopg2


def handler(event, context):
# Load csv to Postgres

# first need to enable the prediction outputs loading process
is_predictive_db_enabled = event.get("is_predictive_db_enabled", False)
if not is_predictive_db_enabled:
return

# user defined database related variables
host = event.get("db_host", None)
db_name = event.get("db_name", None)
db_user = event.get("db_user", None)
db_password = event.get("db_password", None)
s3_csv = event.get("s3_csv", None)

connection_string = (
f"dbname='{db_name}' user='{db_user}' host='{host}' password='{db_password}'"
)

pg_load(connection_string, s3_csv)


def pg_load(connection, file_path, table_name="model_stats"):
try:
conn = psycopg2.connect(connection)
print("Connecting to Database")
cur = conn.cursor()
# Open the input file for copy
# s3_client = boto3.client("s3")
f = open(file_path, "r")
# Load csv file into the table
cur.copy_expert(
"COPY {} FROM STDIN WITH CSV QUOTE e'\x01' DELIMITER e'\x02'".format(
table_name
),
f,
)
cur.execute("commit;")
print("Loaded data into {}".format(table_name))
conn.close()
print("DB connection closed.")

except Exception as e:
print("Error {}".format(str(e)))
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Used in drift detection
psycopg2-binary

# Used for S3 and Sagemaker
boto3
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,10 @@ def lambda_handle(event, context):
sns.publish(
TopicArn="arn:aws:sns:${var.environment.aws_region}:Data drift detected",
Subject="Data Drift Detected",
Message=f"The latest data drift monitor status is {event['latest_result_status']}, model training is stopped. Please provide update datasets and restart the process. For more details, please follow this link to the latest report {event['report_uri']}",
Message=(
f"The latest data drift monitor status is {event['latest_result_status']}, "
"model training is stopped. Please provide update datasets and restart the "
"process. For more details, please follow this link to the latest "
"report {event['report_uri']}"
),
)
54 changes: 0 additions & 54 deletions catalog/aws/ml-ops/lambda-python/load_predoutput_db.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@
import boto3
import os

state_machine_arn = os.environ['state_machine_arn']
client = boto3.client('stepfunctions')
state_machine_arn = os.environ["state_machine_arn"]
client = boto3.client("stepfunctions")


def lambda_handler(event, context):
"""Execute ML State Machine when new training data is uploaded to S3."""

client.start_execution(stateMachineArn=state_machine_arn)

return {
"statusCode": 200
}

client.start_execution(stateMachineArn=state_machine_arn, input=str(event))

return {"statusCode": 200}
Loading

0 comments on commit 94289b8

Please sign in to comment.