In [1]:
################## Install libraries ##################

In [1]:
!pip install transformers safetensors sagemaker boto3  huggingface_hub --upgrade  --quiet

In [None]:
########### Import libraries ###########

In [35]:
import sagemaker
import boto3
sess = sagemaker.Session()
# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists

role = sagemaker.get_execution_role()
sess = sagemaker.Session()

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")

sagemaker role arn: arn:aws:iam::730335220874:role/service-role/AmazonSageMaker-ExecutionRole-20240131T123268
sagemaker bucket: sagemaker-us-east-1-730335220874
sagemaker session region: us-east-1


In [36]:
import os

print("Path at terminal when executing this file")
print(os.getcwd() + "\n")

Path at terminal when executing this file
/home/ec2-user/SageMaker/Trellis



In [37]:
####### Model packaging and config ####### 

In [39]:
!ls

00_data_preprocessing_trellis.ipynb
01_training_model_v1_trellis.ipynb
02_model_deployment_sagemaker_endpoint_Trellis_Doc.ipynb
distilbert-base-uncased-finetuned-Trellis
training-data


In [40]:
!mkdir deployment_package

In [41]:
cd deployment_package

/home/ec2-user/SageMaker/Trellis/deployment_package


In [44]:
!mkdir code

In [42]:
!cp ../distilbert-base-uncased-finetuned-Trellis/{config.json,pytorch_model.bin,special_tokens_map.json,tokenizer_config.json,vocab.txt} .


In [45]:
%%writefile code/inference.py

import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer
import json
import logging

logging.basicConfig(level=logging.INFO)

def model_fn(model_dir):
    """
    Load the model and tokenizer from the specified directory.

    Args:
        model_dir (str): Directory where the model and tokenizer are stored.

    Returns:
        tuple: A tuple containing the model, tokenizer, and device.
    """
    try:
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        model = AutoModelForSequenceClassification.from_pretrained(model_dir)
        tokenizer = AutoTokenizer.from_pretrained(model_dir)
        model.to(device)
        return model, tokenizer, device
    except Exception as e:
        logging.error(f"Error loading model or tokenizer: {e}")
        raise

def input_fn(request_body, request_content_type):
    """
    Process the input data.

    Args:
        request_body (str): The request body.
        request_content_type (str): The content type of the request.

    Returns:
        dict: The processed input data.

    Raises:
        ValueError: If the content type is unsupported.
    """
    try:
        if request_content_type == 'application/json':
            data = json.loads(request_body)
            return data
        else:
            raise ValueError(f"Unsupported content type: {request_content_type}")
    except json.JSONDecodeError as e:
        logging.error(f"Error decoding JSON: {e}")
        raise
    except Exception as e:
        logging.error(f"Error processing input data: {e}")
        raise

def predict_fn(input_data, model_tokenizer_device):
    """
    Generate predictions from the input data.

    Args:
        input_data (dict): The input data.
        model_tokenizer_device (tuple): A tuple containing the model, tokenizer, and device.

    Returns:
        str: The predicted class label.
    """
    try:
        model, tokenizer, device = model_tokenizer_device
        text = input_data['text']
        inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
        inputs = {key: value.to(device) for key, value in inputs.items()}
        model.eval()
        with torch.no_grad():
            outputs = model(**inputs)
            logits = outputs.logits
            predicted_class_id = logits.argmax().item()
            predicted_class = model.config.id2label[predicted_class_id]
        return predicted_class
    except KeyError as e:
        logging.error(f"Key error: {e}")
        raise
    except Exception as e:
        logging.error(f"Error generating prediction: {e}")
        raise

def output_fn(prediction, accept):
    """
    Format the prediction output.

    Args:
        prediction (str): The predicted class label.
        accept (str): The content type of the response.

    Returns:
        tuple: A tuple containing the response body and content type.

    Raises:
        ValueError: If the accept type is unsupported.
    """
    try:
        if accept == "application/json":
            return json.dumps({"predicted_class": prediction}), accept
        else:
            raise ValueError(f"Unsupported accept type: {accept}")
    except Exception as e:
        logging.error(f"Error formatting output: {e}")
        raise

