Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REF-723+] Upload with progress and cancellation #1899

Merged
merged 24 commits into from
Nov 16, 2023

Conversation

masenf
Copy link
Collaborator

@masenf masenf commented Oct 1, 2023

Address #1896
REF-723: Progress Display for rx.upload
REF-1037: In prod mode, upload sometimes cannot send delta
REF-611: files: List[rx.UploadFile] does not work with from __future__ import annotations

Improvements

  • Pass on_upload_progress kwarg to rx.upload_files and receive a callback as the file is uploading
  • rx.upload (and related functions) now accept an id or upload_id kwarg, which allows multiple upload forms to co-exist on the same page and be serviced by different handlers
  • rx.cancel_upload(id) allows for in-progress uploads to be cancelled by an event trigger (like on_click or yield from event handler).
  • Upload handler returns deltas and events via REST channel, so it works with prod mode and with multiple worker processes. (Uses chunked HTTP so that long running backend processing can still update the frontend)
  • Use file.path instead of file.name to support directory uploads
  • Support new style list[rx.UploadFile] annotation in upload handler
  • Background tasks cannot be upload handler, so explicitly raise an error instead of silently throwing away the deltas.
  • Upload passes token and handler via headers instead of munging them into the filename.
  • Uploading files never blocks the websocket event channel on the frontend. (Processing the uploaded files on the backed does block the event channel by virtue of locking the state)

Sample Code

from __future__ import annotations
import asyncio
from typing import Dict, List

import reflex as rx
from reflex.components.forms.upload import (
    cancel_upload,
    selected_files,
    clear_selected_files,
)


class UploadState(rx.State):
    _file_data: Dict[str, str] = {}
    upload_progress: Dict[str, int] = {}
    upload_size: Dict[str, int] = {}
    progress_text: Dict[str, str] = {}

    async def handle_upload(self, fx: list[rx.UploadFile]):
        for file in fx:
            upload_data = await file.read()
            self._file_data[file.filename or ""] = upload_data
            yield rx.console_log(f"Uploaded {file.filename}")
            await asyncio.sleep(1)

    def upprog(self, upload_id: str, prog: dict):
        print(upload_id, prog)
        self.upload_progress[upload_id] = prog["loaded"]
        self.upload_size[upload_id] = prog["total"]
        self.progress_text[upload_id] = f"{round(prog.get('progress', 0) * 100)}%"

    def upprog1(self, prog: dict):
        self._upprog("upload1", prog)

    def upprog2(self, prog: dict):
        self._upprog("upload2", prog)

    @rx.var
    def uploaded_files(self) -> List[str]:
        return list(self._file_data)


def upload_form(upload_id):
    return rx.vstack(
        rx.upload(
            rx.vstack(
                rx.button("Select File"),
                rx.text("Drag and drop files here or click to select files"),
            ),
            id=upload_id,
            directory=False,
        ),
        rx.button(
            "Upload",
            on_click=UploadState.handle_upload(  # type: ignore
                rx.upload_files(
                    upload_id=upload_id,
                    on_upload_progress=lambda prog: UploadState.upprog(upload_id, prog),
                )
            ),
        ),
        rx.box(
            rx.foreach(
                selected_files(upload_id),
                rx.text,
            ),
        ),
        rx.text(f"Selected Files: {selected_files(upload_id).length()}"),
        rx.button(
            "Clear",
            on_click=clear_selected_files(upload_id),
        ),
        rx.text(
            "Upload Progress:",
            rx.cond(
                UploadState.progress_text[upload_id],
                UploadState.progress_text[upload_id],
                "0%",
            ),
        ),
        rx.progress(
            value=rx.cond(
                UploadState.upload_progress[upload_id],
                UploadState.upload_progress[upload_id],
                0,
            ),
            min_=0,
            max_=rx.cond(
                UploadState.upload_size[upload_id],
                UploadState.upload_size[upload_id],
                100,
            ),
            width="100%",
        ),
        rx.button("Cancel", on_click=cancel_upload(upload_id)),
        width="50%",
    )


