# Chatbot without Tools
## Introduction
* In this notebook, I will show you how to use OpenAI API to create a chat bot that support input format in text

## Technolgies
* OpenAI: calling api to OpenAI for getting response
* Gradio: support in building user interface for interacting with AI models

In [1]:
from openai import OpenAI
import gradio as gr
import os
import copy
import json

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
from dotenv import load_dotenv
load_dotenv()

class EnvService():
    def get_open_ai_key(self):
        open_ai_key = os.getenv("OPEN_AI_KEY")
        if not open_ai_key:
            print("OPEN AI KEY IS NOT SET!!!")
            return 
        return open_ai_key
    def get_weather_api_key(self):
        weather_api_key = os.getenv("WEATHER_API_KEY")
        if not weather_api_key:
            print("WEATHER_API_KEY IS NOT SET!!!")
            return 
        return weather_api_key
env_service = EnvService()

In [3]:
class AIService:
    model = "gpt-4.1"
    def __init__(self):
        self.init_client()
        
    def init_client(self):
        self.client = OpenAI(api_key=env_service.get_open_ai_key())

    def chat(self, messages):
        responses = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            stream=True
        )
        return responses

In [4]:
class ChatBot:
    def __init__(self):
        self.ai_service = AIService()
    def chat(self, messages, history):
        new_messages = copy.deepcopy(history)
        new_messages.append({"role": "user", "content": messages})
    
        responses = self.ai_service.chat(new_messages)
    
        partial = ""
        for chunk in responses:
            delta = chunk.choices[0].delta
            if delta.content is not None:
                partial += delta.content
                yield [
                    {"role": "assistant", "content": partial}
                ]

    def render_ui(self):
        chat_interface = gr.ChatInterface(fn=self.chat, type="messages")
        chat_interface.launch()
    def run(self):
        self.render_ui()

In [112]:
chat_bot = ChatBot()
chat_bot.run()

* Running on local URL:  http://127.0.0.1:7897
* To create a public link, set `share=True` in `launch()`.


# Chatbot with Tools

## Introduction
* Using tools to improve the chat bot that can create image, audio
* Using tools to imptove the chat bot can provide current weather at a location

In [7]:
tools = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get current weather in a given location",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {"type": "string", "description": "Location name, e.g. London"},
                },
                "required": ["location"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "generate_image",
            "description": "Generate an image based on a text prompt",
            "parameters": {
                "type": "object",
                "properties": {
                    "prompt": {"type": "string", "description": "What to generate"},
                },
                "required": ["prompt"],
            },
        },
    },
]


In [8]:
import requests

class WeatherService:
    def __init__(self):
        self.api_key = env_service.get_weather_api_key()
        self.api_url = "https://api.openweathermap.org/data/2.5/weather"

    def get_weather(self, location: str) -> str:
        params = {
            "q": location,
            "appid": self.api_key,
            "units": "metric"  # return °C instead of Kelvin
        }
        try:
            response = requests.get(self.api_url, params=params)
            data = response.json()

            if response.status_code != 200:
                return f"Error: {data.get('message', 'Unable to fetch weather')}"

            # Parse relevant info
            city = data.get("name", location)
            country = data.get("sys", {}).get("country", "")
            weather_main = data["weather"][0]["main"]
            weather_desc = data["weather"][0]["description"]
            temp = data["main"]["temp"]
            feels_like = data["main"]["feels_like"]
            humidity = data["main"]["humidity"]
            wind_speed = data["wind"]["speed"]

            return (
                f"Weather in {city}, {country}:\n"
                f"- Condition: {weather_main} ({weather_desc})\n"
                f"- Temperature: {temp:.1f}°C (feels like {feels_like:.1f}°C)\n"
                f"- Humidity: {humidity}%\n"
                f"- Wind speed: {wind_speed} m/s"
            )
        except Exception as e:
            return f"Error fetching weather: {e}"


In [None]:
import srt
from pydub import AudioSegment
import librosa

