Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how to integrate with diesel? example? #42

Closed
adals opened this issue Aug 9, 2018 · 23 comments
Closed

how to integrate with diesel? example? #42

adals opened this issue Aug 9, 2018 · 23 comments

Comments

@adals
Copy link

adals commented Aug 9, 2018

above

thanks

@seanmonstar
Copy link
Owner

I've haven't used diesel yet, so if anyone else wants to help show how to use it here, that'd be awesome!

@dd10-e
Copy link

dd10-e commented Aug 9, 2018

I used Diesel very naively (it doesn't support connection pooling! I need to investigate) but it works.

Just write an etablish_connection function

use diesel::pg::PgConnection;
use diesel::prelude::*;
use dotenv::dotenv;
use std::env;

pub fn establish_connection() -> PgConnection {
    dotenv().ok();

    let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
    PgConnection::establish(&database_url).expect(&format!("Error connecting to {}", database_url))
}

And used it like this for example:

pub fn retrieve_one_user(id_param: i32) -> Result<impl warp::Reply, warp::Rejection> {
    use schema::users::dsl::*;

    let connection = establish_connection();
    let results = users
        .select((id, email, username, created_at, updated_at))
        .find(id_param)
        .load::<PublicUserData>(&connection)
        .optional()
        .expect("Error loading user");

    match results {
        Some(json) => {
            if json.is_empty() == true {
                return Err(warp::reject::not_found());
            }
            Ok(warp::reply::json(&json))
        }
        None => Err(warp::reject::server_error()),
    }
}

@adals
Copy link
Author

adals commented Aug 9, 2018

thanks
Appreciated

@seanmonstar
Copy link
Owner

I believe in that case that uses a blocking connection to load from the database, so you'd probably want to move that to a separate thread (or consider wrapping it in tokio_threadpool::blocking).

@pepsighan
Copy link

pepsighan commented Aug 11, 2018

You may create a db pool, and get each connection from the pool per request like so:

use warp::{self, reject, Filter};
use diesel::pg::PgConnection;
use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};

type PgPool = Pool<ConnectionManager<PgConnection>>;
type PooledPg = PooledConnection<ConnectionManager<PgConnection>>;

fn pg_pool() -> PgPool {
    let manager = ConnectionManager::<PgConnection>::new("postgres://username:password@localhost/db_name");
    Pool::new(manager).expect("Postgres connection pool could not be created")
}

fn main() {
    let pool = pg_pool();

    // setup the the connection pool to get a connection on each request
    let pg = warp::any()
        .map(move || pool.clone())
        .and_then(|pool: PgPool| match pool.get() {
            Ok(conn) => Ok(conn),
            Err(_) => Err(reject::server_error()),
        });

    let hello_world = warp::get(
        warp::index()
            // use the pg connection on the route
            .and(pg)
            .map(|db: PooledPg| {
              // use the conn

              "Hello World!"
            }),
    );

    warp::serve(hello_world).run(([127, 0, 0, 1], 3001));
}

@adals
Copy link
Author

adals commented Aug 11, 2018

thank you all

I think its good way to put an example for diesel inside example folder

@andywwright
Copy link

unfortunately, the example of @csharad does not work for me :(

warp::serve(hello_world)
^^^^^^^^^^^ the trait `warp::reply::sealed::ReplySealed` is not implemented for `()`

@pepsighan
Copy link

pepsighan commented Aug 20, 2018

@andy-rust You are copying the code as is. Replace // use the conn part with your route logic.

I'll update the example with a dummy response.

@andywwright
Copy link

@csharad - thank you very much for the prompt answer! Now in the retrospective, I have to admit the question was stupid, sorry about that.

The only thing it had been swearing at me was "warning: use of deprecated item 'warp::get': warp::get2() is meant to replace get()"

With your permission I'd put your solution with the quick fix for anyone to just grab and run:

use warp::{Filter, reject};
use diesel::pg::PgConnection;
use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};

type PgPool = Pool<ConnectionManager<PgConnection>>;
type PooledPg = PooledConnection<ConnectionManager<PgConnection>>;

fn pg_pool() -> PgPool {
    let manager = ConnectionManager::<PgConnection>::new("postgres://username:password@localhost/db_name");
    Pool::new(manager).expect("Postgres connection pool could not be created")
}

fn main() {
    let pool = pg_pool();

    let pg = warp::any()
        .map(move || pool.clone())
        .and_then(|pool: PgPool| match pool.get() {
            Ok(conn) => Ok(conn),
            Err(_) => Err(reject::server_error()),
        });

    let index_from_db = warp::path::index()
        .and(pg)
        .map(|db: PooledPg| {
            "Get the data from DB"
        });

    let routes = warp::get2()
    	.and(index_from_db);

    warp::serve(routes)
    	.run(([0, 0, 0, 0], 3030));
}

@stefanoc
Copy link

@andy-rust what if you need to share the pool between multiple routes?

@stefanoc
Copy link

Sorry I meant to reply to @csharad :-)

@pepsighan
Copy link

pepsighan commented Aug 25, 2018

@stefanoc Use the pg filter in other routes like:

let index_route = warp::path::index()
        .and(pg.clone())
        .map(|db: PooledPg| {
            "Get the data from DB"
        });

let login_route = path!("login")
        .and(pg)
        .map(|db: PooledPg| {
            "Get the data from DB"
        });

@algermissen
Copy link
Contributor

From a quick glance on this code, I would argue that it is still running on the thread that warp is running on and since Diesel uses blocking operations, it will block warp's thread.

If I am correctly reading this, then all Diesel operations still must be moved to it's own Tokio thread pool as @seanmonstar mentioned above.

If I am wrong about the Diesel internals, please ignore my comment.

@pepsighan
Copy link

The diesel operations work on its own thread pool, if r2d2 is used as far as I know. But still it is a blocking operation and does not interact with futures.

@stefanoc
Copy link

@csharad you're moving pg in the first route, so you can't use it in the second route.

@pepsighan
Copy link

@stefanoc Ah! yes. It clonable not copyable. Will fix that.

@goriunov
Copy link

goriunov commented Nov 10, 2018

@csharad, @stefanoc, @Elod10, @seanmonstar, @carllerche
I have slightly modified the code of @andy-rust to show how to use it with tokio_threadpool::blocking

extern crate tokio_threadpool;

use diesel::pg::PgConnection;
use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
use warp::{reject, Filter};

type PgPool = Pool<ConnectionManager<PgConnection>>;
type PooledPg = PooledConnection<ConnectionManager<PgConnection>>;

fn pg_pool() -> PgPool {
  let manager =
    ConnectionManager::<PgConnection>::new("postgres://username:password@localhost/db_name");
  Pool::new(manager).expect("Postgres connection pool could not be created")
}

fn main() {
  let pool = pg_pool();

  let pg = warp::any()
    .map(move || pool.clone())
    .and_then(|pool: PgPool| match pool.get() {
      Ok(conn) => Ok(conn),
      Err(_) => Err(reject::server_error()),
    });

  let index_from_db = warp::path::index().and(pg).map(|db: PooledPg| {
    tokio_threadpool::blocking(|| {
      diesel::insert_into(posts::table)
        .values(&NewPost {
          title: String::from("Hello world"),
          body: String::from("Hope it works"),
        })
        .execute(&db)
        .unwrap();
    })
    .and_then(|_| Ok("Set data in DB"))
    .unwrap()
  });

  let routes = warp::get2().and(index_from_db);

  warp::serve(routes).run(([0, 0, 0, 0], 3030));

But tokio_threadpool::blocking or diesel or tokio has some weird issue (at least on Arch linux) if i benchmark the code above i will get:

wrk -t4 -c1000 -d10s --latency http://127.0.0.1:3000
Running 10s test @ http://127.0.0.1:3000
  4 threads and 1000 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   404.88ms  266.96ms   2.00s    70.74%
    Req/Sec   542.41     98.39   828.00     72.25%
  Latency Distribution
     50%  426.95ms
     75%  559.05ms
     90%  722.69ms
     99%    1.21s 
  21618 requests in 10.06s, 1.98MB read
  Socket errors: connect 0, read 0, write 0, timeout 39
Requests/sec:   2148.20
Transfer/sec:    201.39KB

Not bad, but if we add println!("Something") inside or outside of tokio_threadpool::blocking block in the code i get benchmark with much lower latency and better Requests/sec:

wrk -t4 -c1000 -d10s --latency http://127.0.0.1:3000
Running 10s test @ http://127.0.0.1:3000
  4 threads and 1000 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   268.47ms  175.47ms   2.00s    75.71%
    Req/Sec   820.25    217.47     1.33k    62.50%
  Latency Distribution
     50%  252.55ms
     75%  363.05ms
     90%  457.58ms
     99%  733.74ms
  32689 requests in 10.07s, 2.99MB read
  Socket errors: connect 0, read 0, write 0, timeout 40
Requests/sec:   3247.02
Transfer/sec:    304.41KB

@ubnt-intrepid
Copy link

ubnt-intrepid commented Nov 10, 2018

@goriunov The blocking API alone does not work as asynchronous computation as expected. You need to construct an instance of Future by using poll_fn as follows:

let db_filter = warp::path::index().and(pg).and_then(|db: PooledPg| {
     futures::future::poll_fn(move || {
          let result = futures::try_ready!(tokio_threadpool::blocking(|| { /* do some stuff */ }));
          result.map(Async::Ready).map_err(internal_server_error)
     })
})
.and_then(|_| Ok("Set data in DB"));

@ubnt-intrepid
Copy link

ubnt-intrepid commented Nov 10, 2018

@seanmonstar Finchers provides a helper function for building a Future from a function that may block the current thread. It seems to be good to introduce such a helper function also in warp, in order to avoid confusion like this thread.

@goriunov
Copy link

goriunov commented Nov 10, 2018

@ubnt-intrepid Thank you for fixing my code, i am trying to implement it and it looks like that right now.
I have got stuck with result.map(Async::Ready).map_err(e) handling error part abit.

 let index_from_db = warp::path::index()
    .and(pg)
    .and_then(|db: PooledPg| {
      futures::future::poll_fn(move || {
        let result = futures::try_ready!(tokio_threadpool::blocking(|| {
          let data = diesel::insert_into(posts::table)
            .values(&NewPost {
              title: String::from("Hello world"),
              body: String::from("Hope it works"),
            })
            .execute(&db);

          Ok(Async::Ready)
        }));
         // need some help to handle error always get compile error :(
        result.map(Async::Ready).map_err(e)
      })
    })
    .and_then(|_| Ok("Set data in DB"));

I can se u use internal_server_error would you mind to show how to implement that part to, Thank you very much :)

@ThePianoDentist
Copy link

ThePianoDentist commented Mar 30, 2020

Been trying this with warp 2.0

In examples above generating a connection just requires small modification of adding async move inside and_then

    let pg_conn = warp::any().map(move || pool.clone()).and_then(|pool: PgPool| async move{ match pool.get(){
        Ok(conn) => Ok(conn),
        Err(_) => Err(reject::custom(PgPoolError)),
    }});

will edit with any other alterations

@dakom
Copy link

dakom commented May 25, 2020

@ThePianoDentist - thanks for the update :)

If I understand correctly - this will still hold the DB connection across all the await points in the pipeline? e.g. if the next part is an async handler that, as part of its processing, sends off other reqwests or does async-style pausing stuff, the db connection will not be dropped and returned to the pool until the entire pipeline completes?

Of course that might be desirable - having an abstract way to get a connection and use it for a request's lifetime is probably the right thing to do most of the time - but just worth noting if the warp handler itself expects to only need the db part of the time, might be better to pass the PgPool clone all the way down and then only get() as needed within locally scoped blocks?

I'm not entirely sure I have a clear picture of how all this works - so consider this more a question for clarification than anything else :) Thanks!

@ThePianoDentist
Copy link

@dakom apologies. Im in same boat as you.

Using this to test/learn async rust. Nothing you've written sounds incorrect to me, but I'm also noob!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests