Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom lifetimes for callback request headers and futures #30

Closed
Relrin opened this issue Dec 13, 2017 · 6 comments
Closed

Custom lifetimes for callback request headers and futures #30

Relrin opened this issue Dec 13, 2017 · 6 comments

Comments

@Relrin
Copy link

Relrin commented Dec 13, 2017

Like as mentioned in the issue about headers, I'd decided to move a part of callback code to the future and using it later. But I'm getting errors, when trying to assign a custom lifetime to a code, where client headers is going to outlive futures, so that I could use it inside of futures (like, to check a token inside of the main processing part). After the moment when the client will end sending requests (or the connecting will be closed by the server by some certain reason), header should deleted.

Code:

pub fn run<'a: 'b, 'b>(&self, address: SocketAddr) {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let socket = TcpListener::bind(&address, &handle).unwrap();
    println!("Listening on: {}", address);

    let server = socket.incoming().for_each(|(stream, addr)| {
        let engine_inner = self.engine.clone();
        let connections_inner = self.connections.clone();
        let auth_middleware_inner = self.auth_middleware.clone();
        let handle_inner = handle.clone();

        let headers: &'a Headers = &HashMap::new();
        let copy_headers_callback = |request: &'b Request| {
            // Just copy the headers as is
            for &(ref name, ref value) in request.headers.iter() {
                headers[&name.clone()] = value.clone()
            }
            Ok(None)
        };

        accept_hdr_async<'b>(stream, copy_headers_callback)
            // Process the messages
            .and_then(move |ws_stream| {
                // Create a channel for the stream, which other sockets will use to
                // send us messages. It could be used for broadcasting your data to
                // another users in the future.
                let (tx, rx) = mpsc::unbounded();
                connections_inner.borrow_mut().insert(addr, tx);

                // Split the WebSocket stream so that it will be possible to work
                // with the reading and writing halves separately.
                let (sink, stream) = ws_stream.split();

                // Check headers, if we're working in a secured mode
                let auth_future = auth_middleware_inner.borrow().process_request(headers, &handle_inner);
                ....
    });

    // Run the server
    core.run(server).unwrap();
}

Errors:

error[E0277]: the trait bound `(): futures::Future` is not satisfied
  --> src/proxy.rs:51:40
   |
51 |         let server = socket.incoming().for_each(|(stream, addr)| {
   |                                        ^^^^^^^^ the trait `futures::Future` is not implemented for `()`
   |
   = note: required because of the requirements on the impl of `futures::IntoFuture` for `()`

error[E0277]: the trait bound `(): futures::Future` is not satisfied
   --> src/proxy.rs:119:14
    |
119 |         core.run(server).unwrap();
    |              ^^^ the trait `futures::Future` is not implemented for `()`
    |
    = note: required because of the requirements on the impl of `futures::IntoFuture` for `()`
    = note: required because of the requirements on the impl of `futures::Future` for `futures::stream::ForEach<tokio_core::net::Incoming, [closure@src/proxy.rs:51:49: 116:10 self:_, handle:_], ()>`

Any ideas how it could be fixed?

@agalakhov
Copy link
Member

agalakhov commented Dec 13, 2017

You're doing the same mistake again and again. This is not related to Tungstenite. Did you read the tutorial? https://tokio.rs/docs/getting-started/futures/

Also, your reference to the HashMap is not mutable, so you can't add to it at all. Please consider reading this: https://doc.rust-lang.org/book/first-edition/ownership.html

@daniel-abramov
Copy link
Member

@Relrin why do you suspect lifetimes to be a problem in this place. According to the errors you posted in your question, rust compiler does not like the type which you passed in for_each, namely it tells you that it expects a function which in turn returns something that can be turned into a future IntoFuture.

This is the for_each function which you called: https://docs.rs/futures/0.1.17/futures/stream/trait.Stream.html#method.for_each

As you can see from the documentation for this function, it tells exactly the same the Rust compiler wrote in the error.

@Relrin
Copy link
Author

Relrin commented Dec 14, 2017

@agalakhov @application-developer-DA
I'm presuming that is the issue with lifetimes and borrowing first of all because when trying just to get a copy of WebSocket headers and using later somewhere in future, like this (at actual moment it only copying, without using somewhere later):

pub fn run(&self, address: SocketAddr) {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let socket = TcpListener::bind(&address, &handle).unwrap();
    println!("Listening on: {}", address);

    let server = socket.incoming().for_each(|(stream, addr)| {
        let engine_inner = self.engine.clone();
        let connections_inner = self.connections.clone();
        let auth_middleware_inner = self.auth_middleware.clone();
        let handle_inner = handle.clone();

        let mut headers: Headers = HashMap::new();
        let copy_headers_callback = |request: &Request| {
            for &(ref name, ref value) in request.headers.iter() {
                headers.insert(name.to_string(), value.clone());
            }
            Ok(None)
        };

        accept_hdr_async(stream, copy_headers_callback)
            // Process the messages
            .and_then(move |ws_stream| {

You will receive an error that was generated by the compiler:

error[E0597]: `headers` does not live long enough
   --> src/proxy.rs:60:21
    |
58  |             let copy_headers_callback = |request: &Request| {
    |                                         ------------------- capture occurs here
59  |                 for &(ref name, ref value) in request.headers.iter() {
60  |                     headers.insert(name.to_string(), value.clone());
    |                     ^^^^^^^ does not live long enough
...
111 |         });
    |         - borrowed value only lives until here
...
115 |     }
    |     - borrowed value needs to live until here

@agalakhov
Copy link
Member

Because it does not live long enough. Your function is asynchronous, that is, you can't tell WHEN the callback will be called. The headers variable does not live so long. It only lives until the end of the closure, but the closure returns a future, not a value! Use Rc.

@Relrin
Copy link
Author

Relrin commented Dec 15, 2017

I'd tried to prolong the life an object via using Rc<RefCell> wrapped for a HashMap, but still getting the same errors. It looks weird, but, I'm interested in allocating a memory for a headers in run-time only in the certain moment, when he's connecting and copy the headers into this "structure".

So, the compiler said "your headers should somewhere outside" of the scope of let server = ... expression. But in this case then I should make the Rc<RefCell<Headers>> as a part of a proxy, which is can be used for storing a data per each user. For an instance it could be a field of Proxy struct with Rc<RefCell<HashMap<Uuid, Headers>>>, however from the some point of view it looks like a dirty hack, isn't it? And, yeah, we still got here an issue with life of an object, but in this case it could be Uuid typo.

The current implementation looks like this:

pub type Headers = HashMap<String, Box<[u8]>>;

pub struct Proxy {
    engine: Rc<RefCell<Engine>>,
    connections: Rc<RefCell<HashMap<SocketAddr, mpsc::UnboundedSender<Message>>>>,
    auth_middleware: Rc<RefCell<Box<Middleware>>>,
}


impl Proxy {
    pub fn new(router: Box<Router>, cli: &CliOptions) -> Proxy {
        let auth_middleware: Box<Middleware> = Box::new(EmptyMiddleware::new(cli));

        Proxy {
            engine: Rc::new(RefCell::new(Engine::new(router))),
            connections: Rc::new(RefCell::new(HashMap::new())),
            auth_middleware: Rc::new(RefCell::new(auth_middleware)),
        }
    }

    pub fn run(&self, address: SocketAddr) {
        let mut core = Core::new().unwrap();
        let handle = core.handle();
        let socket = TcpListener::bind(&address, &handle).unwrap();
        println!("Listening on: {}", address);

        let server = socket.incoming().for_each(|(stream, addr)| {
            let engine_inner = self.engine.clone();
            let connections_inner = self.connections.clone();
            let auth_middleware_inner = self.auth_middleware.clone();
            let handle_inner = handle.clone();

            let mut headers: Rc<RefCell<Headers>> = Rc::new(RefCell::new(HashMap::new()));
            let copy_headers_callback = |request: &Request| {
                let headers_inner = headers.clone();
                for &(ref name, ref value) in request.headers.iter() {
                    headers_inner.borrow_mut().insert(name.to_string(), value.clone());
                }
                Ok(None)
            };

            accept_hdr_async(stream, copy_headers_callback)
                // Process the messages
                .and_then(move |ws_stream| {
                    // Create a channel for the stream, which other sockets will use to
                    // send us messages. It could be used for broadcasting your data to
                    // another users in the future.
                    let (tx, rx) = mpsc::unbounded();
                    connections_inner.borrow_mut().insert(addr, tx);

                    // Split the WebSocket stream so that it will be possible to work
                    // with the reading and writing halves separately.
                    let (sink, stream) = ws_stream.split();

                    // TODO: fire it before starting the processing messages
                    let _auth_future = auth_middleware_inner.borrow().process_request(&headers.borrow(), &handle_inner);

                    // Read and process each message
                    let connections = connections_inner.clone();
                    let ws_reader = stream.for_each(move |message: Message| {
                        engine_inner.borrow().handle(&message, &addr, &connections);
                        Ok(())
                    });

                    // Write back prepared responses
                    let ws_writer = rx.fold(sink, |mut sink, msg| {
                        sink.start_send(msg).unwrap();
                        Ok(sink)
                    });

                    // Wait for either half to be done to tear down the other
                    let connection = ws_reader.map(|_| ()).map_err(|_| ())
                                              .select(ws_writer.map(|_| ()).map_err(|_| ()));

                    // Close the connection after using
                    handle_inner.spawn(connection.then(move |_| {
                        connections_inner.borrow_mut().remove(&addr);
                        println!("Connection {} closed.", addr);
                        Ok(())
                    }));

                    Ok(())
                })
                // An error occurred during the WebSocket handshake
                .or_else(|err| {
                    println!("{}", err.description());
                    Ok(())
                })
        });

        // Run the server
        core.run(server).unwrap();
    }
}

A generated error is still the same:

error[E0597]: `headers` does not live long enough
   --> src/proxy.rs:59:37
    |
58  |             let copy_headers_callback = |request: &Request| {
    |                                         ------------------- capture occurs here
59  |                 let headers_inner = headers.clone();
    |                                     ^^^^^^^ does not live long enough
...
112 |         });
    |         - borrowed value only lives until here
...
116 |     }
    |     - borrowed value needs to live until here

@agalakhov
Copy link
Member

agalakhov commented Dec 15, 2017

Because you did not prolong anything. This time you have a short-living reference to Rc. You forgot to make a copy of Rc.

This is usually done so:

let obj = Rc::new(...);
let my_function = {
   let obj_copy = obj.clone();
   move |foo| { /* do something with  obj_copy */ }
};

Please read thr Rust lifetimes tutorial and understand the difference between cloning a Rc<T> and copying a &Rc<T>.

@Relrin Relrin closed this as completed Dec 16, 2017
gregates pushed a commit to gregates/tokio-tungstenite that referenced this issue Feb 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants