Skip to content

ClientDisconnected immediately after adding background task in which I access the request body as a stream #5820

@danielsteman

Description

@danielsteman

First Check

  • I added a very descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the FastAPI documentation, with the integrated search.
  • I already searched in Google "How to X in FastAPI" and didn't find any information.
  • I already read and followed all the tutorial in the docs and didn't find an answer.
  • I already checked if it is not related to FastAPI but to Pydantic.
  • I already checked if it is not related to FastAPI but to Swagger UI.
  • I already checked if it is not related to FastAPI but to ReDoc.

Commit to Help

  • I commit to help with one of those options 👆

Example Code

import aioredis
from fastapi import FastAPI, HTTPException, Request, status, BackgroundTasks
from fastapi.exceptions import HTTPException
from pydantic import BaseModel, Field
from streaming_form_data import StreamingFormDataParser
from streaming_form_data.targets import FileTarget, ValueTarget
from streaming_form_data.validators import MaxSizeValidator
from uuid import UUID, uuid4
import os


MAX_FILE_SIZE = 1024 * 1024 * 1024 * 4  # = 4GB
MAX_REQUEST_BODY_SIZE = MAX_FILE_SIZE + 1024

app = FastAPI()
redis = aioredis.from_url("redis://localhost:6379", decode_responses=True)


class UploadResult(BaseModel):
    file_size: int
    result: str


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    progress: int = 0


class MaxBodySizeException(Exception):
    def __init__(self, body_len: str):
        self.body_len = body_len


class MaxBodySizeValidator:
    def __init__(self, max_size: int):
        self.body_len = 0
        self.max_size = max_size

    def __call__(self, chunk: bytes):
        self.body_len += len(chunk)
        if self.body_len > self.max_size:
            raise MaxBodySizeException(body_len=self.body_len)


@app.get("/api/v1/jobs")
async def refresh_job_progress(job: Job) -> Job:
    if not job:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail="Could not find job object to refresh",
        )
    progress = await redis.get(str(job.uid))
    job.progress = progress
    return job


async def process_file(request: Request):
    body_validator = MaxBodySizeValidator(MAX_REQUEST_BODY_SIZE)
    filename = "test.dsv"
    new_job = Job()

    filepath = os.path.join("./", os.path.basename(filename))
    file_ = FileTarget(filepath, validator=MaxSizeValidator(MAX_FILE_SIZE))
    source = ValueTarget()
    parser = StreamingFormDataParser(headers=request.headers)
    parser.register("file", file_)
    parser.register("source", source)

    async for chunk in request.stream():
        body_validator(chunk)
        parser.data_received(chunk)
        await redis.incr(str(new_job.uid), len(chunk))


@app.post("/api/v1/upload")
async def upload(request: Request, background_tasks: BackgroundTasks):
    background_tasks.add_task(process_file, request)
    return {"task": "started"}

Description

I have an endpoint that can receive multipart/form-data requests and stream large files into a local file (FileTarget) using a Python package streaming_form_data to do it in chunks.

This works as expected when the code currently in the process_file function is not called as a background task but exists directly in the upload function. The reason why I want to outsource the streaming part to a background job is that I want to keep track of progress in a Redis db, and expose this progress with another endpoint /api/v1/jobs with function refresh_job_progress.

The problem I'm facing with this solution is that the client is disconnected immediately upon calling /api/v1/upload. The traceback shows that a ClientDisconnected exception is raised when calling stream on the fastapi.Request:

async for chunk in request.stream():
  File "venv/lib/python3.10/site-packages/starlette/requests.py", line 228, in stream
    raise ClientDisconnect()
starlette.requests.ClientDisconnect

Is it possible to maintain a connection such that I can stream the request in a background job?

Operating System

Linux

Operating System Details

Distributor ID: Ubuntu
Description: Ubuntu 22.04.1 LTS
Release: 22.04
Codename: jammy

FastAPI Version

0.88.0

Python Version

Python 3.10.6

Additional Context

No response

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions