In [26]:
%help


Available Magic Commands

## Sessions Magics
%help | Return a list of descriptions and input types for all magic commands. 
%profile | String | Specify a profile in your aws configuration to use as the credentials provider.
%region | String | Specify the AWS region in which to initialize a session | Default from ~/.aws/configure
%idle_timeout | Int | The number of minutes of inactivity after which a session will timeout. The default idle timeout value is 2880 minutes (48 hours).
%session_id | Returns the session ID for the running session. 
%session_id_prefix | String | Define a String that will precede all session IDs in the format [session_id_prefix]-[session_id]. If a session ID is not provided, a random UUID will be generated.
%status | Returns the status of the current Glue session including its duration, configuration and executing user / role.
%list_sessions | Lists all currently running sessions by name and ID.
%stop_session | Stops the current session.
%glue_version | String 

# Configure and Initialize Glue Environment

In [32]:
%session_id_prefix test003
%glue_version 3.0
%number_of_workers 10
%idle_timeout 60
%additional_python_modules sagemaker

Setting session ID prefix to test003
You are already connected to session test003-eeb3945a-1d68-4073-bed9-5e781dd541db. Your change will not reflect in the current session, but it will affect future new sessions. 

Setting Glue version to: 3.0
You are already connected to session test003-eeb3945a-1d68-4073-bed9-5e781dd541db. Your change will not reflect in the current session, but it will affect future new sessions. 

Previous number of workers: 10
Setting new number of workers to: 10
You are already connected to session test003-eeb3945a-1d68-4073-bed9-5e781dd541db. Your change will not reflect in the current session, but it will affect future new sessions. 

Current idle_timeout is 60 minutes.
idle_timeout has been set to 60 minutes.
You are already connected to session test003-eeb3945a-1d68-4073-bed9-5e781dd541db. Your change will not reflect in the current session, but it will affect future new sessions. 

Additional python modules to be included:
sagemaker


In [6]:
# first liner to initialize environment
print(spark.version)

3.1.1-amzn-0


# Upload data for preprocessing

In [7]:
import sagemaker
from sagemaker import get_execution_role

role = get_execution_role()
session = sagemaker.Session()
bucket = session.default_bucket()
print(bucket)

key_prefix = "test_data"

sagemaker-ap-southeast-1-436194663543


In [8]:
s3_raw_data = f"s3://{bucket}/raw_data/iris.data"
schema = "sepal_length FLOAT, sepal_width FLOAT, petal_length FLOAT, petal_width FLOAT, class STRING" # source data schema




In [9]:
from pyspark.sql import functions as F
from pyspark.sql.types import LongType
from pyspark.ml.feature import StringIndexer




In [10]:
df = (spark
       .read
       .schema(schema)
       .options(sep=',', header=True, mode="FAILFAST", timestampFormat="yyyy-MM-dd HH:mm:ss")
       .csv(s3_raw_data)
       )


# cache for faster performance
df.cache()

# print DataFrame
df.show(10)

+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|      class|
+------------+-----------+------------+-----------+-----------+
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|
|         4.4|        2.9|         1.4|        0.2|Iris-setosa|
|         4.9|        3.1|         1.5|        0.1|Iris-setosa|
|         5.4|        3.7|         1.5|        0.2|Iris-setosa|
+------------+-----------+------------+-----------+-----------+
only showing top 10 rows


In [11]:
df = df.withColumn("class", F.regexp_replace("class", "Iris-", ""))
df.show(10)

+------------+-----------+------------+-----------+------+
|sepal_length|sepal_width|petal_length|petal_width| class|
+------------+-----------+------------+-----------+------+
|         4.9|        3.0|         1.4|        0.2|setosa|
|         4.7|        3.2|         1.3|        0.2|setosa|
|         4.6|        3.1|         1.5|        0.2|setosa|
|         5.0|        3.6|         1.4|        0.2|setosa|
|         5.4|        3.9|         1.7|        0.4|setosa|
|         4.6|        3.4|         1.4|        0.3|setosa|
|         5.0|        3.4|         1.5|        0.2|setosa|
|         4.4|        2.9|         1.4|        0.2|setosa|
|         4.9|        3.1|         1.5|        0.1|setosa|
|         5.4|        3.7|         1.5|        0.2|setosa|
+------------+-----------+------------+-----------+------+
only showing top 10 rows