class AIServiceWithTools(AIService):
    model_tts = 'gpt-4o-mini-tts'
    def __init__(self):
        super().__init__()
        self.tools = tools
    def chat_with_tools(self, messages):
        responses = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            tools=self.tools,
            stream=True
        )
        return responses
    def generate_image(self):

        pass


    def generate_audio_from_srt(self, srt_file, output_file="final_output.mp3", voice_gender="male"):
        # Parse SRT
        with open(srt_file, "r", encoding="utf-8") as f:
            subtitles = list(srt.parse(f.read()))

        final_audio = AudioSegment.silent(duration=0)

        for i, sub in enumerate(subtitles):
            start_time = int(sub.start.total_seconds() * 1000)  # ms
            end_time = int(sub.end.total_seconds() * 1000)      # ms
            text = sub.content.strip()

            if not text:
                continue  # skip empty subtitles

            temp_file = f"temp_segment_{i}.mp3"

            # Estimate speech duration by avg speed (heuristic)
            target_duration = end_time - start_time

            # We'll first assume default speed=1
            speed = 1.0

            # Rough estimate: 150 words/min ≈ 2.5 words/sec
            words = len(text.split())
            estimated_duration = (words / 2.5) * 1000  # ms

            if estimated_duration > 0 and estimated_duration > target_duration:
                speed = estimated_duration / target_duration
                if speed > 4.0:  # API limit
                    speed = 4.0

            # Generate speech with speed adjustment
            with self.client.audio.speech.with_streaming_response.create(
                model=self.model_tts,
                voice=self._voice_mapper(voice_gender),
                input=text,
                response_format="mp3",
                speed=speed
            ) as response:
                response.stream_to_file(temp_file)

            # Load generated speech
            speech = AudioSegment.from_mp3(temp_file)
            actual_duration = len(speech)

            # If still shorter than target → pad silence
            if actual_duration < target_duration:
                silence = AudioSegment.silent(duration=target_duration - actual_duration)
                speech += silence

            # Ensure timeline coverage
            if len(final_audio) < end_time:
                final_audio += AudioSegment.silent(duration=end_time - len(final_audio))

            # Overlay into final track
            final_audio = final_audio.overlay(speech, position=start_time)

            print(f"[{sub.start} --> {sub.end}] '{text}' "
                f"(est={estimated_duration/1000:.2f}s, "
                f"actual={actual_duration/1000:.2f}s, "
                f"target={target_duration/1000:.2f}s, speed={speed:.2f})")

        # Export combined audio
        final_audio.export(output_file, format="mp3")
        print(f"✅ Final audio saved to {output_file}")


    def _voice_mapper(self, voice_gender = 'male'):
        voice_map = {
            'female': 'coral',
            'male': 'onyx'
        }
        return voice_map.get(voice_gender, 'coral')

In [40]:
ai_with_tools = AIServiceWithTools()
ai_with_tools.generate_audio_from_srt("../sample1.srt", output_file="output_audio/final_output.mp3", voice_gender="male")

[0:00:01 --> 0:00:04] 'Hello, and welcome to our sample movie.' (est=2.80s, actual=2.81s, target=3.00s, speed=1.00)
[0:00:05 --> 0:00:10] 'This is a demonstration of the SRT subtitle format,
which is widely used for video subtitling.' (est=6.40s, actual=4.94s, target=5.00s, speed=1.28)
[0:00:13 --> 0:00:18] 'SRT files contain numbered subtitle sequences
with start and end timestamps.' (est=4.40s, actual=5.30s, target=5.00s, speed=1.00)
[0:00:19 --> 0:00:26] 'Each subtitle can span multiple lines
and includes timing information.' (est=4.00s, actual=3.96s, target=7.00s, speed=1.00)
[0:00:27 --> 0:00:33] 'The format is: sequence number,
followed by the time range,
followed by the subtitle text.' (est=6.00s, actual=5.66s, target=6.00s, speed=1.00)
✅ Final audio saved to output_audio/final_output.mp3


In [120]:
class ChatBotWithTools(ChatBot):
    def __init__(self):
        super().__init__()
        self.ai_service = AIServiceWithTools()
        self.weather_service = WeatherService()

    def chat_with_tools(self, messages, history):
        new_messages = copy.deepcopy(history)
        new_messages.append({"role": "user", "content": messages})
    
        responses = self.ai_service.chat_with_tools(new_messages)
    
        partial = ""
        tool_call_data = {}
    
        for chunk in responses:
            delta = chunk.choices[0].delta
    
            # Normal text
            if delta.content is not None:
                partial += delta.content
                yield [{"role": "assistant", "content": partial}]
    
            # Tool calls (streamed)
            if delta.tool_calls:
                for tool_call in delta.tool_calls:
                    idx = tool_call.index
                    fn_name = tool_call.function.name
                    fn_args_part = tool_call.function.arguments
    
                    if idx not in tool_call_data:
                        tool_call_data[idx] = {"name": fn_name, "args": ""}
    
                    # Keep appending arguments
                    if fn_name:
                        tool_call_data[idx]["name"] = fn_name
                    if fn_args_part:
                        tool_call_data[idx]["args"] += fn_args_part
    
        # After streaming is complete → execute tools
        for idx, tool in tool_call_data.items():
            fn_name = tool["name"]
            args_str = tool["args"]
    
            try:
                args = json.loads(args_str)
            except Exception as e:
                print("Failed to parse tool args:", args_str, e)
                args = {}
    
            if fn_name == "get_weather":
                tool_result = self.weather_service.get_weather(**args)
                yield [{"role": "assistant", "content": tool_result}]
            else:
                yield [{"role": "assistant", "content": f"Unknown function call: {fn_name}"}]



    
    def render_ui(self):
        chat_interface = gr.ChatInterface(fn=self.chat_with_tools, type="messages")
        chat_interface.launch()
    def run(self):
        self.render_ui()

            


In [121]:
chat_bot_with_tools = ChatBotWithTools()
chat_bot_with_tools.run()

* Running on local URL:  http://127.0.0.1:7898
* To create a public link, set `share=True` in `launch()`.
