Skip to content

Commit

Permalink
mysql support
Browse files Browse the repository at this point in the history
  • Loading branch information
sfisol committed Apr 20, 2024
1 parent 654f77f commit 4355bd9
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 41 deletions.
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ auth = ["jsonwebtoken", "quick-error", "rand", "rust-argon2"]
actix_validation = []
rs256_jwks = ["alcoholic_jwt", "awc"]
default = ["pgsql", "auth", "tracing"]
pgsql = ["diesel", "diesel-derive-newtype", "r2d2"]
multidb = ["pgsql", "weighted-rs"]
pgsql = ["diesel/postgres", "diesel-derive-newtype", "r2d2"]
multidb = ["weighted-rs"]
mysql = ["diesel/mysql", "diesel-derive-newtype", "r2d2"]
swagger = ["paperclip"]
rabbit = ["amiquip", "crossbeam-channel"]
prometheus = []
Expand All @@ -42,7 +43,7 @@ bytes = "1"
chrono = { version = "0.4", features = ["serde"] }
crossbeam-channel = { version = "0.5", optional = true }
derive_more = "0.99"
diesel = { version = "2.1", features = ["postgres", "chrono", "r2d2"], optional = true }
diesel = { version = "2.1", features = ["chrono", "r2d2"], optional = true }
diesel-derive-newtype = { version = "2.1", optional = true }
dotenv = "0.15"
futures = "0.3"
Expand Down
20 changes: 11 additions & 9 deletions src/db_pool/async_queries.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::convert;

use actix_web::web;
use diesel::{pg::PgConnection, Connection};
use diesel::Connection;

use super::DbConnection;