In [12]:
# Rearrange columns for sagemaker xgboost (first column is label)
df = df.select("class","sepal_length","sepal_width", "petal_length", "petal_width")

df.show(10)

+------+------------+-----------+------------+-----------+
| class|sepal_length|sepal_width|petal_length|petal_width|
+------+------------+-----------+------------+-----------+
|setosa|         4.9|        3.0|         1.4|        0.2|
|setosa|         4.7|        3.2|         1.3|        0.2|
|setosa|         4.6|        3.1|         1.5|        0.2|
|setosa|         5.0|        3.6|         1.4|        0.2|
|setosa|         5.4|        3.9|         1.7|        0.4|
|setosa|         4.6|        3.4|         1.4|        0.3|
|setosa|         5.0|        3.4|         1.5|        0.2|
|setosa|         4.4|        2.9|         1.4|        0.2|
|setosa|         4.9|        3.1|         1.5|        0.1|
|setosa|         5.4|        3.7|         1.5|        0.2|
+------+------------+-----------+------------+-----------+
only showing top 10 rows


In [13]:
# UDF for converting labels to indexes
def cnvt_species(s):
    species = ['setosa', 'versicolor', 'virginica']
    return species.index(s)
cnvt_species_udf = F.udf(cnvt_species, LongType())

df = df.withColumn("class", cnvt_species_udf(F.col("class")))
df.show(10)

+-----+------------+-----------+------------+-----------+
|class|sepal_length|sepal_width|petal_length|petal_width|
+-----+------------+-----------+------------+-----------+
|    0|         4.9|        3.0|         1.4|        0.2|
|    0|         4.7|        3.2|         1.3|        0.2|
|    0|         4.6|        3.1|         1.5|        0.2|
|    0|         5.0|        3.6|         1.4|        0.2|
|    0|         5.4|        3.9|         1.7|        0.4|
|    0|         4.6|        3.4|         1.4|        0.3|
|    0|         5.0|        3.4|         1.5|        0.2|
|    0|         4.4|        2.9|         1.4|        0.2|
|    0|         4.9|        3.1|         1.5|        0.1|
|    0|         5.4|        3.7|         1.5|        0.2|
+-----+------------+-----------+------------+-----------+
only showing top 10 rows


In [14]:
# Split to train/test dataset
# train_data, test_data, validation_data = df_indexed.randomSplit([0.6, 0.3, 0.1])
train_data, test_data = df.randomSplit([0.8, 0.2])
print(train_data.count())
print(test_data.count())

121
28


In [15]:
train_data.write.csv(f"s3://{bucket}/output_data/train/", mode="overwrite")
test_data.write.csv(f"s3://{bucket}/output_data/test/", mode="overwrite")




# Model training

In [16]:
from sagemaker import image_uris
from sagemaker.inputs import TrainingInput
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import StringDeserializer, JSONDeserializer

image_uri = image_uris.retrieve("xgboost", session.boto_region_name, "latest")
s3_output_path = f"s3://{bucket}/{key_prefix}/output"




In [17]:
# initialize hyperparameters
hyperparameters = {
    "max_depth":"5",
    "eta":"0.2",
    "gamma":"4",
    "min_child_weight":"6",
    "subsample":"0.7",
    "objective":"reg:linear",
    "num_round":"50"
}




In [18]:
# construct a SageMaker estimator that calls the xgboost-container
estimator = sagemaker.estimator.Estimator(
    image_uri=image_uri,
    hyperparameters=hyperparameters,
    role=role,
    instance_count=1, 
    instance_type='ml.m5.2xlarge', 
    volume_size=5, # 5 GB 
    output_path=s3_output_path
)




In [19]:
# model training
content_type = "text/csv"

train_input = TrainingInput("s3://{}/output_data/train/".format(bucket), content_type=content_type)

estimator.fit({"train": train_input})

2023-05-18 18:02:43 Starting - Starting the training job...
2023-05-18 18:02:58 Starting - Preparing the instances for training......
2023-05-18 18:04:11 Downloading - Downloading input data
2023-05-18 18:04:11 Training - Downloading the training image...
2023-05-18 18:04:21 Training - Training image download completed. Training in progress..Arguments: train
[2023-05-18:18:04:41:INFO] Running standalone xgboost training.
[2023-05-18:18:04:41:INFO] Path /opt/ml/input/data/validation does not exist!
[2023-05-18:18:04:41:INFO] File size need to be processed in the node: 0.0mb. Available memory size in the node: 23997.21mb
[2023-05-18:18:04:41:INFO] Determined delimiter of CSV input is ','
[18:04:41] S3DistributionType set as FullyReplicated
[18:04:41] 121x4 matrix with 484 entries loaded from /opt/ml/input/data/train?format=csv&label_column=0&delimiter=,
[18:04:41] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 4 extra nodes, 0 pruned nodes, max_depth=2
[0]#011train-rmse:0.80130

In [20]:
# model predictor
predictor = estimator.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.xlarge",
    serializer=CSVSerializer()
)

---!


In [21]:
print("Setosa")
print(predictor.predict([4.9,3.0,1.4,0.2]))
print("Versicolor")
print(predictor.predict([5.7,2.8,4.1,1.3]))
print("Virginica")
print(predictor.predict([5.9,3.0,5.1,1.8]))

Setosa
b'0.24620097875595093'
Versicolor
b'1.00162935256958'
Virginica
b'1.7745649814605713'


## Validate the model for use

In [22]:
import numpy as np

# retrieve and convert test data
# can import from S3 as DF and convert to array but using test_data directly for now
def predict(data):
    rows=len(data)
    split_array = np.array_split(data, int(data.shape[0] / float(rows)+1))
    
    predictions = ''

    for array in split_array:
        predictions = ','.join([predictions, predictor.predict(array).decode('utf-8')])
        
    return np.fromstring(predictions[1:], sep=',')

predictions = predict(test_data.toPandas().to_numpy()[:, 1:])

print(predictions)

[0.24620098 0.24620098 0.24620098 0.24620098 0.24620098 0.24620098
 0.24620098 0.24620098 0.24620098 0.24620098 1.00162935 1.00162935
 1.00162935 1.00162935 1.12922454 1.00162935 1.00162935 1.00162935
 1.00162935 1.00162935 1.00162935 1.77456498 1.77456498 1.77456498
 1.77456498 1.77456498 1.77456498 1.77456498]


In [49]:
from pyspark.sql.window import Window

predictions_df = spark.sparkContext.parallelize(predictions).map(lambda x: (float(x), )).toDF(["prediction"])

# since there is no common column between these two dataframes add row_index so that it can be joined
test_data = test_data.withColumn('row_index', F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))
predictions_df = predictions_df.withColumn('row_index', F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))

predicted_data = test_data.join(predictions_df, test_data.row_index == predictions_df.row_index, 'inner').drop(predictions_df.row_index)
predicted_data = predicted_data.select("row_index","sepal_length","sepal_width", "petal_length", "petal_width", "class", "prediction")

predicted_data.show()


# print("Virginica")
# print(predictor.predict([5.9,3.0,5.1,1.8]))

+---------+------------+-----------+------------+-----------+-----+-------------------+
|row_index|sepal_length|sepal_width|petal_length|petal_width|class|         prediction|
+---------+------------+-----------+------------+-----------+-----+-------------------+
|        1|         4.5|        2.3|         1.3|        0.3|    0|0.24620097875595093|
|        2|         4.6|        3.4|         1.4|        0.3|    0|0.24620097875595093|
|        3|         4.7|        3.2|         1.6|        0.2|    0|0.24620097875595093|
|        4|         4.9|        3.0|         1.4|        0.2|    0|0.24620097875595093|
|        5|         4.9|        3.1|         1.5|        0.1|    0|0.24620097875595093|
|        6|         5.0|        3.5|         1.3|        0.3|    0|0.24620097875595093|
|        7|         5.0|        3.5|         1.6|        0.6|    0|0.24620097875595093|
|        8|         5.1|        3.5|         1.4|        0.3|    0|0.24620097875595093|
|        9|         5.2|        

In [50]:
predictor.delete_endpoint()




In [37]:
%stop_session

Stopping session: test003-eeb3945a-1d68-4073-bed9-5e781dd541db
Stopped session.
