In [1]:
pip install --upgrade pip

Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com
Collecting pip
  Downloading pip-22.3.1-py3-none-any.whl (2.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m11.3 MB/s[0m eta [36m0:00:00[0m:00:01[0m
[?25hInstalling collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 22.0.4
    Uninstalling pip-22.0.4:
      Successfully uninstalled pip-22.0.4
Successfully installed pip-22.3.1
Note: you may need to restart the kernel to use updated packages.


In [4]:
! pip install -qU sagemaker

In [5]:
!pip install -U "sagemaker>2.0"

Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com


In [6]:
import logging
import sagemaker
from time import gmtime, strftime

sagemaker_logger = logging.getLogger("sagemaker")
sagemaker_logger.setLevel(logging.INFO)
sagemaker_logger.addHandler(logging.StreamHandler())

sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()
role = sagemaker.get_execution_role()

Created S3 bucket: sagemaker-us-east-1-432547830124


In [7]:
bucket_name = sagemaker.Session().default_bucket()

In [8]:
bucket_name

'sagemaker-us-east-1-432547830124'

In [9]:
# Fetch the dataset from the SageMaker bucket
import boto3

s3 = boto3.client("s3")
s3.download_file(
    "sagemaker-sample-files", "datasets/tabular/uci_abalone/abalone.csv", "abalone.csv")

In [10]:
%%writefile ./preprocess.py
from __future__ import print_function
from __future__ import unicode_literals

import argparse
import csv
import os
import shutil
import sys
import time

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
    VectorAssembler,
    VectorIndexer,
)
from pyspark.sql.functions import *
from pyspark.sql.types import (
    DoubleType,
    StringType,
    StructField,
    StructType,
)


def csv_line(data):
    r = ",".join(str(d) for d in data[1])
    return str(data[0]) + "," + r


def main():
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_input_bucket", type=str, help="s3 input bucket")
    parser.add_argument("--s3_input_key_prefix", type=str, help="s3 input key prefix")
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    parser.add_argument("--s3_output_key_prefix", type=str, help="s3 output key prefix")
    args = parser.parse_args()

    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()

    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    spark.sparkContext._jsc.hadoopConfiguration().set(
        "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
    )

    # Defining the schema corresponding to the input data. The input data does not contain the headers
    schema = StructType(
        [
            StructField("sex", StringType(), True),
            StructField("length", DoubleType(), True),
            StructField("diameter", DoubleType(), True),
            StructField("height", DoubleType(), True),
            StructField("whole_weight", DoubleType(), True),
            StructField("shucked_weight", DoubleType(), True),
            StructField("viscera_weight", DoubleType(), True),
            StructField("shell_weight", DoubleType(), True),
            StructField("rings", DoubleType(), True),
        ]
    )

    # Downloading the data from S3 into a Dataframe
    total_df = spark.read.csv(
        ("s3://" + os.path.join(args.s3_input_bucket, args.s3_input_key_prefix, "abalone.csv")),
        header=False,
        schema=schema,
    )

    # StringIndexer on the sex column which has categorical value
    sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex")

    # one-hot-encoding is being performed on the string-indexed sex column (indexed_sex)
    sex_encoder = OneHotEncoder(inputCol="indexed_sex", outputCol="sex_vec")

    # vector-assembler will bring all the features to a 1D vector for us to save easily into CSV format
    assembler = VectorAssembler(
        inputCols=[
            "sex_vec",
            "length",
            "diameter",
            "height",
            "whole_weight",
            "shucked_weight",
            "viscera_weight",
            "shell_weight",
        ],
        outputCol="features",
    )

    # The pipeline is comprised of the steps added above
    pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])

    # This step trains the feature transformers
    model = pipeline.fit(total_df)

    # This step transforms the dataset with information obtained from the previous fit
    transformed_total_df = model.transform(total_df)

    # Split the overall dataset into 80-20 training and validation
    (train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2])

    # Convert the train dataframe to RDD to save in CSV format and upload to S3
    train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))
    train_lines = train_rdd.map(csv_line)
    train_lines.saveAsTextFile(
        "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "train")
    )

    # Convert the validation dataframe to RDD to save in CSV format and upload to S3
    validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))
    validation_lines = validation_rdd.map(csv_line)
    validation_lines.saveAsTextFile(
        "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "validation")
    )


if __name__ == "__main__":
    main()


Writing ./preprocess.py


In [11]:
bucket

'sagemaker-us-east-1-432547830124'

In [12]:
from sagemaker.spark.processing import PySparkProcessor

# Upload the raw input dataset to a unique S3 location
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
prefix = "sagemaker/spark-preprocess-demo/{}".format(timestamp_prefix)
input_prefix_abalone = "{}/input/raw/abalone".format(prefix)
input_preprocessed_prefix_abalone = "{}/input/preprocessed/abalone".format(prefix)

sagemaker_session.upload_data(
    path='abalone.csv', bucket=sagemaker_session.default_bucket(), key_prefix=input_prefix_abalone
)

# Run the processing job
spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="3.1",
    role=role,
    instance_count=2,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200,
)


In [13]:
bucket = sagemaker_session.default_bucket()

In [14]:
spark_processor.run(
    submit_app="./preprocess.py",
    arguments=[
        "--s3_input_bucket",
        bucket,
        "--s3_input_key_prefix",
        input_prefix_abalone,
        "--s3_output_bucket",
        bucket,
        "--s3_output_key_prefix",
        input_preprocessed_prefix_abalone,
    ],
    spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, prefix),
    logs=False,
)

Creating processing-job with name sm-spark-2023-01-04-11-00-58-459
INFO:sagemaker:Creating processing-job with name sm-spark-2023-01-04-11-00-58-459



Job Name:  sm-spark-2023-01-04-11-00-58-459
Inputs:  [{'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-432547830124/sm-spark-2023-01-04-11-00-58-459/input/code/preprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'output-1', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-432547830124/sagemaker/spark-preprocess-demo/2023-01-04-11-00-52/spark_event_logs', 'LocalPath': '/opt/ml/processing/spark-events/', 'S3UploadMode': 'Continuous'}}]
............................................................................!

In [16]:
print("Top 5 rows from s3://{}/{}/train/".format(bucket, input_preprocessed_prefix_abalone))
!aws s3 cp --quiet s3://$bucket/$input_preprocessed_prefix_abalone/train/part-00000 - | head -n5

Top 5 rows from s3://sagemaker-us-east-1-432547830124/sagemaker/spark-preprocess-demo/2023-01-04-11-00-52/input/preprocessed/abalone/train/
5.0,0.0,0.0,0.275,0.195,0.07,0.08,0.031,0.0215,0.025
5.0,0.0,0.0,0.29,0.225,0.075,0.14,0.0515,0.0235,0.04
7.0,0.0,0.0,0.305,0.225,0.07,0.1485,0.0585,0.0335,0.045
7.0,0.0,0.0,0.305,0.23,0.08,0.156,0.0675,0.0345,0.048
9.0,0.0,0.0,0.33,0.26,0.08,0.2,0.0625,0.05,0.07


In [17]:
spark_processor.start_history_server()

https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Pulling spark history server image...
INFO:sagemaker.spark.processing:Pulling spark history server image...
docker command: docker pull 173754725891.dkr.ecr.us-east-1.amazonaws.com/sagemaker-spark-processing:3.1-cpu
INFO:sagemaker.local.image:docker command: docker pull 173754725891.dkr.ecr.us-east-1.amazonaws.com/sagemaker-spark-processing:3.1-cpu


Login Succeeded


image pulled: 173754725891.dkr.ecr.us-east-1.amazonaws.com/sagemaker-spark-processing:3.1-cpu
INFO:sagemaker.local.image:image pulled: 173754725891.dkr.ecr.us-east-1.amazonaws.com/sagemaker-spark-processing:3.1-cpu
Error response from daemon: No such container: history_server
Error: No such container: history_server
History server terminated
INFO:sagemaker.spark.processing:History server terminated
Starting history server...
INFO:sagemaker.spark.processing:Starting history server...
History server is up on https://sagemakerpyspark.notebook.us-east-1.sagemaker.aws/proxy/15050
INFO:sagemaker.spark.processing:History server is up on https://sagemakerpyspark.notebook.us-east-1.sagemaker.aws/proxy/15050


In [18]:
from sagemaker.image_uris import retrieve

training_image = retrieve("xgboost", boto3.Session().region_name, "0.90-1")
print(training_image)

Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
Defaulting to only supported image scope: cpu.
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.


683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:0.90-1-cpu-py3


In [20]:
from sagemaker.inputs import TrainingInput

s3_train_data = "s3://{}/{}/{}".format(bucket, input_preprocessed_prefix_abalone, "train/part")
s3_validation_data = "s3://{}/{}/{}".format(bucket, input_preprocessed_prefix_abalone, "validation/part")
s3_output_location = "s3://{}/{}/{}".format(bucket, prefix, "xgboost_model")

xgb_model = sagemaker.estimator.Estimator(
    training_image,
    role,
    instance_count=1,
    instance_type="ml.m4.xlarge",
    volume_size=20,
    max_run=3600,
    input_mode="File",
    output_path=s3_output_location,
    sagemaker_session=sagemaker_session,
)

xgb_model.set_hyperparameters(
    objective="reg:linear",
    eta=0.2,
    gamma=4,
    max_depth=5,
    num_round=10,
    subsample=0.7,
    silent=0,
    min_child_weight=6,
)

train_data = TrainingInput(
    s3_train_data, distribution="FullyReplicated", content_type="text/csv", s3_data_type="S3Prefix"
)
validation_data = TrainingInput(
    s3_validation_data,
    distribution="FullyReplicated",
    content_type="text/csv",
    s3_data_type="S3Prefix",
)

data_channels = {"train": train_data, "validation": validation_data}

In [21]:
xgb_model.fit(inputs=data_channels, logs=True)

Creating training-job with name: sagemaker-xgboost-2023-01-04-11-17-42-842
INFO:sagemaker:Creating training-job with name: sagemaker-xgboost-2023-01-04-11-17-42-842


2023-01-04 11:17:43 Starting - Starting the training job...
2023-01-04 11:18:09 Starting - Preparing the instances for training...............
2023-01-04 11:20:19 Downloading - Downloading input data...
2023-01-04 11:21:14 Training - Training image download completed. Training in progress.
2023-01-04 11:21:14 Uploading - Uploading generated training model.[34mINFO:sagemaker-containers:Imported framework sagemaker_xgboost_container.training[0m
[34mINFO:sagemaker-containers:Failed to parse hyperparameter objective value reg:linear to Json.[0m
[34mReturning the value itself[0m
[34mINFO:sagemaker-containers:No GPUs detected (normal if no gpus installed)[0m
[34mINFO:sagemaker_xgboost_container.training:Running XGBoost Sagemaker in algorithm mode[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34m[11:21:09] 3362x9 matrix with 30258 entries loaded 