#[cfg(not(feature = "multidb"))]
use super::Pool;
Expand All @@ -16,7 +18,7 @@ pub use serwus_derive::Canceled;
#[cfg(not(feature = "multidb"))]
pub async fn async_query<F, I, E>(db_pool: Pool, query_func: F) -> Result<I, E>
where
F: FnOnce(&mut PgConnection) -> Result<I, E> + Send + 'static,
F: FnOnce(&mut DbConnection) -> Result<I, E> + Send + 'static,
I: Send + 'static,
E: From<actix_web::error::BlockingError> + From<r2d2::Error> + std::fmt::Debug + Send + 'static,
{
Expand All @@ -33,7 +35,7 @@ where
#[cfg(not(feature = "multidb"))]
pub async fn async_transaction<F, I, E>(db_pool: Pool, query_func: F) -> Result<I, E>
where
F: FnOnce(&mut PgConnection) -> Result<I, E> + Send + 'static,
F: FnOnce(&mut DbConnection) -> Result<I, E> + Send + 'static,
I: Send + 'static,
E: From<actix_web::error::BlockingError>
+ From<r2d2::Error>
Expand All @@ -53,11 +55,11 @@ where

/// Performs query to database in currently open transaction (as blocking task)
pub async fn async_query_in_trans<F, I, E>(
mut connection: PgConnection,
mut connection: DbConnection,
query_func: F,
) -> Result<I, E>
where
F: FnOnce(&mut PgConnection) -> Result<I, E> + Send + 'static,
F: FnOnce(&mut DbConnection) -> Result<I, E> + Send + 'static,
I: Send + 'static,
E: From<actix_web::error::BlockingError> + From<r2d2::Error> + std::fmt::Debug + Send + 'static,
{
Expand All @@ -75,7 +77,7 @@ where
/// Use async_read_transaction to perform query in real read-only mode.
pub async fn async_read_query<F, I, E>(db_pool: MultiPool, query_func: F) -> Result<I, E>
where
F: FnOnce(&mut PgConnection) -> Result<I, E> + Send + 'static,
F: FnOnce(&mut DbConnection) -> Result<I, E> + Send + 'static,
I: Send + 'static,
E: From<actix_web::error::BlockingError> + From<r2d2::Error> + std::fmt::Debug + Send + 'static,
{
Expand All @@ -92,7 +94,7 @@ where
#[cfg(feature = "multidb")]
pub async fn async_write_query<F, I, E>(db_pool: MultiPool, query_func: F) -> Result<I, E>
where
F: FnOnce(&mut PgConnection) -> Result<I, E> + Send + 'static,
F: FnOnce(&mut DbConnection) -> Result<I, E> + Send + 'static,
I: Send + 'static,
E: From<actix_web::error::BlockingError> + From<r2d2::Error> + std::fmt::Debug + Send + 'static,
{
Expand All @@ -109,7 +111,7 @@ where
#[cfg(feature = "multidb")]
pub async fn async_read_transaction<F, I, E>(db_pool: MultiPool, query_func: F) -> Result<I, E>
where
F: FnOnce(&mut PgConnection) -> Result<I, E> + Send + 'static,
F: FnOnce(&mut DbConnection) -> Result<I, E> + Send + 'static,
I: Send + 'static,
E: From<actix_web::error::BlockingError>
+ From<r2d2::Error>
Expand All @@ -136,7 +138,7 @@ where
#[cfg(feature = "multidb")]
pub async fn async_write_transaction<F, I, E>(db_pool: MultiPool, query_func: F) -> Result<I, E>
where
F: FnOnce(&mut PgConnection) -> Result<I, E> + Send + 'static,
F: FnOnce(&mut DbConnection) -> Result<I, E> + Send + 'static,
I: Send + 'static,
E: From<actix_web::error::BlockingError>
+ From<r2d2::Error>
Expand Down
20 changes: 20 additions & 0 deletions src/db_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,23 @@ pub mod multi;

mod async_queries;
pub use async_queries::*;

#[cfg(feature = "pgsql")]
use diesel::pg::PgConnection;
#[cfg(all(feature = "mysql", not(feature = "pgsql")))]
use diesel::mysql::MysqlConnection;

#[cfg(feature = "pgsql")]
pub type DbConnection = PgConnection;
#[cfg(all(feature = "mysql", not(feature = "pgsql")))]
pub type DbConnection = MysqlConnection;

#[cfg(feature = "pgsql")]
use diesel::pg::Pg;
#[cfg(all(feature = "mysql", not(feature = "pgsql")))]
use diesel::mysql::Mysql;

#[cfg(feature = "pgsql")]
pub type Db = Pg;
#[cfg(all(feature = "mysql", not(feature = "pgsql")))]
pub type Db = Mysql;
7 changes: 4 additions & 3 deletions src/db_pool/single.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use diesel::pg::PgConnection;
use diesel::r2d2::ConnectionManager;
use std::env;
use log::{error, info};

use crate::threads::num_threads;

pub type Pool = diesel::r2d2::Pool<ConnectionManager<PgConnection>>;
use super::DbConnection;

pub type Pool = diesel::r2d2::Pool<ConnectionManager<DbConnection>>;

/// Init pool of N connections to single database, where N is number of threads (but not less than 2).
///
Expand All @@ -23,7 +24,7 @@ pub fn init_default_pool() -> Result<Pool, r2d2::Error> {
pub fn init_pool(size: usize) -> Result<Pool, r2d2::Error> {
info!("Connecting to database");

let manager = ConnectionManager::<PgConnection>::new(default_database_url());
let manager = ConnectionManager::<DbConnection>::new(default_database_url());

let max_size = if env::var("TEST").is_ok() && size > 2 {
2
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub mod containers;
pub mod utils;

pub mod auth;
#[cfg(feature = "pgsql")]
#[cfg(any(feature = "pgsql", feature = "mysql"))]
pub mod db_pool;

pub mod server;
Expand All @@ -54,14 +54,14 @@ pub mod return_logged;

pub mod threads;

#[cfg(feature = "pgsql")]
#[cfg(any(feature = "pgsql", feature = "mysql"))]
pub mod pagination;

pub mod logger;

/// Re-export of `web` from `actix-web` or from `paperclip` if swagger feature enabled.
#[cfg(not(feature = "swagger"))]
pub use actix_web::web;
/// Re-export of `web` from `actix-web` or from `paperclip` if swagger feature enabled.
#[cfg(feature = "swagger")]
pub use paperclip::actix::web;

Expand Down
17 changes: 10 additions & 7 deletions src/pagination.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use diesel::{
pg::Pg, prelude::*, query_builder::*, query_dsl::methods::LoadQuery, sql_types::BigInt,
prelude::*, query_builder::*, query_dsl::methods::LoadQuery, sql_types::BigInt,
};

use crate::containers::ListResponse;
use crate::db_pool::{Db, DbConnection};



pub trait Paginate: Sized {
fn paginate(self, page: i64) -> Paginated<Self>;
Expand Down Expand Up @@ -41,10 +44,10 @@ impl<T> Paginated<T> {

pub fn load_and_count_pages<'a, U>(
self,
conn: &mut PgConnection,
conn: &mut DbConnection,
) -> QueryResult<ListResponse<U>>
where
Self: LoadQuery<'a, PgConnection, (U, i64)>,
Self: LoadQuery<'a, DbConnection, (U, i64)>,
{
let per_page = self.per_page;
let page = self.page;
Expand All @@ -69,13 +72,13 @@ impl<T: Query> Query for Paginated<T> {
type SqlType = (T::SqlType, BigInt);
}

impl<T> RunQueryDsl<PgConnection> for Paginated<T> {}
impl<T> RunQueryDsl<DbConnection> for Paginated<T> {}

impl<T> QueryFragment<Pg> for Paginated<T>
impl<T> QueryFragment<Db> for Paginated<T>
where
T: QueryFragment<Pg>,
T: QueryFragment<Db>,
{
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Db>) -> QueryResult<()> {
out.push_sql("SELECT *, COUNT(*) OVER () FROM (");
self.query.walk_ast(out.reborrow())?;
out.push_sql(") t LIMIT ");
Expand Down
24 changes: 12 additions & 12 deletions src/server/app_data.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#[cfg(feature = "pgsql")]
#[cfg(any(feature = "pgsql", feature = "mysql"))]
use log::info;

use actix_web::Error;
use futures::future::{Future, ok as fut_ok};
use serde::Serialize;
use std::pin::Pin;

#[cfg(feature = "pgsql")]
use super::db_pool;
#[cfg(any(feature = "pgsql", feature = "mysql"))]
use crate::db_pool;

#[cfg(feature = "prometheus")]
use super::prometheus::AsPrometheus;
Expand All @@ -17,48 +17,48 @@ use super::stats::StatsPresenter;
/// AppData ready to use if you need only default database connection.
#[derive(Clone)]
pub struct DefaultAppData {
#[cfg(feature = "pgsql")]
#[cfg(any(feature = "pgsql", feature = "mysql"))]
pub db_pool: db_pool::Pool,
}

#[cfg(feature = "pgsql")]
#[cfg(any(feature = "pgsql", feature = "mysql"))]
pub fn default_app_data() -> DefaultAppData {
info!("Connecting to database");
let db_pool = db_pool::init_default_pool().unwrap();

DefaultAppData { db_pool }
}

#[cfg(not(feature = "pgsql"))]
#[cfg(all(not(feature = "pgsql"), not(feature = "mysql")))]
pub fn default_app_data() -> DefaultAppData {
DefaultAppData { }
}


#[derive(Serialize)]
pub struct DefaultServiceStats {
#[cfg(feature = "pgsql")]
#[cfg(any(feature = "pgsql", feature = "mysql"))]
db_connection: bool,
}

impl StatsPresenter<DefaultServiceStats> for DefaultAppData {
fn is_ready(&self) -> Pin<Box<dyn Future<Output=Result<bool, Error>>>> {
#[cfg(feature = "pgsql")]
#[cfg(any(feature = "pgsql", feature = "mysql"))]
let res = self.db_pool.get().is_ok();

#[cfg(not(feature = "pgsql"))]
#[cfg(all(not(feature = "pgsql"), not(feature = "mysql")))]
let res = false;

Box::pin(fut_ok(res))
}

fn get_stats(&self) -> Pin<Box<dyn Future<Output=Result<DefaultServiceStats, Error>>>> {
#[cfg(feature = "pgsql")]
#[cfg(any(feature = "pgsql", feature = "mysql"))]
let db_connection = self.db_pool.get().is_ok();

let fut = fut_ok(
DefaultServiceStats {
#[cfg(feature = "pgsql")]
#[cfg(any(feature = "pgsql", feature = "mysql"))]
db_connection,
}
);
Expand All @@ -72,7 +72,7 @@ impl AsPrometheus for DefaultServiceStats {
fn as_prometheus(&self) -> Vec<String> {
#![allow(clippy::vec_init_then_push)]
let mut out = Vec::new();
#[cfg(feature = "pgsql")]
#[cfg(any(feature = "pgsql", feature = "mysql"))]
out.push(format!("db_connection {}", self.db_connection as i32));
out
}
Expand Down
4 changes: 0 additions & 4 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ use actix_web::web;
use paperclip::actix::{web, OpenApiExt};

use super::threads;

#[cfg(feature = "pgsql")]
use super::db_pool;

use super::logger;

pub use app_data::{DefaultAppData, default_app_data};
Expand Down

0 comments on commit 4355bd9

Please sign in to comment.