## Importing the libraries

In [1]:
import os
import io
import re
import time
import json
import sys
import wave
import pyaudio
import numpy as np
import requests
import queue
import threading
import gradio as gr
from dotenv import load_dotenv
from google import genai
from google.genai import types
from google.cloud import speech
from stream_wave import stream_wave
from play_audio import play_audio


In [2]:
load_dotenv()
api_key = os.getenv('GOOGLE_API_KEY')

## Initializing the GenAI client

In [3]:
client = genai.Client(api_key=api_key)

## Google Speech-to-text

In [4]:
# Global variables
global_transcription = ""
stream_active = False

# Audio parameters
STREAMING_LIMIT = 240000  # 4 minutes
SAMPLE_RATE = 16000
CHUNK_SIZE = int(SAMPLE_RATE / 10)  # 100ms

class ResumableMicrophoneStream:
    def __init__(self, rate, chunk_size):
        self._rate = rate
        self.chunk_size = chunk_size
        self._num_channels = 1
        self._buff = queue.Queue()
        self.closed = True
        self._audio_interface = pyaudio.PyAudio()
        self._audio_stream = self._audio_interface.open(
            format=pyaudio.paInt16,
            channels=self._num_channels,
            rate=self._rate,
            input=True,
            frames_per_buffer=self.chunk_size,
            stream_callback=self._fill_buffer,
        )

    def __enter__(self):
        self.closed = False
        return self

    def __exit__(self, type, value, traceback):
        self._audio_stream.stop_stream()
        self._audio_stream.close()
        self.closed = True
        self._buff.put(None)
        self._audio_interface.terminate()

    def _fill_buffer(self, in_data, *args, **kwargs):
        self._buff.put(in_data)
        return None, pyaudio.paContinue

    def generator(self):
        while not self.closed:
            chunk = self._buff.get()
            if chunk is None:
                return
            yield chunk


def transcribe_audio():
    global global_transcription, stream_active
    client = speech.SpeechClient()
    config = speech.RecognitionConfig(
        encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
        sample_rate_hertz=SAMPLE_RATE,
        language_code="en-US",
    )
    streaming_config = speech.StreamingRecognitionConfig(
        config=config, interim_results=True
    )

    with ResumableMicrophoneStream(SAMPLE_RATE, CHUNK_SIZE) as stream:
        audio_generator = stream.generator()
        requests = (
            speech.StreamingRecognizeRequest(audio_content=content)
            for content in audio_generator
        )
        responses = client.streaming_recognize(streaming_config, requests)

        for response in responses:
            if not response.results:
                continue

            result = response.results[0]
            if not result.alternatives:
                continue

            transcript = result.alternatives[0].transcript.strip()

            if result.is_final:
                global_transcription += transcript + " "
                if re.search(r"\b(exit|quit)\b", transcript, re.I):
                    stream_active = False
                    break

    stream_active = False


def start_stream():
    global stream_active, global_transcription
    global_transcription = "" 
    if not stream_active:
        stream_active = True
        threading.Thread(target=transcribe_audio, daemon=True).start()
    return "Listening... Say 'Exit' to stop."


def stop_stream():
    global stream_active
    stream_active = False
    return "Transcription Stopped.\nFinal text: " + global_transcription


def reset_transcription():
    global global_transcription
    global_transcription = ""
    return "Transcription reset."


def view_transcription():
    return global_transcription


## Authorization
### Note: You can give the author of this repository your email for accessing the app such that your token.json could be created. Currently only author email works in this case.

In [5]:
import datetime
import os.path

from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google.generativeai.types import FunctionDeclaration, Tool

In [6]:
SCOPES = ["https://www.googleapis.com/auth/calendar"]

In [7]:
creds = None
if os.path.exists("token.json"):
    creds = Credentials.from_authorized_user_file("token.json", SCOPES)
if not creds or not creds.valid:
    if creds and creds.expired and creds.refresh_token:
        creds.refresh(Request())
    else:
        flow = InstalledAppFlow.from_client_secrets_file(
            "credentials.json", SCOPES
        )
        creds = flow.run_local_server(port=0)
    with open("token.json", "w") as token:
        token.write(creds.to_json())
        
service = build("calendar", "v3", credentials=creds)

## Get Events

In [8]:
def get_events():
  now = datetime.datetime.now(tz=datetime.timezone.utc).isoformat()
  print("Getting the upcoming events")
  events_result = (
        service.events()
        .list(
            calendarId="primary",
            timeMin=now,
            maxResults=5,
            singleEvents=True,
            orderBy="startTime",
            ).execute()
      )
  events = events_result.get("items", [])

  if not events:
    print("No upcoming events found.")
    return []


  for event in events:
    start = event["start"].get("dateTime", event["start"].get("date"))
    print(f"Event: {event['summary']} at {start}")
    return start, event['summary']


In [9]:
get_events_declarations = {
        "name": "get_upcoming_event",
        "description": "Get the next upcoming event from Google Calendar",
        "parameters": {
            "type": "object",
            "properties": {},
            "required": []
        }
}

In [10]:
def get_upcoming_event():
    start, events = get_events()
    if events:
        event = events
        return {
            "summary": event,
            "start": start
        }
    else:
        return {"message": "No upcoming events found"}

## Add Events

In [11]:
def add_events(summary, description, start_datetime, end_datetime):
    event = {
        'summary': summary,
        'location': 'At the Office',
        'description': description,
        'start': {
            'dateTime': start_datetime,
            'timeZone': 'Asia/Kolkata',
        },
        'end': {
            'dateTime': end_datetime,
            'timeZone': 'Asia/Kolkata',
        },
        'reminders': {
            'useDefault': False,
            'overrides': [
                {'method': 'email', 'minutes': 24 * 60},
                {'method': 'popup', 'minutes': 10},
            ],
        },
    }

    event = service.events().insert(calendarId='primary', body=event).execute()
    print('Event created: %s' % (event.get('htmlLink')))
    return event


In [12]:
add_events_declarations =  {
        "name": "add_calendar_event",
        "description": "Add a new event to Google Calendar with title, description, start_datetime, end_datetime. By default take time zone as 'Asia/Kolkata' ",
        "parameters": {
            "type": "object",
            "properties": {
                "summary": {
                    "type": "string",
                    "description": "Title of the event"
                },
                "description": {
                    "type": "string",
                    "description": "Description of the event"
                },
                "start_datetime": {
                    "type": "string",
                    "description": "Start time in ISO 8601 format (e.g. '2025-06-28T09:00:00-07:00')"
                },
                "end_datetime": {
                    "type": "string",
                    "description": "End time in ISO 8601 format"
                },
                
            },
            "required": ["summary", "description", "start_datetime", "end_datetime"]
        }
    } 


In [13]:
def add_calendar_event(summary, description, start_datetime, end_datetime):
    event = add_events(
        summary=summary,
        description=description,
        start_datetime=start_datetime,
        end_datetime=end_datetime,
    )
    return {"status": "success", "event_link": event.get("htmlLink")}

## Function Declaration

In [14]:
tools = types.Tool(function_declarations=[get_events_declarations, add_events_declarations])

## Gradio Framework

In [15]:
with gr.Blocks(title="Speech Chatbot") as demo:
    chatbot = gr.Chatbot(label="Voice Assistant")
    audio_waveform = gr.Audio(label="Assistant Waveform", interactive=False, type='numpy')
    
    with gr.Row():
        start_btn = gr.Button("🎤 Start Listening")
        stop_btn = gr.Button("🛑 Stop Listening")
        send_btn = gr.Button("📨 Send to Gemini")
        view_btn = gr.Button("👁️ View Transcription")
        reset_btn = gr.Button("🔄 Reset")

    chat_state = gr.State([])
    transcription_state = gr.State("")

    # Button callbacks
    def on_start(chat_history, transcription):
        start_stream()
        chat_history.append(("system", "🎙️ Listening started... Say 'Exit' to stop."))
        return chat_history, transcription

    def on_stop(chat_history, transcription):
        stop_stream()
        chat_history.append(("system", "🛑 Transcription stopped."))
        return chat_history, transcription

    def on_view(chat_history, transcription):
        view = view_transcription()
        chat_history.append(("user", "View Transcription"))
        chat_history.append(("assistant", view))
        return chat_history, view

    def on_reset(chat_history, transcription):
        reset_transcription()
        chat_history.clear()
        return [], ""

    def on_send(chat_history, transcription):

        user_input = view_transcription()
        chat_history.append(("user", user_input))
        
        history = []
        for role, msg in chat_history:
            if role == "system":
                continue
            role = "user" if role == "user" else "model"
            history.append(
                types.Content(role=role, parts=[types.Part(text=msg)])
            )

        # Gemini response (Text)
        response = client.models.generate_content(
            model="gemini-2.5-flash",
            contents=history,
            config=types.GenerateContentConfig(
                temperature=0.1,
                tools=[tools]
            )
        )
        reset_transcription()
        
        if response.candidates[0].content.parts[0].function_call:
            function_call = response.candidates[0].content.parts[0].function_call
            if function_call.name == 'add_calendar_event':
                summary = function_call.args['summary']
                description = function_call.args['description']
                start_datetime = function_call.args['start_datetime']
                end_datetime = function_call.args['end_datetime']
                link = add_calendar_event(summary, description, start_datetime, end_datetime)
                bot_response = f"Event added to your Calender successfully {link.get('event_link')}"
            elif function_call.name == 'get_upcoming_event':
                bot_response = f"Here are your events {get_upcoming_event()}"
    
        else:
            bot_response = response.text
        
        chat_history.append(("assistant", bot_response))

        # Gemini response (TTS Audio)
        response_audio = client.models.generate_content(
            model="gemini-2.5-flash-preview-tts",
            contents=bot_response,
            config=types.GenerateContentConfig(
                response_modalities=["AUDIO"],
                speech_config=types.SpeechConfig(
                    voice_config=types.VoiceConfig(
                        prebuilt_voice_config=types.PrebuiltVoiceConfig(
                            voice_name='Kore',
                        )
                    )
                )
            )
        )

        # Get inline audio as BytesIO stream
        data = response_audio.candidates[0].content.parts[0].inline_data.data
        audio_stream = io.BytesIO(data)

        wav_stream = stream_wave(data, channels=1, rate=24000, sample_width=2)

        # Play audio synchronously
        play_audio(wav_stream)

        # Reset pointer so Gradio can read it too
        wav_stream.seek(0)
        with wave.open(wav_stream, 'rb') as wf:
            waveform = np.frombuffer(wf.readframes(wf.getnframes()), dtype=np.int16)
            sample_rate = wf.getframerate()
        
        
        return chat_history, "", (sample_rate, waveform)

    # Bind events
    start_btn.click(on_start, [chat_state, transcription_state], [chatbot, transcription_state])
    stop_btn.click(on_stop, [chat_state, transcription_state], [chatbot, transcription_state])
    view_btn.click(on_view, [chat_state, transcription_state], [chatbot, transcription_state])
    reset_btn.click(on_reset, [chat_state, transcription_state], [chatbot, transcription_state])
    send_btn.click(
        on_send,
        [chat_state, transcription_state],
        [chatbot, transcription_state, audio_waveform]
    )

demo.launch(share=True)




* Running on local URL:  http://127.0.0.1:7860
* Running on public URL: https://814dc3df6efd5dc0a2.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


