# [M_05] PROTOCOL: CONDUCTOR (FASTAPI & AGENTIC WORKERS)

**PROJECT:** OMNI-OPERATOR-V1  
**ENGINE:** FASTAPI + ASYNCIO  
**STATUS:** PROCESS_ORCHESTRATION

In this module, we are building the system's central point of contact. We are transforming the logic from previous stages into an asynchronous API. This will allow video materials to be submitted to a "queue" and work progress to be tracked in real-time.

**Why is this crucial for the Hackathon?**
1. **Asynchronicity:** AI processes take a long time. The API cannot "hang." We use the `BackgroundTasks` pattern.
2. **Scalability:** We are preparing the system to handle multiple operators simultaneously.
3. **API Interface:** The foundation for the dashboard and future integrations (e.g., Telegram Bot).

In [1]:
import os
import sys
import uuid
import asyncio
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from typing import Dict, List, Optional
from datetime import datetime

# 1. WORKING DIRECTORY CORRECTION
if os.getcwd().endswith("notebooks"):
    os.chdir("..")

sys.path.append(os.path.join(os.getcwd(), "src"))

print(f"LOG: API environment ready. ROOT directory: {os.getcwd()}")

LOG: API environment ready. ROOT directory: c:\Users\takze\OneDrive\Pulpit\project\omni-operator-v1


## 1. Data Models and State Database

We define what a "Job" looks like in our system. We use a simple `jobs_db` dictionary to simulate the state database (in a production system, we will replace this with Postgres from M_00).

In [2]:
class JobStatus(BaseModel):
    """Model representing the status of a production task."""
    job_id: str
    status: str # PENDING, ANALYZING, WRITING, RENDERING, COMPLETED, FAILED
    video_path: str
    created_at: str
    updated_at: str
    result: Optional[dict] = None
    error: Optional[str] = None

# Simulated in-memory database
jobs_db: Dict[str, JobStatus] = {}

app = FastAPI(title="OMNI-OPERATOR-V1 API")

@app.get("/")
def health_check():
    return {
        "status": "online", 
        "engine": "Gemini 3 Flash", 
        "memory": "Qdrant Active"
    }

## 2. Orchestrator (Workflow Worker)

This is the heart of the system. The `run_omni_workflow` function will guide the video file through all stages: from analysis, through strategy, all the way to final editing.

In [3]:
async def run_omni_workflow(job_id: str, video_path: str):
    """
    Main asynchronous workflow.
    Connects modules M_01, M_02, M_03, and M_04.
    """
    try:
        # STEP 1: ANALYSIS
        jobs_db[job_id].status = "ANALYZING"
        jobs_db[job_id].updated_at = datetime.now().isoformat()
        print(f"LOG [{job_id}]: Gemini 3 Flash is analyzing the video...")
        await asyncio.sleep(3) # Simulating AI analysis time
        
        # STEP 2: STRATEGY AND COPYWRITING
        jobs_db[job_id].status = "WRITING_STRATEGY"
        print(f"LOG [{job_id}]: Copywriter Agent is preparing posts...")
        await asyncio.sleep(2) # Simulating content generation
        
        # STEP 3: FFMPEG EDITING
        jobs_db[job_id].status = "RENDERING"
        print(f"LOG [{job_id}]: FFmpeg is cutting Shorts...")
        await asyncio.sleep(3) # Simulating rendering
        
        # STEP 4: FINALIZATION AND SAVING TO MEMORY
        jobs_db[job_id].status = "COMPLETED"
        jobs_db[job_id].updated_at = datetime.now().isoformat()
        jobs_db[job_id].result = {
            "shorts_count": 3,
            "output_dir": "output/",
            "strategy_id": "VEC-123"
        }
        print(f"‚úÖ LOG [{job_id}]: Production completed successfully.")

    except Exception as e:
        jobs_db[job_id].status = "FAILED"
        jobs_db[job_id].error = str(e)
        print(f"‚ùå LOG [{job_id}]: Critical error: {str(e)}")

## 3. API Endpoint Definitions

We are creating entry points that allow for interaction with the system.

In [4]:
@app.post("/process", response_model=JobStatus)
async def start_processing(video_path: str, background_tasks: BackgroundTasks):
    """Submits video for processing."""
    if not os.path.exists(video_path):
        raise HTTPException(status_code=404, detail="Video file not found.")
        
    job_id = str(uuid.uuid4())[:8]
    now = datetime.now().isoformat()
    
    job = JobStatus(
        job_id=job_id,
        status="PENDING",
        video_path=video_path,
        created_at=now,
        updated_at=now
    )
    
    jobs_db[job_id] = job
    
    # Starting the background worker without blocking the API
    background_tasks.add_task(run_omni_workflow, job_id, video_path)
    
    return job

@app.get("/status/{job_id}", response_model=JobStatus)
async def get_job_status(job_id: str):
    """Returns the current status of the task."""
    if job_id not in jobs_db:
        raise HTTPException(status_code=404, detail="No job found with this ID.")
    return jobs_db[job_id]

## 4. Server Test (Live Preview)

We are starting the server in the background of the notebook to test the API "live".

In [5]:
import uvicorn
import threading

def run_server():
    # Running on port 8000
    uvicorn.run(app, host="127.0.0.1", port=8000, log_level="error")

# Preventing the server from starting multiple times in one session
if "server_started" not in globals():
    thread = threading.Thread(target=run_server, daemon=True)
    thread.start()
    server_started = True
    print("üöÄ API SERVER STARTED AT http://127.0.0.1:8000")
else:
    print("‚ÑπÔ∏è Server is already running.")

print("Tip: Open http://127.0.0.1:8000/docs in your browser to test the API.")

üöÄ API SERVER STARTED AT http://127.0.0.1:8000
Tip: Open http://127.0.0.1:8000/docs in your browser to test the API.


## STATUS: MODULE 05 COMPLETED

You now have a professional API that acts as the Conductor of your factory.

**Achievements:**
1. Implementation of asynchronous background tasks (`BackgroundTasks`).
2. Production status tracking mechanism.
3. Preparation for integration with a web panel or bot.