<a href="https://colab.research.google.com/github/rajeshradhakrishnanmvk/ML2025/blob/master/AudioTrans001.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install -Uqq --upgrade openai python-dotenv

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m328.6/328.6 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m75.6/75.6 kB[0m [31m11.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.9/77.9 kB[0m [31m11.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m58.3/58.3 kB[0m [31m8.3 MB/s[0m eta [36m0:00:00[0m
[?25h

In [3]:
from google.colab import files
_ = files.upload()

Saving .env to .env


In [4]:
import os
import openai

from dotenv import load_dotenv, find_dotenv

_ = load_dotenv(find_dotenv()) # read local .env file

In [5]:
from openai import OpenAI

client = OpenAI(
    # defaults to os.environ.get("OPENAI_API_KEY")
    api_key=os.environ['OPENAI_API_KEY'],
)


In [66]:
system_prompt = """
You are an Audio Translator, receive audio as file from a path (for e.g./content/audio.mp3)
and you run in a loop of Thought, Action, PAUSE, Observation.
At the end of the loop, you output an Answer.
Use Thought to describe your thoughts about the question you have been asked.
Use Action to run one of the actions available to you - then return PAUSE.
Observation will be the result of running those actions.

Your available actions are:
    splitaudio:
        Split the audio into 1-minute segments

    speech2text:
        Convert Speech to Text

    translate:
        Translate Text from Source Language to Target Language

    text2speech:
        Convert Text to Speech

Example session:

Question: Translate the audio at /content/audio.mp3 from English to Malayalam.
Thought: From the file path /content/audio.mp3, I need to split the audio into 1-minute segments.
Action: splitaudio
PAUSE

You will be called again with this:

Observation: Audio split. It is a 1-minute audio.

Thought: I need to convert the speech in the audio to text.
Action: speech2text
PAUSE

You will be called again with this:

Observation: Speech converted to text.

Thought: I need to translate the text from English to Malayalam.
Action: translate: English to Malayalam
PAUSE

You will be called again with this:

Observation: Text translated to Malayalam.

Thought: I need to convert the translated text back to speech.
Action: text2speech
PAUSE

You will be called again with this:

Observation: Text converted to speech in Malayalam.

If you have the answer, output it as the Answer.

Answer: The tranlsated audio is at /content/audio-mal.mp3 in Malayalam.

Now it's your turn:
""".strip()

In [7]:
!pip install pydub

Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Installing collected packages: pydub
Successfully installed pydub-0.25.1


In [90]:
from pydub import AudioSegment

def splitaudio(filepath: str) -> str:
  song = AudioSegment.from_mp3(filepath)

  # PyDub handles time in milliseconds
  ten_minutes = 10 * 60 * 1000

  first_10_minutes = song[:ten_minutes]

  first_10_minutes.export(filepath, format="mp3")
  return filepath

def speech2text(filepath: str) -> str:
  audio_file = open(filepath, "rb")

  transcript = client.audio.transcriptions.create(
    file=audio_file,
    model="whisper-1",
    response_format="verbose_json",
    timestamp_granularities=["word"]
  )
  return transcript.text

def translate(text: str, target_language: str) -> str:
  response = client.chat.completions.create(
    model="gpt-3.5-turbo",
    messages=[
      {"role": "system", "content": "You are a helpful assistant."},
      {"role": "user", "content": f"Translate the following English text to {target_language}: {text}"}
    ]
  )
  return response.choices[0].message.content

def text2speech(filepath:str ) -> str:
  speech_file_path = filepath
  response = client.audio.speech.create(
    model="tts-1",
    voice="alloy",
    input="Today is a wonderful day to build something people love!"
  )

  response.stream_to_file(speech_file_path)
  return speech_file_path

In [68]:
assistant = client.beta.assistants.create(
    name="Audio Translator",
    instructions=system_prompt,
    model="gpt-4o",
  tools=[
      {
        "type": "function",
        "function":{
          "name": "splitaudio",
          "description": "Split an audio file to keep only the first 10 minutes",
          "parameters": {
            "type": "object",
            "properties": {
              "filepath": {
                "type": "string",
                "description": "The path to the audio file to be split"
              }
            },
            "required": [
              "filepath"
            ]
          }
        }
      }
      ,
      {
        "type": "function",
        "function":{
          "name": "speech2text",
          "description": "Convert speech from an audio file to text",
          "parameters": {
            "type": "object",
            "properties": {
              "filepath": {
                "type": "string",
                "description": "The path to the audio file to be transcribed"
              }
            },
            "required": [
              "filepath"
            ]
          }
        }
        }
      ,
        {
        "type": "function",
        "function":{
          "name": "translate",
          "description": "Translate text from English to a specified target language",
          "parameters": {
            "type": "object",
            "properties": {
              "text": {
                "type": "string",
                "description": "The English text to be translated"
              },
              "target_language": {
                "type": "string",
                "description": "The target language for translation"
              }
            },
            "required": [
              "text",
              "target_language"
            ]
          }
        }
        }
      ,
        {
        "type": "function",
        "function":{
          "name": "text2speech",
          "description": "Convert text to speech and save it to an audio file",
          "parameters": {
            "type": "object",
            "properties": {
              "filepath": {
                "type": "string",
                "description": "The path where the generated speech audio file will be saved"
              }
            },
            "required": [
              "filepath"
            ]
          }
        }
    }
  ]
)
assistant.id #asst_PEIreFhTL9rmJGDndLYqOg9u

'asst_65RV82N9yhODlCFHM3RKp4Nb'

In [69]:
thread = client.beta.threads.create()
thread.id

'thread_qBe0q58dQkDEieCbXnbB7Apz'

In [70]:
message = client.beta.threads.messages.create(
    thread_id=thread.id,
    role="user",
    content="Translate the audio at /content/audio-roberfrost-woodsarelovely.mp3 from English to Malayalam"
)
message.id

'msg_opGzv1aGqpQDWDqePiswqK8F'

In [105]:
# run = client.beta.threads.runs.create_and_poll(
#   thread_id=thread.id,
#   assistant_id= 'asst_65RV82N9yhODlCFHM3RKp4Nb' #assistant.id,
# )
run = client.beta.threads.runs.retrieve(
  thread_id=thread.id,
  run_id=run_id
)

if run.status == 'completed':
  messages = client.beta.threads.messages.list(
    thread_id=thread.id
  )
  print(messages)
else:
  print(run.status)

# Define the list to store tool outputs
tool_outputs = []

# Loop through each tool in the required action section
for tool in run.required_action.submit_tool_outputs.tool_calls:
  function_args=json.loads(tool.function.arguments)
  print(function_args)
  if tool.function.name == "splitaudio":
    tool_outputs.append({
      "tool_call_id": tool.id,
      "output": splitaudio(function_args.get("filepathath"))
    })
  elif tool.function.name == "speech2text":
    tool_outputs.append({
      "tool_call_id": tool.id,
      "output": speech2text(function_args.get("filepath")) #function_args["filepathjson"].get("filepath")
    })
  elif tool.function.name == "translate":
    tool_outputs.append({
      "tool_call_id": tool.id,
      "output": translate(function_args.get("text"), function_args.get("target_language"))
    })
  elif tool.function.name == "text2speech":
    tool_outputs.append({
      "tool_call_id": tool.id,
      "output": text2speech(function_args.get("filepathath"))
    })

# Submit all tool outputs at once after collecting them in a list
if tool_outputs:
  try:
    run = client.beta.threads.runs.submit_tool_outputs_and_poll(
      thread_id=thread.id,
      run_id=run.id,
      tool_outputs=tool_outputs
    )
    print("Tool outputs submitted successfully.")
  except Exception as e:
    print("Failed to submit tool outputs:", e)
else:
  print("No tool outputs to submit.")

if run.status == 'completed':
  messages = client.beta.threads.messages.list(
    thread_id=thread.id
  )
  print(messages)
else:
  print(run.status)

requires_action
{'filepathath': '/content/audio-roberfrost-woodsarelovely-mal.mp3'}


  response.stream_to_file(speech_file_path)


Tool outputs submitted successfully.
SyncCursorPage[Message](data=[Message(id='msg_Q3Ko1ss8sskCulZzULDqsUvB', assistant_id='asst_65RV82N9yhODlCFHM3RKp4Nb', attachments=[], completed_at=None, content=[TextContentBlock(text=Text(annotations=[], value='Answer: The translated audio is at /content/audio-roberfrost-woodsarelovely-mal.mp3 in Malayalam.'), type='text')], created_at=1721382341, incomplete_at=None, incomplete_details=None, metadata={}, object='thread.message', role='assistant', run_id='run_NHuWmzVV6M3ZLsRaQucqBzm3', status=None, thread_id='thread_qBe0q58dQkDEieCbXnbB7Apz'), Message(id='msg_uYT6O6jvflZ2Qtt8jiw0Sttn', assistant_id='asst_65RV82N9yhODlCFHM3RKp4Nb', attachments=[], completed_at=None, content=[TextContentBlock(text=Text(annotations=[], value='Thought: The text has been successfully translated to Malayalam. Now, I need to convert the translated text back to speech.\nAction: text2speech\nPAUSE'), type='text')], created_at=1721382194, incomplete_at=None, incomplete_detai

In [106]:
if run.status == 'completed':
  messages = client.beta.threads.messages.list(
    thread_id=thread.id
  )
  print(messages)
else:
  print(run.status)

SyncCursorPage[Message](data=[Message(id='msg_Q3Ko1ss8sskCulZzULDqsUvB', assistant_id='asst_65RV82N9yhODlCFHM3RKp4Nb', attachments=[], completed_at=None, content=[TextContentBlock(text=Text(annotations=[], value='Answer: The translated audio is at /content/audio-roberfrost-woodsarelovely-mal.mp3 in Malayalam.'), type='text')], created_at=1721382341, incomplete_at=None, incomplete_details=None, metadata={}, object='thread.message', role='assistant', run_id='run_NHuWmzVV6M3ZLsRaQucqBzm3', status=None, thread_id='thread_qBe0q58dQkDEieCbXnbB7Apz'), Message(id='msg_uYT6O6jvflZ2Qtt8jiw0Sttn', assistant_id='asst_65RV82N9yhODlCFHM3RKp4Nb', attachments=[], completed_at=None, content=[TextContentBlock(text=Text(annotations=[], value='Thought: The text has been successfully translated to Malayalam. Now, I need to convert the translated text back to speech.\nAction: text2speech\nPAUSE'), type='text')], created_at=1721382194, incomplete_at=None, incomplete_details=None, metadata={}, object='thread.

In [61]:
import json
import time
from typing_extensions import override
from openai import AssistantEventHandler, OpenAI
from openai.types.beta.threads import Text, TextDelta
from openai.types.beta.threads.runs import ToolCall, ToolCallDelta
from openai.types.beta.threads import Message, MessageDelta
from openai.types.beta.threads.runs import ToolCall, RunStep
from openai.types.beta import AssistantStreamEvent

# First, we create a EventHandler class to define
# how we want to handle the events in the response stream.

class EventHandler(AssistantEventHandler):
   def __init__(self, thread_id, assistant_id):
       super().__init__()
       self.output = None
       self.tool_id = None
       self.thread_id = thread_id
       self.assistant_id = assistant_id
       self.run_id = None
       self.run_step = None
       self.function_name = ""
       self.arguments = ""

   @override
   def on_text_created(self, text) -> None:
       print(f"\nassistant on_text_created > ", end="", flush=True)

   @override
   def on_text_delta(self, delta, snapshot):
       # print(f"\nassistant on_text_delta > {delta.value}", end="", flush=True)
       print(f"{delta.value}")

   @override
   def on_end(self, ):
       print(f"\n end assistant > ",self.current_run_step_snapshot, end="", flush=True)

   @override
   def on_exception(self, exception: Exception) -> None:
       """Fired whenever an exception happens during streaming"""
       print(f"\nassistant > {exception}\n", end="", flush=True)

   @override
   def on_message_created(self, message: Message) -> None:
       print(f"\nassistant on_message_created > {message}\n", end="", flush=True)

   @override
   def on_message_done(self, message: Message) -> None:
       print(f"\nassistant on_message_done > {message}\n", end="", flush=True)

   @override
   def on_message_delta(self, delta: MessageDelta, snapshot: Message) -> None:
       # print(f"\nassistant on_message_delta > {delta}\n", end="", flush=True)
       pass

   def on_tool_call_created(self, tool_call):
       # 4
       print(f"\nassistant on_tool_call_created > {tool_call}")
       self.function_name = tool_call.function.name
       self.tool_id = tool_call.id
       print(f"\on_tool_call_created > run_step.status > {self.run_step.status}")

       print(f"\nassistant > {tool_call.type} {self.function_name}\n", flush=True)

       keep_retrieving_run = client.beta.threads.runs.retrieve(
           thread_id=self.thread_id,
           run_id=self.run_id
       )

       while keep_retrieving_run.status in ["queued", "in_progress"]:
           keep_retrieving_run = client.beta.threads.runs.retrieve(
               thread_id=self.thread_id,
               run_id=self.run_id
           )

           print(f"\nSTATUS: {keep_retrieving_run.status}")

   @override
   def on_tool_call_done(self, tool_call: ToolCall) -> None:
       keep_retrieving_run = client.beta.threads.runs.retrieve(
           thread_id=self.thread_id,
           run_id=self.run_id
       )

       print(f"\nDONE STATUS: {keep_retrieving_run.status}")

       if keep_retrieving_run.status == "completed":
           all_messages = client.beta.threads.messages.list(
               thread_id=thread.id
           )

           print(all_messages.data[0].content[0].text.value, "", "")
           return

       elif keep_retrieving_run.status == "requires_action":
           print("here you would call your function")
           if isinstance(arguments, str):
            arguments = json.loads(arguments)
           if self.function_name == "splitaudio":
               function_data = splitaudio(self.arguments[0])
           if self.function_name == "speech2text":
               function_data = speech2text(self.arguments[0])
           elif self.function_name == "translate":
               function_data = translate(self.arguments[0], self.arguments[1])
           elif self.function_name == "text2speech":
               function_data = text2speech(self.arguments[0])

               self.output=function_data

               with client.beta.threads.runs.submit_tool_outputs_stream(
                   thread_id=self.thread_id,
                   run_id=self.run_id,
                   tool_outputs=[{
                       "tool_call_id": self.tool_id,
                       "output": self.output,
                   }],
                   event_handler=EventHandler(self.thread_id, self.assistant_id)
               ) as stream:
                 stream.until_done()
           else:
               print("unknown function")
               return

   @override
   def on_run_step_created(self, run_step: RunStep) -> None:
       # 2
       print(f"on_run_step_created")
       self.run_id = run_step.run_id
       self.run_step = run_step
       print("The type ofrun_step run step is ", type(run_step), flush=True)
       print(f"\n run step created assistant > {run_step}\n", flush=True)

   @override
   def on_run_step_done(self, run_step: RunStep) -> None:
       print(f"\n run step done assistant > {run_step}\n", flush=True)

   def on_tool_call_delta(self, delta, snapshot):
       if delta.type == 'function':
           # the arguments stream thorugh here and then you get the requires action event
           print(delta.function.arguments, end="", flush=True)
           self.arguments += delta.function.arguments
       elif delta.type == 'code_interpreter':
           print(f"on_tool_call_delta > code_interpreter")
           if delta.code_interpreter.input:
               print(delta.code_interpreter.input, end="", flush=True)
           if delta.code_interpreter.outputs:
               print(f"\n\noutput >", flush=True)
               for output in delta.code_interpreter.outputs:
                   if output.type == "logs":
                       print(f"\n{output.logs}", flush=True)
       else:
           print("ELSE")
           print(delta, end="", flush=True)

   @override
   def on_event(self, event: AssistantStreamEvent) -> None:
       # print("In on_event of event is ", event.event, flush=True)

       if event.event == "thread.run.requires_action":
           print("\nthread.run.requires_action > submit tool call")
           print(f"ARGS: {self.arguments}")

# Then, we use the `stream` SDK helper
# with the `EventHandler` class to create the Run
# and stream the response.

with client.beta.threads.runs.stream(
  thread_id=thread.id,
  assistant_id=assistant.id,
  event_handler=EventHandler(thread.id,assistant.id),
) as stream:
  stream.until_done()

on_run_step_created
The type ofrun_step run step is  <class 'openai.types.beta.threads.runs.run_step.RunStep'>

 run step created assistant > RunStep(id='step_8kEmIQHNhQ3oxwiZaeTfGKte', assistant_id='asst_eqtkeL09djLLXtxiHvOSSglU', cancelled_at=None, completed_at=None, created_at=1721379309, expired_at=None, failed_at=None, last_error=None, metadata=None, object='thread.run.step', run_id='run_5zkgrXN5atak5LSSlrcWEo5G', status='in_progress', step_details=MessageCreationStepDetails(message_creation=MessageCreation(message_id='msg_BwFCs2SQRU1sA5wT4LaENsAx'), type='message_creation'), thread_id='thread_o5v83TxuS39cYkyuY1BC0L0z', type='message_creation', usage=None, expires_at=1721379908)


assistant on_message_created > Message(id='msg_BwFCs2SQRU1sA5wT4LaENsAx', assistant_id='asst_eqtkeL09djLLXtxiHvOSSglU', attachments=[], completed_at=None, content=[], created_at=1721379309, incomplete_at=None, incomplete_details=None, metadata={}, object='thread.message', role='assistant', run_id='run_5z

UnboundLocalError: local variable 'arguments' referenced before assignment

In [97]:
runs = client.beta.threads.runs.list(
  thread.id
)
#loop through runs orderby created
runs.data.sort(key=lambda x: x.created_at)
for run in runs.data:
  print(run.id)
  run_id=run.id
  print(run.status)
  print(run.created_at)
  print(run.assistant_id)

run_LUaEqF7qIsvLBILHoDYGsw58
expired
1721381039
asst_65RV82N9yhODlCFHM3RKp4Nb
run_NHuWmzVV6M3ZLsRaQucqBzm3
requires_action
1721381976
asst_65RV82N9yhODlCFHM3RKp4Nb


In [98]:
run = client.beta.threads.runs.retrieve(
  thread_id=thread.id,
  run_id=run_id
)

print(run.id)
print(run.status)

run_NHuWmzVV6M3ZLsRaQucqBzm3
requires_action


In [87]:
run = client.beta.threads.runs.cancel(
  thread_id=thread.id,
  run_id=run.id
)

print(run)

BadRequestError: Error code: 400 - {'error': {'message': "Cannot cancel run with status 'expired'.", 'type': 'invalid_request_error', 'param': None, 'code': None}}

In [53]:
run_steps = client.beta.threads.runs.steps.list(
    thread_id=thread.id,
    run_id=run.id
)
for run in run_steps.data:
  print(run.id)
  print(run.status)
  print(run.step_details)


step_CWHIx1Pop0cMOJOCVM3SvkBk
expired
ToolCallsStepDetails(tool_calls=[FunctionToolCall(id='call_UXNGtijIwMBSYIaudui4NivY', function=Function(arguments='{\n  "filepath": "/content/audio-roberfrost-woodsarelovely.mp3"\n}', name='speech2text', output=None), type='function')], type='tool_calls')


In [107]:
thread_messages = client.beta.threads.messages.list(thread.id)
# sort thread_messages based on created_at=1721313442
thread_messages.data.sort(key=lambda x: x.created_at)
# loop through thread_messages
for message in thread_messages.data:
    print(message.content[0].text.value)

Translate the audio at /content/audio-roberfrost-woodsarelovely.mp3 from English to Malayalam
Thought: From the file path /content/audio-roberfrost-woodsarelovely.mp3, I need to split the audio into 1-minute segments.
Action: functions.splitaudio
PAUSE
Observation: Audio split. It is a 1-minute audio.

Thought: I need to convert the speech in the audio to text.
Action: speech2text
PAUSE
Thought: The text from the audio has been successfully extracted. Now, I need to translate the text from English to Malayalam.
Action: translate: English to Malayalam
PAUSE
Thought: The text has been successfully translated to Malayalam. Now, I need to convert the translated text back to speech.
Action: text2speech
PAUSE
Answer: The translated audio is at /content/audio-roberfrost-woodsarelovely-mal.mp3 in Malayalam.
