In [116]:
%pip install "sagemaker==2.159.0"

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m23.1.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [117]:
%%writefile recommender.py


from __future__ import absolute_import
import os
import importlib
import logging
import numpy as np
from io import StringIO
# import chardet

import sagemaker_sklearn_container.exceptions as exc
from sagemaker_containers.beta.framework import (
    content_types, encoders, env, modules, transformer, worker, server)
from sagemaker_sklearn_container.serving_mms import start_model_server

logging.basicConfig(format='%(asctime)s %(levelname)s - %(name)s - %(message)s', level=logging.INFO)

logging.getLogger('boto3').setLevel(logging.INFO)
logging.getLogger('s3transfer').setLevel(logging.INFO)
logging.getLogger('botocore').setLevel(logging.WARN)

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
import boto3
import json

def recommend_songs(song_ids):
    # Read the CSV file from the S3 bucket
    song_ids = set(song_ids)
    
    s3 = boto3.client('s3')
    # s3_file = s3fs.S3FileSystem(anon=False)
    # df = pd.read_csv(s3_file.open('s3://spotify-genius-featurestore/spotify-reco-db/actual-database.csv'))
    
    existing_csv_obj = s3.get_object(Bucket='spotify-genius-featurestore', Key='spotify-reco-db/actual-database.csv')
    existing_csv_data = existing_csv_obj['Body'].read().decode('utf-8')
    df = pd.read_csv(StringIO(existing_csv_data))

    # Calculate cosine similarity between song features
    similarity_matrix = cosine_similarity(df.drop(['song_id', 'name', 'artist'], axis=1))

    # Get the indices of the songs in the incoming list
    song_indices = df[df['song_id'].isin(song_ids)].index.tolist()

    # Get the most similar songs
    similar_songs = []
    for idx in song_indices:
        # Get the index of the most similar song
        similar_song_idx = similarity_matrix[idx].argsort()[-2]  # -1 would be the song itself
        # Get the song id of the most similar song
        similar_song_id = df.iloc[similar_song_idx]['song_id']
        similar_songs.append(similar_song_id)
        
    recommended_song_ids = [song_id for song_id in similar_songs if song_id not in song_ids]

    return recommended_song_ids



def model_fn(model_dir):
    return None

def transform_fn(model, request_body, content_type, accept_type):
    
    # encoding = chardet.detect(request_body)['encoding']
    # decoded_request_body = request_body.decode(encoding)
    # song_ids = json.loads(decoded_request_body) # Load the list of song IDs from the request body
    song_ids = json.loads(request_body)  # Load the list of song IDs from the request body
    print(song_ids)
    recommended_song_ids = recommend_songs(song_ids)  # Generate the list of recommended song IDs
    print(recommended_song_ids)
    return json.dumps(recommended_song_ids)  # Return the list as a JSON string

# Copyright 2019-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the 'License'). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
#     http://aws.amazon.com/apache2.0/
#
# or in the 'license' file accompanying this file. This file is
# distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.



def is_multi_model():
    return os.environ.get('SAGEMAKER_MULTI_MODEL')


def default_model_fn(model_dir):
    """Loads a model. For Scikit-learn, a default function to load a model is not provided.
    Users should provide customized model_fn() in script.
    Args:
        model_dir: a directory where model is saved.
    Returns: A Scikit-learn model.
    """
    return transformer.default_model_fn(model_dir)


def default_input_fn(input_data, content_type):
    """Takes request data and de-serializes the data into an object for prediction.
        When an InvokeEndpoint operation is made against an Endpoint running SageMaker model server,
        the model server receives two pieces of information:
            - The request Content-Type, for example "application/json"
            - The request data, which is at most 5 MB (5 * 1024 * 1024 bytes) in size.
        The input_fn is responsible to take the request data and pre-process it before prediction.
    Args:
        input_data (obj): the request data.
        content_type (str): the request Content-Type.
    Returns:
        (obj): data ready for prediction.
    """
    np_array = encoders.decode(input_data, content_type)
    return np_array.astype(np.float32) if content_type in content_types.UTF8_TYPES else np_array


def default_predict_fn(input_data, model):
    """A default predict_fn for Scikit-learn. Calls a model on data deserialized in input_fn.
    Args:
        input_data: input data (Numpy array) for prediction deserialized by input_fn
        model: Scikit-learn model loaded in memory by model_fn
    Returns: a prediction
    """
    output = model.predict(input_data)
    return output


def default_output_fn(prediction, accept):
    """Function responsible to serialize the prediction for the response.
    Args:
        prediction (obj): prediction returned by predict_fn .
        accept (str): accept content-type expected by the client.
    Returns:
        (worker.Response): a Flask response object with the following args:
            * Args:
                response: the serialized data to return
                accept: the content-type that the data was transformed to.
    """
    return worker.Response(encoders.encode(prediction, accept), accept, mimetype=accept)


def _user_module_transformer(user_module):
    model_fn = getattr(user_module, "model_fn", default_model_fn)
    input_fn = getattr(user_module, "input_fn", None)
    predict_fn = getattr(user_module, "predict_fn", None)
    output_fn = getattr(user_module, "output_fn", None)
    transform_fn = getattr(user_module, "transform_fn", None)

    if transform_fn and (input_fn or predict_fn or output_fn):
        raise exc.UserError("Cannot use transform_fn implementation with input_fn, predict_fn, and/or output_fn")

    if transform_fn is not None:
        return transformer.Transformer(model_fn=model_fn, transform_fn=transform_fn)
    else:
        return transformer.Transformer(
            model_fn=model_fn,
            input_fn=input_fn or default_input_fn,
            predict_fn=predict_fn or default_predict_fn,
            output_fn=output_fn or default_output_fn,
        )


def _user_module_execution_parameters_fn(user_module):
    return getattr(user_module, 'execution_parameters_fn', None)


def import_module(module_name, module_dir):

    try:  # if module_name already exists, use the existing one
        user_module = importlib.import_module(module_name)
    except ImportError:  # if the module has not been loaded, 'modules' downloads and installs it.
        user_module = modules.import_module(module_dir, module_name)
    except Exception:  # this shouldn't happen
        logger.info("Encountered an unexpected error.")
        raise

    user_module_transformer = _user_module_transformer(user_module)
    user_module_transformer.initialize()

    return user_module_transformer, _user_module_execution_parameters_fn(user_module)


app = None


def main(environ, start_response):
    global app

    if app is None:
        serving_env = env.ServingEnv()

        user_module_transformer, execution_parameters_fn = import_module(serving_env.module_name,
                                                                         serving_env.module_dir)

        app = worker.Worker(transform_fn=user_module_transformer.transform,
                            module_name=serving_env.module_name,
                            execution_parameters_fn=execution_parameters_fn)

    return app(environ, start_response)


def serving_entrypoint():
    """Start Inference Server.

    NOTE: If the inference server is multi-model, MxNet Model Server will be used as the base server. Otherwise,
        GUnicorn is used as the base server.
    """
    if is_multi_model():
        start_model_server()
    else:
        server.start(env.ServingEnv().framework_module)


Overwriting recommender.py


In [118]:
from sagemaker.sklearn.model import SKLearnModel
from sagemaker import get_execution_role

model = SKLearnModel(
    model_data=None,  # No actual model to load
    role=get_execution_role(),
    entry_point='recommender.py',
    framework_version='1.2-1'
)


In [119]:
import time

# endpoint_name = "cosine-endpoint-" + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
endpoint_name = 'cosine-endpoint-2023-06-21-15-23-4'
print("EndpointName= {}".format(endpoint_name))

model.deploy(
    instance_type='ml.m5.large',
    initial_instance_count=1,
    endpoint_name=endpoint_name,
)


EndpointName= cosine-endpoint-2023-06-21-15-23-4
----!

<sagemaker.sklearn.model.SKLearnPredictor at 0x7f588d8d9330>

In [120]:
from sagemaker.predictor import Predictor

predictor = Predictor(endpoint_name=endpoint_name)

In [121]:
# song_ids = ['0l6zBPLCxPmV4M4Wg6Ospl', '1mrgfC8PEW7N4KY7hbN0Tb', '26Vcf2qFrnaa0pFJkGFh61']
# recommended_song_ids = predictor.predict(song_ids)

In [122]:
import requests
import json

song_ids = ['0NZjt6ZlCVIZarakXF2WkQ', '0dHM31MLd2F5jzNPDMHOpN', '0fNY6p1S1PCwpnK9hlP4qz']
response = predictor.predict(json.dumps(song_ids), initial_args={"ContentType": "text/csv"}).decode("utf-8")
# response = requests.post(predictor.endpoint, data=json.dumps(song_ids))
recommended_song_ids = json.loads(response)

In [123]:
print(recommended_song_ids)
print(type(recommended_song_ids))

['0a7BloCiNzLDD9qSQHh5m7', '3zwMVvkBe2qIKDObWgXw4N', '0qfZ778fbXXCtyEzyIET5K']
<class 'list'>