Writing code/inference.py


In [46]:
%%writefile code/requirements.txt

transformers
torch
safetensors

Writing code/requirements.txt


In [47]:
cd ..

/home/ec2-user/SageMaker/Trellis


In [48]:
def list_directory_contents(directory: str):
    """
    List the contents of a directory.

    Args:
        directory (str): Path to the directory.

    Returns:
        None
    """
    if not os.path.isdir(directory):
        raise FileNotFoundError(f"Directory '{directory}' not found.")
    
    print(f"Contents of '{directory}':")
    for root, dirs, files in os.walk(directory):
        level = root.replace(directory, '').count(os.sep)
        indent = ' ' * 4 * (level)
        print(f"{indent}{os.path.basename(root)}/")
        subindent = ' ' * 4 * (level + 1)
        for f in files:
            print(f"{subindent}{f}")

# Define the path to the deployment package directory
deployment_package_dir = 'deployment_package/'

# List the contents of the deployment package directory
try:
    list_directory_contents(deployment_package_dir)
except Exception as e:
    print(f"Error: {e}")

Contents of 'deployment_package/':
/
    tokenizer_config.json
    config.json
    pytorch_model.bin
    special_tokens_map.json
    vocab.txt
code/
    requirements.txt
    inference.py


In [49]:
def remove_ipynb_checkpoints(directory: str):
    """
    Remove .ipynb_checkpoints directory from the specified directory.

    Args:
        directory (str): Path to the directory.

    Returns:
        None
    """
    for root, dirs, files in os.walk(directory):
        for dir_name in dirs:
            if dir_name == '.ipynb_checkpoints':
                dir_path = os.path.join(root, dir_name)
                shutil.rmtree(dir_path)
                print(f"Removed directory: {dir_path}")

# Remove .ipynb_checkpoints from the deployment package directory
try:
    remove_ipynb_checkpoints(deployment_package_dir)
except Exception as e:
    print(f"Error: {e}")

In [50]:
!pwd

/home/ec2-user/SageMaker/Trellis


In [51]:
deployment_package_dir

'deployment_package/'

In [52]:
import os
import tarfile

def create_tarball(source_dir: str, output_filename: str):
    """
    Create a tarball from the source directory.
    
    Args:
        source_dir (str): Path to the source directory.
        output_filename (str): Name of the output tarball file.
    
    Returns:
        None
    """
    if not os.path.isdir(source_dir):
        raise FileNotFoundError(f"Source directory '{source_dir}' not found.")
    
    with tarfile.open(output_filename, "w:gz") as tar:
        for item in os.listdir(source_dir):
            item_path = os.path.join(source_dir, item)
            tar.add(item_path, arcname=item)
    
    print(f"Tarball '{output_filename}' created successfully.")

# Define the deployment package directory
deployment_package_dir = 'deployment_package'

# Define the output tarball filename
output_tarball = 'model.tar.gz'

# Create the tarball
try:
    create_tarball(deployment_package_dir, output_tarball)
    
    # Print the contents of the tarball
    print("Contents of the tarball:")
    with tarfile.open(output_tarball, "r:gz") as tar:
        for member in tar.getmembers():
            print(member.name)
    
except Exception as e:
    print(f"Error: {e}")

Tarball 'model.tar.gz' created successfully.
Contents of the tarball:
code
code/inference.py
code/requirements.txt
tokenizer_config.json
config.json
pytorch_model.bin
special_tokens_map.json
vocab.txt


In [16]:
#### Upload the trellis model.tar.gz file to an S3 bucket for model deployment #### 

