Skip to content

Commit

Permalink
Merge pull request #1939 from nosqlbench/neo4j_synchronous
Browse files Browse the repository at this point in the history
Add synchronous analogues of autocommit, write_transaction, read_transaction ops to Neo4J driver adapter
  • Loading branch information
jshook committed May 7, 2024
2 parents b34d42c + 97d590e commit 4a29660
Show file tree
Hide file tree
Showing 17 changed files with 335 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,22 @@ public OpDispenser<? extends Neo4JBaseOp> apply(ParsedOp op) {
LongFunction<String> spaceNameFunc = op.getAsFunctionOr("space", "default");
LongFunction<Neo4JSpace> spaceFunc = l -> cache.get(spaceNameFunc.apply(l));
return switch (typeAndTarget.enumId) {
case autocommit -> new Neo4JAutoCommitOpDispenser(
case sync_autocommit -> new Neo4JSyncAutoCommitOpDispenser(
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
);
case read_transaction -> new Neo4JReadTxnOpDispenser(
case async_autocommit -> new Neo4JAsyncAutoCommitOpDispenser(
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
);
case write_transaction -> new Neo4JWriteTxnOpDispenser(
case sync_read_transaction -> new Neo4JSyncReadTxnOpDispenser(
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
);
case async_read_transaction -> new Neo4JAsyncReadTxnOpDispenser(
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
);
case sync_write_transaction -> new Neo4JSyncWriteTxnOpDispenser(
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
);
case async_write_transaction -> new Neo4JAsyncWriteTxnOpDispenser(
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,24 @@
package io.nosqlbench.adapter.neo4j.opdispensers;

import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
import io.nosqlbench.adapter.neo4j.ops.Neo4JAsyncAutoCommitOp;
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
import io.nosqlbench.adapter.neo4j.ops.Neo4JWriteTxnOp;
import io.nosqlbench.adapter.neo4j.types.Neo4JOpType;
import io.nosqlbench.adapters.api.templating.ParsedOp;

import org.neo4j.driver.async.AsyncSession;

import java.util.function.LongFunction;


public class Neo4JWriteTxnOpDispenser extends Neo4JBaseOpDispenser {
public class Neo4JAsyncAutoCommitOpDispenser extends Neo4JBaseOpDispenser {

public Neo4JWriteTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
public Neo4JAsyncAutoCommitOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
super(adapter, op, spaceFunc, requiredTemplateKey);
}

@Override
public LongFunction<Neo4JWriteTxnOp> createOpFunc() {
return l -> new Neo4JWriteTxnOp(
public LongFunction<Neo4JAsyncAutoCommitOp> createOpFunc() {
return l -> new Neo4JAsyncAutoCommitOp(
spaceFunc.apply(l).getDriver().session(AsyncSession.class),
queryFunc.apply(l)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,23 @@
package io.nosqlbench.adapter.neo4j.opdispensers;

import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
import io.nosqlbench.adapter.neo4j.ops.Neo4JAutoCommitOp;
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
import io.nosqlbench.adapter.neo4j.ops.Neo4JAsyncReadTxnOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;

import org.neo4j.driver.async.AsyncSession;

import java.util.function.LongFunction;


public class Neo4JAutoCommitOpDispenser extends Neo4JBaseOpDispenser {

public Neo4JAutoCommitOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
public class Neo4JAsyncReadTxnOpDispenser extends Neo4JBaseOpDispenser {
public Neo4JAsyncReadTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
super(adapter, op, spaceFunc, requiredTemplateKey);
}

@Override
public LongFunction<Neo4JAutoCommitOp> createOpFunc() {
return l -> new Neo4JAutoCommitOp(
public LongFunction<Neo4JAsyncReadTxnOp> createOpFunc() {
return l -> new Neo4JAsyncReadTxnOp(
spaceFunc.apply(l).getDriver().session(AsyncSession.class),
queryFunc.apply(l)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,23 @@

import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
import io.nosqlbench.adapter.neo4j.ops.Neo4JReadTxnOp;
import io.nosqlbench.adapter.neo4j.ops.Neo4JAsyncWriteTxnOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;

import org.neo4j.driver.async.AsyncSession;

import java.util.function.LongFunction;


public class Neo4JReadTxnOpDispenser extends Neo4JBaseOpDispenser {
public Neo4JReadTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
public class Neo4JAsyncWriteTxnOpDispenser extends Neo4JBaseOpDispenser {

public Neo4JAsyncWriteTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
super(adapter, op, spaceFunc, requiredTemplateKey);
}

@Override
public LongFunction<Neo4JReadTxnOp> createOpFunc() {
return l -> new Neo4JReadTxnOp(
public LongFunction<Neo4JAsyncWriteTxnOp> createOpFunc() {
return l -> new Neo4JAsyncWriteTxnOp(
spaceFunc.apply(l).getDriver().session(AsyncSession.class),
queryFunc.apply(l)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.nosqlbench.adapter.neo4j.opdispensers;

import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
import io.nosqlbench.adapter.neo4j.ops.Neo4JSyncAutoCommitOp;
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
import io.nosqlbench.adapters.api.templating.ParsedOp;

import org.neo4j.driver.Session;

import java.util.function.LongFunction;


public class Neo4JSyncAutoCommitOpDispenser extends Neo4JBaseOpDispenser {

public Neo4JSyncAutoCommitOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
super(adapter, op, spaceFunc, requiredTemplateKey);
}

@Override
public LongFunction<Neo4JSyncAutoCommitOp> createOpFunc() {
return l -> new Neo4JSyncAutoCommitOp(
spaceFunc.apply(l).getDriver().session(Session.class),
queryFunc.apply(l)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.nosqlbench.adapter.neo4j.opdispensers;

import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
import io.nosqlbench.adapter.neo4j.ops.Neo4JSyncReadTxnOp;
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
import io.nosqlbench.adapters.api.templating.ParsedOp;

import org.neo4j.driver.Session;

import java.util.function.LongFunction;


public class Neo4JSyncReadTxnOpDispenser extends Neo4JBaseOpDispenser {

public Neo4JSyncReadTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
super(adapter, op, spaceFunc, requiredTemplateKey);
}

@Override
public LongFunction<Neo4JSyncReadTxnOp> createOpFunc() {
return l -> new Neo4JSyncReadTxnOp(
spaceFunc.apply(l).getDriver().session(Session.class),
queryFunc.apply(l)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.nosqlbench.adapter.neo4j.opdispensers;

import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
import io.nosqlbench.adapter.neo4j.ops.Neo4JSyncWriteTxnOp;
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
import io.nosqlbench.adapters.api.templating.ParsedOp;

import org.neo4j.driver.Session;

import java.util.function.LongFunction;


public class Neo4JSyncWriteTxnOpDispenser extends Neo4JBaseOpDispenser {

public Neo4JSyncWriteTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
super(adapter, op, spaceFunc, requiredTemplateKey);
}

@Override
public LongFunction<Neo4JSyncWriteTxnOp> createOpFunc() {
return l -> new Neo4JSyncWriteTxnOp(
spaceFunc.apply(l).getDriver().session(Session.class),
queryFunc.apply(l)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Neo4JAutoCommitOp extends Neo4JBaseOp {
public class Neo4JAsyncAutoCommitOp extends Neo4JBaseOp {
private final AsyncSession session;

public Neo4JAutoCommitOp(AsyncSession session, Query query) {
super(session, query);
public Neo4JAsyncAutoCommitOp(AsyncSession session, Query query) {
super(query);
this.session = session;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Neo4JReadTxnOp extends Neo4JBaseOp{
public class Neo4JAsyncReadTxnOp extends Neo4JBaseOp{
private final AsyncSession session;

public Neo4JReadTxnOp(AsyncSession session, Query query) {
super(session, query);
public Neo4JAsyncReadTxnOp(AsyncSession session, Query query) {
super(query);
this.session = session;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Neo4JWriteTxnOp extends Neo4JBaseOp{
public class Neo4JAsyncWriteTxnOp extends Neo4JBaseOp{
private final AsyncSession session;

public Neo4JWriteTxnOp(AsyncSession session, Query query) {
super(session, query);
public Neo4JAsyncWriteTxnOp(AsyncSession session, Query query) {
super(query);
this.session = session;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,17 @@


public abstract class Neo4JBaseOp implements CycleOp<Record[]> {

protected final AsyncSession session;
protected final Query query;

public Neo4JBaseOp(AsyncSession session, Query query) {
this.session = session;
public Neo4JBaseOp(Query query) {
this.query = query;
}

/**
* In the child classes, this method will be responsible for:
* - using the Neo4J AsyncSession object to run the Neo4J Query
* - using the Neo4J Session/AsyncSession object to run the Neo4J Query
* - process the Result to get an array of Records
* - close the AsyncSession
* - close the Session/AsyncSession
* - Return the array of Records
*
* Session creation and closing is considered light-weight. Reference:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.nosqlbench.adapter.neo4j.ops;

import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.Session;

import java.util.List;


public class Neo4JSyncAutoCommitOp extends Neo4JBaseOp {
private final Session session;

public Neo4JSyncAutoCommitOp(Session session, Query query) {
super(query);
this.session = session;
}

@Override
public final Record[] apply(long value) {
List<Record> recordList = session.run(query).list();
if (session.isOpen()) {
session.close();
}
return recordList.toArray(new Record[recordList.size()]);
}
}

0 comments on commit 4a29660

Please sign in to comment.