# Virtual Workspace Web App Microserver API
---


##  Setup all lib and env:
**Python Lib:**
>* FastApi - *for client access* 
>* redis - *for base data*
>* websockets - *for desktop control* 
>* aiortc - *for desktop streaming*

```
pip install fastapi
pip install uvicorn
pip install transformers
pip install torch
pip install torchvision
pip install sb3-contrib
pip install redis
pip install aioredis
pip install python-socketio[client]
pip install aiortc
pip install av
pip install aiohttp
pip install transformers torch torchvision clip stable-baselines3 webrtcvad torchaudio

```

In [1]:
import os
import subprocess

# Check if ffmpeg is installed
try:
    if os.name == 'nt':
        subprocess.check_output(['where', 'ffmpeg'])
    else:
        subprocess.check_output(['ffmpeg', '-version'])
    print('ffmpeg already installed')
except subprocess.CalledProcessError:
    # Install ffmpeg
    print('ffmpeg not found. Installing...')
    if os.name == 'nt':
        ffmpeg_bin_dir = r'C:\ProgramData\chocolatey\lib\ffmpeg\tools'
        os.environ['PATH'] += os.pathsep + ffmpeg_bin_dir
        if not os.path.exists(ffmpeg_bin_dir):
            print(f'ffmpeg not found in {ffmpeg_bin_dir}')
        else:
            print('ffmpeg installed')
    else:
        subprocess.check_call(['sudo', 'apt-get', 'update'])
        subprocess.check_call(['sudo', 'apt-get', 'install', '-y', 'ffmpeg'])
        print('ffmpeg installed')


ffmpeg already installed


In [2]:
# Setup all lib and env

# fastapi :
try:
    import fastapi                      
    print('fastapi: already installed')
except ImportError:
  !python -m pip install -q fastapi
  print('Installed fastapi')
    
# uvicorn :
try:
    import uvicorn                      
    print('uvicorn: already installed')
except ImportError:
  !python -m pip install -q uvicorn
  print('Installed uvicorn')
    
# transformers :
try:
    import transformers                      
    print('transformers: already installed')
except ImportError:
  !python -m pip install -q transformers
  print('Installed transformers')
    
# TODO torch, vision, audio and difusion model
# torch :
try:
    import torch                      
    print('torch: already installed')
except ImportError:
  !python -m pip install -q torch
  print('Installed torch')
    
# torchvision :
try:
    import torchvision                      
    print('torchvision: already installed')
except ImportError:
  !python -m pip install -q torchvision
  print('Installed torchvision')
    
# ftfy :
try:
    import ftfy                      
    print('ftfy: already installed')
except ImportError:
  !python -m pip install -q ftfy
  print('Installed ftfy')

# torchaudio :
try:
    import torchaudio                      
    print('torchaudio: already installed')
except ImportError:
  !python -m pip install -q torchaudio
  print('Installed torchaudio')
    
# python-multipart :
try:
    import python_multipart                      
    print('python-multipart: already installed')
except ImportError:
    !python -m pip install -q python-multipart
    print('Installed python-multipart')

    
# pip install diffusers["torch"]

# # sb3_diffusion :
# try:
#     import sb3_diffusion                      
#     print('sb3_diffusion (stabl diffsuin 3): already installed')
# except ImportError:
#   !python -m pip install -q sb3_diffusion
#   print('Installed sb3_diffusion(stabl diffsuin 3')
    
# webrtcvad :
try:
    import webrtcvad                     
    print('webrtcvad: already installed')
except ImportError:
  !python -m pip install -q webrtcvad
  print('Installed webrtcvad')
    
# redis :
try:
    import redis                      
    print('redis: already installed')
except ImportError:
  !python -m pip install -q redis
  print('Installed redis')
    
# aioredis :
try:
    import aioredis                      
    print('aioredis: already installed')
except ImportError:
  !python -m pip install -q aioredis
  print('Installed aioredis')
    
# python-socketio[client] :
try:
    import socketio                      
    print('python-socketio[client]: already installed')
except ImportError:
  !python -m pip install -q python-socketio[client]
  print('Installed socketio')
    
# aiortc :
try:
    import aiortc                      
    print('aiortc: already installed')
except ImportError:
  !python -m pip install -q aiortc
  print('Installed aiortc')
    
# av :
try:
    import av                      
    print('av: already installed')
except ImportError:
  !python -m pip install -q av
  print('Installed av')
    
# aiohttp :
try:
    import aiohttp                      
    print('aiohttp: already installed')
except ImportError:
  !python -m pip install -q aiohttp
  print('Installed aiohttp')

fastapi: already installed
uvicorn: already installed
transformers: already installed
torch: already installed
torchvision: already installed
ftfy: already installed
torchaudio: already installed




Installed python-multipart
webrtcvad: already installed
redis: already installed
aioredis: already installed
python-socketio[client]: already installed
aiortc: already installed
av: already installed
aiohttp: already installed


---
## Setup Language Trasnformer Stream API:

## Import libraries:

In [3]:
# Standard library
import base64
import json
from io import BytesIO
from typing import Optional

# Third-party libraries
import numpy as np
from PIL import Image
import uuid
import redis
import requests
import torch
import torchaudio
import torchvision.transforms as T
import torchvision.transforms.functional as TF
import webrtcvad

# FastAPI and related libraries
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
from functools import lru_cache
from pydantic import BaseModel

# Transformers, GPT-2, BERT, and DALLE-mini
from transformers import (
    GPT2LMHeadModel, GPT2Tokenizer, 
    BertTokenizer, BertForSequenceClassification,
    AutoTokenizer, AutoModelForSeq2SeqLM
)

# TODO
# # CLIP
# import clip

# # Stable Baselines 3 diffusion
# from sb3_diffusion import get_prompt_finetune_optimizer, create_diffusion_callback

# aiortc and WebRTC
from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaPlayer, MediaRecorder, MediaStreamTrack
import cv2

#  TODO
# # Load the best large-scale diffusion model and tokenizer
# default_clip_model_name = "lucidrains/big-sleep"
# clip_model = clip.load(default_clip_model_name).eval().cuda()
# clip_processor = clip.tokenize

# # Load the default CLIP model and processor
# clip_model, clip_processor = clip.load("ViT-B/32")

# Load the default GPT-2 model and tokenizer
gpt2_model_name = "gpt2-large"
gpt2_model = GPT2LMHeadModel.from_pretrained(gpt2_model_name)
gpt2_tokenizer = GPT2Tokenizer.from_pretrained(gpt2_model_name)

# TODO
# # Load the default text-to-image model
# diffuse_model_name = "sb3-diffusion/512x512_diffusion_unconditional_imagenet_8s_256it.pt"
# diffuse_model = Diffusion(prompt_size=256, image_size=512, diffusion_steps=1000, denoise_scale=0.1)
# 
# # Load ControlNet model
# controlnet_model = torch.hub.load('facebookresearch/pytorch_GAN_zoo:hub', 'PGAN', model_name='celebAHQ-512',
#                        pretrained=True, useGPU=torch.cuda.is_available())
# 

# Load the default text-to-speech model
def load_tts_model(model_name):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    tokenizer = transformers.AutoTokenizer.from_pretrained(model_name)
    model = transformers.AutoModelWithLMHead.from_pretrained(model_name).to(device)
    return tokenizer, model

default_tts_model_name = "tts_models/tts_model_mellotron_ljspeech.pt"
default_tts_model_name = "ttskit/gpt2-ljspeech-melgan"
tts_tokenizer, tts_model = load_tts_model(default_tts_model_name)
tts_vocoder = torchaudio.models.WaveGlow(n_mel_channels=80, n_flows=12, n_group=8, n_early_every=4, n_early_size=2, WN_config={'n_layers': 8, 'n_channels': 256}).cuda()

# Set device for vocoder
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
tts_vocoder = tts_vocoder.to(device)

# FastAPI app
app = FastAPI()

# Redis configuration
REDIS_URL = "redis://localhost:6379"
redis_client = redis.Redis.from_url(REDIS_URL)
redis_pubsub = redis_client.pubsub()

# Socket.IO client
sio = socketio.AsyncClient()

# # Text-to-image input model
# class TextToImageInput(BaseModel):
#     image: Optional[UploadFile] = None
#     text: Optional[str] = None
#     model_name: str = diffuse_model
#     temperature: float = 0.9
#     top_p: float = 0.99
#     max_length: int = 256
#     seed: Optional[int] = None

# # Image-to-text input model
# class ImageToTextInput(BaseModel):
#     image_url: str
#     text: str
#     model_name: str = clip_model.name_or_path

# Text generation input model
class TextGenerationInput(BaseModel):
    text: str
    model_name: str = gpt2_model_name
    
# Speech-to-text input model
class SpeechToTextInput(BaseModel):
    audio_file: UploadFile
    vad_aggressiveness: int = 3
    sample_rate: int = 16000

# Speech-to-speech input model
class SpeechToSpeechInput(BaseModel):
    audio_file: UploadFile
    source_language: str
    target_language: str
    
# Text-to-speech input model
class TextToSpeechInput(BaseModel):
    text: str
    voice_name: str = "en-US-Wavenet-D"

# Initialize VAD (voice activity detection)
vad = webrtcvad.Vad()

# Constants for audio processing
SILENCE_CHUNK_DURATION_MS = 500
MAX_AUDIO_DURATION_S = 30


OSError: ttskit/gpt2-ljspeech-melgan is not a local folder and is not a valid model identifier listed on 'https://huggingface.co/models'
If this is a private repository, make sure to pass a token having permission to this repo with `use_auth_token` or log in with `huggingface-cli login` and pass `use_auth_token=True`.

## 1 . Language Processing

## FastAPI endpoint for text-to-text generation

In [4]:
@app.post("/text-to-text")
async def text_to_text(text_generation_input: TextGenerationInput):
    # Load the specified model or use the default one
    if text_generation_input.model_name != gpt2_model_name:
        model = GPT2LMHeadModel.from_pretrained(text_generation_input.model_name)
        tokenizer = GPT2Tokenizer.from_pretrained(text_generation_input.model_name)
    else:
        model = gpt2_model
        tokenizer = gpt2_tokenizer

    # Generate text using the specified model and settings
    prompt = text_generation_input.text
    input_ids = tokenizer.encode(prompt, return_tensors='pt')
    output = model.generate(input_ids=input_ids.cuda(),
                            max_length=256,
                            do_sample=True,
                            temperature=0.7)
    generated_text = tokenizer.decode(output[0], skip_special_tokens=True)

    # Generate unique ID for the event and save the results to Redis
    event_id = str(uuid.uuid4())
    redis_client.set(event_id, json.dumps({"text": prompt, "generated_text": generated_text}))

    # Publish the event using Redis Pub/Sub
    redis_client.publish("text-to-text_events", event_id)

    return {"event_id": event_id}


NameError: name 'app' is not defined

## FastAPI endpoint for text-to-image generation

In [None]:
# FastAPI endpoint for text-to-image generation
@app.post("/text-to-image")
async def text_to_image(text_to_image_input: TextToImageInput):
    # Generate a unique ID for the event
    event_id = str(uuid.uuid4())

    # If an image was uploaded, use it, otherwise use the provided text
    if text_to_image_input.image is not None:
        # Load the image into memory
        image_bytes = await text_to_image_input.image.read()
        image = Image.open(BytesIO(image_bytes)).convert("RGB")
        image_tensor = T.ToTensor()(image).unsqueeze(0).cuda()

        # Use the text-to-image model to generate an image from the provided text
        if text_to_image_input.text is not None:
            prompt = f"{text_to_image_input.text} | image prompt"
        else:
            prompt = "image prompt"
        
        # Generate the image using the specified model and settings
        output = model.generate(
            **tokenizer(prompt, return_tensors="pt").to(image_tensor.device),
            do_sample=True,
            temperature=text_to_image_input.temperature,
            top_p=text_to_image_input.top_p,
            max_length=text_to_image_input.max_length,
            seed=text_to_image_input.seed,
        )
        generated_image = T.ToPILImage()(output[0].cpu().clamp(0, 1))
    else:
        # Generate an image from the provided text
        prompt = text_to_image_input.text
        
        # Generate the image using the specified model and settings
        output = model.generate(
            **tokenizer(prompt, return_tensors="pt").to(model.device),
            do_sample=True,
            temperature=text_to_image_input.temperature,
            top_p=text_to_image_input.top_p,
            max_length=text_to_image_input.max_length,
            seed=text_to_image_input.seed,
        )
        generated_image = T.ToPILImage()(output[0].cpu().clamp(0, 1))

    # Save the generated image and text to Redis
    if text_to_image_input.text is not None:
        redis_client.set(event_id, json.dumps({"image": generated_image, "text": text_to_image_input.text}))
    else:
        redis_client.set(event_id, json.dumps({"image": generated_image}))

    # Publish the event using Redis Pub/Sub
    redis_client.publish("text-to-image_events", event_id)

    return {"event_id": event_id}


## 2. Image Processing


## FastAPI endpoint for image-to-text processing using CLIP


In [4]:
@app.post("/image-to-text")
    async def image_to_text(image_to_text_input: ImageToTextInput):
    # Load the specified model or use the default one
    if image_to_text_input.model_name != default_clip_model_name:
        clip_model = clip.load(image_to_text_input.model_name).eval().cuda()
    else:
        clip_model = clip.load(default_clip_model_name).eval().cuda()

    # Download the image
    response = requests.get(image_to_text_input.image_url)
    image = Image.open(BytesIO(response.content)).convert("RGB")

    # Process the image using the CLIP model
    input_image = T.ToTensor()(image).unsqueeze(0).cuda()
    input_text = clip_processor(image_to_text_input.text).unsqueeze(0).cuda()
    with torch.no_grad():
        logits_per_image, logits_per_text = clip_model(input_image, input_text)
        probs = logits_per_image.softmax(dim=-1).cpu().numpy()

    # Generate unique ID for the event and save the results to Redis
    event_id = str(uuid.uuid4())
    redis_client.set(event_id, json.dumps({"image_url": image_to_text_input.image_url, "text": str(probs[0])}))

    # Publish the event using Redis Pub/Sub
    redis_client.publish("image-to-text_events", event_id)

    return {"event_id": event_id}


IndentationError: unexpected indent (4262625757.py, line 2)

## 3. Stable Diffusion

---
## FastAPI endpoint for text-to-image generation using diffusion models

In [5]:
# FastAPI endpoint for text-to-image generation using diffusion models
@app.post("/text-to-image-diffuse")
async def text_to_image_diffuse(text_to_image_input: TextToImageInput):
    # Load the specified model or use the default one
    if text_to_image_input.model_name != diffuse_model_name:
        diffuse_model = Diffusion(prompt_size=256, image_size=512, diffusion_steps=text_to_image_input.diffusion_steps, denoise_scale=text_to_image_input.denoise_scale)
    else:
        diffuse_model = Diffusion(prompt_size=256, image_size=512, diffusion_steps=1000, denoise_scale=0.1)

    # Generate unique ID for the event
    event_id = str(uuid.uuid4())

    if text_to_image_input.text is not None:
        # Generate an image from the provided text
        prompt = text_to_image_input.text

        # Generate the image using the specified model and settings
        output = diffuse_model.sample(
            text=prompt,
            clip_model=clip_model,
            diffusion_steps=text_to_image_input.diffusion_steps,
            temperature=text_to_image_input.temperature,
            clip_guidance_scale=text_to_image_input.clip_guidance_scale,
            tv_scale=text_to_image_input.tv_scale,
            range_scale=text_to_image_input.range_scale,
        )
        generated_image = Image.fromarray((255 * output.permute(0, 2, 3, 1).cpu().numpy()).astype(np.uint8)[0])

        # Save the results to Redis
        image_bytes = BytesIO()
        generated_image.save(image_bytes, format="PNG")
        redis_client.set(event_id, json.dumps({"text": prompt, "image": base64.b64encode(image_bytes.getvalue()).decode("utf-8")}))
    else:
        # Generate text using the specified model and settings
        prompt = text_to_image_input.image_caption
        if prompt is None:
            prompt = "An image generated using text-to-image processing."

        # Generate the image using the specified model and settings
        output = diffuse_model.sample_text(prompt, text_len=text_to_image_input.text_length)
        generated_text = output[0]["text"]

        # Save the results to Redis
        redis_client.set(event_id, json.dumps({"image_caption": prompt, "generated_text": generated_text}))

    # Publish the event using Redis Pub/Sub
    redis_client.publish("text-to-image-diffuse_events", event_id)

    return {"event_id": event_id}


NameError: name 'app' is not defined

---
## FastAPI endpoint for image-to-image generation using controlnet models

In [6]:
# FastAPI endpoint for image manipulation using ControlNet
@app.post("/image-manipulation")
async def image_manipulation(image_manipulation_input: ImageManipulationInput):
    # Load the specified model or use the default one
    if image_manipulation_input.model_name != default_controlnet_model_name:
        controlnet_model, controlnet_tokenizer = load_controlnet_model(image_manipulation_input.model_name)
    else:
        controlnet_model, controlnet_tokenizer = load_controlnet_model(default_controlnet_model_name)

    # Download the image
    response = requests.get(image_manipulation_input.image_url)
    image = Image.open(BytesIO(response.content)).convert("RGB")

    # Load the image into memory and encode it as base64
    image_bytes = BytesIO()
    image.save(image_bytes, format="JPEG")
    image_data = base64.b64encode(image_bytes.getvalue()).decode("utf-8")

    # Generate a list of attribute-value pairs from the provided text
    attribute_values = []
    for attribute_value in image_manipulation_input.attribute_values:
        attribute, value = attribute_value.split(":")
        attribute_values.append((attribute.strip(), value.strip()))

    # Generate a new image using ControlNet
    output_image = generate_image_from_attributes(
        controlnet_model,
        controlnet_tokenizer,
        attribute_values,
        source_image=image,
        max_num_iterations=image_manipulation_input.max_num_iterations,
        save_interval=image_manipulation_input.save_interval,
        save_dir="./generated_images",
    )

    # Load the generated image into memory and encode it as base64
    output_bytes = BytesIO()
    output_image.save(output_bytes, format="JPEG")
    output_data = base64.b64encode(output_bytes.getvalue()).decode("utf-8")

    # Generate unique ID for the event and save the results to Redis
    event_id = str(uuid.uuid4())
    redis_client.set(
        event_id,
        json.dumps(
            {
                "image_url": image_manipulation_input.image_url,
                "attribute_values": image_manipulation_input.attribute_values,
                "output_image": output_data,
                "input_image": image_data,
            }
        ),
    )

    # Publish the event using Redis Pub/Sub
    redis_client.publish("image-manipulation_events", event_id)

    return {"event_id": event_id}


NameError: name 'app' is not defined

## 4. Audio Synthesis

## FastAPI endpoint for text-to-speech conversion

In [7]:
# FastAPI endpoint for text-to-speech conversion
@app.post("/text-to-speech")
async def text_to_speech(text_to_speech_input: TextToSpeechInput):
    # Load the specified model or use the default one
    if text_to_speech_input.model_name != default_tts_model_name:
        tts_model, tts_parser = load_tts_model(text_to_speech_input.model_name)
    else:
        tts_model, tts_parser = load_tts_model(default_tts_model_name)

    # Synthesize speech from the provided text
    with torch.no_grad():
        # Get the phonemes from the text using the text-to-phoneme model
        phonemes = g2p(text_to_speech_input.text)

        # Convert the phonemes to a tensor
        input_ids = torch.LongTensor(tts_parser.text_to_sequence(phonemes)).unsqueeze(0).cuda()

        # Synthesize speech using the text-to-speech model
        audio = tts_model.inference(input_ids)

    # Generate unique ID for the event and save the results to Redis
    event_id = str(uuid.uuid4())
    audio_bytes = audio.cpu().numpy().tobytes()
    redis_client.set(event_id, json.dumps({"text": text_to_speech_input.text, "audio": base64.b64encode(audio_bytes).decode("utf-8")}))
    
    # Publish the event using Redis Pub/Sub
    redis_client.publish("text-to-speech_events", event_id)

    return {"event_id": event_id}


NameError: name 'app' is not defined

## FastAPI endpoint for speech-to-text conversion

In [9]:
# FastAPI endpoint for speech-to-text conversion
@app.post("/speech-to-text")
async def speech_to_text(audio_file: UploadFile, vad_aggressiveness: int = 3, sample_rate: int = 16000):
    # Read the uploaded audio file into memory
    audio_bytes = await audio_file.read()
    audio_io = BytesIO(audio_bytes)

    # Open the audio stream using PyTorch
    with torch.no_grad():
        # Load the audio waveform from the stream
        waveform, sr = torchaudio.load(audio_io)

        # Resample the waveform if necessary
        if sr != sample_rate:
            resampler = torchaudio.transforms.Resample(sr, sample_rate)
            waveform = resampler(waveform)
        
        # Convert the waveform to mono
        waveform = torch.mean(waveform, dim=0, keepdim=True)

        # Convert the waveform to the desired format for VAD
        vad_input = (waveform * 32768).squeeze().numpy().astype(np.int16)

        # Run VAD on the audio to detect speech segments
        vad.set_mode(vad_aggressiveness)
        frames = np.array_split(vad_input, len(vad_input) // (SILENCE_CHUNK_DURATION_MS * sample_rate // 1000))
        speech_segments = []
        for i, frame in enumerate(frames):
            is_speech = vad.is_speech(frame.tobytes(), sample_rate)
            if is_speech:
                speech_segments.append(i)

        # If no speech was detected, return an empty string
        if not speech_segments:
            return {"text": ""}

        # Extract the speech segments from the audio
        speech_audio = torch.cat([waveform[:, segment * (SILENCE_CHUNK_DURATION_MS * sample_rate // 1000):(segment + 1) * (SILENCE_CHUNK_DURATION_MS * sample_rate // 1000)] for segment in speech_segments], dim=-1)

        # Transcribe the speech to text using the specified model
        transcribed_text = model(speech_audio, sample_rate=sample_rate)

    # Generate a unique ID for the event and save the results to Redis
    event_id = str(uuid.uuid4())
    redis_client.set(event_id, json.dumps({"audio": base64.b64encode(audio_bytes).decode("utf-8"), "text": transcribed_text}))

    # Publish the event using Redis Pub/Sub
    redis_client.publish("speech-to-text_events", event_id)

    return {"event_id": event_id, "text": transcribed_text}


Form data requires "python-multipart" to be installed. 
You can install "python-multipart" with: 

pip install python-multipart



RuntimeError: Form data requires "python-multipart" to be installed. 
You can install "python-multipart" with: 

pip install python-multipart


## FastAPI endpoint for speech-to-speech conversion

In [None]:

# FastAPI endpoint for speech-to-speech conversion
@app.post("/speech-to-speech")
async def speech_to_speech(speech_to_speech_input: SpeechToSpeechInput):
    # Load the specified models or use the default ones
    if speech_to_speech_input.source_language != "en":
        stt_model, stt_parser = load_stt_model(speech_to_speech_input.source_language)
    else:
        stt_model, stt_parser = load_stt_model(default_stt_model_name)

    if speech_to_speech_input.target_language != "en":
        tts_model, tts_parser = load_tts_model(speech_to_speech_input.target_language)
    else:
        tts_model, tts_parser = load_tts_model(default_tts_model_name)

    # Transcribe the audio from the source language
    audio_bytes = await speech_to_speech_input.audio_file.read()
    audio_tensor, _ = torchaudio.load(BytesIO(audio_bytes))
    if audio_tensor.size(0) > 1:
        audio_tensor = audio_tensor.mean(dim=0, keepdim=True)

    # Resample the audio if necessary
    if audio_tensor.shape[1] != stt_parser.sample_rate:
        resampler = torchaudio.transforms.Resample(audio_tensor.shape[1], stt_parser.sample_rate)
        audio_tensor = resampler(audio_tensor)

    # Detect speech segments
    speech_segments = detect_speech(audio_tensor, stt_parser.sample_rate)

    # Transcribe the speech segments
    transcriptions = []
    for segment in speech_segments:
        segment_tensor = audio_tensor[:, segment[0]:segment[1]]
        with torch.no_grad():
            input_signal = stt_parser.preprocess(segment_tensor)
            input_signal = input_signal.cuda()
            _, predicted = stt_model(input_signal, input_signal.new_zeros([input_signal.shape[0], 1], dtype=torch.long))
            transcription = stt_parser.decode(predicted[0])
            transcriptions.append(transcription)

    # Translate the transcriptions to the target language
    translations = []
    for transcription in transcriptions:
        translation = translator.translate(transcription, src=speech_to_speech_input.source_language, dest=speech_to_speech_input.target_language)
        translations.append(translation.text)

    # Synthesize speech in the target language
    with torch.no_grad():
        # Get the phonemes from the translated text using the text-to-phoneme model
        phonemes = g2p(translations)

        # Convert the phonemes to a tensor
        input_ids = torch.LongTensor(tts_parser.text_to_sequence(phonemes)).unsqueeze(0).cuda()

        # Synthesize speech using the text-to-speech model
        audio = tts_model.inference(input_ids)

    # Generate unique ID for the event and save the results to Redis
    event_id = str(uuid.uuid4())
    audio_bytes = audio.cpu().numpy().tobytes()
    redis_client.set(event_id, json.dumps({"audio": base64.b64encode(audio_bytes).decode("utf-8")}))
    redis_client.publish("speech-to-speech_events", event_id)

    return {"event_id": event_id}


## 5. Match Moving

---
## Run the FastAPI application in the Jupyter Notebook:

> Note that the pyngrok package is optional and only needed if you want to expose your API to the internet using ngrok. To install the pyngrok package, run:

In [None]:
#pip install pyngrok
# pyngrok :
try:
    import pyngrok                      
    print('pyngrok: already installed')
except ImportError:
  !python -m pip install -q pyngrok
  print('Installed pyngrok')

In [None]:
import nest_asyncio
from pyngrok import ngrok
import uvicorn

nest_asyncio.apply()

# Set up ngrok for external access (optional)
ngrok_tunnel = ngrok.connect(8000)
print("Public URL:", ngrok_tunnel.public_url)

# Run the FastAPI app
uvicorn.run(app, host="0.0.0.0", port=8000)