From 9cc3e65a1bba0a1c1a7c84c57f2b28a0407d4030 Mon Sep 17 00:00:00 2001 From: Reda Noureddine Date: Wed, 17 Apr 2024 09:54:53 +0200 Subject: [PATCH 01/15] feat: separate loader file --- jobs/ml-ops/inference/loader.py | 36 ++++++++++++++++++++++++++++ jobs/ml-ops/inference/main.py | 42 +++------------------------------ 2 files changed, 39 insertions(+), 39 deletions(-) create mode 100644 jobs/ml-ops/inference/loader.py diff --git a/jobs/ml-ops/inference/loader.py b/jobs/ml-ops/inference/loader.py new file mode 100644 index 0000000..4577a34 --- /dev/null +++ b/jobs/ml-ops/inference/loader.py @@ -0,0 +1,36 @@ +import pickle +import boto3 +import os + +class ClassifierLoader: + _classifier = None + + @classmethod + def load(cls, force=False): + if force or cls._classifier is None: + access_key = os.environ["ACCESS_KEY"] + secret_key = os.environ["SECRET_KEY"] + region_name = os.environ["REGION"] + + bucket_name = os.environ["S3_BUCKET_NAME"] + s3_url = os.environ["S3_URL"] + + s3 = boto3.client( + "s3", + region_name=region_name, + endpoint_url=s3_url, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + ) + + # get file with the latest model version + bucket_objects = s3.list_objects(Bucket=bucket_name) + get_last_modified = lambda obj: int(obj['LastModified'].strftime('%s')) + latest_model_file = [obj['Key'] for obj in sorted(bucket_objects, key=get_last_modified)][0] + + s3.download_file(bucket_name, latest_model_file, latest_model_file) + + with open(latest_model_file, "rb") as fh: + cls._classifier = pickle.load(fh) + + return cls._classifier diff --git a/jobs/ml-ops/inference/main.py b/jobs/ml-ops/inference/main.py index 579d340..900802c 100644 --- a/jobs/ml-ops/inference/main.py +++ b/jobs/ml-ops/inference/main.py @@ -1,50 +1,14 @@ from fastapi import FastAPI from sklearn.ensemble import RandomForestClassifier -from sklearn.metrics import RocCurveDisplay -import pickle -import boto3 -import pandas -import os - +from loader import ClassifierLoader import data classifier = RandomForestClassifier() app = FastAPI() -MODEL_FILE = "classifier.pkl" - - -class ClassifierLoader: - _classifier = None - - @classmethod - def load(cls, force=False): - if force or cls._classifier is None: - access_key = os.environ["ACCESS_KEY"] - secret_key = os.environ["SECRET_KEY"] - region_name = os.environ["REGION"] - - bucket_name = os.environ["S3_BUCKET_NAME"] - s3_url = os.environ["S3_URL"] - - s3 = boto3.client( - "s3", - region_name=region_name, - endpoint_url=s3_url, - aws_access_key_id=access_key, - aws_secret_access_key=secret_key, - ) - - s3.download_file(bucket_name, MODEL_FILE, MODEL_FILE) - - with open(MODEL_FILE, "rb") as fh: - cls._classifier = pickle.load(fh) - - return cls._classifier - - -@app.post("/load") +# this endpoint is used by cron trigger to load model from S3 +@app.get("/") def load(): """Reloads classifier from model registry bucket""" From c5dfce0ceff0c6ae686a16dc2611f0b1b9c0e04d Mon Sep 17 00:00:00 2001 From: Reda Noureddine Date: Wed, 17 Apr 2024 09:56:00 +0200 Subject: [PATCH 02/15] feat: version model and other training artifacts --- jobs/ml-ops/training/main.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/jobs/ml-ops/training/main.py b/jobs/ml-ops/training/main.py index 7b2e87c..257e315 100644 --- a/jobs/ml-ops/training/main.py +++ b/jobs/ml-ops/training/main.py @@ -5,14 +5,15 @@ import training as ml from sklearn.metrics import RocCurveDisplay from sklearn.metrics import ConfusionMatrixDisplay +from datetime import datetime -DATA_FILE_NAME = "bank-additional-full.csv" - -MODEL_FILE = "classifier.pkl" -PERF_FILE = "performance.pkl" -ROC_AUC_FILE = "roc_auc.png" -CONFUSION_MATRIX_FILE = "confusion_matrix.png" +VERSION = datetime.now().strftime("%Y%m%d%H%M") +DATA_FILE_NAME = "bank-additional-full.csv" +MODEL_FILE = "classifier_"+VERSION+".pkl" +PERF_FILE = "performance_"+VERSION+".pkl" +ROC_AUC_FILE = "roc_auc_"+VERSION+".png" +CONFUSION_MATRIX_FILE = "confusion_matrix_"+VERSION+".png" def main() -> int: """ From 77c370331ff21d24f1f6fb0d635e53151d412a0e Mon Sep 17 00:00:00 2001 From: Reda Noureddine Date: Wed, 17 Apr 2024 09:56:29 +0200 Subject: [PATCH 03/15] feat: use cron for jobs and container --- jobs/ml-ops/terraform/container.tf | 6 ++++++ jobs/ml-ops/terraform/jobs.tf | 10 ++++++++-- jobs/ml-ops/terraform/variables.tf | 17 +++++++++-------- 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/jobs/ml-ops/terraform/container.tf b/jobs/ml-ops/terraform/container.tf index 5329163..d891e9c 100644 --- a/jobs/ml-ops/terraform/container.tf +++ b/jobs/ml-ops/terraform/container.tf @@ -24,3 +24,9 @@ resource "scaleway_container" "inference" { } deploy = true } + +resource scaleway_container_cron "inference_cron" { + container_id = scaleway_container.inference.id + schedule = var.inference_cron_schedule + args = jsonencode({}) +} \ No newline at end of file diff --git a/jobs/ml-ops/terraform/jobs.tf b/jobs/ml-ops/terraform/jobs.tf index d52b558..876f8c8 100644 --- a/jobs/ml-ops/terraform/jobs.tf +++ b/jobs/ml-ops/terraform/jobs.tf @@ -4,7 +4,10 @@ resource "scaleway_job_definition" "fetch_data" { memory_limit = 1024 image_uri = docker_image.data.name timeout = "10m" - + cron { + schedule = var.data_fetch_cron_schedule + timezone = "Europe/Paris" + } env = { "S3_BUCKET_NAME" : scaleway_object_bucket.main.name, "S3_URL" : var.s3_url, @@ -20,7 +23,10 @@ resource "scaleway_job_definition" "training" { memory_limit = 4096 image_uri = docker_image.training.name timeout = "10m" - + cron { + schedule = var.training_cron_schedule + timezone = "Europe/Paris" + } env = { "S3_BUCKET_NAME" : scaleway_object_bucket.main.name, "S3_URL" : var.s3_url, diff --git a/jobs/ml-ops/terraform/variables.tf b/jobs/ml-ops/terraform/variables.tf index 84355ba..afb8c2e 100644 --- a/jobs/ml-ops/terraform/variables.tf +++ b/jobs/ml-ops/terraform/variables.tf @@ -25,15 +25,16 @@ variable "s3_url" { default = "https://s3.fr-par.scw.cloud" } -variable "data_file" { - type = string - description = "name data file in data store" - default = "bank_telemarketing.csv" +variable "data_fetch_cron_schedule" { + type = string } -variable "model_object" { - type = string - description = "name of model object stored in model registry" - default = "classifier.pkl" +variable "training_cron_schedule" { + type = string + default = "*/15 * * * *" } +variable "inference_cron_schedule" { + type = string + default = "*/25 * * * *" +} From e2a9ba5e8bb0506fabdf78d4d3d2754a8ac8da63 Mon Sep 17 00:00:00 2001 From: Reda Noureddine Date: Wed, 17 Apr 2024 09:56:57 +0200 Subject: [PATCH 04/15] chore: scaleway provider version for terraform --- jobs/ml-ops/terraform/versions.tf | 1 + 1 file changed, 1 insertion(+) diff --git a/jobs/ml-ops/terraform/versions.tf b/jobs/ml-ops/terraform/versions.tf index b4ddc4b..b186193 100644 --- a/jobs/ml-ops/terraform/versions.tf +++ b/jobs/ml-ops/terraform/versions.tf @@ -2,6 +2,7 @@ terraform { required_providers { scaleway = { source = "scaleway/scaleway" + version = ">= 2.39" } docker = { source = "kreuzwerker/docker" From 8e956b6b150926685ebc1ab6962b3c34e913d161 Mon Sep 17 00:00:00 2001 From: Reda Noureddine Date: Wed, 17 Apr 2024 16:37:29 +0200 Subject: [PATCH 05/15] feat: model versioning --- jobs/ml-ops/inference/loader.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/jobs/ml-ops/inference/loader.py b/jobs/ml-ops/inference/loader.py index 4577a34..e3b9483 100644 --- a/jobs/ml-ops/inference/loader.py +++ b/jobs/ml-ops/inference/loader.py @@ -4,6 +4,7 @@ class ClassifierLoader: _classifier = None + _classifier_version = "" @classmethod def load(cls, force=False): @@ -23,14 +24,22 @@ def load(cls, force=False): aws_secret_access_key=secret_key, ) - # get file with the latest model version + # get model file with the latest version bucket_objects = s3.list_objects(Bucket=bucket_name) - get_last_modified = lambda obj: int(obj['LastModified'].strftime('%s')) - latest_model_file = [obj['Key'] for obj in sorted(bucket_objects, key=get_last_modified)][0] + get_last_modified = lambda object: int(object['LastModified'].strftime('%s')) + model_objects = [model_object for model_object in bucket_objects['Contents'] if "classifier" in model_object['Key']] + latest_model_file = [object['Key'] for object in sorted(model_objects, key=get_last_modified)][0] s3.download_file(bucket_name, latest_model_file, latest_model_file) with open(latest_model_file, "rb") as fh: cls._classifier = pickle.load(fh) + cls._classifier_version = latest_model_file[11:-4] + + print('Successfully loaded model file: {latest_model_file}'.format(latest_model_file=latest_model_file), flush=True) return cls._classifier + + @classmethod + def model_version(cls): + return cls._classifier_version \ No newline at end of file From 4a17d10c92fb45dc62f728498f223f2548283119 Mon Sep 17 00:00:00 2001 From: Reda Noureddine Date: Wed, 17 Apr 2024 16:38:35 +0200 Subject: [PATCH 06/15] feat: info get endpoint and trigger post endpoint --- jobs/ml-ops/inference/main.py | 21 +++++++++++++++++++-- jobs/ml-ops/terraform/container.tf | 2 +- jobs/ml-ops/terraform/images.tf | 3 +++ jobs/ml-ops/terraform/variables.tf | 4 ++-- 4 files changed, 25 insertions(+), 5 deletions(-) diff --git a/jobs/ml-ops/inference/main.py b/jobs/ml-ops/inference/main.py index 900802c..e99c70e 100644 --- a/jobs/ml-ops/inference/main.py +++ b/jobs/ml-ops/inference/main.py @@ -7,12 +7,29 @@ app = FastAPI() -# this endpoint is used by cron trigger to load model from S3 @app.get("/") +def hello(): + """Get Model Version""" + + model_version = ClassifierLoader.model_version() + + if model_version == "": + return { + "message": "Hello, this is the inference server! No classifier loaded in memory." + } + + return { + "message": "Hello, this is the inference server! Serving classifier with version {model_version}" + .format(model_version=model_version) + } + +# this endpoint is used by cron trigger to load model from S3 +@app.post("/") def load(): """Reloads classifier from model registry bucket""" ClassifierLoader.load(force=True) + return {"message": "model loaded successfully"} @@ -23,7 +40,7 @@ def classify(profile: data.ClientProfile): cleaned_data = data.clean_profile(profile) data_point_processed = data.transform_data(cleaned_data) - # Lazy-loads classifer from S3 + # Lazy-loads classifier from S3 classifier = ClassifierLoader.load() prediction = classifier.predict(data_point_processed) diff --git a/jobs/ml-ops/terraform/container.tf b/jobs/ml-ops/terraform/container.tf index d891e9c..6e23151 100644 --- a/jobs/ml-ops/terraform/container.tf +++ b/jobs/ml-ops/terraform/container.tf @@ -12,7 +12,7 @@ resource "scaleway_container" "inference" { cpu_limit = 2000 memory_limit = 2048 min_scale = 1 - max_scale = 5 + max_scale = 1 environment_variables = { "S3_BUCKET_NAME" = scaleway_object_bucket.main.name "S3_URL" = var.s3_url diff --git a/jobs/ml-ops/terraform/images.tf b/jobs/ml-ops/terraform/images.tf index 479a9b3..97cd3bc 100644 --- a/jobs/ml-ops/terraform/images.tf +++ b/jobs/ml-ops/terraform/images.tf @@ -8,6 +8,7 @@ resource "docker_image" "inference" { name = "${scaleway_registry_namespace.main.endpoint}/inference:${var.image_version}" build { context = "${path.cwd}/../inference" + no_cache = true } provisioner "local-exec" { @@ -19,6 +20,7 @@ resource "docker_image" "data" { name = "${scaleway_registry_namespace.main.endpoint}/data:${var.image_version}" build { context = "${path.cwd}/../data" + no_cache = true } provisioner "local-exec" { @@ -30,6 +32,7 @@ resource "docker_image" "training" { name = "${scaleway_registry_namespace.main.endpoint}/training:${var.image_version}" build { context = "${path.cwd}/../training" + no_cache = true } provisioner "local-exec" { diff --git a/jobs/ml-ops/terraform/variables.tf b/jobs/ml-ops/terraform/variables.tf index afb8c2e..eec90fc 100644 --- a/jobs/ml-ops/terraform/variables.tf +++ b/jobs/ml-ops/terraform/variables.tf @@ -31,10 +31,10 @@ variable "data_fetch_cron_schedule" { variable "training_cron_schedule" { type = string - default = "*/15 * * * *" + default = "*/10 * * * *" } variable "inference_cron_schedule" { type = string - default = "*/25 * * * *" + default = "*/20 * * * *" } From e9888ca3a9cf18f1da6215a16d5dc6b243e8b194 Mon Sep 17 00:00:00 2001 From: Reda Noureddine Date: Wed, 17 Apr 2024 16:38:54 +0200 Subject: [PATCH 07/15] docs: add TF_VAR_data_fetch_cron_schedule --- jobs/ml-ops/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/jobs/ml-ops/README.md b/jobs/ml-ops/README.md index 8aca67a..1b81b40 100644 --- a/jobs/ml-ops/README.md +++ b/jobs/ml-ops/README.md @@ -37,6 +37,7 @@ Set your Scaleway access key, secret key and project ID in environment variables export TF_VAR_access_key= export TF_VAR_secret_key= export TF_VAR_project_id= # you can create a separate project for this example +export TF_VAR_data_fetch_cron_schedule="19 14 17 4 *" # should trigger data fetching just after deploying pipeline infrastructure cd terraform terraform init From 2bf3f3f4f1ef5c7feadb6b88a4d462a782c851e8 Mon Sep 17 00:00:00 2001 From: Reda Noureddine Date: Tue, 7 May 2024 13:02:56 +0200 Subject: [PATCH 08/15] refactor: formatting --- jobs/ml-ops/data/main.py | 3 ++- jobs/ml-ops/inference/data.py | 2 +- jobs/ml-ops/inference/loader.py | 27 +++++++++++++++++++++------ jobs/ml-ops/inference/main.py | 17 ++++++++++------- jobs/ml-ops/training/main.py | 17 +++++++++-------- jobs/ml-ops/training/training.py | 7 +++---- 6 files changed, 46 insertions(+), 27 deletions(-) diff --git a/jobs/ml-ops/data/main.py b/jobs/ml-ops/data/main.py index fa7b9e2..00fed0c 100644 --- a/jobs/ml-ops/data/main.py +++ b/jobs/ml-ops/data/main.py @@ -1,8 +1,9 @@ -import boto3 import os import urllib.request import zipfile +import boto3 + DATA_DIR = "dataset" ZIP_URL = "http://archive.ics.uci.edu/static/public/222/bank+marketing.zip" diff --git a/jobs/ml-ops/inference/data.py b/jobs/ml-ops/inference/data.py index d570cb8..8853b88 100644 --- a/jobs/ml-ops/inference/data.py +++ b/jobs/ml-ops/inference/data.py @@ -1,5 +1,5 @@ -import pandas as pd import numpy as np +import pandas as pd from pydantic import BaseModel diff --git a/jobs/ml-ops/inference/loader.py b/jobs/ml-ops/inference/loader.py index e3b9483..856df48 100644 --- a/jobs/ml-ops/inference/loader.py +++ b/jobs/ml-ops/inference/loader.py @@ -1,6 +1,8 @@ +import os import pickle + import boto3 -import os + class ClassifierLoader: _classifier = None @@ -26,9 +28,17 @@ def load(cls, force=False): # get model file with the latest version bucket_objects = s3.list_objects(Bucket=bucket_name) - get_last_modified = lambda object: int(object['LastModified'].strftime('%s')) - model_objects = [model_object for model_object in bucket_objects['Contents'] if "classifier" in model_object['Key']] - latest_model_file = [object['Key'] for object in sorted(model_objects, key=get_last_modified)][0] + get_last_modified = lambda object: int( + object["LastModified"].strftime("%s") + ) + model_objects = [ + model_object + for model_object in bucket_objects["Contents"] + if "classifier" in model_object["Key"] + ] + latest_model_file = [ + object["Key"] for object in sorted(model_objects, key=get_last_modified) + ][0] s3.download_file(bucket_name, latest_model_file, latest_model_file) @@ -36,10 +46,15 @@ def load(cls, force=False): cls._classifier = pickle.load(fh) cls._classifier_version = latest_model_file[11:-4] - print('Successfully loaded model file: {latest_model_file}'.format(latest_model_file=latest_model_file), flush=True) + print( + "Successfully loaded model file: {latest_model_file}".format( + latest_model_file=latest_model_file + ), + flush=True, + ) return cls._classifier @classmethod def model_version(cls): - return cls._classifier_version \ No newline at end of file + return cls._classifier_version diff --git a/jobs/ml-ops/inference/main.py b/jobs/ml-ops/inference/main.py index e99c70e..3f7cfab 100644 --- a/jobs/ml-ops/inference/main.py +++ b/jobs/ml-ops/inference/main.py @@ -1,12 +1,13 @@ +import data from fastapi import FastAPI -from sklearn.ensemble import RandomForestClassifier from loader import ClassifierLoader -import data +from sklearn.ensemble import RandomForestClassifier classifier = RandomForestClassifier() app = FastAPI() + @app.get("/") def hello(): """Get Model Version""" @@ -14,15 +15,17 @@ def hello(): model_version = ClassifierLoader.model_version() if model_version == "": - return { - "message": "Hello, this is the inference server! No classifier loaded in memory." - } + return { + "message": "Hello, this is the inference server! No classifier loaded in memory." + } return { - "message": "Hello, this is the inference server! Serving classifier with version {model_version}" - .format(model_version=model_version) + "message": "Hello, this is the inference server! Serving classifier with version {model_version}".format( + model_version=model_version + ) } + # this endpoint is used by cron trigger to load model from S3 @app.post("/") def load(): diff --git a/jobs/ml-ops/training/main.py b/jobs/ml-ops/training/main.py index 257e315..e41a0c3 100644 --- a/jobs/ml-ops/training/main.py +++ b/jobs/ml-ops/training/main.py @@ -1,19 +1,20 @@ -import pandas as pd import os import pickle +from datetime import datetime + import boto3 +import pandas as pd import training as ml -from sklearn.metrics import RocCurveDisplay -from sklearn.metrics import ConfusionMatrixDisplay -from datetime import datetime +from sklearn.metrics import ConfusionMatrixDisplay, RocCurveDisplay VERSION = datetime.now().strftime("%Y%m%d%H%M") DATA_FILE_NAME = "bank-additional-full.csv" -MODEL_FILE = "classifier_"+VERSION+".pkl" -PERF_FILE = "performance_"+VERSION+".pkl" -ROC_AUC_FILE = "roc_auc_"+VERSION+".png" -CONFUSION_MATRIX_FILE = "confusion_matrix_"+VERSION+".png" +MODEL_FILE = "classifier_" + VERSION + ".pkl" +PERF_FILE = "performance_" + VERSION + ".pkl" +ROC_AUC_FILE = "roc_auc_" + VERSION + ".png" +CONFUSION_MATRIX_FILE = "confusion_matrix_" + VERSION + ".png" + def main() -> int: """ diff --git a/jobs/ml-ops/training/training.py b/jobs/ml-ops/training/training.py index 478a47a..4fc7915 100644 --- a/jobs/ml-ops/training/training.py +++ b/jobs/ml-ops/training/training.py @@ -1,10 +1,9 @@ -import pandas as pd import numpy as np +import pandas as pd from imblearn.over_sampling import SMOTE -from sklearn.model_selection import train_test_split -from sklearn.metrics import accuracy_score, precision_score, recall_score, log_loss -from sklearn.model_selection import RandomizedSearchCV from sklearn.ensemble import RandomForestClassifier +from sklearn.metrics import accuracy_score, log_loss, precision_score, recall_score +from sklearn.model_selection import RandomizedSearchCV, train_test_split def transform_data(data: pd.DataFrame) -> pd.DataFrame: From 08325f23d6204b18d1bb50226833cd5ee146731c Mon Sep 17 00:00:00 2001 From: Reda Noureddine Date: Tue, 7 May 2024 13:03:20 +0200 Subject: [PATCH 09/15] docs: rewording --- jobs/ml-ops/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jobs/ml-ops/README.md b/jobs/ml-ops/README.md index 1b81b40..8a221ae 100644 --- a/jobs/ml-ops/README.md +++ b/jobs/ml-ops/README.md @@ -37,7 +37,7 @@ Set your Scaleway access key, secret key and project ID in environment variables export TF_VAR_access_key= export TF_VAR_secret_key= export TF_VAR_project_id= # you can create a separate project for this example -export TF_VAR_data_fetch_cron_schedule="19 14 17 4 *" # should trigger data fetching just after deploying pipeline infrastructure +export TF_VAR_data_fetch_cron_schedule="19 14 17 4 *" # should be decided manually, this would triggers data fetching just after deploying pipeline infrastructure cd terraform terraform init From da2f1433a749445029eecb3930201d429a960993 Mon Sep 17 00:00:00 2001 From: Reda Noureddine Date: Tue, 7 May 2024 13:20:14 +0200 Subject: [PATCH 10/15] feat: increase scheduling frequency --- jobs/ml-ops/terraform/variables.tf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jobs/ml-ops/terraform/variables.tf b/jobs/ml-ops/terraform/variables.tf index eec90fc..1737a30 100644 --- a/jobs/ml-ops/terraform/variables.tf +++ b/jobs/ml-ops/terraform/variables.tf @@ -31,10 +31,10 @@ variable "data_fetch_cron_schedule" { variable "training_cron_schedule" { type = string - default = "*/10 * * * *" + default = "0 */12 * * *" } variable "inference_cron_schedule" { type = string - default = "*/20 * * * *" + default = "0 */14 * * *" } From 623431005282f2fe5b7767ea49cc7ad5b2893748 Mon Sep 17 00:00:00 2001 From: Reda Noureddine Date: Tue, 7 May 2024 15:48:30 +0200 Subject: [PATCH 11/15] feat: schedule fetching data from source --- jobs/ml-ops/README.md | 24 +++++++++++++++++++++--- jobs/ml-ops/terraform/variables.tf | 5 +++-- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/jobs/ml-ops/README.md b/jobs/ml-ops/README.md index 8a221ae..0374484 100644 --- a/jobs/ml-ops/README.md +++ b/jobs/ml-ops/README.md @@ -37,15 +37,28 @@ Set your Scaleway access key, secret key and project ID in environment variables export TF_VAR_access_key= export TF_VAR_secret_key= export TF_VAR_project_id= # you can create a separate project for this example -export TF_VAR_data_fetch_cron_schedule="19 14 17 4 *" # should be decided manually, this would triggers data fetching just after deploying pipeline infrastructure +``` + +You can optionally configure non-default CRON schedules to orderly fetch data then train a model, and finally re-load a new model within the inference server. For this, set the following Terraform environment variables: + +```console +export TF_VAR_data_fetch_cron_schedule= +export TF_VAR_training_cron_schedule= +export TF_VAR_inference_cron_schedule= +``` + +Then deploy MLOps infrastructure using the following: +```console cd terraform terraform init terraform plan terraform apply ``` -### Step 2. Run the data and training Jobs +### Step 2. Optional: trigger jobs manually + +The pipeline is automatic, all jobs will be run at their respective scheduled time. This step can be ignored unless for debugging or test purposes. To run the jobs for the data and training, we can use the Scaleway CLI: @@ -61,12 +74,17 @@ You can also trigger the jobs from the [Jobs section](https://console.scaleway.c ### Step 3. Use the inference API +Load model with the latest version using: + ``` cd terraform export INFERENCE_URL=$(terraform output raw endpoint) +curl -X POST ${INFERENCE_URL} +``` -curl -X POST ${INFERENCE_URL}/load +Then post data to infer the class: +``` curl -X POST \ -H "Content-Type: application/json" \ -d @../inference/example.json diff --git a/jobs/ml-ops/terraform/variables.tf b/jobs/ml-ops/terraform/variables.tf index 1737a30..ccc5f7e 100644 --- a/jobs/ml-ops/terraform/variables.tf +++ b/jobs/ml-ops/terraform/variables.tf @@ -27,14 +27,15 @@ variable "s3_url" { variable "data_fetch_cron_schedule" { type = string + default = "* */10 * * *" } variable "training_cron_schedule" { type = string - default = "0 */12 * * *" + default = "* */11 * * *" } variable "inference_cron_schedule" { type = string - default = "0 */14 * * *" + default = "* */12 * * *" } From 530670d9873214533be8c69200470ec87811b30d Mon Sep 17 00:00:00 2001 From: Reda Noureddine Date: Fri, 10 May 2024 15:13:23 +0200 Subject: [PATCH 12/15] fix: remove no cache directive --- jobs/ml-ops/terraform/images.tf | 3 --- 1 file changed, 3 deletions(-) diff --git a/jobs/ml-ops/terraform/images.tf b/jobs/ml-ops/terraform/images.tf index 97cd3bc..479a9b3 100644 --- a/jobs/ml-ops/terraform/images.tf +++ b/jobs/ml-ops/terraform/images.tf @@ -8,7 +8,6 @@ resource "docker_image" "inference" { name = "${scaleway_registry_namespace.main.endpoint}/inference:${var.image_version}" build { context = "${path.cwd}/../inference" - no_cache = true } provisioner "local-exec" { @@ -20,7 +19,6 @@ resource "docker_image" "data" { name = "${scaleway_registry_namespace.main.endpoint}/data:${var.image_version}" build { context = "${path.cwd}/../data" - no_cache = true } provisioner "local-exec" { @@ -32,7 +30,6 @@ resource "docker_image" "training" { name = "${scaleway_registry_namespace.main.endpoint}/training:${var.image_version}" build { context = "${path.cwd}/../training" - no_cache = true } provisioner "local-exec" { From 5e1cfb0749a87b6139f22ecc3f3fe2c572504eb9 Mon Sep 17 00:00:00 2001 From: Reda Noureddine Date: Fri, 10 May 2024 15:13:35 +0200 Subject: [PATCH 13/15] fix: cron schedules --- jobs/ml-ops/terraform/variables.tf | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/jobs/ml-ops/terraform/variables.tf b/jobs/ml-ops/terraform/variables.tf index ccc5f7e..bbe8b76 100644 --- a/jobs/ml-ops/terraform/variables.tf +++ b/jobs/ml-ops/terraform/variables.tf @@ -27,15 +27,15 @@ variable "s3_url" { variable "data_fetch_cron_schedule" { type = string - default = "* */10 * * *" + default = "0 */10 * * *" } variable "training_cron_schedule" { type = string - default = "* */11 * * *" + default = "0 */11 * * *" } variable "inference_cron_schedule" { type = string - default = "* */12 * * *" + default = "0 */12 * * *" } From edb4c9c91c1150001f0e3b7bb47ff7fc10ab685f Mon Sep 17 00:00:00 2001 From: Reda Noureddine Date: Fri, 10 May 2024 15:13:57 +0200 Subject: [PATCH 14/15] docs: optional tf variables rewording --- jobs/ml-ops/README.md | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/jobs/ml-ops/README.md b/jobs/ml-ops/README.md index 0374484..5503dd2 100644 --- a/jobs/ml-ops/README.md +++ b/jobs/ml-ops/README.md @@ -39,13 +39,7 @@ export TF_VAR_secret_key= export TF_VAR_project_id= # you can create a separate project for this example ``` -You can optionally configure non-default CRON schedules to orderly fetch data then train a model, and finally re-load a new model within the inference server. For this, set the following Terraform environment variables: - -```console -export TF_VAR_data_fetch_cron_schedule= -export TF_VAR_training_cron_schedule= -export TF_VAR_inference_cron_schedule= -``` +By default, both jobs and container trigger in the example run regularly on a schedule. The default values for these schedules are configured in `jobs/ml-ops/terraform/variables.tf`, and can be overridden using Terraform variables, e.g. `export TF_VAR_data_fetch_cron_schedule="0 10 * * *"`. Then deploy MLOps infrastructure using the following: From 9aed2fe3732653c59001f775542c71156e9f2ec3 Mon Sep 17 00:00:00 2001 From: Reda Noureddine Date: Fri, 31 May 2024 14:52:02 +0200 Subject: [PATCH 15/15] refactor: inference response message --- jobs/ml-ops/inference/main.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/jobs/ml-ops/inference/main.py b/jobs/ml-ops/inference/main.py index 3f7cfab..32f9fbc 100644 --- a/jobs/ml-ops/inference/main.py +++ b/jobs/ml-ops/inference/main.py @@ -47,4 +47,9 @@ def classify(profile: data.ClientProfile): classifier = ClassifierLoader.load() prediction = classifier.predict(data_point_processed) - return {"predicted_class": int(prediction)} + response = "This client is likely to respond positively to a cold call" + + if int(prediction) == 0: + response = "This client is likely to respond negatively to a cold call" + + return {"prediction": response}