<a href="https://colab.research.google.com/github/pavansai26/end_to_end-nlp_mlops_company_sentiment-nlp_company_earnings_analysis_pipeline/blob/main/end_to_end_nlp_mlops_company_sentiment_nlp_company_earnings_analysis_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Understanding Trends in Company Valuation with NLP**

# Orchestrating company earnings trend analysis, using SEC filings, news sentiment with the transformers

In this notebook, we demonstrate how to summarize and derive sentiments out of Security and Exchange Commission reports filed by a publicly traded organization. We will derive the overall market sentiments about the said organization through financial news articles within the same financial period to present a fair view of the organization vs. market sentiments and outlook about the company's overall valuation and performance. In addition to this we will also identify the most popular keywords and entities within the news articles about that organization.

In order to achieve the above we will be using multiple SageMaker Hugging Face based NLP transformers for the downstream NLP tasks of Summarization (e.g., of the news and SEC MDNA sections) and Sentiment Analysis (of the resulting summaries).

Amazon SageMaker Pipelines is the first purpose-built, easy-to-use continuous integration and continuous delivery (CI/CD) service for machine learning (ML). With SageMaker Pipelines, you can create, automate, and manage end-to-end ML workflows at scale.

Orchestrating workflows across each step of the machine learning process (e.g. exploring and preparing data, experimenting with different algorithms and parameters, training and tuning models, and deploying models to production) can take months of coding.

Since it is purpose-built for machine learning, SageMaker Pipelines helps you automate different steps of the ML workflow, including data loading, data transformation, training and tuning, and deployment. With SageMaker Pipelines, you can build dozens of ML models a week, manage massive volumes of data, thousands of training experiments, and hundreds of different model versions. You can share and re-use workflows to recreate or optimize models

We are going to demonstrate how to summarize and derive sentiments out of Security and Exchange Commission reports filed by a publicly traded organization. We are also going to derive the overall market sentiments about the said organization through financial news articles within the same financial period to present a fair view of the organization vs. market sentiments and outlook about the company's overall valuation and performance. In addition to this we will also identify the most popular keywords and entities within the news articles about that organization.

# **install packages**

In [5]:
# Install sagemaker-studio-image-build CLI tool
!pip install sagemaker-studio-image-build

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting sagemaker-studio-image-build
  Downloading sagemaker_studio_image_build-0.6.0.tar.gz (13 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting boto3<2.0,>=1.10.44
  Downloading boto3-1.26.129-py3-none-any.whl (135 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.6/135.6 kB[0m [31m11.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting sagemaker<3.0
  Downloading sagemaker-2.152.0.tar.gz (751 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m751.1/751.1 kB[0m [31m52.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting botocore<1.30.0,>=1.29.129
  Downloading botocore-1.29.129-py3-none-any.whl (10.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.7/10.7 MB[0m [31m93.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting s3transfer<0.7.0,>=0.6.0
  Downloading s3tr

In [6]:
!pip install -q sagemaker==2.91.1

!pip install transformers
!pip install typing
!pip install sentencepiece
!pip install fiscalyear

# Install SageMaker Jumpstart Industry
!pip install smjsindustry

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m534.7/534.7 kB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.3/49.3 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for sagemaker (setup.py) ... [?25l[?25hdone
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting transformers
  Downloading transformers-4.28.1-py3-none-any.whl (7.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.0/7.0 MB[0m [31m60.9 MB/s[0m eta [36m0:00:00[0m
Collecting tokenizers!=0.11.3,<0.14,>=0.11.1
  Downloading tokenizers-0.13.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (7.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.8/7.8 MB[0m [31m30.7 MB/s[0m eta [36m0:00:00[0m
Collecting huggingface-hub<1.0,>=0.11.0
  Downloading huggingface

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting sentencepiece
  Downloading sentencepiece-0.1.99-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m30.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: sentencepiece
Successfully installed sentencepiece-0.1.99
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting fiscalyear
  Downloading fiscalyear-0.4.0-py3-none-any.whl (8.4 kB)
Installing collected packages: fiscalyear
Successfully installed fiscalyear-0.4.0
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting smjsindustry
  Downloading smjsindustry-1.0.2-py3-none-any.whl (26 kB)
Collecting sagemaker>=2.111.0
  Using cached sagemaker-2.152.0-py2.py3-none-any.whl
Installing collected packages: sag

# import packages

In [7]:
import boto3
import botocore
import pandas as pd
import sagemaker

print(f"SageMaker version: {sagemaker.__version__}")

from sagemaker.huggingface import HuggingFace
from sagemaker.huggingface import HuggingFaceModel

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CreateModelStep
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.steps import TransformStep
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.sklearn.processing import ScriptProcessor
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)

SageMaker version: 2.152.0


# Create a Custom Container


To achieve that, you first have to build a docker image and push it to an ECR (Elastic Container Registry) repo in your account. Typically, this can be done using the docker CLI and aws cli in your local machine pretty easily. However, SageMaker makes it even easier to use this in the studio environment to build, create, and push any custom container to your ECR repository using a purpose-built tool known as sagemaker-studio-image-build, and use the custom container image in your notebooks for your ML projects.

In [None]:
import boto3

region = boto3.session().region_name
my_account = boto3.client("sts").get_caller_identity().get("Account")
nlp_script_processor = f"nlp-script-processor:1.0"

CONTAINER_IMAGE_URI = f"{my_account}.dkr.ecr.{region}.amazonaws.com/{nlp_script_processor}"

CONTAINER_IMAGE_URI


## 1. Grant appropriate permissions to SageMaker

In order to use sagemaker-studio-image-build, we need to first add permissions to SageMaker's IAM role so that it may perform actions on your behalf. Specifically, you would add Amazon ECR and Amazon CodeBuild permissions to it.

In addition to this, you will also have to add the iam:PassRole permission to the SageMaker Studio execution role. Add the following policy as an inline policy to the SageMaker Studio Execution role using the AWS IAM console.

Also, you must add a trust relationship in the SageMaker Studio Execution role to allow CodeBuild to assume this role

"Action": "sts:AssumeRole"

# **2. Build a custom Docker image**

We now build a custom Dockerfile and use the CLI tool to build the image from the Dockerfile. Our docker image is going to be pretty simple, it will be a copy of the open source python:3.7-slim-buster image and contain an installation of Boto3 SDK, SageMaker SDK, Pandas, and NumPy.

For our NLP pipeline, we have a number of tasks that depend on Boto3 and SageMaker SDK. We will also use the SageMaker JumpStart Industry Python SDK to download 10k/10Q reports from SEC's EDGAR system. We install all of these dependencies in the container, and use the custom container in our ScriptProcessor step in our pipelines.

In [1]:
%%writefile Dockerfile
FROM python:3.7-slim-buster

RUN pip3 install smjsindustry==1.0.0 requests botocore boto3>=1.15.0 sagemaker pandas numpy transformers typing sentencepiece nltk
RUN python3 -c "import nltk; nltk.download('punkt')"
ENV PYTHONUNBUFFERED=TRUE

ENTRYPOINT ["python3"]

Writing Dockerfile


The code cell above will create a Dockerfile in the local project's directory. We can then run the sm-docker build command to build, and publish our image. This single command will take care of building the Docker image and publishing it to a private ECR Repository in your current region (i.e. your SageMaker Studio's default Region).

NOTE: You must execute the code cell above to run the following cells. the sm-docker build command reads the Dockerfile to create the docker image. To ensure that the code above ran successfully, please verify that you have a file named Dockerfile is under the same directory where this notebook is located in the left navigation pane of Studio. This project already includes the Dockerfile, however, if you modify the code cell above, it would be a good idea to verify if the contents of the Dockerfile were updated correctly.

In [None]:
%%time

!sm-docker build . --repository $nlp_script_processor

# Define parameters that you'll use throughout the notebook


In [None]:
s3 = boto3.resource("s3")
region = boto3.Session().region_name
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
sagemaker_role = role
default_bucket = sagemaker_session.default_bucket()
prefix = "nlp-e2e-mlops"
s3_client = boto3.client("s3", region_name=region)
sagemaker_boto_client = boto3.client("sagemaker", region_name=region)


# deploy_model_instance_type = "ml.m4.8xlarge"
deploy_model_instance_type = "ml.m4.xlarge"
inference_instances = [
    "ml.t2.medium",
    "ml.m5.xlarge",
    "ml.m5.2xlarge",
    "ml.m5.4xlarge",
    "ml.m5.12xlarge",
]
transform_instances = ["ml.m5.xlarge"]
PROCESSING_INSTANCE = "ml.m4.4xlarge"
ticker = "AMZN"

In [None]:
print(f"s3://{default_bucket}/{prefix}/code/model_deploy.py")
print(f"SageMaker Role: {role}")

# Define parameters to parametrize Pipeline Execution


Using SageMaker Pipelines, we can define the steps to be included in a pipeline but then use parameters to modify that pipeline when we go to execute the pipeline, without having to modify the pipeline definition. We'll provide some default parameter values that can be overridden on pipeline execution.



In [None]:
# Define some default parameters:

# specify default number of instances for processing step
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

# specify default instance type for processing step
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value=PROCESSING_INSTANCE
)

# specify location of inference data for data processing step
inference_input_data = f"s3://{default_bucket}/{prefix}/nlp-pipeline/inf-data"

# Specify the Ticker CIK for the pipeline
inference_ticker_cik = ParameterString(
    name="InferenceTickerCik",
    default_value=ticker,
)

# specify default method for model approval
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

# specify if new model needs to be registered and deployed
model_register_deploy = ParameterString(name="ModelRegisterDeploy", default_value="Y")

# Preparing SEC dataset


Before we dive right into setting up the pipeline, let's take a look at how the SageMaker Jumpstart Industry SDK for Financial language model helps obtain the dataset from SEC forms and what are the features available for us to use.



## Install the SageMaker JumpStart Industry SDK


The functionality is delivered through a client-side SDK. The first step requires pip installing a Python package that interacts with a SageMaker processing container. The retrieval, parsing, transforming, and scoring of text is a complex process and uses different algorithms and packages. In order to make this seamless and stable for the user, the functionality is packaged into a SageMaker container. This lifts the load of installation and maintenance of the workflow, reducing the user effort down to a pip install followed by a single API call.

In [10]:
!pip install --no-index smjsindustry




As an example, we will try to pull AMZN ticker 10k/10q filings from EDGAR and write the data as CSV to S3. Below is the single block of code that contains the API call.



In [11]:
# from smfinance import SECDataSetConfig, DataLoader
from smjsindustry.finance import DataLoader
from smjsindustry.finance.processor_config import EDGARDataSetConfig

The extracted reports will be saved to an S3 bucket for us to review. This code will also be used in the Pipeline to fetch the report for the Ticker or CIK number passed to the SageMaker Pipeline. Executing the following code cell will run a processing job which will fetch the SEC reports from the EDGAR database.



# Obtain SEC data using the SageMaker JumpStart Industry SDK


In [None]:
%%time

dataset_config = EDGARDataSetConfig(
    tickers_or_ciks=["amzn", "goog", "27904", "FB"],  # list of stock tickers or CIKs
    form_types=["10-K", "10-Q"],  # list of SEC form types
    filing_date_start="2019-01-01",  # starting filing date
    filing_date_end="2020-12-31",  # ending filing date
    email_as_user_agent="test-user@test.com",
)  # user agent email

data_loader = DataLoader(
    role=sagemaker.get_execution_role(),  # loading job execution role
    instance_count=1,  # instances number, limit varies with instance type
    instance_type="ml.c5.2xlarge",  # instance type
    volume_size_in_gb=30,  # size in GB of the EBS volume to use
    volume_kms_key=None,  # KMS key for the processing volume
    output_kms_key=None,  # KMS key ID for processing job outputs
    max_runtime_in_seconds=None,  # timeout in seconds. Default is 24 hours.
    sagemaker_session=sagemaker.Session(),  # session object
    tags=None,
)  # a list of key-value pairs

data_loader.load(
    dataset_config,
    "s3://{}/{}".format(
        default_bucket, "sample-sec-data"
    ),  # output s3 prefix (both bucket and folder names are required)
    "dataset_10k_10q.csv",  # output file name
    wait=True,
    logs=True,
)

# Output
#The output of the data_loader processing job is a CSV file. We see the filings for different quarters.

# The filing date comes within a month of the end date of the reporting period. Both these dates are collected and displayed in the dataframe. The column text contains the full text of the report, but the tables are not extracted. The values in the tables in the filings are balance-sheet and income-statement data (numeric/tabular) and are easily available elsewhere as they are reported in numeric databases. The last column of the dataframe comprises the Management Discussion & Analysis section, the column is named mdna, which is the primary forward-looking section in the filing. This is the section that has been most widely used in financial text analysis. Therefore, we will use the mdna text to derive the sentiment of the overall filing in this example.

In [None]:
!mkdir data
print(f"{default_bucket}/{prefix}/")
s3_client.download_file(
    default_bucket,
    "{}/{}".format(f"sample-sec-data", f"dataset_10k_10q.csv"),
    f"./data/dataset_10k_10q.csv",
)

In [None]:
data_frame_10k_10q = pd.read_csv(f"./data/dataset_10k_10q.csv")
data_frame_10k_10q

# Set Up Your MLOps NLP Pipeline with SageMaker Pipelines



# Step 1: Data pre-processing - extract SEC data and news about the company

## Define a processing step to prepare SEC data for inference

We will define a processing step to extract 10K and 10Q forms for a specific Organization either using the company Stock Ticker Symbol or CIK (Central Index Key) used to lookup reports in SEC's EDGAR System. You can find the company Stock Ticker Symbol to CIK Number mapping here. This step will also collect news article snippets related to the company using the NewsCatcher API.

Important:
It is recommended to use CIKs as the input. The tickers will be internally converted to CIKs according to the mapping file.
One ticker may map to multiple CIKs, but we only support the latest ticker to CIK mapping. Please provide the old CIKs in the input when you want historical filings. Also note that even though the Client side SDK allows you to download multiple SEC reports for multiple CIKs at a time, we will set up our data preprocessing step to grab exactly 1 SEC Report for 1 CIK (Company/Organization).

The code snippet you provided is using the AWS SDK for Python (Boto3) to create an Amazon SageMaker ScriptProcessor object. The ScriptProcessor is used to run a script in a specified container environment for data processing tasks in Amazon SageMaker.

In the code snippet, the ScriptProcessor is being created with the following parameters:

command: The command to be executed in the script processor. In this case, it is set to ["python3"], indicating that the script processor will run a Python 3 command.

image_uri: The URI of the container image to be used for the script processor. CONTAINER_IMAGE_URI is a placeholder variable that should be replaced with the actual URI of the desired container image.

role: The IAM role ARN (Amazon Resource Name) that provides permissions to the script processor. The role variable should be replaced with the actual ARN of the IAM role.

instance_count: The number of instances to be used for the script processor. The processing_instance_count variable should be replaced with the desired number of instances.

instance_type: The EC2 instance type to be used for the script processor. The processing_instance_type variable should be replaced with the desired instance type, in this case, "ml.c5.2xlarge".

After creating the ScriptProcessor object, it can be used for data processing tasks in Amazon SageMaker, such as running scripts on input data or generating output data.



In [None]:

loader_instance_type = "ml.c5.2xlarge"
create_dataset_processor = ScriptProcessor(
    command=["python3"],
    image_uri=CONTAINER_IMAGE_URI,
    role=role,
    instance_count=processing_instance_count,
    instance_type=processing_instance_type,
)

Create a processing step to process the SEC data for inference:



The additional code you provided is using the ScriptProcessor object to create a processing step in Amazon SageMaker. The processing step will execute a script called "data-processing.py" on input data and generate two processing outputs.

Here's a breakdown of the additional code:

create_dataset_script_uri: It represents the S3 URI where the "data-processing.py" script will be stored. It uses the default_bucket, prefix, and "code/data-processing.py" to construct the URI.

s3_client.upload_file: This code uploads the local file "./scripts/data-processing.py" to the S3 bucket specified by default_bucket and the key specified by the combination of prefix and "code/data-processing.py". This step is necessary to make the script accessible for the processing step.

create_dataset_step: It creates a ProcessingStep object, representing the processing step in Amazon SageMaker. The parameters for the ProcessingStep include:

name: The name of the processing step, in this case, "HFSECFinBertCreateDataset".

processor: The create_dataset_processor object, which is an instance of the previously created ScriptProcessor.

outputs: A list of ProcessingOutput objects that define the outputs generated by the processing step. In this case, there are two outputs: "report_data" and "article_data". Each output specifies a source directory in the script processor container ("/opt/ml/processing/output/10k10q" and "/opt/ml/processing/output/articles") and a destination directory where the output data will be stored. The destination directory is constructed using the inference_input_data variable and a subdirectory name.

job_arguments: A list of command-line arguments to be passed to the script. These arguments provide configuration information for the script, such as the ticker CIK, instance type, region, S3 bucket, prefix, and role. The values for these arguments are provided using the corresponding variables.

code: The S3 URI of the script to be executed, which is set to create_dataset_script_uri. It specifies the location where the "data-processing.py" script is stored in S3.

In [None]:
create_dataset_script_uri = f"s3://{default_bucket}/{prefix}/code/data-processing.py"
s3_client.upload_file(
    Filename="./scripts/data-processing.py",
    Bucket=default_bucket,
    Key=f"{prefix}/code/data-processing.py",
)

create_dataset_step = ProcessingStep(
    name="HFSECFinBertCreateDataset",
    processor=create_dataset_processor,
    outputs=[
        sagemaker.processing.ProcessingOutput(
            output_name="report_data",
            source="/opt/ml/processing/output/10k10q",
            destination=f"{inference_input_data}/10k10q",
        ),
        sagemaker.processing.ProcessingOutput(
            output_name="article_data",
            source="/opt/ml/processing/output/articles",
            destination=f"{inference_input_data}/articles",
        ),
    ],
    job_arguments=[
        "--ticker-cik",
        inference_ticker_cik,
        "--instance-type",
        loader_instance_type,
        "--region",
        region,
        "--bucket",
        default_bucket,
        "--prefix",
        prefix,
        "--role",
        role,
    ],
    code=create_dataset_script_uri,
)

# Step 2: Create models for summarization and sentiment analysis

In [12]:
sentiment_model_name = "HFSECFinbertModel"
summarization_model_name = "HFSECPegasusModel"

# Create the finBert model for Sentiment Analysis

In [None]:
# Download pre-trained model using HuggingFaceModel class
from sagemaker.huggingface import HuggingFaceModel

hub = {"HF_MODEL_ID": "ProsusAI/finbert", "HF_TASK": "text-classification"}

# create Hugging Face Model Class (documentation here: https://sagemaker.readthedocs.io/en/stable/frameworks/huggingface/sagemaker.huggingface.html#hugging-face-model)
sentiment_huggingface_model = HuggingFaceModel(
    name=sentiment_model_name,
    transformers_version="4.6.1",
    pytorch_version="1.7.1",
    py_version="py36",
    env=hub,
    role=role,
    sagemaker_session=sagemaker_session,
)

inputs = sagemaker.inputs.CreateModelInput(instance_type="ml.m4.xlarge")

create_sentiment_model_step = CreateModelStep(
    name="HFSECFinBertCreateModel",
    model=sentiment_huggingface_model,
    inputs=inputs,
    #     depends_on=['HFSECFinBertCreateDataset']
)

## Create the Pegasus summarization model

In [None]:
hub = {
    "HF_MODEL_ID": "human-centered-summarization/financial-summarization-pegasus",
    "HF_TASK": "summarization",
}

# create Hugging Face Model Class (documentation here: https://sagemaker.readthedocs.io/en/stable/frameworks/huggingface/sagemaker.huggingface.html#hugging-face-model)
summary_huggingface_model = HuggingFaceModel(
    name=summarization_model_name,
    transformers_version="4.6.1",
    pytorch_version="1.7.1",
    py_version="py36",
    env=hub,
    role=role,
    sagemaker_session=sagemaker_session,
)

create_summary_model_step = CreateModelStep(
    name="HFSECPegasusCreateModel",
    model=summary_huggingface_model,
    inputs=inputs,
    #     depends_on=['HFSECFinBertCreateDataset']
)

# Step 3: Register model

Use HuggingFace register method to register Hugging Face Model for deployment. Set up step as a custom processing step



In [None]:
sentiment_model_package_group_name = "HuggingFaceSECSentimentModelPackageGroup"
summary_model_package_group_name = "HuggingFaceSECSummaryModelPackageGroup"
model_approval_status = "Approved"

register_sentiment_model_step = RegisterModel(
    name="HFSECFinBertRegisterModel",
    model=sentiment_huggingface_model,
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.t2.medium", "ml.m4.4xlarge"],
    transform_instances=["ml.m4.4xlarge"],
    model_package_group_name=sentiment_model_package_group_name,
    approval_status=model_approval_status,
    depends_on=["HFSECFinBertCreateModel"],
)

register_summary_model_step = RegisterModel(
    name="HFSECPegasusRegisterModel",
    model=summary_huggingface_model,
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.t2.medium", "ml.m4.4xlarge"],
    transform_instances=["ml.m4.4xlarge"],
    model_package_group_name=summary_model_package_group_name,
    approval_status=model_approval_status,
    depends_on=["HFSECPegasusCreateModel"],
)

# Step 4: Deploy model

We deploy the FinBert and Pegasus models from the model registry.

NOTE: The models in the model registry are the pre-trained version from HuggingFace Model Hub. Each of the deployment step will attempt to deploy a SageMaker Endpoint with the model and will write a property file upon successful completion. The Pipeline will make use of these property files to decide whether to execute the subsequent summarization and sentiment analysis inference steps.

In [None]:
deploy_model_instance_type = "ml.m4.4xlarge"
deploy_model_instance_count = "1"

sentiment_endpoint_name = "HFSECFinBertModel-endpoint"
summarization_endpoint_name = "HFSECPegasusModel-endpoint"

In [None]:
s3_client.upload_file(
    Filename="./scripts/model_deploy_v2.py",
    Bucket=default_bucket,
    Key=f"{prefix}/code/model_deploy_v2.py",
)
deploy_model_script_uri = f"s3://{default_bucket}/{prefix}/code/model_deploy_v2.py"


deploy_model_processor = ScriptProcessor(
    command=["python3"],
    image_uri=CONTAINER_IMAGE_URI,
    role=role,
    instance_count=processing_instance_count,
    instance_type=processing_instance_type,
)

sentiment_deploy_response = PropertyFile(
    name="SentimentPropertyFile",
    output_name="sentiment_deploy_response",
    path="success.json",  # the property file generated by the script
)

sentiment_deploy_step = ProcessingStep(
    name="HFSECFinBertDeployModel",
    processor=deploy_model_processor,
    outputs=[
        sagemaker.processing.ProcessingOutput(
            output_name="sentiment_deploy_response",
            source="/opt/ml/processing/output",
            destination=f"s3://{default_bucket}/{prefix}/nlp-pipeline/sentimentResponse",
        )
    ],
    job_arguments=[
        "--initial-instance-count",
        deploy_model_instance_count,
        "--endpoint-instance-type",
        deploy_model_instance_type,
        "--endpoint-name",
        sentiment_endpoint_name,
        "--model-package-group-name",
        sentiment_model_package_group_name,
        "--role",
        role,
        "--region",
        region,
    ],
    property_files=[sentiment_deploy_response],
    code=deploy_model_script_uri,
    depends_on=["HFSECFinBertRegisterModel"],
)




In [None]:
summary_deploy_response = PropertyFile(
    name="SummaryPropertyFile",
    output_name="summary_deploy_response",
    path="success.json",  # the property file generated by the script
)

summary_deploy_step = ProcessingStep(
    name="HFSECPegasusDeployModel",
    processor=deploy_model_processor,
    outputs=[
        sagemaker.processing.ProcessingOutput(
            output_name="summary_deploy_response",
            source="/opt/ml/processing/output",
            destination=f"s3://{default_bucket}/{prefix}/nlp-pipeline/summaryResponse",
        )
    ],
    job_arguments=[
        "--initial-instance-count",
        deploy_model_instance_count,
        "--endpoint-instance-type",
        deploy_model_instance_type,
        "--endpoint-name",
        summarization_endpoint_name,
        "--model-package-group-name",
        summary_model_package_group_name,
        "--role",
        role,
        "--region",
        region,
    ],
    property_files=[summary_deploy_response],
    code=deploy_model_script_uri,
    depends_on=["HFSECPegasusRegisterModel"],
)

Create pipeline conditions to check if the Endpoint deployments were successful

We will define a condition that checks to see if our model deployment was successful based on the property files generated by the deployment steps of both the FinBert and Pegasus Models. If both the conditions evaluates to True then we will run or subsequent inferences for Summarization and Sentiment analysis.



In [None]:
from sagemaker.workflow.conditions import ConditionEquals
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

summarize_script_uri = f"s3://{default_bucket}/{prefix}/code/summarize.py"

sentiment_condition_eq = ConditionEquals(
    left=JsonGet(  # the left value of the evaluation expression
        step_name="HFSECFinBertDeployModel",  # the step from which the property file will be grabbed
        property_file=sentiment_deploy_response,  # the property file instance that was created earlier in Step 4
        json_path="model_created",  # the JSON path of the property within the property file success.json
    ),
    right="Y",  # the right value of the evaluation expression, i.e. the AUC threshold
)

In [None]:
summary_condition_eq = ConditionEquals(
    left=JsonGet(  # the left value of the evaluation expression
        step_name="HFSECPegasusDeployModel",  # the step from which the property file will be grabbed
        property_file=summary_deploy_response,  # the property file instance that was created earlier in Step 4
        json_path="model_created",  # the JSON path of the property within the property file success.json
    ),
    right="Y",  # the right value of the evaluation expression, i.e. the AUC threshold
)

summarize_processor = ScriptProcessor(
    command=["python3"],
    image_uri=CONTAINER_IMAGE_URI,
    role=role,
    instance_count=processing_instance_count,
    instance_type=processing_instance_type,
)

summarize_step_2 = ProcessingStep(
    name="HFSECPegasusSummarizer_2",
    processor=summarize_processor,
    inputs=[
        sagemaker.processing.ProcessingInput(
            input_name="summary_data",
            source=f"{inference_input_data}/10k10q",
            destination="/opt/ml/processing/input",
        )
    ],
    outputs=[
        sagemaker.processing.ProcessingOutput(
            output_name="summarized_data",
            source="/opt/ml/processing/output",
            destination=f"{inference_input_data}/10k10q/summary",
        )
    ],
    job_arguments=[
        "--ticker-cik",
        inference_ticker_cik,
        "--region",
        region,
        "--endpoint-name",
        summarization_endpoint_name,
    ],
    code=summarize_script_uri,
)

deploy_condition_step = ConditionStep(...) : This code block initializes a condition step using the ConditionStep class and assigns it to the deploy_condition_step variable. The condition step is used to define branching logic based on specified conditions.

The parameters for the ConditionStep class include:

name="HFSECFinBertDeployConditionCheck" : Specifies the name of the condition step.

conditions=[sentiment_condition_eq, summary_condition_eq] : Defines the conditions for the step. The conditions are specified using the sentiment_condition_eq and summary_condition_eq variables. These conditions likely use the ConditionEquals class or a similar condition class from the sagemaker.workflow.conditions module.

if_steps=[summarize_step_2] : Specifies the steps to execute if the conditions evaluate to True. In this case, the summarize_step_2 step is included in the if_steps list. It is assumed that the summarize_step_2 step is defined elsewhere in the pipeline.

else_steps=[] : Specifies the steps to execute if the conditions evaluate to False. In this case, the else_steps list is empty, indicating that there are no steps to execute in the else branch.

depends_on=["HFSECFinBertDeployModel", "HFSECPegasusDeployModel"] : Specifies the dependencies for the step. The step depends on the completion of both the "HFSECFinBertDeployModel" and "HFSECPegasusDeployModel" steps. These steps likely correspond to the deployment of the Finbert and Pegasus models in the pipeline.

In [None]:
deploy_condition_step = ConditionStep(
    name="HFSECFinBertDeployConditionCheck",
    conditions=[
        sentiment_condition_eq,
        summary_condition_eq,
    ],  # the equal to conditions defined above
    if_steps=[
        summarize_step_2
    ],  # if the condition evaluates to true then run the summarization step
    else_steps=[],  # there are no else steps so we will keep it empty
    depends_on=[
        "HFSECFinBertDeployModel",
        "HFSECPegasusDeployModel",
    ],  # dependencies on both Finbert and Pegasus Deployment steps
)

Step 5: Summarize SEC report step


This step is to make use of the Pegasus Summarizer model endpoint to summarize the MDNA text from the SEC report. Because the MDNA text is usually large, we want to derive a short summary of the overall text to be able to determine the overall sentiment.



sentiment_processor = ScriptProcessor(...) : This line creates an instance of the ScriptProcessor class and assigns it to the sentiment_processor variable. The ScriptProcessor class is used to run a script in a specified container environment.

The parameters for the ScriptProcessor class include:

command=["python3"] : Specifies the command to execute the script. In this case, the command is set to ["python3"], indicating that the script should be executed using Python 3.

image_uri=CONTAINER_IMAGE_URI : Specifies the URI of the container image to use for running the script. The CONTAINER_IMAGE_URI variable should hold the URI of the desired container image.

role=role : Specifies the IAM role to use for the script processor. The role variable should hold the ARN of the IAM role with the necessary permissions to run the script.

instance_count=processing_instance_count : Specifies the number of instances to use for running the script. The processing_instance_count variable should hold the desired instance count.

instance_type=processing_instance_type : Specifies the type of instance to use for running the script. The processing_instance_type variable should hold the desired instance type.

In [None]:
summarize_processor = ScriptProcessor(
    command=["python3"],
    image_uri=CONTAINER_IMAGE_URI,
    role=role,
    instance_count=processing_instance_count,
    instance_type=processing_instance_type,
)

s3_client.upload_file(...) : This line uploads the sentiment analysis script file to the specified S3 location.

In [None]:
s3_client.upload_file(
    Filename="./scripts/summarize.py", Bucket=default_bucket, Key=f"{prefix}/code/summarize.py"
)

summarize_step_1 = ProcessingStep(
    name="HFSECPegasusSummarizer_1",
    processor=summarize_processor,
    inputs=[
        sagemaker.processing.ProcessingInput(
            input_name="summary_data",
            source=f"{inference_input_data}/10k10q",
            destination="/opt/ml/processing/input",
        )
    ],
    outputs=[
        sagemaker.processing.ProcessingOutput(
            output_name="summarized_data",
            source="/opt/ml/processing/output",
            destination=f"{inference_input_data}/10k10q/summary",
        )
    ],
    job_arguments=[
        "--ticker-cik",
        inference_ticker_cik,
        "--region",
        region,
        "--endpoint-name",
        summarization_endpoint_name,
    ],
    code=summarize_script_uri,
)



summarize_step_2 = ProcessingStep(
    name="HFSECPegasusSummarizer_2",
    processor=summarize_processor,
    inputs=[
        sagemaker.processing.ProcessingInput(
            input_name="summary_data",
            source=f"{inference_input_data}/10k10q",
            destination="/opt/ml/processing/input",
        )
    ],
    outputs=[
        sagemaker.processing.ProcessingOutput(
            output_name="summarized_data",
            source="/opt/ml/processing/output",
            destination=f"{inference_input_data}/10k10q/summary",
        )
    ],
    job_arguments=[
        "--ticker-cik",
        inference_ticker_cik,
        "--region",
        region,
        "--endpoint-name",
        summarization_endpoint_name,
    ],
    code=summarize_script_uri,
)

## Step 6: Sentiment inference step - SEC summary and news articles

This step uses the MDNA summary (determined by the previous step) and the news articles to find out the sentiment of the company's financial and what the Market trends are indicating. This would help us understand the overall position of the company's financial outlook and current position without leaning solely on the company's forward-looking statements and bring objective market opinions into the picture.



In [None]:
sentiment_processor = ScriptProcessor(
    command=["python3"],
    image_uri=CONTAINER_IMAGE_URI,
    role=role,
    instance_count=processing_instance_count,
    instance_type=processing_instance_type,
)

In [None]:
sentiment_script_uri = f"s3://{default_bucket}/{prefix}/code/sentiment.py"
s3_client.upload_file(
    Filename="./scripts/sentiment.py", Bucket=default_bucket, Key=f"{prefix}/code/sentiment.py"
)

sentiment_step_1 = ProcessingStep(
    name="HFSECFinBertSentiment_1",
    processor=summarize_processor,
    inputs=[
        sagemaker.processing.ProcessingInput(
            input_name="sec_summary",
            source=f"{inference_input_data}/10k10q/summary",
            destination="/opt/ml/processing/input/10k10q",
        ),
        sagemaker.processing.ProcessingInput(
            input_name="articles",
            source=f"{inference_input_data}/articles",
            destination="/opt/ml/processing/input/articles",
        ),
    ],
    outputs=[
        sagemaker.processing.ProcessingOutput(
            output_name="sentiment_data",
            source="/opt/ml/processing/output",
            destination=f"{inference_input_data}/sentiment",
        )
    ],
    job_arguments=[
        "--ticker-cik",
        inference_ticker_cik,
        "--region",
        region,
        "--endpoint-name",
        sentiment_endpoint_name,
    ],
    code=sentiment_script_uri,
    depends_on=["HFSECPegasusSummarizer_1"],
)

sentiment_step_2 = ProcessingStep(
    name="HFSECFinBertSentiment_2",
    processor=summarize_processor,
    inputs=[
        sagemaker.processing.ProcessingInput(
            input_name="sec_summary",
            source=f"{inference_input_data}/10k10q/summary",
            destination="/opt/ml/processing/input/10k10q",
        ),
        sagemaker.processing.ProcessingInput(
            input_name="articles",
            source=f"{inference_input_data}/articles",
            destination="/opt/ml/processing/input/articles",
        ),
    ],
    outputs=[
        sagemaker.processing.ProcessingOutput(
            output_name="sentiment_data",
            source="/opt/ml/processing/output",
            destination=f"{inference_input_data}/sentiment",
        )
    ],
    job_arguments=[
        "--ticker-cik",
        inference_ticker_cik,
        "--region",
        region,
        "--endpoint-name",
        sentiment_endpoint_name,
    ],
    code=sentiment_script_uri,
    depends_on=["HFSECPegasusSummarizer_2"],
)

Condition Step


As explained earlier, this is a top level condition step. This step will determine based on the value of the pipeline parameter model_register_deploy on whether we want to register and deploy a new version of the models and then run inference, or to simply run inference using the existing endpoints.



condition_eq = ConditionEquals(left=model_register_deploy, right="Y"): This line defines a condition using the ConditionEquals class. The ConditionEquals class takes two parameters:

left: Represents the left side of the equality condition. In this case, it is the model_register_deploy variable, which presumably holds a value that you want to compare.

right: Represents the right side of the equality condition. In this case, it is the string "Y", indicating the expected value for the condition to evaluate to true.

By using the ConditionEquals class, you can define an equality condition that compares two values and returns True if they are equal. This condition can be used in a ConditionStep to control the flow of execution in an Amazon SageMaker pipeline.

In [None]:

from sagemaker.workflow.conditions import ConditionEquals
from sagemaker.workflow.condition_step import ConditionStep

condition_eq = ConditionEquals(left=model_register_deploy, right="Y")

condition_step = ConditionStep(...) : This line creates a new instance of the ConditionStep class and assigns it to the condition_step variable. The ConditionStep class is used to define a step in the pipeline that includes a condition for branching the execution flow.

The parameters for the ConditionStep class include:

name="HFSECFinBertConditionCheck": Specifies the name of the condition step as "HFSECFinBertConditionCheck". This name is used to identify the step within the pipeline.

conditions=[condition_eq]: Defines the condition for the step. In this example, the condition is specified as a list containing a single condition object represented by the variable condition_eq. The condition_eq object represents an equality condition that evaluates to either true or false.

if_steps=[...]: Specifies the list of steps to be executed if the condition evaluates to true. In this example, the steps are represented by the variables create_sentiment_model_step, register_sentiment_model_step, sentiment_deploy_step, create_summary_model_step, register_summary_model_step, and summary_deploy_step. These variables hold the individual steps of the pipeline that should be executed if the condition is true.

else_steps=[summarize_step_1]: Specifies the list of steps to be executed if the condition evaluates to false. In this example, the summarize_step_1 variable represents a single step that should be executed if the condition is false.

depends_on=["HFSECFinBertCreateDataset"]: Specifies the list of steps that the condition step depends on. In this case, the condition step depends on the step named "HFSECFinBertCreateDataset". This ensures that the condition step is executed after the "HFSECFinBertCreateDataset" step.



In [None]:
# Define the condition step
condition_step = ConditionStep(
    name="HFSECFinBertConditionCheck",
    conditions=[condition_eq],  # the parameter is Y
    if_steps=[
        create_sentiment_model_step,
        register_sentiment_model_step,
        sentiment_deploy_step,
        create_summary_model_step,
        register_summary_model_step,
        summary_deploy_step,
    ],  # if the condition evaluates to true then create model, register, and deploy
    else_steps=[summarize_step_1],
    depends_on=["HFSECFinBertCreateDataset"],
)

Combine Pipeline steps and run


pipeline_name = "FinbertSECDeploymentPipeline": This line assigns the name "FinbertSECDeploymentPipeline" to the pipeline_name variable. This name will be used as the identifier for the pipeline.

pipeline = Pipeline(...) : This line creates a new instance of the Pipeline class. 

The parameters for the Pipeline class include:

name=pipeline_name: Specifies the name of the pipeline using the pipeline_name variable.

parameters=[...]: Defines a list of parameters that can be passed to the pipeline during execution. In this case, the parameters are represented by the variables processing_instance_type, processing_instance_count, model_register_deploy, inference_ticker_cik, and inference_input_data. These variables hold the values that will be passed as input parameters to the pipeline.

steps=[...]: Specifies the list of steps that make up the pipeline. In this example, the steps are represented by the variables create_dataset_step, condition_step, deploy_condition_step, sentiment_step_1, and sentiment_step_2. These variables hold the individual steps of the pipeline, which can include data processing, conditions, model deployment, or other tasks.

In [None]:
pipeline_name = "FinbertSECDeploymentPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        model_register_deploy,
        inference_ticker_cik,
        inference_input_data,
    ],
    steps=[
        create_dataset_step,
        condition_step,
        deploy_condition_step,
        sentiment_step_1,
        sentiment_step_2,
    ],
)

pipeline.upsert(role_arn=role): This line invokes the upsert() method on the pipeline object to create or update the pipeline. The role_arn parameter is used to specify the Amazon Resource Name (ARN) of the IAM role that grants necessary permissions for the pipeline. The role variable represents the ARN of the IAM role

By calling upsert() with the appropriate parameters, you define or modify the pipeline in Amazon SageMaker, allowing you to manage and orchestrate your machine learning workflows.

In [None]:
pipeline.upsert(role_arn=role)


The %%time magic command provides timing information for the overall execution

start_response = pipeline.start(): This line initiates the execution of the pipeline. The start() method is called on the pipeline object, and the response from starting the pipeline is assigned to the start_response variable. This response contains information about the pipeline execution, such as the execution ID.

start_response.wait(delay=60, max_attempts=200): This line waits for the pipeline execution to complete. It uses the wait() method of the start_response object to wait for the execution status to reach a terminal state. The delay parameter specifies the number of seconds to wait between polling for the status, and the max_attempts parameter sets the maximum number of polling attempts.

start_response.describe(): This line retrieves the description of the pipeline execution. The describe() method is called on the start_response object, and it returns information about the execution, including its status, steps, and metadata.

In [None]:
%%time
start_response = pipeline.start()
start_response.wait(delay=60, max_attempts=200)
start_response.describe()

## View Evaluation Results


Once the pipeline execution completes, we can download the evaluation data from S3 and view it.



s3_client.download_file: This line downloads a file from Amazon S3 to the local file system. It uses the download_file method of the s3_client object to perform the download.

default_bucket: The name of the S3 bucket where the file is located.

f"{prefix}/nlp-pipeline/inf-data/sentiment/{ticker}_sentiment_result.csv": The key (path) of the file within the S3 bucket. It is constructed using the prefix variable, along with additional directory structure and the name of the CSV file.

f"./data/{ticker}_sentiment_result.csv": The local file path where the downloaded file will be saved. It is constructed using the ticker variable (presumably representing a specific stock ticker) to generate the name of the CSV file.

sentiment_df = pd.read_csv(f"./data/{ticker}_sentiment_result.csv"): This line reads the downloaded CSV file into a pandas DataFrame. It uses the read_csv function from the pandas library, passing the local file path as the argument.

sentiment_df: This line outputs the sentiment_df DataFrame, displaying its contents in the output. This allows you to see the data stored in the DataFrame.

In [None]:
s3_client.download_file(
    default_bucket,
    f"{prefix}/nlp-pipeline/inf-data/sentiment/{ticker}_sentiment_result.csv",
    f"./data/{ticker}_sentiment_result.csv",
)
sentiment_df = pd.read_csv(f"./data/{ticker}_sentiment_result.csv")
sentiment_df

Clean up


Delete the SageMaker Pipeline and the SageMaker Endpoints created by the pipeline.



pipeline.delete(): This line deletes the pipeline that was created. The pipeline object is assumed to be an instance of the pipeline that needs to be deleted.

sagemaker_boto_client.delete_endpoint(EndpointName=sentiment_endpoint_name): This line deletes the sentiment analysis endpoint. The sagemaker_boto_client is an instance of the SageMaker Boto3 client that is used to interact with the SageMaker service. sentiment_endpoint_name is a variable that holds the name of the sentiment analysis endpoint to be deleted.

sagemaker_boto_client.delete_endpoint(EndpointName=summarization_endpoint_name): This line deletes the summarization endpoint. Similar to the previous line, it uses the sagemaker_boto_client to delete the endpoint with the name specified by the summarization_endpoint_name variable.

The clean_up_resources() function can be called when you want to clean up or delete the resources created during the execution of your code or when you no longer need them. 

In [None]:
def clean_up_resources():
    # Delete the pipeline
    pipeline.delete()
    # Delete the sentiment analysis endpoint
    sagemaker_boto_client.delete_endpoint(EndpointName=sentiment_endpoint_name)
    # Delete the summarization endpoint
    sagemaker_boto_client.delete_endpoint(EndpointName=summarization_endpoint_name)