### Overview

This notebook does the following:

* Explore and query data from a Hive table 
* Demonstrates how to saved processed data to S3 and utilize Built In SageMaker algorithm (BlazingText) for sentiment analysis

In [2]:
%%local
print("Demo Notebook")

Demo Notebook


Next, we will query the movie_reviews table and get the data into a spark dataframe.

In [4]:
from pyspark.sql.functions import regexp_replace, col, concat, lit
movie_reviews = sqlContext.sql("select * from movie_reviews").cache()
movie_reviews= movie_reviews.where(col('sentiment') != "sentiment")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
%load_ext sagemaker_studio_analytics_extension.magics
%sm_analytics emr connect --verify-certificate False --cluster-id j-3HRO0LPGDWELV --auth-type None   


Boto3 will no longer support Python 3.7 starting December 13, 2023. To continue receiving service updates, bug fixes, and security updates please upgrade to Python 3.8 or later. More information can be found here: https://aws.amazon.com/blogs/developer/python-support-policy-updates-for-aws-sdks-and-tools/



Successfully read emr cluster(j-3HRO0LPGDWELV) details
Initiating EMR connection..
Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1700429933532_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.
{"namespace": "sagemaker-analytics", "cluster_id": "j-3HRO0LPGDWELV", "error_message": null, "success": true, "service": "emr", "operation": "connect"}


In this notebook we use SageMaker experiments, trial and estimator to train a model and deploy the model using SageMaker realtime endpoint hosting

In [5]:
%%local
%pip install -q sagemaker-experiments 

[33mDEPRECATION: pyodbc 4.0.0-unsupported has a non-standard version number. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of pyodbc or contact the author to suggest that they release a version with a conforming version number. Discussion can be found at https://github.com/pypa/pip/issues/12063[0m[33m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [6]:
%%local
import sagemaker
import boto3
import botocore
from botocore.exceptions import ClientError
from time import strftime, gmtime
import json
from sagemaker import get_execution_role

from smexperiments.experiment import Experiment
from smexperiments.trial import Trial


4.0.0-unsupported is an invalid version and will not be supported in a future release



In [7]:
%%local 
sess = sagemaker.Session()
bucket = sess.default_bucket()

train_bucket = f"s3://{bucket}/reviews/train"
val_bucket = f"s3://{bucket}/reviews/val"

Send the following variables to spark

In [8]:
%%send_to_spark -i train_bucket -t str -n train_bucket

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Successfully passed 'train_bucket' as 'train_bucket' to Spark kernel

In [9]:
%%send_to_spark -i val_bucket -t str -n val_bucket

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Successfully passed 'val_bucket' as 'val_bucket' to Spark kernel

In [10]:
val_bucket

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

's3://sagemaker-us-east-1-793217038115/reviews/val'

### Pre-process data and feature engineering

In [11]:
from pyspark.sql.functions import regexp_replace, col, concat, lit

movie_reviews = movie_reviews.withColumn('sentiment', regexp_replace('sentiment', 'positive', '__label__positive'))
movie_reviews = movie_reviews.withColumn('sentiment', regexp_replace('sentiment', 'negative', '__label__negative'))

# Remove all the special characters
movie_reviews = movie_reviews.withColumn('review', regexp_replace('review', '\W', " "))

# Remove all single characters
movie_reviews = movie_reviews.withColumn('review', regexp_replace('review', r"\s+[a-zA-Z]\s+", " "))

# Remove single characters from the start
movie_reviews = movie_reviews.withColumn('review', regexp_replace('review', r"\^[a-zA-Z]\s+", " "))

# Substituting multiple spaces with single space
movie_reviews = movie_reviews.withColumn('review', regexp_replace('review', r"\s+", " "))

# Removing prefixed 'b'
movie_reviews = movie_reviews.withColumn('review', regexp_replace('review', r"^b\s+", " "))

movie_reviews.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------------+
|              review|        sentiment|
+--------------------+-----------------+
|One of the other ...|__label__positive|
|A wonderful littl...|__label__positive|
|I thought this wa...|__label__positive|
|Basically there a...|__label__negative|
|Petter Mattei Lov...|__label__positive|
|Probably my all t...|__label__positive|
|I sure would like...|__label__positive|
|This show was an ...|__label__negative|
|Encouraged by the...|__label__negative|
|If you like origi...|__label__positive|
|Phil the Alien is...|__label__negative|
|I saw this movie ...|__label__negative|
|So im not big fan...|__label__negative|
|The cast played S...|__label__negative|
|This fantastic mo...|__label__positive|
|Kind of drawn in ...|__label__negative|
|Some films just s...|__label__positive|
|This movie made i...|__label__negative|
|I remember this f...|__label__positive|
|An awful film It ...|__label__negative|
+--------------------+-----------------+
only showing top

Merge columns for [BlazingText](https://docs.aws.amazon.com/sagemaker/latest/dg/blazingtext.html) input format.

In [12]:

movie_reviews = movie_reviews.select(concat(col("sentiment"), lit(" "), col("review")).alias("record"))
movie_reviews.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|              record|
+--------------------+
|__label__positive...|
|__label__positive...|
|__label__positive...|
|__label__negative...|
|__label__positive...|
|__label__positive...|
|__label__positive...|
|__label__negative...|
|__label__negative...|
|__label__positive...|
|__label__negative...|
|__label__negative...|
|__label__negative...|
|__label__negative...|
|__label__positive...|
|__label__negative...|
|__label__positive...|
|__label__negative...|
|__label__positive...|
|__label__negative...|
+--------------------+
only showing top 20 rows

In [13]:
# Set flag so that _SUCCESS meta files are not written to S3
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
train_df, val_df = movie_reviews.randomSplit([0.8, 0.2], seed=42)
train_df.coalesce(1).write.csv(train_bucket, mode='overwrite') 
val_df.coalesce(1).write.csv(val_bucket, mode='overwrite') 

print(train_bucket)
print(val_bucket)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://sagemaker-us-east-1-793217038115/reviews/train
s3://sagemaker-us-east-1-793217038115/reviews/val

In [15]:
%%local
instance_type_smtraining="ml.m5.xlarge"
instance_type_smendpoint="ml.m5.xlarge"

In [16]:
%%local
prefix = 'blazingtext/supervised' 
output_location = 's3://{}/{}/output'.format(bucket, prefix)

print(train_bucket)
print(val_bucket)
print(output_location)

s3://sagemaker-us-east-1-793217038115/reviews/train
s3://sagemaker-us-east-1-793217038115/reviews/val
s3://sagemaker-us-east-1-793217038115/blazingtext/supervised/output


### Train a SageMaker model
#### SageMaker Experiments

SageMaker Experiments allows us to keep track of model training. To reduce cost, the training code below has a variable to utilize spot instances.

In [17]:
%%local
import boto3
region_name = boto3.Session().region_name

sm_session = sagemaker.session.Session()

create_date = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
sentiment_experiment = Experiment.create(experiment_name="sentimentdetection-{}".format(create_date), 
                                              description="Detect sentiment in text", 
                                              sagemaker_boto_client=boto3.client('sagemaker'))

trial = Trial.create(trial_name="sentiment-trial-blazingtext-{}".format(strftime("%Y-%m-%d-%H-%M-%S", gmtime())), 
                     experiment_name=sentiment_experiment.experiment_name,
                     sagemaker_boto_client=boto3.client('sagemaker'))

container = sagemaker.amazon.amazon_estimator.get_image_uri(region_name, "blazingtext", "latest")
print('Using SageMaker BlazingText container: {} ({})'.format(container, region_name))


Boto3 will no longer support Python 3.7 starting December 13, 2023. To continue receiving service updates, bug fixes, and security updates please upgrade to Python 3.8 or later. More information can be found here: https://aws.amazon.com/blogs/developer/python-support-policy-updates-for-aws-sdks-and-tools/

See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.
INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


Using SageMaker BlazingText container: 811284229777.dkr.ecr.us-east-1.amazonaws.com/blazingtext:1 (us-east-1)


In [18]:
%%local 
train_use_spot_instances = False
train_max_run=3600
train_max_wait = 3600 if train_use_spot_instances else None

bt_model = sagemaker.estimator.Estimator(container,
                                         role=sagemaker.get_execution_role(), 
                                         instance_count=1, 
                                         instance_type=instance_type_smtraining,
                                         volume_size = 30,
                                         input_mode= 'File',
                                         output_path=output_location,
                                         sagemaker_session=sm_session,
                                         use_spot_instances=train_use_spot_instances,
                                         max_run=train_max_run,
                                         max_wait=train_max_wait)

In [19]:
%%local 
bt_model.set_hyperparameters(mode="supervised",
                            epochs=10,
                            min_count=2,
                            learning_rate=0.005328,
                            vector_dim=286,
                            early_stopping=True,
                            patience=4,
                            min_epochs=5,
                            word_ngrams=2)

In [20]:
%%local
train_data = sagemaker.inputs.TrainingInput(train_bucket, distribution='FullyReplicated', 
                        content_type='text/plain', s3_data_type='S3Prefix')

validation_data = sagemaker.inputs.TrainingInput(val_bucket, distribution='FullyReplicated', 
                             content_type='text/plain', s3_data_type='S3Prefix')


data_channels = {'train': train_data, 'validation': validation_data}

In [21]:
%%local
%%time

bt_model.fit(data_channels, 
             experiment_config={
                      "ExperimentName": sentiment_experiment.experiment_name, 
                      "TrialName": trial.trial_name,
                      "TrialComponentDisplayName": "BlazingText-Training",
                  },
             logs=False)

INFO:sagemaker:Creating training-job with name: blazingtext-2023-11-19-22-47-31-341



2023-11-19 22:47:31 Starting - Starting the training job..
2023-11-19 22:47:46 Starting - Preparing the instances for training..........
2023-11-19 22:48:44 Downloading - Downloading input data....
2023-11-19 22:49:09 Training - Downloading the training image.
2023-11-19 22:49:20 Training - Training image download completed. Training in progress..............
2023-11-19 22:50:30 Uploading - Uploading generated training model..........
2023-11-19 22:51:26 Completed - Training job completed
CPU times: user 289 ms, sys: 13 ms, total: 302 ms
Wall time: 3min 58s


### Deploy the model and get predictions

In [22]:
%%local 
from sagemaker.serializers import JSONSerializer

text_classifier = bt_model.deploy(initial_instance_count = 1, instance_type = instance_type_smendpoint, serializer=JSONSerializer())

INFO:sagemaker:Creating model with name: blazingtext-2023-11-19-22-51-37-385
INFO:sagemaker:Creating endpoint-config with name blazingtext-2023-11-19-22-51-37-385
INFO:sagemaker:Creating endpoint with name blazingtext-2023-11-19-22-51-37-385


----!

In [23]:
%%local 
import json

review = ["please give this one a miss br br kristy swanson and the rest of the cast"
          "rendered terrible performances the show is flat flat flat br br"
          "i don't know how michael madison could have allowed this one on his plate"
          "he almost seemed to know this wasn't going to work out"
          "and his performance was perhaps his greatest one yet dont miss"]

payload = {"instances" : review}
output = json.loads(text_classifier.predict(payload).decode('utf-8'))
classification = output[0]['label'][0].split('__')[-1]

print("Sentiment:", classification.upper())

Sentiment: NEGATIVE


### Clean up  

In [24]:
%%local
# Delete endpoint
text_classifier.delete_endpoint()

INFO:sagemaker:Deleting endpoint configuration with name: blazingtext-2023-11-19-22-51-37-385
INFO:sagemaker:Deleting endpoint with name: blazingtext-2023-11-19-22-51-37-385


In [25]:
%%cleanup -f