Skip to content

Commit

Permalink
Merge pull request #751 from tursodatabase/ns-path
Browse files Browse the repository at this point in the history
Add path based routing
  • Loading branch information
MarinPostma committed Dec 18, 2023
2 parents e8500de + 5773ed8 commit a321b86
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 1 deletion.
3 changes: 3 additions & 0 deletions libsql-server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub enum Error {
Anyhow(#[from] anyhow::Error),
#[error("Invalid host header: `{0}`")]
InvalidHost(String),
#[error("Invalid path in URI: `{0}`")]
InvalidPath(String),
#[error("Namespace `{0}` doesn't exist")]
NamespaceDoesntExist(String),
#[error("Namespace `{0}` already exists")]
Expand Down Expand Up @@ -134,6 +136,7 @@ impl IntoResponse for Error {
TooManyRequests => self.format_err(StatusCode::TOO_MANY_REQUESTS),
QueryError(_) => self.format_err(StatusCode::BAD_REQUEST),
InvalidHost(_) => self.format_err(StatusCode::BAD_REQUEST),
InvalidPath(_) => self.format_err(StatusCode::BAD_REQUEST),
NamespaceDoesntExist(_) => self.format_err(StatusCode::BAD_REQUEST),
PrimaryConnectionTimeout => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
NamespaceAlreadyExist(_) => self.format_err(StatusCode::BAD_REQUEST),
Expand Down
28 changes: 27 additions & 1 deletion libsql-server/src/http/user/db_factory.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use axum::extract::FromRequestParts;
use axum::extract::{FromRequestParts, Path};
use hyper::http::request::Parts;
use hyper::HeaderMap;

Expand Down Expand Up @@ -64,6 +64,32 @@ pub fn namespace_from_headers(
}
}

pub struct MakeConnectionExtractorPath<D>(pub Arc<dyn MakeConnection<Connection = D>>);
#[async_trait::async_trait]
impl<F> FromRequestParts<AppState<F>>
for MakeConnectionExtractorPath<<F::Database as Database>::Connection>
where
F: MakeNamespace,
{
type Rejection = Error;

async fn from_request_parts(
parts: &mut Parts,
state: &AppState<F>,
) -> Result<Self, Self::Rejection> {
let auth = Authenticated::from_request_parts(parts, state).await?;
let Path((ns, _)) = Path::<(NamespaceName, String)>::from_request_parts(parts, state)
.await
.map_err(|e| Error::InvalidPath(e.to_string()))?;
Ok(Self(
state
.namespaces
.with_authenticated(ns, auth, |ns| ns.db.connection_maker())
.await?,
))
}
}

fn split_namespace(host: &str) -> crate::Result<NamespaceName> {
let (ns, _) = host.split_once('.').ok_or_else(|| {
Error::InvalidHost("host header should be in the format <namespace>.<...>".into())
Expand Down
33 changes: 33 additions & 0 deletions libsql-server/src/http/user/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
use crate::hrana;
use crate::http::user::db_factory::MakeConnectionExtractorPath;
use crate::http::user::types::HttpQuery;
use crate::metrics::LEGACY_HTTP_CALL;
use crate::namespace::{MakeNamespace, NamespaceStore};
Expand Down Expand Up @@ -195,6 +196,33 @@ async fn handle_fallback() -> impl IntoResponse {
(StatusCode::NOT_FOUND).into_response()
}

async fn handle_hrana_pipeline<F: MakeNamespace>(
AxumState(state): AxumState<AppState<F>>,
MakeConnectionExtractorPath(connection_maker): MakeConnectionExtractorPath<
<F::Database as Database>::Connection,
>,
auth: Authenticated,
axum::extract::Path((_, version)): axum::extract::Path<(String, String)>,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
let hrana_version = match version.as_str() {
"2" => hrana::Version::Hrana2,
"3" => hrana::Version::Hrana3,
_ => return Err(Error::InvalidPath("invalid hrana version".to_string())),
};
Ok(state
.hrana_http_srv
.handle_request(
connection_maker,
auth,
req,
hrana::http::Endpoint::Pipeline,
hrana_version,
hrana::Encoding::Json,
)
.await?)
}

/// Router wide state that each request has access too via
/// axum's `State` extractor.
pub(crate) struct AppState<F: MakeNamespace> {
Expand Down Expand Up @@ -387,6 +415,11 @@ where
hrana::Encoding::Protobuf,
)),
)
// turso dev routes
.route(
"/dev/:namespace/v:version/pipeline",
post(handle_hrana_pipeline),
)
.with_state(state);

// Merge the grpc based axum router into our regular http router
Expand Down
28 changes: 28 additions & 0 deletions libsql-server/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use libsql_replication::rpc::replication::replication_log_client::ReplicationLog
use libsql_sys::wal::{Sqlite3WalManager, WalManager};
use parking_lot::Mutex;
use rusqlite::ErrorCode;
use serde::de::Visitor;
use serde::Deserialize;
use tokio::io::AsyncBufReadExt;
use tokio::sync::watch;
use tokio::task::JoinSet;
Expand Down Expand Up @@ -109,6 +111,32 @@ impl fmt::Display for NamespaceName {
}
}

impl<'de> Deserialize<'de> for NamespaceName {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct V;

impl<'de> Visitor<'de> for V {
type Value = NamespaceName;

fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "a valid namespace name")
}

fn visit_string<E>(self, v: String) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
NamespaceName::from_string(v).map_err(|e| E::custom(e))
}
}

deserializer.deserialize_string(V)
}
}

pub enum ResetOp {
Reset(NamespaceName),
Destroy(NamespaceName),
Expand Down

0 comments on commit a321b86

Please sign in to comment.