Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to stream a large JSON response? #1424

Closed
fabienbellanger opened this issue Sep 4, 2020 · 9 comments
Closed

How to stream a large JSON response? #1424

fabienbellanger opened this issue Sep 4, 2020 · 9 comments
Labels
question A question (converts to discussion)

Comments

@fabienbellanger
Copy link

Hello,

Rocket version : 0.4.5

I'm new with Rust and Rocket, so sorry if my question sounds stupid.

I'm used to stream large JSON with Golang and Echo like this:

package main

import (
	"net/http"
	
	"github.com/labstack/echo/v4"
)

func main() {
	e := echo.New()
	e.GET("/", func(c echo.Context) error {
        resp := c.Response()
        resp.WriteHeader(http.StatusOK)

        if _, err := io.WriteString(resp, "["); err != nil {
            return err
        }
        enc := json.NewEncoder(resp)
        for i := 0; i < 100000; i++ {
            if i > 0 {
                if _, err := io.WriteString(resp, ","); err != nil {
                    return err
                }
            }
            user := User{
                Username:  "My username_" + strconv.Itoa(i),
                Password:  "gjgjghjggjhg7884erhkhjb;lljjkbhjvftxe!hjjèkhkljkbhft",
                Lastname:  "A looooong lastname",
                Firstname: "A looooong firstname",
            }
            if err := enc.Encode(user); err != nil {
                return err
            }
        }
        if _, err := io.WriteString(resp, "]"); err != nil {
            return err
        }

        return nil
    })
	e.Logger.Fatal(e.Start(":1323"))
}

I read the streaming section in Rocket documentation, but i definitely do not see how to do with a JSON.

How can i do this with Rocket?

Thanks

@jebrosen
Copy link
Collaborator

jebrosen commented Sep 6, 2020

Unfortunately there isn't an easy one-to-one translation of this code, because at the end of the day Rocket wants the data body to implement Read (see e.g. streamed_body). You'll have to implement Responder to set one of these bodies, and there are a few different kinds of choices:

  • Implement Read as a state machine manually. Self-contained, but usually more difficult to implement and maintain.
  • Create an in-memory pipe or channel, move the write end to a thread (or thread pool) and write to it in that new thread, then respond with the read end of the channel. One downside here is needing another thread, but the code should be easier to understand than the state machine (and more closely resembles the code you posted).
  • Use a crate that implements Read, possibly based on one of these approaches. I haven't used such a crate before, so I won't make any specific recommendations. If a good one is out there, this would be my preferred choice for an application.

Here is some (untested!) partial code as an example of the state machine approach.

struct UsersStream {
    state: State,
    users: Vec<User>,
    pos: 0,
    pending: Cursor<Vec<u8>>,
}

enum State { Header, Users, Trailer, Done }

impl Read for UsersStream {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        loop {
            // first, try to read any unfinished data from the buffer
            match self.pending.read(buf) {
                // end of buffer; need to get more data
                Ok(0) => (),
                Ok(n) => return Ok(n),
                Err(e) => return Err(e),
            };

            // determine the next data to read
            match self.state {
                State::Header => {
                    self.pending = Cursor::new(vec![b'[']);
                    self.state = State::Users;
                }
                State::Users => {
                    // encode the next user
                    match self.users.get(self.pos) {
                        Some(user) => {
                            let mut bytes = vec![b','];
                            encode_user_to_bytes(user, &mut bytes);
                            self.pos += 1;
                            self.pending = Cursor::new(bytes);
                        }
                        None => self.state = State::Trailer,
                    }
                }
                State::Trailer => {
                    self.pending = Cursor::new(vec![b']']);
                    self.state = State::Done;
                }
                State::Done => return Ok(0),
            }
        }
    }
}

@jebrosen jebrosen added the question A question (converts to discussion) label Sep 6, 2020
@fabienbellanger
Copy link
Author

Hi @jebrosen,

Thanks for your response.
I try your solution but i do not know how implement Responder trait for UserStream.

use rocket::http::ContentType;
use rocket::request::Request;
use rocket::response::{self, Responder, Response};
use serde::Serialize;
use std::io::{Cursor, Read};

#[derive(Serialize, Debug)]
pub struct User {
    pub id: u32,
    pub lastname: String,
    pub firstname: String,
}

#[derive(Debug)]
pub struct UsersStream {
    pub state: State,
    pub users: Vec<User>,
    pub pos: usize,
    pub pending: Cursor<Vec<u8>>,
}

#[derive(Debug)]
pub enum State {
    Header,
    Users,
    Trailer,
    Done,
}

impl Read for UsersStream {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        loop {
            // first, try to read any unfinished data from the buffer
            match self.pending.read(buf) {
                // end of buffer; need to get more data
                Ok(0) => (),
                Ok(n) => return Ok(n),
                Err(e) => return Err(e),
            };

            // determine the next data to read
            match self.state {
                State::Header => {
                    self.pending = Cursor::new(vec![b'[']);
                    self.state = State::Users;
                }
                State::Users => {
                    // encode the next user
                    match self.users.get(self.pos) {
                        Some(user) => {
                            let mut bytes = vec![b','];
                            bytes.append(&mut serde_json::to_vec(user).unwrap());
                            self.pos += 1;
                            self.pending = Cursor::new(bytes);
                        }
                        None => self.state = State::Trailer,
                    }
                }
                State::Trailer => {
                    self.pending = Cursor::new(vec![b']']);
                    self.state = State::Done;
                }
                State::Done => return Ok(0),
            }
        }
    }
}

impl<'r> Responder<'r> for UsersStream {
    fn respond_to(self, _: &Request) -> response::Result<'r> {
        Response::build()
            .sized_body(self.pending)
            .header(ContentType::new("application", "json"))
            .ok()
    }
}

#[get("/big-json-stream")]
pub fn big_json_stream() -> Result<UsersStream, ()> {
    let mut v: Vec<User> = Vec::new();
    for i in 0..100_000 {
        v.push(User {
            id: i,
            lastname: "My lastname".to_owned(),
            firstname: String::from("My firstname"),
        });
    }
    Ok(UsersStream {
        state: State::Header,
        users: v,
        pos: 0,
        pending: Cursor::new(vec![]),
    })
}

Sorry, i'm still newby with rust 😞

@jebrosen
Copy link
Collaborator

jebrosen commented Sep 9, 2020

This looks almost right, except that you want .sized_body(self) -- self is a UsersStream, which is the type that implements Read.

@fabienbellanger
Copy link
Author

Hi @jebrosen,

I tried this:

impl<S: Seek + Write> Seek for BufStream<S> {
    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
        self.inner.seek(pos)
    }
}

But i had this error:

.sized_body(self)
            ^^^^ the trait `std::io::Seek` is not implemented for `handlers::UsersStream`

So i implemented the sdt::io::Seek trait like this:

impl Seek for UsersStream {
    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
        self.pending.seek(pos)
    }
}

It compiles but curl http://localhost:8000/big-json-stream returns nothing 😞

Here all the code:

use serde::Serialize;
use std::io::{Cursor, Read, Seek, SeekFrom};
use serde_json;
use rocket::request::Request;
use rocket::response::{self, Response, Responder};
use rocket::http::ContentType;

#[derive(Serialize, Debug)]
pub struct User {
    pub id: u32,
    pub lastname: String,
    pub firstname: String,
}

#[derive(Debug)]
pub struct UsersStream {
    pub state: State,
    pub users: Vec<User>,
    pub pos: usize,
    pub pending: Cursor<Vec<u8>>,
}

#[derive(Debug)]
pub enum State { Header, Users, Trailer, Done }

impl Read for UsersStream {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        loop {
            // first, try to read any unfinished data from the buffer
            match self.pending.read(buf) {
                // end of buffer; need to get more data
                Ok(0) => (),
                Ok(n) => return Ok(n),
                Err(e) => return Err(e),
            };

            // determine the next data to read
            match self.state {
                State::Header => {
                    self.pending = Cursor::new(vec![b'[']);
                    self.state = State::Users;
                }
                State::Users => {
                    // encode the next user
                    match self.users.get(self.pos) {
                        Some(user) => {
                            let mut bytes = vec![b','];
                            bytes.append(&mut serde_json::to_vec(user).unwrap());
                            self.pos += 1;
                            self.pending = Cursor::new(bytes);
                        }
                        None => self.state = State::Trailer,
                    }
                }
                State::Trailer => {
                    self.pending = Cursor::new(vec![b']']);
                    self.state = State::Done;
                }
                State::Done => return Ok(0),
            }
        }
    }
}

impl<'r> Responder<'r> for UsersStream {
    fn respond_to(self, _: &Request) -> response::Result<'r> {
        Response::build()
            .sized_body(self)
            .header(ContentType::new("application", "json"))
            .ok()
    }
}

impl Seek for UsersStream {
    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
        self.pending.seek(pos)
    }
}

#[get("/big-json-stream")]
pub fn big_json_stream() -> Result<UsersStream, ()> {
    let mut v: Vec<User> = Vec::new();
    for i in 0..100_000 {
        v.push(User {
            id: i,
            lastname: "My lastname".to_owned(),
            firstname: String::from("My firstname"),
        });
    }
    Ok(UsersStream{
        state: State::Header,
        users: v,
        pos: 0,
        pending: Cursor::new(vec![]),
    })
}

@jebrosen
Copy link
Collaborator

jebrosen commented Sep 9, 2020

Whoops. You can't use sized_body for this, since you don't know the size of the body up front. I should have suggested .streamed_body(self), and you also don't need that Seek impl.

@fabienbellanger
Copy link
Author

@jebrosen, thank you so much for your help, it works perfectly 🎉

Here the final code:

use serde::Serialize;
use std::io::{Cursor, Read, Seek, SeekFrom};
use serde_json;
use rocket::request::Request;
use rocket::response::{self, Response, Responder};
use rocket::http::ContentType;

#[derive(Serialize, Debug)]
pub struct User {
    pub id: u32,
    pub lastname: String,
    pub firstname: String,
}

#[derive(Debug)]
pub struct UsersStream {
    pub state: State,
    pub users: Vec<User>,
    pub pos: usize,
    pub pending: Cursor<Vec<u8>>,
}

#[derive(Debug)]
pub enum State { Header, Users, Trailer, Done }

impl Read for UsersStream {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        loop {
            // first, try to read any unfinished data from the buffer
            match self.pending.read(buf) {
                // end of buffer; need to get more data
                Ok(0) => (),
                Ok(n) => return Ok(n),
                Err(e) => return Err(e),
            };

            // determine the next data to read
            match self.state {
                State::Header => {
                    self.pending = Cursor::new(vec![b'[']);
                    self.state = State::Users;
                }
                State::Users => {
                    // encode the next user
                    match self.users.get(self.pos) {
                        Some(user) => {
                            let mut bytes = vec![b','];
                            bytes.append(&mut serde_json::to_vec(user).unwrap());
                            self.pos += 1;
                            self.pending = Cursor::new(bytes);
                        }
                        None => self.state = State::Trailer,
                    }
                }
                State::Trailer => {
                    self.pending = Cursor::new(vec![b']']);
                    self.state = State::Done;
                }
                State::Done => return Ok(0),
            }
        }
    }
}

impl<'r> Responder<'r> for UsersStream {
    fn respond_to(self, _: &Request) -> response::Result<'r> {
        Response::build()
            .streamed_body(self)
            .header(ContentType::new("application", "json"))
            .ok()
    }
}

#[get("/big-json-stream")]
pub fn big_json_stream() -> Result<UsersStream, ()> {
    let mut v: Vec<User> = Vec::new();
    for i in 0..100_000 {
        v.push(User {
            id: i,
            lastname: "My lastname".to_owned(),
            firstname: String::from("My firstname"),
        });
    }
    Ok(UsersStream{
        state: State::Header,
        users: v,
        pos: 0,
        pending: Cursor::new(vec![]),
    })
}

Can i create a PR to add this case in the examples folder?

@jebrosen
Copy link
Collaborator

The code would need some changes to work with async, in particular AsyncRead instead of Read - which might make it a little bit more tricky because of pinning and Poll. I have a long backlog so it might be a bit before I get back to this issue, but I did write something similar for Server-Sent Events that wraps futures::stream::Stream (https://git.jebrosen.com/jeb/rocket-rooms/src/commit/731bca81912176e96b6976eb7a8de579c7b99c51/src/sse.rs) and might be helpful as a reference.

@fabienbellanger
Copy link
Author

OK :)
AsyncRead will come with Rocket 0.5?
I will try to adapt this code to work with AsynRead.
May i close the issue?

@SergioBenitez
Copy link
Member

@fabienbellanger It's already in master. Let's keep this use-case in mind when resolving #33.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question A question (converts to discussion)
Projects
None yet
Development

No branches or pull requests

3 participants