In [2]:
import uuid
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from enum import Enum
import asyncio
from datetime import datetime
import time

In [3]:
app = FastAPI()


Enum for priority

In [4]:
class Priority(str, Enum):
    HIGH = "HIGH"
    MEDIUM = "MEDIUM"
    LOW = "LOW"

Pydantic model for ingestion request

In [5]:
class IngestionRequest(BaseModel):
    ids: list[int]
    priority: Priority

In-memory store for ingestion and batch status

In [6]:
ingestion_store = {}
batch_queue = []
last_batch_time = 0


Mock external API response

In [7]:
async def fetch_data(id: int) -> dict:
    await asyncio.sleep(1)  # Simulate API delay
    return {"id": id, "data": "processed"}

Process a batch asynchronously and Update overall ingestion status based on batch statuses




In [15]:
async def process_batch(batch_id: str, ids: list[int], ingestion_id: str):
    ingestion_store[ingestion_id]["batches"][batch_id]["status"] = "triggered"

    for id in ids:
        await fetch_data(id)

    ingestion_store[ingestion_id]["batches"][batch_id]["status"] = "completed"

    all_completed = all(
        batch["status"] == "completed"
        for batch in ingestion_store[ingestion_id]["batches"].values()
    )
    ingestion_store[ingestion_id]["status"] = "completed" if all_completed else "triggered"




Ingestion API

In [23]:
@app.post("/ingest")
async def ingest_data(request: IngestionRequest):
    global batch_queue, last_batch_time

    # Validate IDs range
    if not all(1 <= id <= 10**9 + 7 for id in request.ids):
        raise HTTPException(status_code=400, detail="IDs must be between 1 and 10^9+7")

    # Generate unique ingestion_id
    ingestion_id = str(uuid.uuid4())

    # Split into batches of 3
    batches = [request.ids[i:i + 3] for i in range(0, len(request.ids), 3)]

    # Initialize ingestion status
    ingestion_store[ingestion_id] = {
        "status": "yet_to_start",
        "priority": request.priority,
        "created_time": datetime.now(),
        "batches": {str(uuid.uuid4()): {"ids": batch, "status": "yet_to_start"} for batch in batches}
    }

    # Enqueue batches with priority and timestamp
    for batch_id in ingestion_store[ingestion_id]["batches"]:
        batch_queue.append((ingestion_id, batch_id, request.priority, datetime.now()))

    # Sort queue by (priority, created_time) - HIGH > MEDIUM > LOW
    batch_queue.sort(key=lambda x: ({"HIGH": 0, "MEDIUM": 1, "LOW": 2}[x[2]], x[3]))

    # Process batches with rate limiting
    while batch_queue and time.time() - last_batch_time >= 5:
      ingestion_id, batch_id, _, _ = batch_queue.pop(0)
      batch_ids = ingestion_store[ingestion_id]["batches"][batch_id]["ids"]
      asyncio.create_task(process_batch(batch_id, batch_ids, ingestion_id))
      last_batch_time = time.time()
    return {"ingestion_id": ingestion_id}



Status API

In [24]:
@app.get("/status/{ingestion_id}")
async def get_status(ingestion_id: str):
    if ingestion_id not in ingestion_store:
        raise HTTPException(status_code=404, detail="Ingestion ID not found")

    return {
        "ingestion_id": ingestion_id,
        "status": ingestion_store[ingestion_id]["status"],
        "batches": [
            {"batch_id": batch_id, "ids": details["ids"], "status": details["status"]}
            for batch_id, details in ingestion_store[ingestion_id]["batches"].items()
        ]
    }


Background task to process remaining batches

In [25]:
async def process_queue():
    global batch_queue, last_batch_time
    while True:
        now = time.time()
        if batch_queue and now - last_batch_time >= 5:
            ingestion_id, batch_id, _, _ = batch_queue.pop(0)
            batch_ids = ingestion_store[ingestion_id]["batches"][batch_id]["ids"]
            asyncio.create_task(process_batch(batch_id, batch_ids, ingestion_id))
            last_batch_time = now
        await asyncio.sleep(1)



Start background task

In [26]:
@app.on_event("startup")
async def startup_event():
    asyncio.create_task(process_queue())


        on_event is deprecated, use lifespan event handlers instead.

        Read more about it in the
        [FastAPI docs for Lifespan Events](https://fastapi.tiangolo.com/advanced/events/).
        
  @app.on_event("startup")
