In [None]:
pip install -r requirements.txt

In [2]:
'''
from fastapi import FastAPI, File, UploadFile, Query, HTTPException, Request, Form
from fastapi.responses import JSONResponse, FileResponse
from fastapi.encoders import jsonable_encoder
import shutil
from typing import List
import subprocess
from osllm import OpenSourceLLM
import nest_asyncio
import uvicorn
from pydantic import BaseModel

app = FastAPI()

llm = OpenSourceLLM()
@app.get("/")
def read_root():
    return {"message": "Estoy vivo!"}

class Modelo(BaseModel):
    nombre: str
    descripcion: str

class OpcionesModelo(BaseModel):
    modelos: List[Modelo]

@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile = File(...),speakers: str = Query(..., title="Speaker Name", description="Name of the speaker"),
                             language: str = Query(..., title="Language", description="Language for speech-to-text")):
    try:
        if file.filename.endswith('.mp3'):
            with open(f"/workspace/{file.filename}", "wb") as mp3_file:
                shutil.copyfileobj(file.file, mp3_file)
            #subprocess.run(["python","audio2text.py",f"uploaded_files/{file.filename}",speakers,language])
            return JSONResponse(content=jsonable_encoder({"message": f"File uploaded successfully, speakers= {speakers}, language={language}"}), status_code=200)
        else:
            return JSONResponse(content=jsonable_encoder({"error": "Only MP3 files are allowed"}), status_code=400)
    except Exception as e:
        return JSONResponse(content=jsonable_encoder({"error": str(e)}), status_code=500)

@app.get("/speech-to-text/")
async def speech_to_text(speakers: str = Query(..., title="Speaker Name", description="Name of the speaker"),
                         language: str = Query(..., title="Language", description="Language for speech-to-text")):
    response_content = {
        "message": f"speech-to-text conversion for {speakers} in {language} language."
    }
    return response_content

@app.get("/get-transcript/")
async def get_text_file(name_file: str = Query(..., title="Nombre del archivo", description="Nombre del archivo")):
    file_path = f"/workspace/transcript.txt"
    transcription_processor.run_transcription('/workspace/' + name_file)
    return FileResponse(file_path, filename=f"transcript.txt", media_type="text/plain")

@app.get("/modelos", response_model=OpcionesModelo)
async def obtener_modelos():
    opciones = []
    for nombre, descripcion in OpenSourceLLM.OPCIONES_MODELOS.items():
        opciones.append(Modelo(nombre=nombre, descripcion=descripcion))
    return OpcionesModelo(modelos=opciones)

@app.post("/generar_texto", response_model=str)
async def generar_texto(request: Request, form_data: Form(...)):
    modelo_elegido = form_data.modelo
    texto = form_data.texto

    llm = OpenSourceLLM(modelo_elegido)
    respuesta = llm.text2text(texto)

    return respuesta

nest_asyncio.apply()

config = uvicorn.Config(app=app, host="127.0.0.1", port=8000, loop='asyncio')
server = uvicorn.Server(config)
await server.serve()
'''

'\nfrom fastapi import FastAPI, File, UploadFile, Query, HTTPException, Request, Form\nfrom fastapi.responses import JSONResponse, FileResponse\nfrom fastapi.encoders import jsonable_encoder\nimport shutil\nfrom typing import List\nimport subprocess\nfrom osllm import OpenSourceLLM\nimport nest_asyncio\nimport uvicorn\nfrom pydantic import BaseModel\n\napp = FastAPI()\n\nllm = OpenSourceLLM()\n@app.get("/")\ndef read_root():\n    return {"message": "Estoy vivo!"}\n\nclass Modelo(BaseModel):\n    nombre: str\n    descripcion: str\n\nclass OpcionesModelo(BaseModel):\n    modelos: List[Modelo]\n\n@app.post("/uploadfile/")\nasync def create_upload_file(file: UploadFile = File(...),speakers: str = Query(..., title="Speaker Name", description="Name of the speaker"),\n                             language: str = Query(..., title="Language", description="Language for speech-to-text")):\n    try:\n        if file.filename.endswith(\'.mp3\'):\n            with open(f"/workspace/{file.filename}",

In [3]:
from fastapi import FastAPI, File, UploadFile, Query
from fastapi.responses import JSONResponse, FileResponse
from fastapi.encoders import jsonable_encoder
import shutil
from typing import List
import subprocess
from osllm import OpenSourceLLM
import nest_asyncio
import asyncio
import uvicorn

app = FastAPI()

llm = OpenSourceLLM()
@app.get("/")
def read_root():
    return {"message": 'MODELOS_CARGADOS = ["llama2": "Llama2 (70B)","mixtral": "Mixtral (35B)","bakllava": "Bakllava (15B)"]'}

@app.post("/download_model/")
async def download_model(model: str = Query(..., title="Elección de modelo", description="Elección de modelo a descargar desde el repositorio de Ollama")):
    try:
        subprocess.run(['ollama', 'pull', model])
        return JSONResponse(content=jsonable_encoder({"message": f" Descargado el modelo {model} exitosamente"}), status_code=200)
    except Exception as e:
        return JSONResponse(content=jsonable_encoder({"error": str(e)}), status_code=500)

@app.get("/txt2txt/")
async def txt2txt(model: str = Query(..., title="Elección de modelo", description="Elección de modelo con opciones en: llama2, mixtral"),
                  prompt: str = Query(..., title="Prompt", description="Instrucción en la consulta al LLM")):
    try:
        res = llm.text2text(prompt)
        return JSONResponse(content=jsonable_encoder({"message": f"{res}"}), status_code=200)
    except Exception as e:
        return JSONResponse(content=jsonable_encoder({"error": str(e)}), status_code=500)

@app.get("/speech-to-text/")
async def speech_to_text(speakers: str = Query(..., title="Speaker Name", description="Name of the speaker"),
                         language: str = Query(..., title="Language", description="Language for speech-to-text")):
    response_content = {
        "message": f"speech-to-text conversion for {speakers} in {language} language."
    }
    return response_content

@app.get("/get-transcript/")
async def get_text_file(name_file: str = Query(..., title="Nombre del archivo", description="Nombre del archivo")):
    file_path = f"/workspace/transcript.txt"
    transcription_processor.run_transcription('/workspace/' + name_file)
    return FileResponse(file_path, filename=f"transcript.txt", media_type="text/plain")

@app.get("/image-to-text/")
async def image_to_text(filename: str = Query(..., title="Nombre del archivo de imagen", description="Nombre del archivo de imagen"),
                          prompt: str = Query(..., title="Image Prompt", description="Description or prompt for the image")):
    """Procesa una imagen y genera texto utilizando un modelo LLM."""

    self.rm_old_files()  # Clean up old files before processing

    file_path = f"img/{filename}"
    try:
        pil_image = Image.open(file_path)
        image_b64 = self.convert_to_base64(pil_image)
        generated_text = self.img2text(filename, prompt)

        response_content = {
            "message": "Image-to-text generation successful",
            "generated_text": generated_text
        }
        return response_content

    except Exception as e:
        return {"message": f"Error al procesar la imagen: {e}"}

nest_asyncio.apply()

config = uvicorn.Config(app=app, host="127.0.0.1", port=8000, loop='asyncio')
server = uvicorn.Server(config)
await server.serve()

INFO:     Started server process [21076]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


INFO:     127.0.0.1:62398 - "GET / HTTP/1.1" 200 OK
INFO:     127.0.0.1:62403 - "GET /docs HTTP/1.1" 200 OK
INFO:     127.0.0.1:62403 - "GET /openapi.json HTTP/1.1" 200 OK
