# Telemetry data using Custom TensorFlow Model

In [None]:

import boto3
import pandas as pd
import sagemaker
import time
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.feature_store.feature_definition import FeatureDefinition, FeatureTypeEnum

# # Initialize the S3 client
# s3_client = boto3.resource('s3')

# # Define the pipeline name
# # pipeline_name = f"telemetry-sagemaker-mlops-train-pipeline"

# # Initialize the SageMaker session
# sagemaker_session = sagemaker.Session()

# # Get the AWS region
# region = sagemaker_session.boto_region_name

# # Get the SageMaker execution role
# role = sagemaker.get_execution_role()

# # Initialize the Pipeline session
pipeline_session = PipelineSession()

# # Get the default S3 bucket
# default_bucket = sagemaker_session.default_bucket()

sess = boto3.Session()
sm = sess.client("sagemaker")
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session(boto_session=sess)
bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name
# Define the model package group name

model_package_group_name = "TF2-telemetry-engine"  #Model name in model registry
prefix = "tf2-telemetry-engine-pipelines"
pipeline_name = "TF2telemetryEnginePipeline"  #SageMaker Pipeline name
current_time = time.strftime("%m-%d-%H-%M-%S", time.localtime())

In [27]:
#pip  install s3fs

In [28]:
import os
data_dir = os.path.join(os.getcwd(), "data")
os.makedirs(data_dir, exist_ok=True)

tel_raw_dir = os.path.join(os.getcwd(), "data/tel_raw")
os.makedirs(tel_raw_dir, exist_ok=True)

fs_data = os.path.join(os.getcwd(), "data/fs_data")
os.makedirs(fs_data, exist_ok=True)

# Define the S3 URI for the input data
# input_data_uri ="s3://msil-ds/raw-data/cal_housing.tgz"
# s3 = boto3.client("s3")
# s3.download_file(
#     "msil-ds", #bucket Name
#     "raw-data/cal_housing.tgz",#prefix
#     "data/raw/cal_housing.tgz",#local path
# )


In [29]:
#!tar -zxf data/raw/cal_housing.tgz #extraction

In [30]:
input_data_uri = "s3://msil-ds/raw-data/new_telemetry_data.csv"
# Define the S3 URI for the requirements file
req_url = "s3://msil-ds/raw-data/requirements.txt"

# Import necessary modules from SageMaker Workflow
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

# Define the processing instance count parameter
processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)

# Define the processing instance type parameter
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.large"
)

# Define the training instance type parameter
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.large"
)

# Define the model approval status parameter
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

In [31]:

telemetry_df = pd.read_csv(input_data_uri)
telemetry_df["engine_status"] = telemetry_df['engine_status'].map({'ON': 1, 'OFF': 0})
telemetry_df["current_gear"] = telemetry_df['current_gear'].map({'N': 0, 'R': 1,'P':2,'D':3})

telemetry_df.head()

Unnamed: 0,identifyrecord,Eventime,speed,engine_status,fuel_level,battery_voltage,tire_pressure,current_gear,odometer_reading,engine_temperature,coolant_level
0,100,10-04-2024 00:00,108.53,0,53.03,11.95,33.97,0,43323.02,110.87,7.89
1,101,11-04-2024 00:00,68.59,1,12.21,12.39,29.86,0,45975.75,118.53,97.97
2,102,12-04-2024 00:00,118.31,1,87.17,12.81,35.19,3,29510.29,113.63,49.5
3,103,13-04-2024 00:00,110.32,1,92.09,13.43,30.54,0,10258.51,101.41,16.79
4,104,14-04-2024 00:00,110.94,1,13.08,12.34,35.64,3,77187.78,86.43,21.78


In [32]:
telemetry_df.columns

Index(['identifyrecord', 'Eventime', 'speed', 'engine_status', 'fuel_level',
       'battery_voltage', 'tire_pressure', 'current_gear', 'odometer_reading',
       'engine_temperature', 'coolant_level'],
      dtype='object')

In [33]:
from sklearn.model_selection import train_test_split
import numpy as np
import os
X = telemetry_df[
    ['speed', 'engine_status', 'fuel_level',
       'battery_voltage', 'tire_pressure', 'current_gear', 'odometer_reading', 'coolant_level']
]
Y = telemetry_df[["engine_temperature"]]

x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.33)

np.save(os.path.join(tel_raw_dir, "x_train.npy"), x_train.to_numpy())
np.save(os.path.join(tel_raw_dir, "x_test.npy"), x_test.to_numpy())
np.save(os.path.join(tel_raw_dir, "y_train.npy"), y_train.to_numpy())
np.save(os.path.join(tel_raw_dir, "y_test.npy"), y_test.to_numpy())
rawdata_s3_prefix = "{}/data/tel_raw".format(prefix)
raw_s3 = sagemaker_session.upload_data(path="./data/tel_raw/", key_prefix=rawdata_s3_prefix)
print(raw_s3)
print(x_train.columns,y_train.columns)

s3://sagemaker-ap-south-1-955658629586/tf2-telemetry-engine-pipelines/data/tel_raw
Index(['speed', 'engine_status', 'fuel_level', 'battery_voltage',
       'tire_pressure', 'current_gear', 'odometer_reading', 'coolant_level'],
      dtype='object') Index(['engine_temperature'], dtype='object')


In [34]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat

# raw input data
input_data = ParameterString(name="InputData", default_value=raw_s3)

# training step parameters
training_epochs = ParameterString(name="TrainingEpochs", default_value="100")

# model performance step parameters
accuracy_mse_threshold = ParameterFloat(name="AccuracyMseThreshold", default_value=0.75)

# Inference step parameters
endpoint_instance_type = ParameterString(name="EndpointInstanceType", default_value="ml.m5.large")

# <span style="color:green">Processing</span> 

In [35]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.sklearn.processing import SKLearnProcessor

# Define the SKLearn framework version
framework_version = "1.2-1"

# Initialize the SKLearn processor
sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="tf2-telemetry-engine-processing-job",
    sagemaker_session=pipeline_session,
    role=role,
)

# Define the processor arguments for running the SKLearn processor
processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(
            source=input_data, 
            destination="/opt/ml/processing/input"
        ),
        ProcessingInput(source=req_url, destination="/opt/ml/processing/requirement/"),
    ],
    outputs=[
        ProcessingOutput(
            output_name="train", 
            source="/opt/ml/processing/train"
        ),
        ProcessingOutput(
            output_name="test", 
            source="/opt/ml/processing/test"
        )
    ],
    code="telemetry_tensorflow/preprocess.py",
)

# Define the processing step for the pipeline
step_process = ProcessingStep(
    name="TF-Preprocess-telemetry-Data",
    step_args=processor_args,
)


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


# <span style="color:green">feature store</span> 

In [36]:
from time import gmtime, strftime
import time
import uuid
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.feature_store.feature_definition import FeatureDefinition, FeatureTypeEnum
import pandas as pd

# Constants and configuration
record_identifier_feature_name = "identifyrecord"
event_time_feature_name = "Eventime"
input_data_uri = "s3://msil-ds/raw-data/new_telemetry_data.csv"  # Make sure input_data_uri is defined earlier in your script

# Read the data and infer schema
col_def_after_processing = [{'name': 'speed', 'type':  'float64'},
 {'name': 'fuel_level', 'type':  'float64'},
 {'name': 'battery_voltage', 'type':  'float64'},
 {'name': 'tire_pressure', 'type':  'float64'},
 {'name': 'odometer_reading', 'type':  'float64'},
 {'name': 'coolant_level', 'type':  'float64'},
                            
 {'name': 'engine_status', 'type':  'float64'},
 {'name': 'current_gear', 'type':  'float64'},
 {'name': 'identifyrecord', 'type': 'int64'},
 {'name': 'Eventime', 'type':  'object'},
{'name': 'engine_temperature', 'type':  'float64'}]
column_names = [col['name'] for col in col_def_after_processing]
column_types = {col['name']: col['type'] for col in col_def_after_processing}

# Create an empty DataFrame with the specified column names
df = pd.DataFrame(columns=column_names)

# Convert data types of DataFrame columns
for col, dtype in column_types.items():
    df[col] = df[col].astype(dtype)


# Define feature group name
feature_group_name = "tensorflow-telemetry-featuregroup-TTF2"
# feature_group_name = f"telemetry-feature-group-{str(uuid.uuid4())[:8]}"  # Alternatively generate a unique name

# Initialize the SageMaker FeatureGroup
feature_group = FeatureGroup(
    name=feature_group_name, 
    sagemaker_session=sagemaker.Session()  # Ensure sagemaker_session is defined earlier
)

try:
    # Check if the feature group already exists
    status = feature_group.describe().get("FeatureGroupStatus")
    if status == "Created":
        print(f"Feature Store {feature_group_name} already present, skipping creation part")
except Exception as e:
    print(f"Feature group not found(Hence Creating Another one) : {e}")
    current_time_sec = int(round(time.time()))
    
    # Load feature definitions and create feature group
    feature_group.load_feature_definitions(data_frame=df)
    feature_store_offline_s3_uri = 's3://' + bucket  # Ensure default_bucket is defined earlier
    feature_group.create(
        s3_uri=feature_store_offline_s3_uri,
        record_identifier_name=record_identifier_feature_name,
        event_time_feature_name=event_time_feature_name,
        role_arn=sagemaker.get_execution_role(),  # Ensure role is defined earlier
        enable_online_store=True
    )
    
    # Wait for the feature group to be created
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group to be created...")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    print(f"FeatureGroup {feature_group.name} successfully created.")


Feature Store tensorflow-telemetry-featuregroup-TTF2 already present, skipping creation part


In [37]:
# x_train = pd.DataFrame(np.load(os.path.join("x_train.npy")),  columns = ['longitude', 'latitude', 'housingMedianAge', 'totalRooms',
#        'totalBedrooms', 'population', 'households', 'medianIncome'])
# x_train.columns.values

# Feature store data INGEST, FETCH using Athena Query

In [38]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.sklearn.processing import SKLearnProcessor 

# Define the SKLearn framework version

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="tf2-telemetry-engine-processing-job",
    sagemaker_session=pipeline_session,
    role=role,
)


# Run the SKLearn processor with the specified inputs
fs_arg = sklearn_processor.run(
    inputs=[
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            destination="/opt/ml/processing/train/",
        ),
        ProcessingInput(
            source=req_url, 
            destination="/opt/ml/processing/input/"
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="train_fs", 
            source="/opt/ml/processing/train_fs"
        )],
    code="telemetry_tensorflow/testfeaturestore.py",
) 

# Define the processing step for the pipeline
step_feature = ProcessingStep(
    name="TF-Featurestore-telemetry-ingestion-fetch",
    step_args=fs_arg,
)


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


# <span style="color:green">Training</span>
### Trainining input from Feature Store

In [39]:
# identity_query = feature_group.athena_query()
# identity_table = identity_query.table_name

# query_string = 'SELECT longitude,	latitude,	housingMedianAge,	totalRooms,	totalBedrooms,	population,	households,	medianIncome  FROM "'+identity_table+'" '

# # run Athena query. The output is loaded to a Pandas dataframe.
# dataset = pd.DataFrame()
# identity_query.run(query_string=query_string, output_location='s3://'+bucket+'/query_results/')
# identity_query.wait()
# x_train = identity_query.as_dataframe()
# x_train.head()

In [40]:
# query_string = f'SELECT target  FROM "{identity_table}"'
# identity_query.run(query_string=query_string, output_location = f's3://{bucket}/query_results/')
# identity_query.wait()
# y_train = identity_query.as_dataframe()
# y_train.head()


In [41]:
# np.save(os.path.join(fs_data, "x_train.npy"), x_train)
# np.save(os.path.join(fs_data, "y_train.npy"), y_train)
# fsdata_s3_prefix = "{}/data/fs_data".format(prefix)
# fs_raw_s3 = sagemaker_session.upload_data(path="./data/fs_data/", key_prefix=fsdata_s3_prefix)
# print(fs_raw_s3)
# feature_store_data = ParameterString(name="InputData", default_value=fs_raw_s3)

In [42]:
# For training with columnar input, the algorithm assumes that the target variable (label) is the first column.
# For inference, the algorithm assumes that the input has no label column.
from sagemaker.tensorflow import TensorFlow
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.step_collections import RegisterModel
import time

# Where to store the trained model
model_path = f"s3://{bucket}/{prefix}/model/"

hyperparameters = {"epochs": 100}
tensorflow_version = "2.11.0"
python_version = "py39"

tf2_estimator = TensorFlow(
    source_dir="telemetry_tensorflow",
    entry_point="train.py",
    instance_type="ml.m5.large",
    instance_count=1,
    framework_version=tensorflow_version,
    role=role,
    base_job_name="tf2-telemetry-engine-train",
    output_path=model_path,
    hyperparameters=hyperparameters,
    py_version=python_version,
)

# Use the tf2_estimator in a Sagemaker pipelines ProcessingStep.
# NOTE how the input to the training job directly references the output of the previous step.
step_train = TrainingStep(
    name="TF-Training-telemetry-Model",
    estimator=tf2_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_feature.properties.ProcessingOutputConfig.Outputs[
                "train_fs"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)

# <span style="color:green">Evaluation</span> 

In [43]:
from sagemaker.workflow.properties import PropertyFile
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

# Create SKLearnProcessor object.
# The object contains information about what container to use, what instance type etc.
evaluate_model_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.large",
    instance_count=1,
    base_job_name="tf2-telemetry-engine-evaluate",
    role=role,
)

# Create a PropertyFile
# A PropertyFile is used to be able to reference outputs from a processing step, for instance to use in a condition step.
# For more information, visit https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html
evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

# Use the evaluate_model_processor in a Sagemaker pipelines ProcessingStep.
step_eval = ProcessingStep(
    name="TF-Evaluate-telemetry-Model",
    processor=evaluate_model_processor,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="telemetry_tensorflow/evaluate.py",
    property_files=[evaluation_report],
)


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


# <span style="color:green">Model Registry</span> 

In [44]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker import Model
from sagemaker.workflow.model_step import ModelStep

# Initialize the Model with the top model S3 URI from the tuning step
# model = Model(
#     model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
#     sagemaker_session=pipeline_session,
#     role=role,
# )

# Define model metrics using the evaluation results from the evaluation step
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)

# Register the model with specified parameters
step_register = RegisterModel(
    name="TF-Register-telemetry-Model",
    estimator=tf2_estimator,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.m5.large", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    model_metrics=model_metrics,
)



# <span style="color:green">Register model on condition</span> 

In [45]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

# Define the RMSE score threshold
mse_score_threshold = 140

# Define the condition for checking if RMSE score is less than or equal to the threshold
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value"
    ),
    right=mse_score_threshold
)

# Define the condition step
step_cond = ConditionStep(
    name="TF-MSE-telemetry-check",
    conditions=[cond_lte],
    if_steps=[step_register],  # Register the model if condition is true
    else_steps=[],  # No else steps defined
)


In [46]:
from sagemaker.workflow.pipeline import Pipeline

# Creating pipeline flow
pipeline_name = "NEW-Tensorflow-telemetry-sagemaker-pipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        mse_score_threshold,
    ],
    steps=[step_process,step_feature, step_train, step_eval, step_cond],
)

In [47]:
import json

json.loads(pipeline.definition())

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.large'},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.large'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-ap-south-1-955658629586/tf2-telemetry-engine-pipelines/data/tel_raw'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'TF-Preprocess-telemetry-Data',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSi

In [48]:
pipeline.upsert(role_arn=role)
pipeline.start()

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


_PipelineExecution(arn='arn:aws:sagemaker:ap-south-1:955658629586:pipeline/NEW-Tensorflow-telemetry-sagemaker-pipeline/execution/vf1yz6cknv0m', sagemaker_session=<sagemaker.session.Session object at 0x7f6fab568100>)

In [49]:
# Sample to fetch Data from feature store.

In [50]:
# Fetch the record from the Feature Store
#Online Store: When using the get_record method or querying via the FeatureGroup's online methods, the data is being accessed from the online store.
#Offline Store: When querying data through Athena or reading from the S3 bucket where the offline store data is stored, the data is coming from the offline store.

# response = feature_group.get_record(
#     record_identifier_value_as_string="2"
# )

# # Print the fetched record
# pd.DataFrame(response)