def index():
    return rx.vstack(
        rx.input(value=UploadState.router.session.client_token, is_read_only=True, id="token"),
        rx.hstack(
            upload_form("upload1"),
            upload_form("upload2"),
        ),
        rx.vstack(
            rx.foreach(
                UploadState.uploaded_files,
                lambda f: rx.text(f),
            ),
        ),
    )


app = rx.App(state=UploadState)
app.add_page(index)
app.compile()

☝️ Separate, independently operating upload forms!!

@masenf masenf marked this pull request as draft October 2, 2023 17:20
Process delta and queued events returned in response to an upload REST event.

Allows upload to reliably return deltas in prod mode, with redis.

Fix: REF-1037
The upload handler may yield multiple deltas and events, and these should not
wait until the entire upload handler completes, since it may take some time to
send the uploaded files to disk/s3 and it would be nice to send updates to the
frontend.

Since axios does not support streaming output natively in the browser, we reuse
the onDownloadProgress functionality to get access to the partial response,
split the JSON structures, and hand them off to the websocket.on("event")
handler, to be processed as-if they came from the websocket itself.

applyRestEvent no longer waits for the upload to complete, and all events from
the backend upload handler are marked as final so that upload events and
chained events do not block the websocket event queue.
(now that py37 is dropped, we can use `get_args`)
Instead of including them in the filename, pass these identifiers in the
headers, which simplifies processing.
Add a test for canceling an upload, and ensure that it reported some progress
along the way.
reflex/app.py Outdated Show resolved Hide resolved
If the handler takes the lock outside the processing function, it ends up
dropping the lock while streaming and the updates never make it into redis.
Simplify logic of existing negative test.
If the upload completes and triggers the handler before the progress event is
sent, then progress will be after the completed upload marker.
@masenf masenf marked this pull request as ready for review October 31, 2023 19:21
@masenf masenf changed the title [WiP] Upload with progress and cancellation Upload with progress and cancellation Oct 31, 2023
@masenf masenf changed the title Upload with progress and cancellation [REF-723+] Upload with progress and cancellation Oct 31, 2023
Lendemor
Lendemor previously approved these changes Nov 15, 2023
if isinstance(func, EventHandler):
if func.is_background:
raise TypeError(
f"@rx.background is not supported for upload handler `{handler}`.",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for excluding background tasks as upload handler?
Does it cause issues somewhere?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that a background task relies on websocket to send the deltas to the frontend, but the upload event comes via REST and might hit an app instance that is not managing the websocket associated with the session that made the upload. So if the upload handler is a background task, and the response is handled by a different instance the deltas wont be sent and the user will file a bug or just be confused why things work in dev mode and not work with redis.

Ultimately want to solve this with REF-1073, but it's non-trivial.

let eventSent = false;
if (event.handler == "uploadFiles") {
eventSent = await uploadFiles(event.name, event.payload.files);
// Start upload, but do not wait for it, which would block other events.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

DEFAULT_UPLOAD_ID: str = "default"


def upload_file_for(id_: str = DEFAULT_UPLOAD_ID) -> BaseVar:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just my preference - but can we remove the for and have this just be upload_file or get_upload_file. I think it matches the rest of our API better as we don't do this in other places

upload_file: BaseVar = upload_file_for()


def selected_files_for(id_: str = DEFAULT_UPLOAD_ID) -> BaseVar:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly can this become selected_files or get_selected_files

clear_selected_files: EventSpec = clear_selected_files_for()


def cancel_upload(upload_id: str) -> EventSpec:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will match this API also, since this doesn't have the for

These classes are used as decorator over a function returning a BaseVar or
EventSpec, the resulting name can be used as either

a function, in which case it passes through to the decorated function, or

as a value, in which case it will take on the apparent value of calling the
  function with no arguments.
@picklelo picklelo merged commit 7eccc6d into main Nov 16, 2023
45 checks passed
@masenf masenf deleted the masenf/on-upload-progress branch December 6, 2023 22:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants