### 1. 安装HuggingFace 并下载模型到本地(local test)

In [None]:
!pip install huggingface-hub -Uqq
!pip install -Uqq sagemaker

In [1]:
import boto3
import sagemaker
from sagemaker import Model, image_uris, serializers, deserializers

role = sagemaker.get_execution_role()  # execution role for the endpoint
sess = sagemaker.session.Session()  # sagemaker session for interacting with different AWS APIs
region = sess._region_name  # region name of the current SageMaker Studio environment
account_id = sess.account_id()  # account_id of the current SageMaker Studio environment
bucket = sess.default_bucket()
image="embedding-inference"
s3_client = boto3.client("s3")
sm_client = boto3.client("sagemaker")
smr_client = boto3.client("sagemaker-runtime")



sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


In [None]:
!pip install -U sagemaker

In [None]:
from pyannote.audio import Pipeline


In [None]:
from huggingface_hub import snapshot_download
from pathlib import Path

local_model_path = Path("./whisperX")
local_model_path.mkdir(exist_ok=True)
model_name = "pyannote/speaker-diarization-3.1"
snapshot_download(repo_id=model_name, cache_dir=local_model_path)

## 2. 模型部署准备（entrypoint脚本，容器镜像，服务配置）

* 打包镜像

In [2]:
!aws ecr get-login-password --region us-west-2 | docker login --username AWS --password-stdin 763104351884.dkr.ecr.us-west-2.amazonaws.com

https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded


In [3]:
%%writefile Dockerfile.inference
## You should change below region code to the region you used, here sample is use us-west-2
From 763104351884.dkr.ecr.us-east-1.amazonaws.com/huggingface-pytorch-inference:2.1.0-transformers4.37.0-gpu-py310-cu118-ubuntu20.04 

ENV LANG=C.UTF-8
ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE
## Make all local GPUs visible
ENV NVIDIA_VISIBLE_DEVICES="all"

RUN pip install transformers==4.25.1
RUN pip install openai-whisper
RUN pip install boto3
RUN pip install PyAnnote.audio
RUN pip install soundfile
RUN pip install librosa
RUN pip install onnxruntime
RUN pip install wget
RUN pip install pandas
RUN pip install sagemaker_ssh_helper
RUN pip install "huggingface_hub<0.26.0"
RUN pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

Overwriting Dockerfile.inference


In [4]:
repo_name = "sagemaker-whisper-pyannote-inference-demo"

In [None]:
%%script env repo_name=$repo_name bash

#!/usr/bin/env bash

# This script shows how to build the Docker image and push it to ECR to be ready for use
# by SageMaker.

# The argument to this script is the image name. This will be used as the image on the local
# machine and combined with the account and region to form the repository name for ECR.
# The name of our algorithm
algorithm_name=${repo_name}

account=$(aws sts get-caller-identity --query Account --output text)

# Get the region defined in the current configuration (default to us-west-2 if none defined)
region=$(aws configure get region)
region=${region:-us-west-2}

fullname="${account}.dkr.ecr.${region}.amazonaws.com/${algorithm_name}:latest"

# If the repository doesn't exist in ECR, create it.
aws ecr describe-repositories --repository-names "${algorithm_name}" > /dev/null 2>&1

if [ $? -ne 0 ]
then
    aws ecr create-repository --repository-name "${algorithm_name}" > /dev/null
fi

# Get the login command from ECR and execute it directly
aws ecr get-login-password --region ${region}|docker login --username AWS --password-stdin ${fullname}

# Build the docker image locally with the image name and then push it to ECR
# with the full name.

docker build -t ${algorithm_name} -f Dockerfile.inference .
docker tag ${algorithm_name} ${fullname}

docker push ${fullname}


In [6]:
inference_image_uri = "{}.dkr.ecr.{}.amazonaws.com/{}:latest".format(account_id, region, repo_name)
inference_image_uri

'687912291502.dkr.ecr.us-west-2.amazonaws.com/sagemaker-whisper-pyannote-inference-demo:latest'

* 打包部署脚本

In [7]:
!rm -rf model/code
!mkdir -p model/code

In [None]:
%%writefile model/code/inference.py

from pyannote.audio import Pipeline
import subprocess
import boto3
from urllib.parse import urlparse
import pandas as pd
from io import StringIO
import os
import torch
import sys
import time
from whisper.audio import SAMPLE_RATE
import whisper

sys.path.append(os.path.join(os.path.dirname(__file__), "lib"))
import sagemaker_ssh_helper
sagemaker_ssh_helper.setup_and_start_ssh()


device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'--device={device}')

def model_fn(model_dir):
    # Load the model from the specified model directory
    model = Pipeline.from_pretrained(
        "PyAnnote/speaker-diarization-3.1",
        use_auth_token="*******")
    return model 

def diarization_from_s3(model, s3_file, language=None):
    s3 = boto3.client("s3")
    o = urlparse(s3_file, allow_fragments=False)
    bucket = o.netloc
    key = o.path.lstrip("/")
    # 生成时间戳
    timestamp = int(time.time())
    tmp_file = f"/tmp/{timestamp}_tmp.wav"
    
    s3.download_file(bucket, key, tmp_file)
    result = model(tmp_file)
    data = {} 
    for turn, _, speaker in result.itertracks(yield_label=True):
        print("turn",turn,"speaker",speaker)
        data[turn] = (turn.start, turn.end, speaker)
    data_df = pd.DataFrame(data.values(), columns=["start", "end", "text","speaker"])
    print(data_df.shape)
    result = data_df.to_json(orient="split")
    return result


def predict_fn(data, model):
    s3_file = data.pop("s3_file")
    language = data.pop("language", None)
    result = diarization_from_s3(model, s3_file, language)
    return {
        "diarization_from_s3": result
    }

Writing model/code/inference.py


In [None]:
%%writefile model/code/inference_v2.py
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), "lib"))
import sagemaker_ssh_helper
sagemaker_ssh_helper.setup_and_start_ssh()

from pyannote.audio import Pipeline
import subprocess
import boto3
from urllib.parse import urlparse
import pandas as pd
from io import StringIO
import os
import torch
import sys
import time
from whisper.audio import SAMPLE_RATE
import whisper




device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'--device={device}')
_whisper_model = None


def load_whisper_model(model_name="large"):
    global _whisper_model
    if _whisper_model is None:
        _whisper_model = whisper.load_model(model_name)
    return _whisper_model

def model_fn(model_dir):
    # Load the model from the specified model directory
    model = Pipeline.from_pretrained(
        "PyAnnote/speaker-diarization-3.1",
        use_auth_token="*******")
    
    return model 

def diarization_from_s3(model, s3_file, language=None):
    s3 = boto3.client("s3")
    o = urlparse(s3_file, allow_fragments=False)
    bucket = o.netloc
    key = o.path.lstrip("/")
    # 生成时间戳
    timestamp = int(time.time())
    tmp_file = f"/tmp/{timestamp}_tmp.wav"
    
    s3.download_file(bucket, key, tmp_file)
    
    ## 先转录
    whisper_model = load_whisper_model("large")  # 使用缓存的模型
    result = whisper_model.transcribe(tmp_file, language=language, fp16=torch.cuda.is_available())
    # 提取转录文本并按时间段分段
    segments = result["segments"]
    
    ## 再diarization speaker
    diarization_result = model(tmp_file)
    data = {} 
    for turn, _, speaker in diarization_result.itertracks(yield_label=True):
        # 找到对应时间段的转录文本
        transcript = ""
        for segment in segments:
            if turn.start <= segment["start"] < turn.end or turn.start < segment["end"] <= turn.end:
                transcript += segment["text"] + " "
        data[turn] = (turn.start, turn.end,transcript.strip(), speaker)
    data_df = pd.DataFrame(data.values(), columns=["start", "end", "text","speaker"])
    print(data_df)
    result = data_df.to_json(orient="records",force_ascii=False)
    return result


def predict_fn(data, model):
    s3_file = data.pop("s3_file")
    language = data.pop("language", None)
    result = diarization_from_s3(model, s3_file, language)
    return {
        "diarization_from_s3": result
    }


Writing model/code/inference_v2.py


In [10]:
%%writefile model/code/requirements.txt
transformers==4.25.1
openai-whisper
boto3
PyAnnote.audio
soundfile
librosa
onnxruntime
wget
pandas
sagemaker_ssh_helper
huggingface_hub<0.26.0

Writing model/code/requirements.txt


In [11]:
s3_code_prefix = "whisper/whisper_deploy_code"
!rm whisper_model.tar.gz
!cd model && rm -rf ".ipynb_checkpoints"
!cd model && rm -rf "code/requirements.txt"
!tar czvf whisper_model.tar.gz model

model/
model/code/
model/code/inference_v2.py
model/code/inference.py


In [12]:
s3_code_artifact = sess.upload_data("whisper_model.tar.gz", bucket, s3_code_prefix)
print(f"S3 Code or Model tar ball uploaded to --- > {s3_code_artifact}")

S3 Code or Model tar ball uploaded to --- > s3://sagemaker-us-west-2-687912291502/whisper/whisper_deploy_code/whisper_model.tar.gz


### 3. 创建模型 & 创建endpoint

In [13]:
from sagemaker.huggingface.model import HuggingFaceModel
from sagemaker.async_inference.async_inference_config import AsyncInferenceConfig
from sagemaker.s3 import s3_path_join
from sagemaker.utils import name_from_base
from sagemaker_ssh_helper.wrapper import SSHModelWrapper

async_endpoint_name = name_from_base("whisper-asyc")
#full_image_uri = "763104351884.dkr.ecr.us-west-2.amazonaws.com/huggingface-pytorch-inference:2.1.0-transformers4.37.0-gpu-py310-cu118-ubuntu20.04"

# create Hugging Face Model Class
huggingface_model = HuggingFaceModel(
    image_uri=inference_image_uri,
    entry_point="inference_v2.py",
    source_dir="./model/code/",
    model_data=s3_code_artifact,  # path to your model and script
    role=role,  # iam role with permissions to create an Endpoint
    #transformers_version="4.17",  # transformers version used
    #pytorch_version="1.10",  # pytorch version used
    #py_version="py38",  # python version used
)

ssh_wrapper = SSHModelWrapper.create(huggingface_model, connection_wait_time_seconds=0) 

# create async endpoint configuration
async_config = AsyncInferenceConfig(
    output_path=s3_path_join(
        "s3://", bucket, "whisper/async_inference/output"
    ),  # Where our results will be stored
    # Add nofitication SNS if needed
    notification_config={
        # "SuccessTopic": "PUT YOUR SUCCESS SNS TOPIC ARN",
        # "ErrorTopic": "PUT YOUR ERROR SNS TOPIC ARN",
    },  #  Notification configuration
)

env = {"MODEL_SERVER_WORKERS": "2",
       "HF_TASK":"automatic-speech-recognition"}

# deploy the endpoint endpoint
async_predictor = huggingface_model.deploy(
    initial_instance_count=1,
    instance_type="ml.g5.xlarge",
    async_inference_config=async_config,
    endpoint_name=async_endpoint_name,
    env=env,
)

---------------!

In [14]:
instance_ids = ssh_wrapper.get_instance_ids(timeout_in_sec=100)

### 4.扩缩

In [22]:
# application-autoscaling client
endpointName=async_endpoint_name
asg_client = boto3.client("application-autoscaling")

# This is the format in which application autoscaling references the endpoint
resource_id = f"endpoint/{endpointName}/variant/AllTraffic"

# Configure Autoscaling on asynchronous endpoint down to zero instances
response = asg_client.register_scalable_target(
    ServiceNamespace="sagemaker",
    ResourceId=resource_id,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    MinCapacity=1,
    MaxCapacity=5,
)

response = asg_client.put_scaling_policy(
    PolicyName=f'Request-ScalingPolicy-{endpointName}',
    ServiceNamespace="sagemaker",
    ResourceId=resource_id,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    PolicyType="TargetTrackingScaling",
    TargetTrackingScalingPolicyConfiguration={
        "TargetValue": 1,
        "CustomizedMetricSpecification": {
            "MetricName": "ApproximateBacklogSizePerInstance",
            #"MetricName": "GPUUtilization",
            "Namespace": "AWS/SageMaker",
            "Dimensions": [{"Name": "EndpointName", "Value": endpointName}],
            "Statistic": "Average",
        },
        "ScaleInCooldown": 600, # duration until scale in begins (down to zero)
        "ScaleOutCooldown": 300 # duration between scale out attempts
    },
)

### 5. 模型测试

In [15]:
endpointName=async_endpoint_name


* predict api

In [None]:
# Replace with a path to audio object in S3
from sagemaker.async_inference import WaiterConfig

#local test
s3_file="s3://sagemaker-us-west-2-687912291502/gpt-sovits/wav/speech_20240425104005663.mp3"
#model = Pipeline.from_pretrained(
#        "PyAnnote/speaker-diarization-3.1",
#        use_auth_token="*******")
#result = diarization_from_s3(model, s3_file)

data={"s3_file":s3_file}
res = async_predictor.predict_async(data=data)
print(f"Response output path: {res.output_path}")
print("Start Polling to get response:")

config = WaiterConfig(
  max_attempts=10, #  number of attempts
  delay=10#  time in seconds to wait between attempts
  )
res.get_result(config)


Response output path: s3://sagemaker-us-west-2-687912291502/whisper/async_inference/output/ca2c0bad-2bb4-45d9-9a93-6ebd5870de70.out
Start Polling to get response:


{'diarization_from_s3': '[{"start":0.03096875,"end":9.88596875,"text":"私はスポーツが好きな女の子で、私は中華料理が大好きで、私は中国へ旅行するのが好きで、特に甲州、生徒が好きです。","speaker":"SPEAKER_00"}]'}

* sagemaker boto3 api

In [8]:
import uuid
import boto3
import os
import json
import time

s3_file="s3://sagemaker-us-west-2-687912291502/gpt-sovits/wav/speech_20240425104005663.mp3"
payload={"s3_file":s3_file}

def get_bucket_and_key(s3uri):
    pos = s3uri.find('/', 5)
    bucket = s3uri[5 : pos]
    key = s3uri[pos + 1 : ]
    return bucket, key


def s3_object_exists(s3_path):
    """
    s3_object_exists
    """
    try:
        s3 = boto3.client('s3')
        base_name=os.path.basename(s3_path)
        _,ext_name=os.path.splitext(base_name)
        bucket,key=get_bucket_and_key(s3_path)
        s3.head_object(Bucket=bucket, Key=key)
        return True
    except Exception as ex:
        print("job is not completed, waiting...")   
        return False

def wait_async_result(output_location,timeout=60):
    current_time=0
    result = None
    while current_time<timeout:
        if s3_object_exists(output_location):
            print("have async result",output_location)
            break
        else:
            time.sleep(5)
    return result



runtime_client = boto3.client('runtime.sagemaker')
input_file=str(uuid.uuid4())+".json"
s3_client = boto3.resource('s3')
s3_object = s3_client.Object(bucket, f'whisper/async_inference/input/{input_file}')
payload_data = json.dumps(payload).encode('utf-8')
s3_object.put( Body=bytes(payload_data))
input_location=f's3://{bucket}/whisper/async_inference/input/{input_file}'
print(f'input_location: {input_location}')
response = runtime_client.invoke_endpoint_async(
    EndpointName=endpointName,
    InputLocation=input_location,
    ContentType = "application/json"
)
result =response.get("OutputLocation",'')
print(result)
wait_async_result(result)

input_location: s3://sagemaker-us-west-2-687912291502/whisper/async_inference/input/208f7d90-b04f-4808-8f3d-d3e827e79b4a.json
s3://sagemaker-us-west-2-687912291502/whisper/async_inference/output/5df5a83d-7ad9-45c8-a3ba-ab4400898e35.out
job is not completed, waiting...
have async result s3://sagemaker-us-west-2-687912291502/whisper/async_inference/output/5df5a83d-7ad9-45c8-a3ba-ab4400898e35.out


#### 清除模型Endpoint和config （如无需要，不要执行）

In [6]:
!aws s3 cp s3://sagemaker-us-west-2-687912291502/whisper/async_inference/output/b97dcfde-a466-4784-831f-fcdedac46677.out ./
!cat b97dcfde-a466-4784-831f-fcdedac46677.out

download: s3://sagemaker-us-west-2-687912291502/whisper/async_inference/output/b97dcfde-a466-4784-831f-fcdedac46677.out to ./b97dcfde-a466-4784-831f-fcdedac46677.out
{"diarization_from_s3":"[{\"start\":0.03096875,\"end\":9.88596875,\"text\":\"私はスポーツが好きな女の子で、私は中華料理が大好きで、私は中国へ旅行するのが好きで、特に甲州、生徒が好きです。\",\"speaker\":\"SPEAKER_00\"}]"}

In [None]:
!aws sagemaker delete-endpoint --endpoint-name ""

In [2]:
!pip install requests-aws4auth


Collecting requests-aws4auth
  Downloading requests_aws4auth-1.3.1-py3-none-any.whl.metadata (18 kB)
Downloading requests_aws4auth-1.3.1-py3-none-any.whl (24 kB)
Installing collected packages: requests-aws4auth
Successfully installed requests-aws4auth-1.3.1


In [None]:
import requests
import json
import time
from requests_aws4auth import AWS4Auth
import boto3

def get_aws_auth():
    # 获取临时凭证
    session = boto3.Session()
    # 创建 AWS4Auth 对象
    aws_auth = AWS4Auth(
        "AKIA2AKWERSXHGC6PSWW",
        "R2S9cOf8TAnsQx0OiXbzJxKJMUSl2W0NX0LLClhp",
        'us-west-2',  # 替换为你的区域
        'sagemaker',
        #session_token=credentials.token
    )
    
    return aws_auth

def invoke_endpoint(payload):
    endpoint_name = "whisper-asyc-2025-01-20-13-49-47-909"
    region = "us-west-2"  # 替换为你的区域
    
    # 构建完整的 URL
    url = "https://runtime.sagemaker.us-west-2.amazonaws.com/endpoints/whisper-asyc-2025-01-20-13-49-47-909/invocations"
    
    headers = {
        'Content-Type': 'application/json',
        'Accept': 'application/json'
    }
    
    # 获取 AWS 认证
    aws_auth = get_aws_auth()
    
    try:
        response = requests.post(
            url,
            auth=aws_auth,
            json=payload,
            headers=headers,
            timeout=90
        )
        
        response.raise_for_status()
        return response.json()
        
    except requests.exceptions.Timeout:
        print("请求超时")
        return None
    except requests.exceptions.RequestException as e:
        print(f"请求错误: {e}")
        return None

# 带重试的版本
def invoke_endpoint_with_retry(payload, max_retries=3, delay=5):
    for attempt in range(max_retries):
        try:
            result = invoke_endpoint(payload)
            if result is not None:
                return result
                
        except Exception as e:
            print(f"第 {attempt + 1} 次尝试失败: {e}")
            if attempt < max_retries - 1:
                print(f"等待 {delay} 秒后重试...")
                time.sleep(delay)
                continue
            else:
                print("已达到最大重试次数")
                return None

# 使用示例
if __name__ == "__main__":
    test_payload = {
        "s3_file": "s3://sagemaker-us-west-2-687912291502/gpt-sovits/wav/speech_20240425104005663.mp3"
    }
    
    result = invoke_endpoint_with_retry(test_payload)
    
    if result:
        print("Response:", result)
    else:
        print("Failed to get response")
