Skip to content

Commit

Permalink
Restructure the Query struct in the rust library (#4145)
Browse files Browse the repository at this point in the history
  • Loading branch information
DelSkayn committed Jun 11, 2024
1 parent b9b2974 commit 1e0eddc
Show file tree
Hide file tree
Showing 17 changed files with 228 additions and 164 deletions.
2 changes: 0 additions & 2 deletions lib/src/api/engine/any/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ impl Surreal<Any> {
engine: PhantomData,
address: address.into_endpoint(),
capacity: 0,
client: PhantomData,
waiter: self.waiter.clone(),
response_type: PhantomData,
}
Expand Down Expand Up @@ -297,7 +296,6 @@ pub fn connect(address: impl IntoEndpoint) -> Connect<Any, Surreal<Any>> {
engine: PhantomData,
address: address.into_endpoint(),
capacity: 0,
client: PhantomData,
waiter: Arc::new(watch::channel(None)),
response_type: PhantomData,
}
Expand Down
10 changes: 4 additions & 6 deletions lib/src/api/engine/any/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use flume::Receiver;
use reqwest::ClientBuilder;
use std::collections::HashSet;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
Expand Down Expand Up @@ -231,15 +230,14 @@ impl Connection for Any {
EndpointKind::Unsupported(v) => return Err(Error::Scheme(v).into()),
}

Ok(Surreal {
router: Arc::new(OnceLock::with_value(Router {
Ok(Surreal::new_from_router_waiter(
Arc::new(OnceLock::with_value(Router {
features,
sender: route_tx,
last_id: AtomicI64::new(0),
})),
waiter: Arc::new(watch::channel(Some(WaitFor::Connection))),
engine: PhantomData,
})
Arc::new(watch::channel(Some(WaitFor::Connection))),
))
})
}

Expand Down
9 changes: 4 additions & 5 deletions lib/src/api/engine/any/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,14 @@ impl Connection for Any {
EndpointKind::Unsupported(v) => return Err(Error::Scheme(v).into()),
}

Ok(Surreal {
router: Arc::new(OnceLock::with_value(Router {
Ok(Surreal::new_from_router_waiter(
Arc::new(OnceLock::with_value(Router {
features,
sender: route_tx,
last_id: AtomicI64::new(0),
})),
waiter: Arc::new(watch::channel(Some(WaitFor::Connection))),
engine: PhantomData,
})
Arc::new(watch::channel(Some(WaitFor::Connection))),
))
})
}

Expand Down
1 change: 0 additions & 1 deletion lib/src/api/engine/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,6 @@ impl Surreal<Db> {
engine: PhantomData,
address: address.into_endpoint(),
capacity: 0,
client: PhantomData,
waiter: self.waiter.clone(),
response_type: PhantomData,
}
Expand Down
10 changes: 4 additions & 6 deletions lib/src/api/engine/local/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
Expand Down Expand Up @@ -64,15 +63,14 @@ impl Connection for Db {
features.insert(ExtraFeatures::Backup);
features.insert(ExtraFeatures::LiveQueries);

Ok(Surreal {
router: Arc::new(OnceLock::with_value(Router {
Ok(Surreal::new_from_router_waiter(
Arc::new(OnceLock::with_value(Router {
features,
sender: route_tx,
last_id: AtomicI64::new(0),
})),
waiter: Arc::new(watch::channel(Some(WaitFor::Connection))),
engine: PhantomData,
})
Arc::new(watch::channel(Some(WaitFor::Connection))),
))
})
}

Expand Down
9 changes: 4 additions & 5 deletions lib/src/api/engine/local/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,14 @@ impl Connection for Db {
let mut features = HashSet::new();
features.insert(ExtraFeatures::LiveQueries);

Ok(Surreal {
router: Arc::new(OnceLock::with_value(Router {
Ok(Surreal::new_from_router_waiter(
Arc::new(OnceLock::with_value(Router {
features,
sender: route_tx,
last_id: AtomicI64::new(0),
})),
waiter: Arc::new(watch::channel(Some(WaitFor::Connection))),
engine: PhantomData,
})
Arc::new(watch::channel(Some(WaitFor::Connection))),
))
})
}

Expand Down
1 change: 0 additions & 1 deletion lib/src/api/engine/remote/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ impl Surreal<Client> {
engine: PhantomData,
address: address.into_endpoint(),
capacity: 0,
client: PhantomData,
waiter: self.waiter.clone(),
response_type: PhantomData,
}
Expand Down
10 changes: 4 additions & 6 deletions lib/src/api/engine/remote/http/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use reqwest::header::HeaderMap;
use reqwest::ClientBuilder;
use std::collections::HashSet;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
Expand Down Expand Up @@ -73,15 +72,14 @@ impl Connection for Client {
let mut features = HashSet::new();
features.insert(ExtraFeatures::Backup);

Ok(Surreal {
router: Arc::new(OnceLock::with_value(Router {
Ok(Surreal::new_from_router_waiter(
Arc::new(OnceLock::with_value(Router {
features,
sender: route_tx,
last_id: AtomicI64::new(0),
})),
waiter: Arc::new(watch::channel(Some(WaitFor::Connection))),
engine: PhantomData,
})
Arc::new(watch::channel(Some(WaitFor::Connection))),
))
})
}

Expand Down
9 changes: 4 additions & 5 deletions lib/src/api/engine/remote/http/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,14 @@ impl Connection for Client {

conn_rx.into_recv_async().await??;

Ok(Surreal {
router: Arc::new(OnceLock::with_value(Router {
Ok(Surreal::new_from_router_waiter(
Arc::new(OnceLock::with_value(Router {
features: HashSet::new(),
sender: route_tx,
last_id: AtomicI64::new(0),
})),
waiter: Arc::new(watch::channel(Some(WaitFor::Connection))),
engine: PhantomData,
})
Arc::new(watch::channel(Some(WaitFor::Connection))),
))
})
}

Expand Down
1 change: 0 additions & 1 deletion lib/src/api/engine/remote/ws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ impl Surreal<Client> {
engine: PhantomData,
address: address.into_endpoint(),
capacity: 0,
client: PhantomData,
waiter: self.waiter.clone(),
response_type: PhantomData,
}
Expand Down
10 changes: 4 additions & 6 deletions lib/src/api/engine/remote/ws/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::future::Future;
use std::marker::PhantomData;
use std::mem;
use std::pin::Pin;
use std::sync::atomic::AtomicI64;
Expand Down Expand Up @@ -150,15 +149,14 @@ impl Connection for Client {
let mut features = HashSet::new();
features.insert(ExtraFeatures::LiveQueries);

Ok(Surreal {
router: Arc::new(OnceLock::with_value(Router {
Ok(Surreal::new_from_router_waiter(
Arc::new(OnceLock::with_value(Router {
features,
sender: route_tx,
last_id: AtomicI64::new(0),
})),
waiter: Arc::new(watch::channel(Some(WaitFor::Connection))),
engine: PhantomData,
})
Arc::new(watch::channel(Some(WaitFor::Connection))),
))
})
}

Expand Down
9 changes: 4 additions & 5 deletions lib/src/api/engine/remote/ws/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,14 @@ impl Connection for Client {
let mut features = HashSet::new();
features.insert(ExtraFeatures::LiveQueries);

Ok(Surreal {
router: Arc::new(OnceLock::with_value(Router {
Ok(Surreal::new_from_router_waiter(
Arc::new(OnceLock::with_value(Router {
features,
sender: route_tx,
last_id: AtomicI64::new(0),
})),
waiter: Arc::new(watch::channel(Some(WaitFor::Connection))),
engine: PhantomData,
})
Arc::new(watch::channel(Some(WaitFor::Connection))),
))
})
}

Expand Down
42 changes: 26 additions & 16 deletions lib/src/api/method/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,25 +93,19 @@ macro_rules! into_future {
Resource::Edges(edges) => return Err(Error::LiveOnEdges(edges).into()),
},
}
let query = Query {
client: client.clone(),
query: vec![Ok(vec![Statement::Live(stmt)])],
bindings: Ok(Default::default()),
register_live_queries: false,
};
let query = Query::new(
client.clone(),
vec![Statement::Live(stmt)],
Default::default(),
false,
);
let id: Value = query.await?.take(0)?;
let rx = register::<Client>(router, id.clone()).await?;
Ok(Stream {
Ok(Stream::new(
Surreal::new_from_router_waiter(client.router.clone(), client.waiter.clone()),
id,
rx: Some(rx),
client: Surreal {
router: client.router.clone(),
waiter: client.waiter.clone(),
engine: PhantomData,
},
response_type: PhantomData,
engine: PhantomData,
})
Some(rx),
))
})
}
};
Expand Down Expand Up @@ -177,6 +171,22 @@ pub struct Stream<'r, C: Connection, R> {
pub(crate) response_type: PhantomData<R>,
}

impl<'r, C: Connection, R> Stream<'r, C, R> {
pub(crate) fn new(
client: Surreal<Any>,
id: Value,
rx: Option<Receiver<dbs::Notification>>,
) -> Self {
Self {
id,
rx,
client,
response_type: PhantomData,
engine: PhantomData,
}
}
}

macro_rules! poll_next {
($notification:ident => $body:expr) => {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down
13 changes: 9 additions & 4 deletions lib/src/api/method/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Duration;

use self::query::ValidQuery;

/// Query statistics
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[non_exhaustive]
Expand Down Expand Up @@ -262,7 +264,6 @@ where
engine: PhantomData,
address: address.into_endpoint(),
capacity: 0,
client: PhantomData,
waiter: Arc::new(watch::channel(None)),
response_type: PhantomData,
}
Expand Down Expand Up @@ -662,11 +663,15 @@ where
/// # }
/// ```
pub fn query(&self, query: impl opt::IntoQuery) -> Query<C> {
Query {
let inner = query.into_query().map(|x| ValidQuery {
client: Cow::Borrowed(self),
query: vec![query.into_query()],
bindings: Ok(Default::default()),
query: x,
bindings: Default::default(),
register_live_queries: true,
});

Query {
inner,
}
}

Expand Down
Loading

0 comments on commit 1e0eddc

Please sign in to comment.