In [53]:
def upload_to_s3(file_path: str, bucket_name: str, s3_key: str):
    """
    Upload a file to an S3 bucket.

    Args:
        file_path (str): Path to the file to upload.
        bucket_name (str): Name of the S3 bucket.
        s3_key (str): S3 key for the uploaded file.

    Returns:
        None
    """
    s3 = boto3.client('s3')
    try:
        s3.upload_file(file_path, bucket_name, s3_key)
        print(f"File '{file_path}' uploaded to 's3://{bucket_name}/{s3_key}' successfully.")
    except Exception as e:
        print(f"Error uploading file to S3: {e}")

# Define the S3 bucket name and key
bucket_name = 'sagemaker-us-east-1-730335220874'
s3_key = 'distilbert-base-uncased-finetuned-trellis/model/model.tar.gz'

# Upload the tarball to S3
try:
    upload_to_s3(output_tarball, bucket_name, s3_key)
except Exception as e:
    print(f"Error: {e}")

File 'model.tar.gz' uploaded to 's3://sagemaker-us-east-1-730335220874/distilbert-base-uncased-finetuned-trellis/model/model.tar.gz' successfully.


In [None]:
##### Deploy the Model

In [54]:
from sagemaker.huggingface import HuggingFaceModel

def deploy_huggingface_model(s3_model_path: str, role: str, instance_type: str = 'ml.g4dn.2xlarge', 
                             instance_count: int = 1) -> sagemaker.predictor.Predictor:
    """
    Deploy the Hugging Face model to SageMaker.

    Args:
        s3_model_path (str): S3 URI to the model tar.gz file.
        role (str): Execution role for SageMaker.
        instance_type (str, optional): Type of instance to deploy the model. Defaults to 'ml.g4dn.2xlarge'.
        instance_count (int, optional): Number of instances for the deployment. Defaults to 1.

    Returns:
        sagemaker.predictor.Predictor: The deployed model predictor.

    Raises:
        ValueError: If the specified instance type is not supported or if the instance count is less than 1.
    """
    # Validate instance type and count
    if instance_type not in ['ml.g4dn.2xlarge', 'ml.g4dn.4xlarge', 'ml.g4dn.8xlarge']:
        raise ValueError(f"Unsupported instance type: {instance_type}. Supported types: 'ml.g4dn.2xlarge','ml.g4dn.4xlarge', 'ml.g4dn.8xlarge'")
    if instance_count < 1:
        raise ValueError(f"Invalid instance count: {instance_count}. Instance count must be at least 1.")

    # Create a SageMaker Hugging Face Model
    huggingface_model = HuggingFaceModel(
        model_data=s3_model_path,
        name='distilbert-hf-emails-trellis-document-class',
        role=role,
        transformers_version="4.26",
        pytorch_version="1.13",
        py_version='py39'
    )

    # Deploy the model
    predictor = huggingface_model.deploy(
        endpoint_name='distilbert-hf-emails-trellis-document-class',
        endpoint_config_name='distilbert-hf-emails-trellis-document-class',
        initial_instance_count=instance_count,
        instance_type=instance_type,
    )

    return predictor


# Define the S3 path to our Trellis Doc Class model
s3_model_path = 's3://sagemaker-us-east-1-730335220874/distilbert-base-uncased-finetuned-trellis/model/model.tar.gz'

# Get the SageMaker execution role
role = sagemaker.get_execution_role()

# Deploy the model
predictor = deploy_huggingface_model(s3_model_path, role, instance_type='ml.g4dn.2xlarge', instance_count=1)

INFO:sagemaker:Creating model with name: distilbert-hf-emails-trellis-document-class
INFO:sagemaker:Creating endpoint-config with name distilbert-hf-emails-trellis-document-class
INFO:sagemaker:Creating endpoint with name distilbert-hf-emails-trellis-document-class


---------!

In [None]:
##### Test inference  'distilbert-hf-emails-trellis-document-class' endpoint ##### 

In [55]:
ent = """Sundance to honour foreign films

International films will be given the same prominence as US films at next year's Sundance Film Festival, with movies dominated by the theme of war.

The independent film festival will feature two new international cinema competitions, during its 20-30 January season in Utah. Forty-two films will debut at Sundance, including The Liberace of Baghdad by British director Sean McAllister. The prestigious festival was founded by actor Robert Redford in 1981.

"We have always had an international component, but from next year they will enter a jury competition," festival director Geoffrey Gilmore said. "We wanted to give world cinema more emphasis and have now put it on par with the American dramatic and documentary competitions." Twelve films competing in the new world cinema documentary category focus on countries and people under siege.

The Liberace of Baghdad features an Iraqi pianist hiding in a hotel as he waits for a visa, while Finnish film The Three Rooms of Melancholia looks at the war in Chechnya. Shake Hands With The Devil: The Journey of Romeo Dallaire tells of a UN mission to Rwanda during the 1994 genocide, while French-Israeli production Wall looks at Israel's controversial security wall separating it from the Palestinian territories. The 16 films competing in the new world cinema dramatic category include works from Germany, South Korea, Angola, China, Denmark and Australia.

Several Hollywood stars feature in the festival's American independent drama category, including Keanu Reeves and Benjamin Bratt. Vince Vaughn stars in quirky movie Thumbsucker while 21 Grams actress Naomi Watts plays a budding Hollywood actress in Ellie Parker. The top Grand Jury prize at this year's festival went to low budget sci-fi thriller Primer, written and directed by Shane Carruth. Morgan Spurlock earned the directing award for Super Size Me, which became an international box office hit.""".strip()

In [56]:
import re
from html import unescape
import logging
import json
import boto3

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def clean_text_content_trellis(text_content: str) -> str:
    """
    Cleans text content of document text by performing several operations:
    - Normalizes line breaks to Unix-style.
    - Removes excessive whitespace within lines.
    - Strips HTML tags and decodes HTML entities.
    - Normalizes paragraph breaks to ensure readability.

    Args:
        text_content (str): The raw text content to be cleaned.

    Returns:
        str: The cleaned text content.
    """
    try:
        # Normalize line breaks to Unix-style
        text_content = re.sub(r'\r\n|\r', '\n', text_content)
        # Remove excessive whitespace within lines
        text_content = re.sub(r'\s+', ' ', text_content).strip()
        # Strip HTML tags and decode HTML entities
        text_content = re.sub(r'<[^>]+>', '', text_content, flags=re.DOTALL)
        text_content = unescape(text_content)
        # Normalize paragraph breaks to ensure readability
        text_content = re.sub(r'\n{3,}', '\n\n', text_content)
        return text_content
    except Exception as e:
        logger.error(f"An error occurred while cleaning the text content: {e}")
        return ""

def invoke_sagemaker_endpoint(endpoint_name: str, text: str) -> dict:
    """
    Preprocess the input text and invoke the SageMaker endpoint.

    Args:
        endpoint_name (str): The name of the SageMaker endpoint.
        text (str): The input text to process and send to the model.

    Returns:
        dict: The response from the SageMaker endpoint.

    Raises:
        Exception: Any exception caught during processing or invoking the endpoint.
    """
    try:
        # Preprocess the text using clean_text_content_trellis
        logger.info("Preprocessing the input text.")
        processed_text = clean_text_content_trellis(text)

        # Prepare the payload for SageMaker endpoint
        payload = json.dumps({"text": processed_text})
        logger.info(f"Payload prepared: {payload}")

        # Create a runtime client
        runtime = boto3.client('runtime.sagemaker')

        # Invoke the endpoint
        logger.info(f"Invoking the SageMaker endpoint: {endpoint_name}")
        response = runtime.invoke_endpoint(
            EndpointName=endpoint_name,
            ContentType='application/json',
            Body=payload
        )

        # Parse and return the response
        response_body = response['Body'].read().decode()
        logger.info(f"Raw response from SageMaker endpoint: {response_body}")

        # Handle the case where the response is a list
        if isinstance(response_body, str):
            response_body = json.loads(response_body)
        elif isinstance(response_body, list):
            response_body = json.loads(response_body[0])

        logger.info(f"Parsed response from SageMaker endpoint: {response_body}")
        return response_body

    except Exception as e:
        logger.error(f"An error occurred: {str(e)}")
        raise

