Skip to content

Commit d2505d3

Browse files
authored
MergeV, MergeE, & Option Steps (#214)
* First pass * Added JanusGraph in memory mode to docker compose enviornment * Added mergeV as a start step * Relaxed property step keys to Into<GValue> from &str support converting a TraversalBuilder into GValue via Bytecode * Added JanusGraph custom vertex id tests * Implemented From<HashMap<GKey, GValue>> for MergeVertexStep to support literal maps being defined for mergeV steps * Implemented Option step for mergeV * Added healthcheck for JG & wait with timeout for docker compose up in GH Action * Use Docker Compose v2 via "docker compose" vs v1's "docker-compose" in order to leverage v2's wait flag * Combine merge v custom id test cases * Better handle tests being reran * Added merge_v_tests feature for tests * Formatting * FIxed if condition * Corrected cargo.toml merge_test feature * Changed GH Action if statement formatting * Increased docker compose timeout time for healthy service check * Fixed imports for merge_test module. Moved merge tests into their own module * Use drop vertices test utility function * Drop vertices for test_merge_v_options * Added mergeV step to anonymous traversals * Implemented mergeE step * Added mergeV and mergeE to Bytecode WRITE_OPERATORS * Implemented travsal test based on reference doc combo mergeV and mergeE * Support literal options for choose step and added test * Rewrote side effect and expose via GraphTraversal * Implemented support for Columns in By Step * Expose properties() step in an anonymouse traversal * Also update property_many, property_with_cardinality, and property_many_with_cardinality to take a Key that impls Into<GValue> instead of just &str * If a request responds with a websocket error mark the conneciton as invalid * Map additional tungstenite errors to GremlinError::WebSocketAsync * Arc tungstenite::Error into WebSocketAsync error to maintain Async error enum back to caller * Expose healthcheck interval setting on async connection pool * Formatting * Map mobc pool errors to type that would invalidate connection * Exploratory logging * Added uuid to connection instance logging * Update mobc and make its idle connection behavior the same as the rd2d sync pool * 0.8 mobc does not treat async-std feature as mutually exclusive from tokio * Include tokio/sync for mobc compilation in async-std-runtime feature * Implemented None step * Implemented iterator() method on returned remote stream to consume stream for only Null terminated traversals * Trial connection multiplexing for non-credential configured clients * Formatting * Removed internal channel bounding * Revert "Removed internal channel bounding" This reverts commit 7500820. * Revert "Formatting" This reverts commit 8082d3b. * Revert "Trial connection multiplexing for non-credential configured clients" This reverts commit 5db2b58. * Revert "Added uuid to connection instance logging" This reverts commit a342d8c. * Revert "Exploratory logging" This reverts commit 03c24fe. * Formatting * Switched command to docker compose in coverage GH Action Workflow * Corrected non-async tungstenite Error to using #[from] * Added running cargo test with no async feature enabled
1 parent d7ce0e4 commit d2505d3

38 files changed

+1408
-92
lines changed

.github/workflows/coverage.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ jobs:
2323
- uses: actions/checkout@v2
2424
- name: Starting Gremlin Servers
2525
run: |
26-
docker-compose -f ./docker-compose/docker-compose.yaml up -d
26+
docker compose -f ./docker-compose/docker-compose.yaml up -d
2727
env:
2828
GREMLIN_SERVER: ${{ matrix.gremlin-server }}
2929

.github/workflows/test.yml

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ jobs:
2323
- uses: actions/checkout@v2
2424
- name: Starting Gremlin Servers
2525
run: |
26-
docker-compose -f ./docker-compose/docker-compose.yaml up -d
26+
docker compose -f ./docker-compose/docker-compose.yaml up -d --wait --wait-timeout 90
2727
env:
2828
GREMLIN_SERVER: ${{ matrix.gremlin-server }}
2929

@@ -41,13 +41,40 @@ jobs:
4141
with:
4242
command: fmt
4343
args: --all -- --check
44+
- name: Run cargo test with blocking client
45+
if: matrix.gremlin-server == '3.5.7'
46+
uses: actions-rs/cargo@v1
47+
with:
48+
command: test
49+
args: --manifest-path gremlin-client/Cargo.toml
4450
- name: Run cargo test with tokio
51+
if: matrix.gremlin-server == '3.5.7'
4552
uses: actions-rs/cargo@v1
4653
with:
4754
command: test
4855
args: --manifest-path gremlin-client/Cargo.toml --features=tokio-runtime
4956
- name: Run cargo test with async-std
57+
if: matrix.gremlin-server == '3.5.7'
5058
uses: actions-rs/cargo@v1
5159
with:
5260
command: test
5361
args: --manifest-path gremlin-client/Cargo.toml --features=async-std-runtime
62+
# MergeV as a step doesn't exist in 3.5.x, so selectively run those tests
63+
- name: Run cargo test with blocking client
64+
if: matrix.gremlin-server != '3.5.7'
65+
uses: actions-rs/cargo@v1
66+
with:
67+
command: test
68+
args: --manifest-path gremlin-client/Cargo.toml --features=merge_tests
69+
- name: Run cargo test with tokio
70+
if: matrix.gremlin-server != '3.5.7'
71+
uses: actions-rs/cargo@v1
72+
with:
73+
command: test
74+
args: --manifest-path gremlin-client/Cargo.toml --features=tokio-runtime,merge_tests
75+
- name: Run cargo test with async-std
76+
if: matrix.gremlin-server != '3.5.7'
77+
uses: actions-rs/cargo@v1
78+
with:
79+
command: test
80+
args: --manifest-path gremlin-client/Cargo.toml --features=async-std-runtime,merge_tests

docker-compose/docker-compose.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,16 @@ services:
1818
command : ["conf/gremlin-server-credentials.yaml"]
1919
ports:
2020
- "8183:8182"
21+
janusgraph:
22+
image: janusgraph/janusgraph:latest
23+
environment:
24+
- janusgraph.graph.set-vertex-id=true
25+
- janusgraph.graph.allow-custom-vid-types=true
26+
- JANUS_PROPS_TEMPLATE=inmemory
27+
ports:
28+
- "8184:8182"
29+
healthcheck:
30+
test: ["CMD", "bin/gremlin.sh", "-e", "scripts/remote-connect.groovy"]
31+
interval: 10s
32+
timeout: 30s
33+
retries: 3

gremlin-client/Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@ readme = "README.md"
1515
[features]
1616

1717
default = []
18+
merge_tests = []
1819

1920

2021

2122
async_gremlin = ["futures","mobc","async-tungstenite","async-trait","url","pin-project-lite"]
2223

2324
async_std = ["async-std-runtime"]
24-
tokio-runtime = ["async_gremlin","tokio","mobc/tokio","async-tungstenite/tokio-runtime","async-tungstenite/tokio-native-tls","tokio-native-tls","tokio-stream"]
25-
async-std-runtime = ["async_gremlin","async-std","async-tungstenite/async-std-runtime","async-tungstenite/async-tls","mobc/async-std","async-tls","rustls","webpki"]
25+
tokio-runtime = ["async_gremlin","tokio","async-tungstenite/tokio-runtime","async-tungstenite/tokio-native-tls","tokio-native-tls","tokio-stream"]
26+
async-std-runtime = ["async_gremlin","async-std","async-tungstenite/async-std-runtime","async-tungstenite/async-tls","tokio/sync", "mobc/async-std","async-tls","rustls","webpki"]
2627

2728
derive = ["gremlin-derive"]
2829

@@ -57,7 +58,7 @@ thiserror = "1.0.20"
5758

5859

5960

60-
mobc = {version = "0.7", optional = true, default-features=false, features = ["unstable"] }
61+
mobc = {version = "0.8", optional = true }
6162
url = {version = "2.1.0", optional = true}
6263
futures = { version = "0.3.1", optional = true}
6364
pin-project-lite = { version = "0.2", optional = true}

gremlin-client/src/aio/client.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ impl GremlinClient {
5959
let pool = Pool::builder()
6060
.get_timeout(opts.pool_get_connection_timeout)
6161
.max_open(pool_size as u64)
62+
.health_check_interval(opts.pool_healthcheck_interval)
63+
//Makes max idle connections equal to max open, matching the behavior of the sync pool r2d2
64+
.max_idle(0)
6265
.build(manager);
6366

6467
Ok(GremlinClient {

gremlin-client/src/aio/connection.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mod tokio_use {
2121
pub use tokio_native_tls::TlsStream;
2222
}
2323

24+
use futures::TryFutureExt;
2425
#[cfg(feature = "tokio-runtime")]
2526
use tokio_use::*;
2627

@@ -144,6 +145,7 @@ impl Conn {
144145
tls::connector(&opts),
145146
websocket_config,
146147
)
148+
.map_err(|e| Arc::new(e))
147149
.await?
148150
};
149151
#[cfg(feature = "tokio-runtime")]
@@ -153,6 +155,7 @@ impl Conn {
153155
tls::connector(&opts),
154156
websocket_config,
155157
)
158+
.map_err(|e| Arc::new(e))
156159
.await?
157160
};
158161

@@ -190,6 +193,18 @@ impl Conn {
190193
.await
191194
.expect("It should contain the response")
192195
.map(|r| (r, receiver))
196+
.map_err(|e| {
197+
//If there's been an websocket layer error, mark the connection as invalid
198+
match e {
199+
GremlinError::WebSocket(_)
200+
| GremlinError::WebSocketAsync(_)
201+
| GremlinError::WebSocketPoolAsync(_) => {
202+
self.valid = false;
203+
}
204+
_ => {}
205+
}
206+
e
207+
})
193208
}
194209

195210
pub fn is_valid(&self) -> bool {
@@ -222,7 +237,7 @@ fn sender_loop(
222237
if let Err(e) = sink.send(Message::Binary(msg.2)).await {
223238
let mut sender = guard.remove(&msg.1).unwrap();
224239
sender
225-
.send(Err(GremlinError::from(e)))
240+
.send(Err(GremlinError::from(Arc::new(e))))
226241
.await
227242
.expect("Failed to send error");
228243
}
@@ -257,8 +272,9 @@ fn receiver_loop(
257272
match stream.next().await {
258273
Some(Err(error)) => {
259274
let mut guard = requests.lock().await;
275+
let error = Arc::new(error);
260276
for s in guard.values_mut() {
261-
match s.send(Err(GremlinError::from(&error))).await {
277+
match s.send(Err(error.clone().into())).await {
262278
Ok(_r) => {}
263279
Err(_e) => {}
264280
}

gremlin-client/src/aio/error.rs

Lines changed: 0 additions & 15 deletions
This file was deleted.

gremlin-client/src/aio/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ pub(crate) mod connection;
33
pub(crate) mod pool;
44
mod result;
55

6-
mod error;
76
pub(crate) mod process;
87
pub use client::GremlinClient;
98
pub use process::traversal::AsyncTerminator;

gremlin-client/src/aio/process/traversal/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::GremlinResult;
66
use core::task::Context;
77
use core::task::Poll;
88
use futures::Stream;
9+
use futures::StreamExt;
910
use std::marker::PhantomData;
1011
use std::pin::Pin;
1112

@@ -29,6 +30,17 @@ impl<T> RemoteTraversalStream<T> {
2930
}
3031
}
3132
}
33+
34+
impl RemoteTraversalStream<crate::structure::Null> {
35+
pub async fn iterate(&mut self) -> GremlinResult<()> {
36+
while let Some(response) = self.next().await {
37+
//consume the entire stream, returning any errors
38+
response?;
39+
}
40+
Ok(())
41+
}
42+
}
43+
3244
impl<T: FromGValue> Stream for RemoteTraversalStream<T> {
3345
type Item = GremlinResult<T>;
3446

gremlin-client/src/connection.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{net::TcpStream, time::Duration};
1+
use std::{net::TcpStream, sync::Arc, time::Duration};
22

33
use crate::{GraphSON, GremlinError, GremlinResult};
44
use native_tls::TlsConnector;
@@ -120,6 +120,16 @@ impl ConnectionOptionsBuilder {
120120
self
121121
}
122122

123+
/// Only applicable to async client. By default a connection is checked on each return to the pool (None)
124+
/// This allows setting an interval of how often it is checked on return.
125+
pub fn pool_healthcheck_interval(
126+
mut self,
127+
pool_healthcheck_interval: Option<Duration>,
128+
) -> Self {
129+
self.0.pool_healthcheck_interval = pool_healthcheck_interval;
130+
self
131+
}
132+
123133
/// Both the sync and async pool providers use a default of 30 seconds,
124134
/// Async pool interprets `None` as no timeout. Sync pool maps `None` to the default value
125135
pub fn pool_connection_timeout(mut self, pool_connection_timeout: Option<Duration>) -> Self {
@@ -170,6 +180,7 @@ pub struct ConnectionOptions {
170180
pub(crate) host: String,
171181
pub(crate) port: u16,
172182
pub(crate) pool_size: u32,
183+
pub(crate) pool_healthcheck_interval: Option<Duration>,
173184
pub(crate) pool_get_connection_timeout: Option<Duration>,
174185
pub(crate) credentials: Option<Credentials>,
175186
pub(crate) ssl: bool,
@@ -254,6 +265,7 @@ impl Default for ConnectionOptions {
254265
port: 8182,
255266
pool_size: 10,
256267
pool_get_connection_timeout: Some(Duration::from_secs(30)),
268+
pool_healthcheck_interval: None,
257269
credentials: None,
258270
ssl: false,
259271
tls_options: None,

0 commit comments

Comments
 (0)