### Installing requirements

In [1]:
!pip3 install -r requirements.txt

Looking in links: https://download.pytorch.org/whl/torch_stable.html


### Inference Pipeline with Grafana

In [12]:
%%writefile ./single_layer_ann_inference_pipeline.py
# The above line just writes this cell into a Python file; this is used in the KFP DSL command.

# Global KFP imports.
import kfp
import kfp.components as comp


def unzip_data(
    bucket_zipfile_path: str, bucket_name: str, sep: str,
    decimal: str, encoding: str, output_csv: comp.OutputPath('CSV')
):
    # Imports required for the Pipeline Component.
    from io import BytesIO

    import pandas as pd
    import pathlib
    import zipfile
    import boto3
    import os

    # Download a ZIP file from S3.
    path_bucket = 'datakflow'
    path_to_move_file = ''

    os.makedirs('./data', exist_ok=True)
    os.makedirs('./unzipped_data', exist_ok=True)

    boto3.resource('s3').Object(path_bucket, bucket_zipfile_path).download_file(Filename='./data/zipfile.zip')

    # Extract all files out of the ZIP file and write them back to S3.
    for zip in os.listdir('./data'):
        with zipfile.ZipFile(os.path.join('./data', zip), 'r') as file:
            file.extractall('./unzipped_data')

    for file in pathlib.Path('./unzipped_data').glob('**/*.csv'):
        output_path = path_to_move_file / file

    df = pd.read_csv(output_path)

    # Write the CSV into an artifact.
    df.to_csv(output_csv, index=True, header=True)

def preprocess_data(csv_path: comp.InputPath('CSV'), sequence_json: comp.OutputPath()):
    # Imports required for the Pipeline Component.
    import pandas as pd
    import numpy as np
    import json
    
    # Read from the artifact CSV.
    df = pd.read_csv(csv_path)

    # Preprocess the dataset.
    df['sequence'] = df['sequence'].replace('[]', np.nan).copy()
    mask = ~(df['sequence'].isna())
    sequences = df.loc[mask, 'sequence']
    df = None
    sequences = [eval(sequence) for sequence in sequences]

    # Write the preprocessed data into an artifact.
    with open(sequence_json, 'w') as f:
        json.dump(sequences, f)

def model_inferencing(sequence_json: comp.InputPath(), preds_json: comp.OutputPath()):
    # Imports required for the Pipeline Component.
    import boto3
    import torch
    import json

    # Read the preprocessed data from the artifact.
    with open(sequence_json, 'r') as f:
        sequences = json.load(f)

    # Setting up Dataset and DataLoader for torch model.
    X_valid = torch.Tensor([sequence for sequence in sequences])

    # Dataset class.
    class LogDataset(torch.utils.data.Dataset):
        def __init__(self, X, y):
            self.sequences = X
            self.labels = y

        def __len__(self):
            return len(self.labels)

        def __getitem__(self, idx):
            sequence = self.sequences[idx]
            label = self.labels[idx]
            return sequence, label

    valid_dataset = LogDataset(X_valid, [-1] * len(X_valid))

    # Hyperparameters.
    batch_size = 64

    valid_dataloader = torch.utils.data.DataLoader(valid_dataset, batch_size=batch_size)

    # Inference loop over batches of dataset.
    def test_loop(dataloader, model):
        preds = []
        with torch.no_grad():
            for X, y in dataloader:
                try:
                    pred = model(X)
                    preds += pred.argmax(1).type(torch.float).tolist()
                except RuntimeError as e:
                    print('Could not ')
        return preds

    # Model class: single-layer A.N.N.
    class SimpleNN(torch.nn.Module):
        def __init__(self, input_size, num_keys):
            super(SimpleNN, self).__init__()
            self.fc = torch.nn.Linear(input_size, num_keys)

        def forward(self, x):
            out = self.fc(x)
            return out

    # Downloading the model from S3.
    path_bucket = 'datakflow'
    boto3.client('s3').download_file(path_bucket, 'checkpoint.pth', 'checkpoint.pth')

    # Loading the model from local.
    checkpoint = torch.load('checkpoint.pth')['state_dict']
    model = SimpleNN(10, 11)
    model.load_state_dict(checkpoint)
    model.eval()

    # The inference loop.
    preds = test_loop(valid_dataloader, model)
    with open(preds_json, 'w') as f:
        json.dump(preds, f)

def writing_monitoring_info(csv_path: comp.InputPath('CSV'), preds_json: comp.InputPath()):
    # Imports required for the Pipeline Component.
    import pandas as pd
    import numpy as np
    import requests
    import datetime
    import json

    # Read from the artifact CSV.
    df = pd.read_csv(csv_path)

    with open(preds_json) as f:
        preds = json.load(f)

    # Preprocess the dataset.
    df['sequence'] = df['sequence'].replace('[]', np.nan).copy()
    mask = ~(df['sequence'].isna())
    reqd_columns = ['timestamp', 'danger', 'variables', 'sequence', 'target']
    df = df.loc[mask, reqd_columns].reset_index(drop=True)
    df['prediction'] = preds

    class NumpyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj, np.void):
                return None

            if isinstance(obj, (np.generic, np.bool_)):
                return obj.item()

            if isinstance(obj, np.ndarray):
                return obj.tolist()

            return json.JSONEncoder.default(self, obj)

    def send_data(data) -> None:
        try:
            response = requests.post(
                f"http://a055ffc26004049f1ac02141e4c98fb4-409861333.us-east-1.elb.amazonaws.com:8085/iterate/log_1_layer_ann",
                data=json.dumps(data, cls=NumpyEncoder),
                headers={"content-type": "application/json"},
            )

            if response.status_code == 200:
                print(f"Success.")

            else:
                print(
                    f"Got an error code {response.status_code} for the data chunk. "
                    f"Reason: {response.reason}, error text: {response.text}"
                )

        except requests.exceptions.ConnectionError as error:
            print(f"Cannot reach a metrics application, error: {error}, data: {data}")

    data = df.to_dict(orient='records')
    send_data(data)

base_img = "sent2020/kflow1:latest"  # The base container image to be used by pods running the Components.

# Create components from the functions above.
unzip_data_op              = kfp.components.create_component_from_func(unzip_data, base_image=base_img)
preprocess_data_op         = kfp.components.create_component_from_func(preprocess_data, base_image=base_img)
model_inferencing_op       = kfp.components.create_component_from_func(model_inferencing, base_image=base_img)
writing_monitoring_info_op = kfp.components.create_component_from_func(
    writing_monitoring_info, base_image=base_img, packages_to_install=['requests']
)

# Create the pipeline from the components created above.
@kfp.dsl.pipeline(
    name='single-layer-ann-inference-pipeline',
    description='Performs inference using a single-layer A.N.N. to find anomalies in string sequences'
)
def unzip_and_read_pipeline(
    bucket_zipfile_path, bucket_name,
    sep, decimal, encoding
):
    first_task = unzip_data_op(bucket_zipfile_path, bucket_name, sep, decimal, encoding)
    second_task = preprocess_data_op(first_task.outputs['output_csv'])
    third_task = model_inferencing_op(second_task.outputs['sequence_json'])
    fourth_task = writing_monitoring_info_op(first_task.outputs['output_csv'], third_task.outputs['preds_json'])

Overwriting ./single_layer_ann_inference_pipeline.py


In [13]:
%%sh
dsl-compile --py single_layer_ann_inference_pipeline.py --output single_layer_ann_inference_pipeline.yaml
# Compilation of the pipeline code into a YAML.