# Whisper + Transcribe Demo

In [5]:
import time

def timer(func):
    """
    A decorator to measure the execution time of a function.
    """
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        execution_time = end_time - start_time
        print(f"Function {func.__name__} took {execution_time:.6f} seconds to execute.")
        return result
    return wrapper

## Whisper

In [2]:
!pip install -U openai-whisper



In [13]:

! pip install --upgrade pip
! pip install --upgrade git+https://github.com/huggingface/transformers.git accelerate datasets[audio]


Collecting git+https://github.com/huggingface/transformers.git
  Cloning https://github.com/huggingface/transformers.git to /tmp/pip-req-build-dmtn41de
  Running command git clone --filter=blob:none --quiet https://github.com/huggingface/transformers.git /tmp/pip-req-build-dmtn41de
  Resolved https://github.com/huggingface/transformers.git to commit b382a09e28c7e59129246ccdf4b00f2cac4547a4
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Installing backend dependencies ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
Collecting datasets[audio]
  Downloading datasets-2.18.0-py3-none-any.whl.metadata (20 kB)
Downloading datasets-2.18.0-py3-none-any.whl (510 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m510.5/510.5 kB[0m [31m37.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: transformers
  Building wheel for transformers (pyproject.toml) ... [?25ld

In [None]:
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
from datasets import load_dataset


device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

model_id = "openai/whisper-base"

model = AutoModelForSpeechSeq2Seq.from_pretrained(
    model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
)
model.to(device)

processor = AutoProcessor.from_pretrained(model_id)

pipe = pipeline(
    "automatic-speech-recognition",
    model=model,
    tokenizer=processor.tokenizer,
    feature_extractor=processor.feature_extractor,
    max_new_tokens=128,
    chunk_length_s=30,
    batch_size=16,
    return_timestamps=True,
    torch_dtype=torch_dtype,
    device=device,
)

dataset = load_dataset("distil-whisper/librispeech_long", "clean", split="validation")
sample = dataset[0]["audio"]

result = pipe(sample)
print(result["text"])

In [None]:
import whisper

model_size = ["tiny", "base", "small", "medium", "large"]

model = whisper.load_model("base")
sound_file = "test1"
soudd_path = "./records/{}.m4a".format(sound_file) 

result = model.transcribe(soudd_path)
print(result["text"])

In [6]:
import whisper

@timer
def whisperPipeline(model, soudd_path):
    print(model.device)
    # load audio and pad/trim it to fit 30 seconds
    audio = whisper.load_audio(soudd_path)
    audio = whisper.pad_or_trim(audio)

    # make log-Mel spectrogram and move to the same device as the model
    mel = whisper.log_mel_spectrogram(audio).to(model.device)

    # detect the spoken language
    _, probs = model.detect_language(mel)
    print(f"Detected language: {max(probs, key=probs.get)}")

    # decode the audio
    options = whisper.DecodingOptions()
    result = whisper.decode(model, mel, options)
    return result

model = whisper.load_model("base")
sound_file = "test2"
soudd_path = "./records/{}.m4a".format(sound_file) 
result = whisperPipeline(model, soudd_path)
# print the recognized text
print(result.text)

cuda:0
Detected language: zh
Function whisperPipeline took 0.174484 seconds to execute.
我要找一部电影叫做 Brave Heart


## Transcribe test

In [7]:
import ipywidgets as widgets

languages = {"en-US": "美式英语", "en-GB": "英式英语", "en-AU": "澳大利亚英语", "zh-CN": "普通话", "ja-JP": "日语", "ko-KR": "韩语"}
             
selection = widgets.Dropdown(
    options=[("Select language", None), ("----------", None)] + sorted([(f"{v} ({k})", k) for k, v in languages.items()]),
    value="en-US",
    description='Language:',
    disabled=False,
)

selection

Dropdown(description='Language:', index=5, options=(('Select language', None), ('----------', None), ('日语 (ja-…

In [8]:
lang = selection.value
language = languages[lang]

assert lang is not None, "Please select a language"
print(f"Selected language: {language} ({lang})")

Selected language: 美式英语 (en-US)


In [None]:
import boto3
import time
import uuid
import urllib.request

@timer
def transcribePipeline(job_name, job_uri):
    transcribe = boto3.client('transcribe')
    transcribe.start_transcription_job(
        TranscriptionJobName=job_name,
        Media={'MediaFileUri': job_uri},
        MediaFormat='m4a',
        LanguageCode=lang 
    )

    while True:
        status = transcribe.get_transcription_job(TranscriptionJobName=job_name)
        if status['TranscriptionJob']['TranscriptionJobStatus'] in ['COMPLETED', 'FAILED']:
            break
        print(f"Current status: {status['TranscriptionJob']['TranscriptionJobStatus']}")
        time.sleep(1)
        pass

    # 获取转录结果
    if status['TranscriptionJob']['TranscriptionJobStatus'] == 'COMPLETED':
        response = transcribe.get_transcription_job(TranscriptionJobName=job_name)
        transcript_uri = response['TranscriptionJob']['Transcript']['TranscriptFileUri']
        return transcript_uri
    else:
        print(f"Transcription job failed: {status['TranscriptionJob']['FailureReason']}")
        return None


sound_file = "test2"
job_name = "test_job_" + str(uuid.uuid4())
job_uri = "s3://benxiwan-1212-s3/sounds/{}.m4a".format(sound_file)

transcript_uri = transcribePipeline(job_name, job_uri)
if not transcript_uri is None:
    # print(f"Transcript URI: {transcript_uri}")
    # 下载转录文件
    transcript_file = urllib.request.urlopen(transcript_uri)
    transcript_text = transcript_file.read().decode('utf-8')

    # 将转录结果保存到本地文本文件
    output_file = "./outputs/{}-{}.json".format(sound_file, job_name)
    with open(output_file, "w", encoding="utf-8") as f:
        f.write(transcript_text)
    print(f"Transcription saved to {output_file}")



Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