#  call invoke endpoint
if __name__ == "__main__":
    endpoint_name = 'distilbert-hf-emails-trellis-document-class'  # The name of the trellis deployed endpoint

    # Invoke the endpoint
    try:
        response = invoke_sagemaker_endpoint(endpoint_name, ent)
        print("Response from SageMaker endpoint:", response)
        # Access the actual JSON
        if isinstance(response, list):
            response = json.loads(response[0])
        predicted_class = response.get("predicted_class")
        print("Predicted class:", predicted_class)
    except Exception as e:
        print("Error invoking the endpoint:", str(e))

INFO:__main__:Preprocessing the input text.
INFO:__main__:Payload prepared: {"text": "Sundance to honour foreign films International films will be given the same prominence as US films at next year's Sundance Film Festival, with movies dominated by the theme of war. The independent film festival will feature two new international cinema competitions, during its 20-30 January season in Utah. Forty-two films will debut at Sundance, including The Liberace of Baghdad by British director Sean McAllister. The prestigious festival was founded by actor Robert Redford in 1981. \"We have always had an international component, but from next year they will enter a jury competition,\" festival director Geoffrey Gilmore said. \"We wanted to give world cinema more emphasis and have now put it on par with the American dramatic and documentary competitions.\" Twelve films competing in the new world cinema documentary category focus on countries and people under siege. The Liberace of Baghdad features a

Response from SageMaker endpoint: ['{"predicted_class": "entertainment"}', 'application/json']
Predicted class: entertainment


In [57]:
print("Predicted class:", predicted_class)

Predicted class: entertainment


In [58]:
##### Test latency #####

In [64]:
import time 

def test_inference_time(endpoint_name, input_text):
    """
    Test the inference time of the SageMaker endpoint.

    Args:
        endpoint_name (str): The name of the SageMaker endpoint.
        input_text (str): The input text to be classified.

    Returns:
        tuple: A tuple containing the response dictionary and the inference time in seconds of our Trellis Endpoint.
    """
    try:
        start_time = time.time()
        response = invoke_sagemaker_endpoint(endpoint_name, input_text)
        end_time = time.time()
        inference_time = end_time - start_time
        
        # Parse the JSON from the first element of the response list
        if isinstance(response, list):
            response = json.loads(response[0])

        return response, inference_time
    except Exception as e:
        logging.error(f"Error testing inference time: {e}")
        raise

In [65]:
input_news = """
Grilled Shrimp Tacos with Avocado-Corn Salsa
instruction 
Remove the corn kernels from the cobs: Place a large container on a damp towel. Fold a paper towel into fourths and place it inside the container. Stand 1 ear of corn on the paper towel, using the stem as a handle. Using a paring knife, slice downward, letting the kernels fall into the container. Rotate the cob and continue until all the kernels have been removed; discard the cob. Repeat with the remaining corn. Discard the paper towel.
Add the scallions, tomatoes, measured lime juice, cilantro, serrano, and measured salt and stir to combine.
Halve and pit the avocados. Using a paring knife, score the flesh of the avocado halves in a 1/4-inch-wide crosshatch pattern (be careful not to cut through the skin). Using a spoon, scoop the avocado pieces into the corn mixture and gently fold to combine.
Taste and add more lime juice or salt as needed; set aside.
For the tacos:
Heat an outdoor grill to high (about 450°F to 550°F). Meanwhile, assemble the shrimp.
Whisk the lime juice, oil, chipotle powder, salt, and cumin together in a large bowl. Add the shrimp and toss to combine.
Skewer each shrimp through the tail and head ends, leaving about 1/4 inch of space between each shrimp. Transfer the skewers to a baking sheet.
Place the skewers in a single layer on the grill without touching. Close the grill and cook until grill marks appear on the bottom, about 4 minutes. Flip the skewers, close the grill, and cook until the shrimp are just firm, about 1 minute more. Transfer the skewers to a clean baking sheet.
Remove and discard the skewers, transfer the shrimp to a cutting board, and coarsely chop. Place in a serving bowl.
Serve the shrimp with the tortillas and salsa.
""".strip()

# Invoke the endpoint and test 2 inference time
try:
    response, inference_time = test_inference_time(endpoint_name, input_news)
    print("Response from SageMaker endpoint:", response)
    predicted_class = response.get("predicted_class")
    print("Predicted class:", predicted_class)
    print(f"Inference time: {inference_time:.4f} seconds")
except Exception as e:
    print("Error invoking the endpoint:", str(e))



INFO:__main__:Preprocessing the input text.
INFO:__main__:Payload prepared: {"text": "Grilled Shrimp Tacos with Avocado-Corn Salsa instruction Remove the corn kernels from the cobs: Place a large container on a damp towel. Fold a paper towel into fourths and place it inside the container. Stand 1 ear of corn on the paper towel, using the stem as a handle. Using a paring knife, slice downward, letting the kernels fall into the container. Rotate the cob and continue until all the kernels have been removed; discard the cob. Repeat with the remaining corn. Discard the paper towel. Add the scallions, tomatoes, measured lime juice, cilantro, serrano, and measured salt and stir to combine. Halve and pit the avocados. Using a paring knife, score the flesh of the avocado halves in a 1/4-inch-wide crosshatch pattern (be careful not to cut through the skin). Using a spoon, scoop the avocado pieces into the corn mixture and gently fold to combine. Taste and add more lime juice or salt as needed; s

Response from SageMaker endpoint: {'predicted_class': 'food'}
Predicted class: food
Inference time: 0.0808 seconds


In [74]:
input_news = """
Talks held on Gibraltar's future

Two days of talks on the future of Gibraltar begin at Jack Straw's country residence later on Wednesday.

Officials at the two-day summit at the foreign secretary's official Kent house, Chevening, will plan a new forum on the Rock's future. In October, Mr Straw and his Spanish counterpart Miguel Moratinos agreed to establish a body that would give Gibraltarians a voice in their future. Most Gibraltarians said in a referendum they wanted to remain British.

Gibraltar's Chief Minister Peter Caruana will represent the British citizens living on the Rock, while Britain's Europe Director Dominick Chilcott will represent the UK. Madrid is being represented by Spain's director general for Europe, Jose Maria Pons. The initiative follows Spain's socialist government's decision to put its long-standing sovereignty ambitions on hold. Gibraltarians rejected plans for the Rock's sovereignty to be shared between Britain and Spain in a referendum organised by Gibraltar government.

""".strip()

# Invoke the endpoint and test 2 inference time
try:
    response, inference_time = test_inference_time(endpoint_name, input_news)
    print("Response from SageMaker endpoint:", response)
    predicted_class = response.get("predicted_class")
    print("Predicted class:", predicted_class)
    print(f"Inference time: {inference_time:.4f} seconds")
except Exception as e:
    print("Error invoking the endpoint:", str(e))

INFO:__main__:Preprocessing the input text.
INFO:__main__:Payload prepared: {"text": "Talks held on Gibraltar's future Two days of talks on the future of Gibraltar begin at Jack Straw's country residence later on Wednesday. Officials at the two-day summit at the foreign secretary's official Kent house, Chevening, will plan a new forum on the Rock's future. In October, Mr Straw and his Spanish counterpart Miguel Moratinos agreed to establish a body that would give Gibraltarians a voice in their future. Most Gibraltarians said in a referendum they wanted to remain British. Gibraltar's Chief Minister Peter Caruana will represent the British citizens living on the Rock, while Britain's Europe Director Dominick Chilcott will represent the UK. Madrid is being represented by Spain's director general for Europe, Jose Maria Pons. The initiative follows Spain's socialist government's decision to put its long-standing sovereignty ambitions on hold. Gibraltarians rejected plans for the Rock's sove

Response from SageMaker endpoint: {'predicted_class': 'politics'}
Predicted class: politics
Inference time: 0.0736 seconds
