In [177]:
!pip install -U sagemaker
%pip install s3fs
!pip install h5py


[0mNote: you may need to restart the kernel to use updated packages.
[0m

In [178]:
import os
import time
import boto3
import numpy as np
import pandas as pd
import sagemaker
from sagemaker import get_execution_role
from sagemaker.workflow.pipeline_context import PipelineSession

In [236]:
## CICD Sagemaker config file

## Sagemaker pipeline config
prefix = "pipeline-model-example"
pipeline_name = "serial-inference-pipeline"  # SageMaker Pipeline name
raw_s3 = "s3://das-samples-uploader/DS_Ramsbrook_DAS_data/2024-04-18/100_lpm_60s_1"

## preprocessing config
pre_preocessing_machine = "ml.m5.12xlarge"
pre_preocessing_no_of_machines = 1

## training config
training_machine = "ml.m5.12xlarge"
training_no_of_machines = 1

## validation config
validation_machine = "ml.m5.12xlarge"
validation_no_of_machines = 1


## hypersapce parameters 
TrainingEpochs = "10"
Accuracy_Threshold = 0.75
tensorflow_version = "2.4.1"
python_version = "py37"
sklearn_framework_version = "1.2-1"

## preprocess config
DAS_data_channels = 376



In [237]:
sess = boto3.Session()
sm = sess.client("sagemaker")
role = get_execution_role()
sagemaker_session = sagemaker.Session(boto_session=sess)
bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name
pipeline_session = PipelineSession()

In [249]:
print(region)
print(bucket)

eu-central-1
sagemaker-eu-central-1-631045770794


In [238]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat

# raw input data
input_data = ParameterString(name="InputData", default_value=raw_s3)

# status of newly trained model in registry
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="Approved")

# processing step parameters
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value=pre_preocessing_machine
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

# training step parameters
training_instance_type = ParameterString(name="TrainingInstanceType",default_value=training_machine)
training_epochs = ParameterString(name="TrainingEpochs", default_value=TrainingEpochs)

# model performance step parameters
accuracy_mse_threshold = ParameterFloat(name="AccuracyMseThreshold", default_value=Accuracy_Threshold)

In [174]:
#!mkdir -p code

In [250]:
%%writefile code/preprocess.py
import sys
import subprocess
import os

subprocess.check_call([
    sys.executable, "-m", "pip", "install", "-r",
    "/opt/ml/processing/depend/code/requirements.txt",
])
from my_package.read_DAS_hdf5 import load_single_DAS_file, list_hdf5_files_in_dir, load_multi_DAS_file, generate_training_set, generate_training_set_spectrogram, moving_average, define_butterworth_highpass, filtering, spectrogram

#from my_package.DAS_filtering import moving_average,define_butterworth_highpass,filtering,spectrogram
import s3fs
import h5py
import glob
import numpy as np
import pandas as pd
import os
import json
import joblib
from io import StringIO
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import tarfile
from sklearn.model_selection import train_test_split

try:
    from sagemaker_containers.beta.framework import (
        content_types,
        encoders,
        env,
        modules,
        transformer,
        worker,
        server,
    )
except ImportError:
    pass


base_dir = "/opt/ml/processing"
base_output_dir = "/opt/ml/output/"

if __name__ == "__main__":
    
    data_location_str = base_dir+"/input/"

    print("data_location_str", data_location_str)
    lst = os.listdir(data_location_str) # your directory path
    number_files = len(lst)
    print(number_files)
    
    channels = 376
    file_names = list_hdf5_files_in_dir(data_location_str)
    
    print(file_names)
    
    DAS_fitlered_data = load_multi_DAS_file(file_names, channels)
    
    print("DAS data shape", DAS_fitlered_data.shape)
    
    
    ## 100 lm/m sample 1 ( 60 2 )
    channel_start_leak = 188
    channel_end_leak = 199
    capture_period_leak = [300000, 2182000]
    fs = 20000
    training_data_leak = generate_training_set_spectrogram(DAS_fitlered_data, channel_start_leak, channel_end_leak,capture_period_leak,fs)
    training_label_leak = np.ones(training_data_leak.shape[0])

    ## no leak
    channel_start_no_leak = 148
    channel_end_no_leak = 270
    capture_period_no_leak = [0, 200000]
    training_data_noleak = generate_training_set_spectrogram(DAS_fitlered_data, channel_start_no_leak, channel_end_no_leak,capture_period_no_leak,fs)
    training_label_noleak = np.zeros(training_data_noleak.shape[0])

    training_data = np.concatenate([training_data_leak,training_data_noleak ])
    training_label = np.concatenate([training_label_leak,training_label_noleak ])

    train = training_data.reshape(training_data.shape[0], training_data.shape[1], training_data.shape[2],training_data.shape[3], 1)
    
    seed = 7
    X_train, X_test, y_train, y_test = train_test_split(train, training_label, test_size=0.33, random_state=seed)

    
    np.save(f"{base_dir}/train/X_train.npy",X_train)
    np.save(f"{base_dir}/train/y_train.npy",y_train)

    np.save(f"{base_dir}/test/X_test.npy",X_test)
    np.save(f"{base_dir}/test/y_test.npy",y_test)
    
    #train_dataset.to_csv(f"{base_dir}/train/train.csv", header=True, index=False)
    #test_dataset.to_csv(f"{base_dir}/test/test.csv", header=True, index=False)
    #joblib.dump(scaler, "model.joblib")
    #with tarfile.open(f"{base_dir}/scaler_model/model.tar.gz", "w:gz") as tar_handle:
        #tar_handle.add(f"model.joblib")


def input_fn(input_data, content_type):
    """Parse input data payload

    We currently only take csv input. Since we need to process both labelled
    and unlabelled data we first determine whether the label column is present
    by looking at how many columns were provided.
    """
    if content_type == "text/csv":
        # Read the raw input data as CSV.
        df = pd.read_csv(StringIO(input_data), header=None)

        if len(df.columns) == len(feature_columns) + 1:
            # This is a labelled example, includes the ring label
            df.columns = feature_columns + [label_column]
        elif len(df.columns) == len(feature_columns):
            # This is an unlabelled example.
            df.columns = feature_columns

        return df
    else:
        raise ValueError("{} not supported by script!".format(content_type))


def output_fn(prediction, accept):
    """Format prediction output

    The default accept/content-type between containers for serial inference is JSON.
    We also want to set the ContentType or mimetype as the same value as accept so the next
    container can read the response payload correctly.
    """
    if accept == "application/json":
        instances = []
        for row in prediction.tolist():
            instances.append(row)
        json_output = {"instances": instances}

        return worker.Response(json.dumps(json_output), mimetype=accept)
    elif accept == "text/csv":
        return worker.Response(encoders.encode(prediction, accept), mimetype=accept)
    else:
        raise RuntimeException("{} accept type is not supported by this script.".format(accept))


def predict_fn(input_data, model):
    """Preprocess input data

    We implement this because the default predict_fn uses .predict(), but our model is a preprocessor
    so we want to use .transform().

    The output is returned in the following order:

        rest of features either one hot encoded or standardized
    """
    features = model.transform(input_data)

    if label_column in input_data:
        # Return the label (as the first column) and the set of features.
        return np.insert(features, 0, input_data[label_column], axis=1)
    else:
        # Return only the set of features
        return features


def model_fn(model_dir):
    """Deserialize fitted model"""
    preprocessor = joblib.load(os.path.join(model_dir, "model.joblib"))
    return preprocessor

Overwriting code/preprocess.py


In [240]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_framework_version = "1.2-1"

sklearn_processor = SKLearnProcessor(
    framework_version=sklearn_framework_version,
    instance_type=pre_preocessing_machine,
    instance_count=processing_instance_count,
    base_job_name="hdf5-data-process",
    role=role,
    sagemaker_session=pipeline_session,
)

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
        ProcessingInput(source="code/", destination="/opt/ml/processing/depend/code"),
        ProcessingInput(source="code/my_package/", destination="/opt/ml/processing/input/code/my_package/"),

    ],
    outputs=[
        ProcessingOutput(output_name="scaler_model", source="/opt/ml/processing/scaler_model"),
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="code/preprocess.py",
    #source_dir="code",
    #dependencies=['code/requirements.txt'],
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [241]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep


step_process = ProcessingStep(
    name="PreprocessData",
    step_args=processor_args,
)

In [188]:
print(bucket)
print(prefix)

sagemaker-eu-central-1-631045770794
pipeline-model-example


In [242]:
%%writefile code/train.py

import argparse
import numpy as np
import os
import tensorflow as tf
import pandas as pd
from tensorflow.keras import layers, Input
from tensorflow.keras.models import Sequential
from tensorflow.keras.initializers import Constant
from sklearn.preprocessing import LabelEncoder
from keras.utils import np_utils

def parse_args():
    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script
    parser.add_argument("--epochs", type=int, default=1)
    parser.add_argument("--batch_size", type=int, default=64)
    parser.add_argument("--learning_rate", type=float, default=0.1)

    # data directories
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST"))

    # model directory
    parser.add_argument("--sm-model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))

    return parser.parse_known_args()


def get_train_data(train_dir):
    #train_data = pd.read_csv(os.path.join(train_dir, "train.csv"))
    
    
    print("train_dir",train_dir)
    #x_train = train_data[feature_columns].to_numpy()
    x_train = np.load(train_dir+"/X_train.npy")
    y_train = np.load(train_dir+"/y_train.npy")
    #y_train = train_data[label_column].to_numpy()
    print("x train", x_train.shape, "y train", y_train.shape)

    return x_train, y_train


def get_test_data(test_dir):
    
    print("train_dir",test_dir)
    x_test = np.load(test_dir+"/X_test.npy")
    y_test = np.load(test_dir+"/y_test.npy")
    print("x test", x_test.shape, "y test", y_test.shape)
    return x_test, y_test


def get_model(num_classes):
    
    
    input_shape = (5,29,89,1)
    
    model = Sequential(
        [
            Input(shape=input_shape),
            layers.Conv3D(128,(3,3,3),activation='relu',input_shape=(5,29,89,1),bias_initializer=Constant(0.01)),
            layers.Conv3D(128,(3,3,3),activation='relu',bias_initializer=Constant(0.01)),
            layers.MaxPooling3D((2,2,2), padding='same'),
            layers.Conv3D(64,(3,3,3),activation='relu', padding='same'),
            layers.Conv3D(64,(3,3,3),activation='relu', padding='same'),
            layers.MaxPooling3D((2,2,2), padding='same'),
            layers.Dropout(0),
            layers.Flatten(),
            layers.Dropout(0),
            layers.Dense(64, activation="softmax"),
            layers.Dropout(0),
            layers.Dense(32, activation="softmax"),
            layers.Dropout(0),
            layers.Dense(num_classes, activation="softmax"),
        ]
    )

    model.summary()
    #inputs = tf.keras.Input(shape=(8,))
    #hidden_1 = tf.keras.layers.Dense(8, activation="tanh")(inputs)
   # hidden_2 = tf.keras.layers.Dense(4, activation="sigmoid")(hidden_1)
   # outputs = tf.keras.layers.Dense(1)(hidden_2)
   # return tf.keras.Model(inputs=inputs, outputs=outputs)
    return model

if __name__ == "__main__":
    args, _ = parse_args()

    print("Training data location: {}".format(args.train))
    print("Test data location: {}".format(args.test))
    x_train, y_train = get_train_data(args.train)
    x_test, y_test = get_test_data(args.test)

    batch_size = args.batch_size
    epochs = args.epochs
    learning_rate = args.learning_rate
    print(
        "batch_size = {}, epochs = {}, learning rate = {}".format(batch_size, epochs, learning_rate)
    )
    num_classes = 2

    model = get_model(num_classes)
    
    batch_size = args.batch_size
    epochs = args.epochs
    model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
    
    
    #optimizer = tf.keras.optimizers.SGD(learning_rate)
    #model.compile(optimizer=optimizer, loss="mse")
    
    region = "eu-central-1"
    encoder = LabelEncoder()
    encoder.fit(y_train)
    encoded_Y = encoder.transform(y_train)
    hot_y_train = np_utils.to_categorical(encoded_Y)
    
    encoder = LabelEncoder()
    encoder.fit(y_test)
    encoded_Y = encoder.transform(y_test)
    hot_y_test = np_utils.to_categorical(encoded_Y)
    
    
    model.fit(
        x_train, hot_y_train, batch_size=batch_size, epochs=epochs, validation_data=(x_test, hot_y_test)
    )

    # evaluate on test set
    scores = model.evaluate(x_test, hot_y_test, batch_size, verbose=2)
    print("Loss, Accuracy :", scores)

    # save model
    model.save(args.sm_model_dir + "/1.keras")

Overwriting code/train.py


In [243]:
from sagemaker.tensorflow import TensorFlow
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.model_step import ModelStep
import time

hyperparameters = {"epochs": training_epochs,
                  "learning_rate": 0.1,
                  "batch_size": 64}

# Where to store the trained model
model_path = f"s3://{bucket}/{prefix}/model/"

training_metric_definitions = [
    {"Name": "val_accuracy", "Regex": "val_accuracy: ([0-9\\.]+)"},
    {"Name": "val_loss", "Regex": "val_loss: ([0-9\\.]+)"},
    {"Name": "train_accuracy", "Regex": "- accuracy: ([0-9\\.]+)"},
    {"Name": "train_loss", "Regex": "- loss: ([0-9\\.]+)"},
]

tf2_estimator = TensorFlow(
    source_dir="code",
    entry_point="train.py",
    instance_type=training_instance_type,
    instance_count=training_no_of_machines,
    framework_version=tensorflow_version,
    role=role,
    base_job_name="tensorflow-train-model",
    output_path=model_path,
    hyperparameters=hyperparameters,
    py_version=python_version,
    sagemaker_session=pipeline_session,
    metric_definitions=training_metric_definitions,
)

# NOTE how the input to the training job directly references the output of the previous step.
train_args = tf2_estimator.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }, logs=True
)

step_train_model = TrainingStep(name="TrainTensorflowModel", step_args=train_args)

In [244]:
%%writefile code/evaluate.py
import subprocess
import sys
import os
import json
import sys
import numpy as np
import pandas as pd
import pathlib
import tarfile
from sklearn.preprocessing import LabelEncoder

subprocess.check_call([
    sys.executable, "-m", "pip", "install", "-r",
    "/opt/ml/processing/depend/code/requirements.txt",
])

from keras.utils import np_utils


if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path, "r:gz") as tar:
        tar.extractall("./model")
    import tensorflow as tf

    model = tf.keras.models.load_model("./model/1.keras")
    
    test_path = "/opt/ml/processing/test/"
    
    x_test = np.load(test_path+"/X_test.npy")
    y_test = np.load(test_path+"/y_test.npy")
    
    region = "eu-central-1"
    encoder = LabelEncoder()
    encoder.fit(y_test)
    encoded_Y = encoder.transform(y_test)
    hot_y_test = np_utils.to_categorical(encoded_Y)
    
    scores = model.evaluate(x_test, hot_y_test, verbose=2)
    print("Loss, Accuracy :", scores)

    # Available metrics to add to model: https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html
    report_dict = {
        "classification_metrics": {
            "Accuracy": {"value": scores, "standard_deviation": "NaN"},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting code/evaluate.py


In [245]:
from sagemaker.workflow.properties import PropertyFile
from sagemaker.sklearn.processing import ScriptProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput


tf_eval_image_uri = sagemaker.image_uris.retrieve(
    framework="tensorflow",
    region=region,
    version=tensorflow_version,
    image_scope="training",
    py_version=python_version,
    instance_type=validation_machine,
)

evaluate_model_processor = ScriptProcessor(
    role=role,
    image_uri=tf_eval_image_uri,
    command=["python3"],
    instance_count=1,
    instance_type=processing_instance_type,
    sagemaker_session=pipeline_session,
)

# Create a PropertyFile
# A PropertyFile is used to be able to reference outputs from a processing step, for instance to use in a condition step.
# For more information, visit https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html
evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

eval_args = evaluate_model_processor.run(
    inputs=[
        ProcessingInput(
            source=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
        ProcessingInput(source="code/", destination="/opt/ml/processing/depend/code"),
        ProcessingInput(source="code/my_package/", destination="/opt/ml/processing/input/code/my_package/"),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluate.py",
)

step_evaluate_model = ProcessingStep(
    name="EvaluateModelPerformance",
    step_args=eval_args,
    property_files=[evaluation_report],
)

        


In [246]:
from sagemaker.workflow.pipeline import Pipeline

# Create a Sagemaker Pipeline.
# Each parameter for the pipeline must be set as a parameter explicitly when the pipeline is created.
# Also pass in each of the steps created above.
# Note that the order of execution is determined from each step's dependencies on other steps,
# not on the order they are passed in below.
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        training_instance_type,
        processing_instance_type,
        processing_instance_count,
        input_data,
        model_approval_status,
        training_epochs,
        accuracy_mse_threshold,
    ],
    steps=[step_process, step_train_model, step_evaluate_model],
)

In [251]:
import json

definition = json.loads(pipeline.definition())
definition

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.12xlarge'},
  {'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.12xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://das-samples-uploader/DS_Ramsbrook_DAS_data/2024-04-18/100_lpm_60s_1'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'Approved'},
  {'Name': 'TrainingEpochs', 'Type': 'String', 'DefaultValue': '10'},
  {'Name': 'AccuracyMseThreshold', 'Type': 'Float', 'DefaultValue': 0.75}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'PreprocessData',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.12xlarge',
     

In [None]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.wait()

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


In [216]:

plt = metrics_dataframe.plot(
    kind="line", figsize=(12, 5), x="timestamp", y="value", style="b.", legend=False
)
plt.set_ylabel(metric_name);

ClientError: An error occurred (ValidationException) when calling the DescribeTrainingJob operation: Requested resource not found.

In [226]:
%matplotlib inline
from sagemaker.analytics import TrainingJobAnalytics

latest_job_name = tf2_estimator.latest_training_job.job_name

In [227]:
print(latest_job_name)

tensorflow-train-model-2024-07-05-14-16-47-719


In [228]:

metric_name = "loss"

metrics_dataframe = TrainingJobAnalytics(
    training_job_name=latest_job_name, metric_names=[metric_name]
).dataframe()


ClientError: An error occurred (ValidationException) when calling the DescribeTrainingJob operation: Requested resource not found.