KNN With Sagemaker Training API - Quick Approach

In [None]:
import boto3

# Set variables
region = boto3.Session().region_name

downloaded_data_bucket = f"sagemaker-example-files-prod-{region}"
downladed_data_prefix = f"datasets/tabular/uci_covtype"

s3 = boto3.client("s3").download_file(downloaded_data_bucket,
                                     f"{downladed_data_prefix}/covtype.data.gz", "covtype.data.gz")

In [None]:
!mkdir -p /tmp/covtype/raw
!mv covtype.data.gz /tmp/covtype/raw/covtype.data.gz

In [None]:
# Preprocessing Data
import numpy as np
import os

data_dir = "/tmp/covtype/"
processed_subdir = "standardized"
raw_data_file = os.path.join(data_dir, "raw", "covtype.data.gz")
train_features_file = os.path.join(data_dir, processed_subdir, "train/csv/features.csv")
train_labels_file = os.path.join(data_dir, processed_subdir, "train/csv/labels.csv")
test_features_file = os.path.join(data_dir, processed_subdir, "test/csv/features.csv")
test_labels_file = os.path.join(data_dir, processed_subdir, "test/csv/labels.csv")

# read raw data
print(f"Read raw data from:{raw_data_file}")
raw = np.loadtxt(raw_data_file, delimiter=",")

# split into train/test with 90/10 split
np.random.seed(0)
np.random.shuffle(raw)
train_size = int(0.9 * raw.shape[0])
train_features = raw[:train_size, :-1]
train_labels = raw[:train_size, -1]
test_features = raw[train_size:, :-1]
test_labels = raw[train_size:, -1]

In [None]:
train_features.shape, train_labels.shape, test_features.shape, test_labels.shape

In [None]:
train_features[0]

In [None]:
# upload to S3

import io
import sagemaker.amazon.common as smac

train_set_buf = io.BytesIO()
smac.write_numpy_to_dense_tensor(train_set_buf, train_features, train_labels)
train_set_buf.seek(0)

In [None]:
import sagemaker

sess = sagemaker.Session()
bucket = sess.default_bucket()
prefix = "sagemaker/DEMO-knn-covtype"

key = "recordio-pb-data"
boto3.resource("s3").Bucket(bucket).Object(os.path.join(prefix, "train", key)).upload_fileobj(train_set_buf)
s3_train_data = f"s3://{bucket}/{prefix}/train/{key}"

print(f"Uploaded training data location: {s3_train_data}")

In [None]:
# Upload test data to S3 to help woth evaluation later on

test_set_buf = io.BytesIO()
smac.write_numpy_to_dense_tensor(test_set_buf, test_features, test_labels)
test_set_buf.seek(0)

boto3.resource("s3").Bucket(bucket).Object(os.path.join(prefix, "test", key)).upload_fileobj(test_set_buf)
s3_test_data = f"s3://{bucket}/{prefix}/test/{key}"
print(f"Uploaded test data to location: {s3_test_data}")

In [None]:
# Train the model
from sagemaker import get_execution_role
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer

from sagemaker import image_uris

In [None]:
role = get_execution_role()

In [None]:
container = image_uris.retrieve(framework="knn", region=region)

def trained_estimator_from_hyperparameters(s3_train_data, hyperparams, output_path, s3_test_data=None):
    knn = sagemaker.estimator.Estimator(
        image_uri=container,
        role=role,
        instance_count=1,
        instance_type="ml.m5.xlarge",
        output_path=output_path,
        sagemaker_session=sess,
    )
    knn.set_hyperparameters(**hyperparams)
    # fit the model
    inputs = {"train":s3_train_data}
    if s3_test_data is not None:
        inputs["test"] = s3_test_data
    knn.fit(inputs=inputs)
    return knn

In [None]:
# Run the training job

output_path = f"s3://{bucket}/{prefix}/output"
hyperparams = {"feature_dim":54, "k":10, "sample_size":200000, "predictor_type":"classifier"}
knn_estimator = trained_estimator_from_hyperparameters(s3_train_data, hyperparams, output_path, s3_test_data=s3_test_data)

In [None]:
# Setup endpoint

def predictor_from_estimator(estimator, estimator_name, instance_type, endpoint_name):
    predictor = estimator.deploy(
        initial_instance_count=1,
        instance_type=instance_type,
        endpoint_name=endpoint_name
    )
    predictor.serializer = CSVSerializer()
    predictor.deserializer = JSONDeserializer()

    return predictor

In [None]:
import time
instance_type = "ml.m4.xlarge"
endpoint_name = "knn-demo-covtype" + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime()) 
knn_predictor = predictor_from_estimator(knn_estimator, estimator_name=f"knn-{instance_type}", instance_type=instance_type, endpoint_name=endpoint_name)

In [None]:
test_features.shape

In [None]:
sample_prediction = knn_predictor.predict(test_features[30:35], initial_args={"ContentType":"text/csv"})
print(sample_prediction)
print(type(sample_prediction))

In [None]:
# Batch Predictions

batches = np.array_split(test_features, 100)
print(f"data split into 100 batches of size {batches[0].shape[0]}")

start_time = time.time()
predictions = []
for batch in batches:
    result = knn_predictor.predict(batch, initial_args={"ContentType":"text/csv"})
    cur_predictions = np.array(
        [result["predictions"][i]["predicted_label"] for i in range(len(result["predictions"]))]
    )
    predictions.append(cur_predictions)

predictions = np.concatenate(predictions)
run_time = time.time() - start_time

test_size = test_labels.shape[0]
num_correct = sum(predictions == test_labels)
accuracy = num_correct/float(test_size)
print(f"Time required for prediction {test_size} data points:{run_time} seconds")
print(f"Accuracy of the model: {accuracy}%")

In [None]:
knn_predictor.delete_model()
knn_predictor.delete_endpoint()