From 56cda2d8c1edfb968058d427f1fb8b0ba35289d4 Mon Sep 17 00:00:00 2001 From: snyk-bot Date: Tue, 9 Apr 2024 17:39:57 +0000 Subject: [PATCH 01/21] fix: upgrade com.amazonaws:aws-java-sdk-dynamodb from 1.12.678 to 1.12.681 Snyk has created this PR to upgrade com.amazonaws:aws-java-sdk-dynamodb from 1.12.678 to 1.12.681. See this package in Maven Repository: https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-dynamodb/ See this project in Snyk: https://app.snyk.io/org/nosqlbench/project/db3dfb82-467b-4263-94f8-28f933540a6d?utm_source=github&utm_medium=referral&page=upgrade-pr --- adapter-dynamodb/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adapter-dynamodb/pom.xml b/adapter-dynamodb/pom.xml index 59794a9c14..a48d81e492 100644 --- a/adapter-dynamodb/pom.xml +++ b/adapter-dynamodb/pom.xml @@ -43,7 +43,7 @@ com.amazonaws aws-java-sdk-dynamodb - 1.12.678 + 1.12.681 From 02b36a282da329af1777122715b970ed5bbfa073 Mon Sep 17 00:00:00 2001 From: ShaunakDas88 Date: Mon, 8 Apr 2024 11:59:43 -0700 Subject: [PATCH 02/21] initial version of neo4j driver adapter --- adapter-neo4j/pom.xml | 58 ++++++++++ .../adapter/neo4j/Neo4JAdapterUtils.java | 71 +++++++++++++ .../adapter/neo4j/Neo4JDriverAdapter.java | 53 ++++++++++ .../neo4j/Neo4JDriverAdapterLoader.java | 30 ++++++ .../adapter/neo4j/Neo4JOpMapper.java | 58 ++++++++++ .../nosqlbench/adapter/neo4j/Neo4JSpace.java | 100 ++++++++++++++++++ .../Neo4JAutoCommitOpDispenser.java | 42 ++++++++ .../opdispensers/Neo4JBaseOpDispenser.java | 67 ++++++++++++ .../opdispensers/Neo4JReadTxnOpDispenser.java | 40 +++++++ .../Neo4JWriteTxnOpDispenser.java | 42 ++++++++ .../neo4j/ops/NBExecutionException.java | 42 ++++++++ .../neo4j/ops/NBInterruptedException.java | 40 +++++++ .../adapter/neo4j/ops/NBTimeoutException.java | 42 ++++++++ .../adapter/neo4j/ops/Neo4JAutoCommitOp.java | 64 +++++++++++ .../adapter/neo4j/ops/Neo4JBaseOp.java | 52 +++++++++ .../adapter/neo4j/ops/Neo4JReadTxnOp.java | 66 ++++++++++++ .../adapter/neo4j/ops/Neo4JWriteTxnOp.java | 67 ++++++++++++ .../adapter/neo4j/types/Neo4JOpType.java | 36 +++++++ .../src/main/resources/activities/neo4j.yaml | 99 +++++++++++++++++ adapter-neo4j/src/main/resources/neo4j.md | 61 +++++++++++ nb5/pom.xml | 21 ++++ pom.xml | 1 + 22 files changed, 1152 insertions(+) create mode 100644 adapter-neo4j/pom.xml create mode 100644 adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JAdapterUtils.java create mode 100644 adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JDriverAdapter.java create mode 100644 adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JDriverAdapterLoader.java create mode 100644 adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JOpMapper.java create mode 100644 adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JSpace.java create mode 100644 adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JAutoCommitOpDispenser.java create mode 100644 adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JBaseOpDispenser.java create mode 100644 adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JReadTxnOpDispenser.java create mode 100644 adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JWriteTxnOpDispenser.java create mode 100644 adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBExecutionException.java create mode 100644 adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBInterruptedException.java create mode 100644 adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBTimeoutException.java create mode 100644 adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JAutoCommitOp.java create mode 100644 adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JBaseOp.java create mode 100644 adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JReadTxnOp.java create mode 100644 adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JWriteTxnOp.java create mode 100644 adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/types/Neo4JOpType.java create mode 100644 adapter-neo4j/src/main/resources/activities/neo4j.yaml create mode 100644 adapter-neo4j/src/main/resources/neo4j.md diff --git a/adapter-neo4j/pom.xml b/adapter-neo4j/pom.xml new file mode 100644 index 0000000000..94bbdee00a --- /dev/null +++ b/adapter-neo4j/pom.xml @@ -0,0 +1,58 @@ + + + + 4.0.0 + + adapter-neo4j + jar + + + mvn-defaults + io.nosqlbench + ${revision} + ../mvn-defaults + + + ${project.artifactId} + + An nosqlbench adapter driver module for the Neo4J/Aura database. + + + + + + io.nosqlbench + nb-annotations + ${revision} + compile + + + io.nosqlbench + adapters-api + ${revision} + compile + + + org.neo4j.driver + neo4j-java-driver + 5.18.0 + + + + + diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JAdapterUtils.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JAdapterUtils.java new file mode 100644 index 0000000000..ed60e8d645 --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JAdapterUtils.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j; + +import org.apache.commons.lang3.StringUtils; + +import org.neo4j.driver.Record; +import org.neo4j.driver.exceptions.ClientException; + +import java.util.NoSuchElementException; + + +public class Neo4JAdapterUtils { + + /** + * Mask the digits in the given string with '*' + * + * @param unmasked The string to mask + * @return The masked string + */ + protected static String maskDigits(String unmasked) { + assert StringUtils.isNotBlank(unmasked) && StringUtils.isNotEmpty(unmasked); + int inputLength = unmasked.length(); + StringBuilder masked = new StringBuilder(inputLength); + for (char ch : unmasked.toCharArray()) { + if (Character.isDigit(ch)) { + masked.append("*"); + } else { + masked.append(ch); + } + } + return masked.toString(); + } + + /** + * Reference: + * - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/Value.html#asObject() + */ + public static Object[] getFieldForAllRecords(Record[] records, String fieldName) { + int n = records.length; + Object[] values = new Object[n]; + int idx; + for (int i = 0; i < n; i++) { + try { + idx = records[i].index(fieldName); + values[i] = records[i].get(idx).asObject(); + } + catch (NoSuchElementException e) { + throw e; + } + catch (ClientException e) { + throw e; + } + } + return values; + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JDriverAdapter.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JDriverAdapter.java new file mode 100644 index 0000000000..833fb86302 --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JDriverAdapter.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j; + +import io.nosqlbench.adapter.neo4j.ops.Neo4JBaseOp; +import io.nosqlbench.adapters.api.activityimpl.OpMapper; +import io.nosqlbench.adapters.api.activityimpl.uniform.BaseDriverAdapter; +import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.nb.annotations.Service; +import io.nosqlbench.nb.api.components.core.NBComponent; +import io.nosqlbench.nb.api.config.standard.NBConfigModel; +import io.nosqlbench.nb.api.config.standard.NBConfiguration; +import io.nosqlbench.nb.api.labels.NBLabels; + +import java.util.function.Function; + + +@Service(value = DriverAdapter.class, selector = "neo4j") +public class Neo4JDriverAdapter extends BaseDriverAdapter { + + public Neo4JDriverAdapter(NBComponent parentComponent, NBLabels labels) { + super(parentComponent, labels); + } + + @Override + public OpMapper getOpMapper() { + return new Neo4JOpMapper(this, getSpaceCache()); + } + + @Override + public Function getSpaceInitializer(NBConfiguration cfg) { + return (s) -> new Neo4JSpace(s, cfg); + } + + @Override + public NBConfigModel getConfigModel() { + return super.getConfigModel().add(Neo4JSpace.getConfigModel()); + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JDriverAdapterLoader.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JDriverAdapterLoader.java new file mode 100644 index 0000000000..53f9098b00 --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JDriverAdapterLoader.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j; + +import io.nosqlbench.adapter.diag.DriverAdapterLoader; +import io.nosqlbench.nb.annotations.Service; +import io.nosqlbench.nb.api.components.core.NBComponent; +import io.nosqlbench.nb.api.labels.NBLabels; + +@Service(value = DriverAdapterLoader.class, selector = "neo4j") +public class Neo4JDriverAdapterLoader implements DriverAdapterLoader { + @Override + public Neo4JDriverAdapter load(NBComponent parent, NBLabels childLabels) { + return new Neo4JDriverAdapter(parent, childLabels); + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JOpMapper.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JOpMapper.java new file mode 100644 index 0000000000..918d52f4d8 --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JOpMapper.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j; + +import io.nosqlbench.adapter.neo4j.opdispensers.*; +import io.nosqlbench.adapter.neo4j.ops.Neo4JBaseOp; +import io.nosqlbench.adapter.neo4j.types.Neo4JOpType; +import io.nosqlbench.adapters.api.activityimpl.OpDispenser; +import io.nosqlbench.adapters.api.activityimpl.OpMapper; +import io.nosqlbench.adapters.api.activityimpl.uniform.DriverSpaceCache; +import io.nosqlbench.adapters.api.templating.ParsedOp; +import io.nosqlbench.engine.api.templating.TypeAndTarget; + +import java.util.function.LongFunction; + + +public class Neo4JOpMapper implements OpMapper { + private final DriverSpaceCache cache; + private final Neo4JDriverAdapter adapter; + + public Neo4JOpMapper(Neo4JDriverAdapter adapter, DriverSpaceCache cache) { + this.adapter = adapter; + this.cache = cache; + } + + @Override + public OpDispenser apply(ParsedOp op) { + TypeAndTarget typeAndTarget = op.getTypeAndTarget(Neo4JOpType.class, String.class); + LongFunction spaceNameFunc = op.getAsFunctionOr("space", "default"); + LongFunction spaceFunc = l -> cache.get(spaceNameFunc.apply(l)); + return switch (typeAndTarget.enumId) { + case autocommit -> new Neo4JAutoCommitOpDispenser( + adapter, op, spaceFunc, typeAndTarget.enumId.getValue() + ); + case read_transaction -> new Neo4JReadTxnOpDispenser( + adapter, op, spaceFunc, typeAndTarget.enumId.getValue() + ); + case write_transaction -> new Neo4JWriteTxnOpDispenser( + adapter, op, spaceFunc, typeAndTarget.enumId.getValue() + ); + }; + } +} + diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JSpace.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JSpace.java new file mode 100644 index 0000000000..82e06a473d --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JSpace.java @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j; + +import io.nosqlbench.nb.api.config.standard.*; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import org.neo4j.driver.AuthTokens; +import org.neo4j.driver.Driver; +import org.neo4j.driver.GraphDatabase; + +import java.util.Optional; + +public class Neo4JSpace implements AutoCloseable { + + private final static Logger logger = LogManager.getLogger(Neo4JSpace.class); + private final String space; + private Driver driver; + + public Neo4JSpace(String space, NBConfiguration cfg) { + this.space = space; + this.driver = initializeDriver(cfg); + driver.verifyConnectivity(); + } + + private Driver initializeDriver(NBConfiguration cfg) { + String dbURI = cfg.get("db_uri"); + Optional usernameOpt = cfg.getOptional("username"); + Optional passwordOpt = cfg.getOptional("password"); + String username; + String password; + // user has supplied both username and password + if (usernameOpt.isPresent() && passwordOpt.isPresent()) { + username = usernameOpt.get(); + password = passwordOpt.get(); + logger.info(this.space + ": Creating new Neo4J driver with [" + + "dbURI = " + dbURI + + ", username = " + username + + ", password = " + Neo4JAdapterUtils.maskDigits(password) + + "]" + ); + return GraphDatabase.driver(dbURI, AuthTokens.basic(username, password)); + } + // user has only supplied username + else if (usernameOpt.isPresent()) { + String error = "username is present, but password is not defined."; + logger.error(error); + throw new RuntimeException(error); + } + // user has only supplied password + else if (passwordOpt.isPresent()) { + String error = "password is present, but username is not defined."; + logger.error(error); + throw new RuntimeException(error); + } + // user has supplied neither + else { + logger.info(this.space + ": Creating new Neo4J driver with [" + + "dbURI = " + dbURI + + "]" + ); + return GraphDatabase.driver(dbURI); + } + } + + public static NBConfigModel getConfigModel() { + return ConfigModel.of(Neo4JSpace.class) + .add(Param.required("db_uri", String.class)) + .add(Param.optional("username", String.class)) + .add(Param.optional("password", String.class)) + .asReadOnly(); + } + + public Driver getDriver() { + return driver; + } + + @Override + public void close() throws Exception { + if (driver != null){ + driver.close(); + } + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JAutoCommitOpDispenser.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JAutoCommitOpDispenser.java new file mode 100644 index 0000000000..f3899245e2 --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JAutoCommitOpDispenser.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.opdispensers; + +import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter; +import io.nosqlbench.adapter.neo4j.ops.Neo4JAutoCommitOp; +import io.nosqlbench.adapter.neo4j.Neo4JSpace; +import io.nosqlbench.adapters.api.templating.ParsedOp; + +import org.neo4j.driver.async.AsyncSession; + +import java.util.function.LongFunction; + + +public class Neo4JAutoCommitOpDispenser extends Neo4JBaseOpDispenser { + + public Neo4JAutoCommitOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction spaceFunc, String requiredTemplateKey) { + super(adapter, op, spaceFunc, requiredTemplateKey); + } + + @Override + public LongFunction createOpFunc() { + return l -> new Neo4JAutoCommitOp( + spaceFunc.apply(l).getDriver().session(AsyncSession.class), + queryFunc.apply(l) + ); + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JBaseOpDispenser.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JBaseOpDispenser.java new file mode 100644 index 0000000000..f79de7c4ec --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JBaseOpDispenser.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.opdispensers; + +import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter; +import io.nosqlbench.adapter.neo4j.ops.Neo4JBaseOp; +import io.nosqlbench.adapter.neo4j.Neo4JSpace; +import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser; +import io.nosqlbench.adapters.api.templating.ParsedOp; + +import org.neo4j.driver.Query; + +import java.util.Collections; +import java.util.function.LongFunction; +import java.util.Map; + + +public abstract class Neo4JBaseOpDispenser extends BaseOpDispenser { + protected final LongFunction spaceFunc; + protected final LongFunction cypherFunc; + protected final LongFunction queryFunc; + protected final LongFunction paramFunc; + protected final LongFunction opFunc; + + public Neo4JBaseOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction spaceFunc, String requiredTemplateKey) { + super(adapter, op); + this.spaceFunc = spaceFunc; + this.cypherFunc = op.getAsRequiredFunction(requiredTemplateKey); + this.paramFunc = createParamFunc(op); + this.queryFunc = createQueryFunc(); + this.opFunc = (LongFunction) createOpFunc(); + } + + private LongFunction createParamFunc(ParsedOp op) { + return op.getAsOptionalFunction("query_params", Map.class) + .orElse(l -> Collections.emptyMap()); + } + + /** + * Reference: + * - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/Query.html#withParameters(java.util.Map) + */ + private LongFunction createQueryFunc() { + return l -> new Query(cypherFunc.apply(l)).withParameters(paramFunc.apply(l)); + } + + public abstract LongFunction createOpFunc(); + + @Override + public Neo4JBaseOp apply(long cycle) { + return opFunc.apply(cycle); + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JReadTxnOpDispenser.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JReadTxnOpDispenser.java new file mode 100644 index 0000000000..770d148cfc --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JReadTxnOpDispenser.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.opdispensers; + +import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter; +import io.nosqlbench.adapter.neo4j.Neo4JSpace; +import io.nosqlbench.adapter.neo4j.ops.Neo4JReadTxnOp; +import io.nosqlbench.adapters.api.templating.ParsedOp; +import org.neo4j.driver.async.AsyncSession; + +import java.util.function.LongFunction; + + +public class Neo4JReadTxnOpDispenser extends Neo4JBaseOpDispenser { + public Neo4JReadTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction spaceFunc, String requiredTemplateKey) { + super(adapter, op, spaceFunc, requiredTemplateKey); + } + + @Override + public LongFunction createOpFunc() { + return l -> new Neo4JReadTxnOp( + spaceFunc.apply(l).getDriver().session(AsyncSession.class), + queryFunc.apply(l) + ); + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JWriteTxnOpDispenser.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JWriteTxnOpDispenser.java new file mode 100644 index 0000000000..cfb5d5aeaf --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JWriteTxnOpDispenser.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.opdispensers; + +import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter; +import io.nosqlbench.adapter.neo4j.Neo4JSpace; +import io.nosqlbench.adapter.neo4j.ops.Neo4JWriteTxnOp; +import io.nosqlbench.adapter.neo4j.types.Neo4JOpType; +import io.nosqlbench.adapters.api.templating.ParsedOp; +import org.neo4j.driver.async.AsyncSession; + +import java.util.function.LongFunction; + + +public class Neo4JWriteTxnOpDispenser extends Neo4JBaseOpDispenser { + + public Neo4JWriteTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction spaceFunc, String requiredTemplateKey) { + super(adapter, op, spaceFunc, requiredTemplateKey); + } + + @Override + public LongFunction createOpFunc() { + return l -> new Neo4JWriteTxnOp( + spaceFunc.apply(l).getDriver().session(AsyncSession.class), + queryFunc.apply(l) + ); + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBExecutionException.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBExecutionException.java new file mode 100644 index 0000000000..f2761a02f5 --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBExecutionException.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.ops; + +import java.util.concurrent.ExecutionException; + +public class NBExecutionException extends RuntimeException { + private final ExecutionException exception; + + public NBExecutionException(ExecutionException e) { + this.exception = e; + } + + @Override + public String getMessage() { + return "Wrapped Exception: " + exception.getMessage(); + } + + @Override + public StackTraceElement[] getStackTrace() { + return exception.getStackTrace(); + } + + @Override + public synchronized Throwable getCause() { + return exception.getCause(); + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBInterruptedException.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBInterruptedException.java new file mode 100644 index 0000000000..ede57b670b --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBInterruptedException.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.ops; + +public class NBInterruptedException extends RuntimeException { + private final InterruptedException exception; + + public NBInterruptedException(InterruptedException e) { + this.exception = e; + } + + @Override + public String getMessage() { + return "Wrapped Exception: " + exception.getMessage(); + } + + @Override + public StackTraceElement[] getStackTrace() { + return exception.getStackTrace(); + } + + @Override + public synchronized Throwable getCause() { + return exception.getCause(); + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBTimeoutException.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBTimeoutException.java new file mode 100644 index 0000000000..a83305f0d3 --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBTimeoutException.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.ops; + +import java.util.concurrent.TimeoutException; + +public class NBTimeoutException extends RuntimeException { + private final TimeoutException exception; + + public NBTimeoutException(TimeoutException e) { + this.exception = e; + } + + @Override + public String getMessage() { + return "Wrapped Exception: " + exception.getMessage(); + } + + @Override + public StackTraceElement[] getStackTrace() { + return exception.getStackTrace(); + } + + @Override + public synchronized Throwable getCause() { + return exception.getCause(); + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JAutoCommitOp.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JAutoCommitOp.java new file mode 100644 index 0000000000..7505553af7 --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JAutoCommitOp.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.ops; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.async.AsyncSession; + +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class Neo4JAutoCommitOp extends Neo4JBaseOp { + + public Neo4JAutoCommitOp(AsyncSession session, Query query) { + super(session, query); + } + + /** + * Reference: + * - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/async/AsyncSession.html#runAsync(java.lang.String,java.util.Map,org.neo4j.driver.TransactionConfig) + */ + @Override + public final Record[] apply(long value) { + try { + CompletionStage> resultStage = session.runAsync(query).thenComposeAsync( + cursor -> cursor.listAsync().whenComplete( + (records, throwable) -> { + if (throwable != null) { + session.closeAsync(); + } + } + ) + ); + List recordList = resultStage.toCompletableFuture().get(300, TimeUnit.SECONDS); + return recordList.toArray(new Record[recordList.size()]); + } catch (ExecutionException exe) { + Throwable ee = exe.getCause(); + if (ee instanceof RuntimeException re) { + throw re; + } else throw new NBExecutionException(exe); + } catch (InterruptedException ie) { + throw new NBInterruptedException(ie); + } catch (TimeoutException e) { + throw new NBTimeoutException(e); + } + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JBaseOp.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JBaseOp.java new file mode 100644 index 0000000000..261b5cf190 --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JBaseOp.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.ops; + +import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.async.AsyncSession; + + +public abstract class Neo4JBaseOp implements CycleOp { + + protected final AsyncSession session; + protected final Query query; + + public Neo4JBaseOp(AsyncSession session, Query query) { + this.session = session; + this.query = query; + } + + /** + * In the child classes, this method will be responsible for: + * - using the Neo4J AsyncSession object to run the Neo4J Query + * - process the Result to get an array of Records + * - close the AsyncSession + * - Return the array of Records + * + * Session creation and closing is considered light-weight. Reference: + * - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/Session.html#close() + */ + public abstract Record[] apply(long value); + + @Override + public String toString() { + return "Neo4JBaseOp(" + query.toString().getClass().getSimpleName() + ")"; + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JReadTxnOp.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JReadTxnOp.java new file mode 100644 index 0000000000..861e1d33dc --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JReadTxnOp.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.ops; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.async.AsyncSession; + +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class Neo4JReadTxnOp extends Neo4JBaseOp{ + + public Neo4JReadTxnOp(AsyncSession session, Query query) { + super(session, query); + } + + /** + * Reference: + * - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/async/AsyncSession.html#executeReadAsync(org.neo4j.driver.async.AsyncTransactionCallback) + */ + @Override + public final Record[] apply(long value) { + try { + CompletionStage> resultStage = session.executeReadAsync( + txn -> txn.runAsync(query).thenComposeAsync( + cursor -> cursor.listAsync().whenComplete( + (records, throwable) -> { + if (throwable != null) { + session.closeAsync(); + } + } + ) + ) + ); + List recordList = resultStage.toCompletableFuture().get(300, TimeUnit.SECONDS); + return recordList.toArray(new Record[recordList.size()]); + } catch (ExecutionException exe) { + Throwable ee = exe.getCause(); + if (ee instanceof RuntimeException re) { + throw re; + } else throw new NBExecutionException(exe); + } catch (InterruptedException ie) { + throw new NBInterruptedException(ie); + } catch (TimeoutException e) { + throw new NBTimeoutException(e); + } + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JWriteTxnOp.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JWriteTxnOp.java new file mode 100644 index 0000000000..1fec09fd60 --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JWriteTxnOp.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.ops; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.async.AsyncSession; + +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class Neo4JWriteTxnOp extends Neo4JBaseOp{ + + public Neo4JWriteTxnOp(AsyncSession session, Query query) { + super(session, query); + } + + /** + * References: + * - https://neo4j.com/docs/java-manual/current/async/ + * - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/async/AsyncSession.html#executeWriteAsync(org.neo4j.driver.async.AsyncTransactionCallback) + */ + @Override + public final Record[] apply(long value) { + try { + CompletionStage> resultStage = session.executeWriteAsync( + txn -> txn.runAsync(query).thenComposeAsync( + cursor -> cursor.listAsync().whenComplete( + (records, throwable) -> { + if (throwable != null) { + session.closeAsync(); + } + } + ) + ) + ); + List recordList = resultStage.toCompletableFuture().get(300, TimeUnit.SECONDS); + return recordList.toArray(new Record[recordList.size()]); + } catch (ExecutionException exe) { + Throwable ee = exe.getCause(); + if (ee instanceof RuntimeException re) { + throw re; + } else throw new NBExecutionException(exe); + } catch (InterruptedException ie) { + throw new NBInterruptedException(ie); + } catch (TimeoutException e) { + throw new NBTimeoutException(e); + } + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/types/Neo4JOpType.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/types/Neo4JOpType.java new file mode 100644 index 0000000000..8c5040ade6 --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/types/Neo4JOpType.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.types; + +public enum Neo4JOpType { + + autocommit("autocommit"), + + read_transaction("read_transaction"), + + write_transaction("write_transaction"); + + private final String value; + + Neo4JOpType(String value) { + this.value = value; + } + + public String getValue(){ + return value; + } +} diff --git a/adapter-neo4j/src/main/resources/activities/neo4j.yaml b/adapter-neo4j/src/main/resources/activities/neo4j.yaml new file mode 100644 index 0000000000..825d8c05b1 --- /dev/null +++ b/adapter-neo4j/src/main/resources/activities/neo4j.yaml @@ -0,0 +1,99 @@ +min_version: 5.21.1 +description: | + Sample Neo4J Driver workload for ANN. Responsible for resetting/creating schema, ingesting vector + data from HDF5 file format, and then performing ANN queries against the ingested data + + Template Variables: + TEMPLATE(datafile) + TEMPLATE(node_label,Node) + TEMPLATE(k,100) + TEMPLATE(batch_size) + TEMPLATE(delete_batch_size,5000) + +bindings: + id: ToString() + id_batch: Mul(TEMPLATE(batch_size)L); ListSizedStepped(TEMPLATE(batch_size),long->ToString()); + train_vector: HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/train"); + train_vector_batch: Mul(TEMPLATE(batch_size)L); ListSizedStepped(TEMPLATE(batch_size),HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/train")); + test_vector: HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/test"); + relevant_indices: HdfFileToIntArray("testdata/TEMPLATE(datafile).hdf5", "/neighbors") + +blocks: + # TODO: Node deletion times out; attempt this in future: CREATE OR REPLACE DATABASE neo4j + reset-schema: + ops: + # Reference: https://support.neo4j.com/s/article/360059882854-Deleting-large-numbers-of-nodes#h_01H95CXNJ8TN4126T3Y01BRWKS + delete_nodes: + autocommit: | + MATCH (n) + CALL { WITH n + DETACH DELETE n + } IN TRANSACTIONS OF $delete_batch_size ROWS; + query_params: + delete_batch_size: TEMPLATE(delete_batch_size,5000) + drop_index: + autocommit: DROP INDEX $index_name IF EXISTS + query_params: + index_name: vector_index + + schema: + ops: + create_vector_index: + autocommit: | + CREATE VECTOR INDEX $index_name IF NOT EXISTS FOR (n:TEMPLATE(node_label,Node)) + ON (n.embedding) OPTIONS + {indexConfig: {`vector.dimensions`: $dimension, `vector.similarity_function`: $similarity_function}} + query_params: + index_name: vector_index + dimension: TEMPLATE(dimension) + similarity_function: TEMPLATE(similarity_function,cosine) + + rampup: + ops: + insert_node: + write_transaction: | + CREATE (v:TEMPLATE(node_label,Node) {id: $id, embedding: $vector}) + query_params: + id: '{id}' + vector: '{train_vector}' + + rampup-batch: + ops: + # Reference: https://community.neo4j.com/t/unwind-multiple-arrays-to-set-property/59908/5 + insert_nodes: + write_transaction: | + WITH $id_list as ids, $vector_list as vectors + UNWIND RANGE(0, size(ids) - 1) as idx + CREATE (v:TEMPLATE(node_label,Node) {id: ids[idx], embedding: vectors[idx]}) + query_params: + id_list: '{id_batch}' + vector_list: '{train_vector_batch}' + + search: + ops: + search: + read_transaction: | + WITH $query_vector AS queryVector + CALL db.index.vector.queryNodes($index_name, $k, queryVector) + YIELD node + RETURN node.id + query_params: + query_vector: '{test_vector}' + index_name: vector_index + k: TEMPLATE(k,100) + verifier-init: | + relevancy = new io.nosqlbench.nb.api.engine.metrics.wrappers.RelevancyMeasures(_parsed_op); + for (int k in List.of(100)) { + relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.recall("recall",k)); + relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.precision("precision",k)); + relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.F1("F1",k)); + relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.reciprocal_rank("RR",k)); + relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.average_precision("AP",k)); + } + verifier: | + // result is a Record[] + values = io.nosqlbench.adapter.neo4j.Neo4JAdapterUtils.getFieldForAllRecords(result, "node.id") + ann = values.collect { it.toString().toInteger() }.toArray(new Integer[values.size()]) + knn = {relevant_indices} + relevancy.accept(knn, ann); + return true; diff --git a/adapter-neo4j/src/main/resources/neo4j.md b/adapter-neo4j/src/main/resources/neo4j.md new file mode 100644 index 0000000000..0a49784a58 --- /dev/null +++ b/adapter-neo4j/src/main/resources/neo4j.md @@ -0,0 +1,61 @@ +# neo4j driver adapter + +The neo4j driver adapter is a nb adapter for the Neo4J driver, an open source Java driver for connecting to and +performing operations on an instance of a Neo4J/Aura database. The driver is hosted on github at +https://github.com/neo4j/neo4j-java-driver. + +## activity parameters + +The following parameters must be supplied to the adapter at runtime in order to successfully connect to an +instance of the Neo4J/Aura database: + +* db_uri - the URI for the Neo4J instance for the driver to connect to. + +## Op Templates + +The Neo4J adapter supports three different op types: +- autocommit +- read_transaction +- write_transaction + +A good reference for when to use each is located at https://neo4j.com/docs/driver-manual/1.7/sessions-transactions/ + +For these different op types, users can specify appropriate Cypher queries to run against the database + + +## Examples +All examples provided are in the scope of leveraging Neo4J's vector index capabilities. Although, +arbitrary Cypher queries can be run for most involved graph modeling use cases, only a simple +vector search functionality has been properly worked through, currently. + + +```yaml +ops: + example_create_vector_index: + autocommit: | + CREATE VECTOR INDEX $index_name IF NOT EXISTS FOR (n:TEMPLATE(node_label,Node)) + ON (n.embedding) OPTIONS + {indexConfig: {`vector.dimensions`: $dimension, `vector.similarity_function`: $similarity_function}} + query_params: + index_name: vector_index + dimension: TEMPLATE(dimension) + similarity_function: TEMPLATE(similarity_function,cosine) + + example_insert_node: + write_transaction: | + CREATE (v:TEMPLATE(node_label,Node) {id: $id, embedding: $vector}) + query_params: + id: '{id}' + vector: '{train_vector}' + + example_search: + read_transaction: | + WITH $query_vector AS queryVector + CALL db.index.vector.queryNodes($index_name, $k, queryVector) + YIELD node + RETURN node.id + query_params: + query_vector: '{test_vector}' + index_name: vector_index + k: TEMPLATE(k,100) +``` diff --git a/nb5/pom.xml b/nb5/pom.xml index 8ada316eb3..cae2bd24c5 100644 --- a/nb5/pom.xml +++ b/nb5/pom.xml @@ -111,6 +111,11 @@ adapter-mongodb ${revision} + + io.nosqlbench + adapter-neo4j + ${revision} + io.nosqlbench adapter-aws-opensearch @@ -331,6 +336,22 @@ + + adapter-neo4j + + + + ../adapter-neo4j/target + + + + + io.nosqlbench + adapter-neo4j + ${revision} + + + adapter-tcp diff --git a/pom.xml b/pom.xml index 587c1aaaf5..ddd9ff36ed 100644 --- a/pom.xml +++ b/pom.xml @@ -69,6 +69,7 @@ adapter-jdbc adapter-milvus adapter-mongodb + adapter-neo4j adapter-aws-opensearch adapter-cqld4 adapter-s4j From 99213b45562a96909a4ae627a797185ce0138853 Mon Sep 17 00:00:00 2001 From: Mark Wolters Date: Wed, 3 Apr 2024 13:30:54 -0400 Subject: [PATCH 03/21] adding resolver for NBIO cache --- .../nb/api/nbio/ResolverForNBIOCache.java | 116 ++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java new file mode 100644 index 0000000000..c0061a5660 --- /dev/null +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.nosqlbench.nb.api.nbio; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +public class ResolverForNBIOCache implements ContentResolver { + public static final ResolverForNBIOCache INSTANCE = new ResolverForNBIOCache(); + private final static Logger logger = LogManager.getLogger(ResolverForNBIOCache.class); + //TODO: This needs to be set somehow - envvar, yaml setting, etc. + private static final String cache = "/tmp/nbio-cache"; + @Override + public List> resolve(URI uri) { + List> contents = new ArrayList<>(); + Path path = resolvePath(uri); + + if (path != null) { + contents.add(new PathContent(path)); + } + return contents; + } + + private Path resolvePath(URI uri) { + /* + * 1st time through this will just be the name of the file. On the second path it will include the full + * URI, including the scheme (eg file:/// or https://, etc.). Since we need to at least verify the + * existence of the remote file, and more typically compare checksums, we don't do anything until + * we get the full URI + */ + if (uri.getScheme() != null && !uri.getScheme().isEmpty() && + (uri.getScheme().equalsIgnoreCase("http") || + uri.getScheme().equalsIgnoreCase("https"))) { + + Path cachePath = Path.of(cache + uri.getPath()); + if (Files.isReadable(cachePath)) { + return pathFromLocalCache(cachePath, uri); + } + else { + return pathFromRemoteUrl(uri); + } + } + return null; + } + + private Path pathFromRemoteUrl(URI uri) { + URLContent urlContent = resolveURI(uri); + /* + * File is not in cache - next steps: + * 1. Download the file and put it in the cache + * 1a. If the remote file does not exist throw an exception + * 1b. If the download fails jump to step 6 + * 2. Download the checksum and put it in the cache + * 3. Generate a new checksum for the file + * 4. compare the checksums + * 5. If they match, return the path to the file in the cache + * 6. If they don't match/exception downloading repeat steps 1-5 up to a configurable number of times + * 6a. If the max attempts have been exceeded throw an exception and clean up the cache + */ + return null; + } + + private Path pathFromLocalCache(Path cachePath, URI uri) { + /* + * File is in cache - next steps: + * 1. Check "force update" option + * 1a. If true remove file from cache and go to "File is not in cache" operations + * 1b. If not specified default to false + * 2. Check for existence of remote file + * 2a. If the remote file does not exist generate warning message and return local file + * 3. Check "checksum verification" option (default = true) + * 3a. If false generate warning message and return local file + * 4. If a local checksum exists compare it against the remote checksum + * 4a. If none exists generate a new one and compare it against the remote checksum + * 5. If checksums match return the local file + * 6. If checksums do not match remove the local file and go to "File is not in cache" operations + */ + return null; + } + + private URLContent resolveURI(URI uri) { + return null; + } + + @Override + public List resolveDirectory(URI uri) { + List dirs = new ArrayList<>(); + + Path path = resolvePath(uri); + if (path!=null && Files.isDirectory(path)) { + dirs.add(path); + } + return dirs; + } +} From 2692a862731417ab96abae304949813222e8bb7f Mon Sep 17 00:00:00 2001 From: Mark Wolters Date: Thu, 4 Apr 2024 11:31:34 -0400 Subject: [PATCH 04/21] checking in code for handling locally cached file --- .../nb/api/nbio/ResolverForNBIOCache.java | 92 ++++++++++++++++++- 1 file changed, 87 insertions(+), 5 deletions(-) diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java index c0061a5660..5ab92c1411 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java @@ -20,9 +20,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; import java.net.URI; +import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.List; @@ -30,7 +37,11 @@ public class ResolverForNBIOCache implements ContentResolver { public static final ResolverForNBIOCache INSTANCE = new ResolverForNBIOCache(); private final static Logger logger = LogManager.getLogger(ResolverForNBIOCache.class); //TODO: This needs to be set somehow - envvar, yaml setting, etc. - private static final String cache = "/tmp/nbio-cache"; + private static final String cache = "~/.nosqlbench/nbio-cache/"; + //TODO: This needs to be set through configuration at runtime + private boolean forceUpdate = false; + //TODO: This needs to be set through configuration at runtime + private boolean verifyChecksum = true; @Override public List> resolve(URI uri) { List> contents = new ArrayList<>(); @@ -48,11 +59,14 @@ private Path resolvePath(URI uri) { * URI, including the scheme (eg file:/// or https://, etc.). Since we need to at least verify the * existence of the remote file, and more typically compare checksums, we don't do anything until * we get the full URI - */ + * + * TODO: Need to handle situation where file is in the cache, we want to force update but the update fails. + * In this case we don't want to delete the local file because we need to return it. + * Suggestion: add enum type defining behavior (force update, for update IF condition x, do not update, etc.) + */ if (uri.getScheme() != null && !uri.getScheme().isEmpty() && (uri.getScheme().equalsIgnoreCase("http") || uri.getScheme().equalsIgnoreCase("https"))) { - Path cachePath = Path.of(cache + uri.getPath()); if (Files.isReadable(cachePath)) { return pathFromLocalCache(cachePath, uri); @@ -96,11 +110,79 @@ private Path pathFromLocalCache(Path cachePath, URI uri) { * 5. If checksums match return the local file * 6. If checksums do not match remove the local file and go to "File is not in cache" operations */ - return null; + String checksumFileStr = cachePath.toString().substring(0, cachePath.toString().indexOf('.')) + ".sha256"; + if (!Files.isReadable(Path.of(checksumFileStr))) { + try { + Files.writeString(Path.of(checksumFileStr), generateSHA256Checksum(cachePath.toString())); + } catch (IOException | NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } + if (forceUpdate) { + if (!cachePath.toFile().delete()) + logger.warn(() -> "Could not delete cached file " + cachePath); + if (!new File(checksumFileStr).delete()) + logger.warn(() -> "Could not delete cached checksum " + checksumFileStr); + return pathFromRemoteUrl(uri); + } + if (!verifyChecksum) { + logger.warn(() -> "Checksum verification is disabled, returning cached file " + cachePath); + return cachePath; + } + URLContent content = resolveURI(uri); + if (content == null) { + logger.warn(() -> "Remote file does not exist, returning cached file " + cachePath); + return cachePath; + } + String remoteChecksumFileStr = uri.getPath().substring(0, uri.getPath().indexOf('.')) + ".sha256"; + URLContent checksum = resolveURI(URI.create(uri.toString().replace(uri.getPath(), remoteChecksumFileStr))); + if (checksum == null) { + logger.warn(() -> "Remote checksum file does not exist, returning cached file " + cachePath); + return cachePath; + } + try { + String localChecksum = Files.readString(Path.of(checksumFileStr)); + String remoteChecksum = new String(checksum.getInputStream().readAllBytes()); + if (localChecksum.equals(remoteChecksum)) { + return cachePath; + } + else { + if (!cachePath.toFile().delete()) + logger.warn(() -> "Could not delete cached file " + cachePath); + return pathFromRemoteUrl(uri); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static String generateSHA256Checksum(String filePath) throws IOException, NoSuchAlgorithmException { + MessageDigest md = MessageDigest.getInstance("SHA-256"); + try (InputStream is = new FileInputStream(filePath)) { + byte[] buffer = new byte[8192]; + int bytesRead; + while ((bytesRead = is.read(buffer)) != -1) { + md.update(buffer, 0, bytesRead); + } + } + byte[] digest = md.digest(); + StringBuilder sb = new StringBuilder(); + for (byte b : digest) { + sb.append(String.format("%02x", b)); + } + return sb.toString(); } private URLContent resolveURI(URI uri) { - return null; + try { + URL url = uri.toURL(); + InputStream inputStream = url.openStream(); + logger.debug("Found accessible remote file at " + url); + return new URLContent(url, inputStream); + } catch (IOException e) { + logger.warn("Unable to find content at URI '" + uri + "', this often indicates a configuration error."); + return null; + } } @Override From dcd4b0fe72b7315d6f6120e161d6d1e568cb58e5 Mon Sep 17 00:00:00 2001 From: Mark Wolters Date: Thu, 4 Apr 2024 15:37:52 -0400 Subject: [PATCH 05/21] checkpoint checkin --- .../nb/api/nbio/ResolverForNBIOCache.java | 86 +++++++++++++++---- 1 file changed, 71 insertions(+), 15 deletions(-) diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java index 5ab92c1411..3034b92fb5 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java @@ -20,7 +20,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -39,9 +38,11 @@ public class ResolverForNBIOCache implements ContentResolver { //TODO: This needs to be set somehow - envvar, yaml setting, etc. private static final String cache = "~/.nosqlbench/nbio-cache/"; //TODO: This needs to be set through configuration at runtime - private boolean forceUpdate = false; + private final boolean forceUpdate = false; //TODO: This needs to be set through configuration at runtime - private boolean verifyChecksum = true; + private final boolean verifyChecksum = true; + //TODO: This needs to be set through configuration at runtime + private final int maxRetries = 3; @Override public List> resolve(URI uri) { List> contents = new ArrayList<>(); @@ -78,21 +79,76 @@ private Path resolvePath(URI uri) { return null; } + private boolean downloadFile(URI uri, Path cachePath) { + int retries = 0; + boolean success = false; + while (retries < maxRetries) { + try { + URLContent urlContent = resolveURI(uri); + if (urlContent != null) { + Files.copy(urlContent.getInputStream(), cachePath); + logger.debug("Downloaded remote file to cache at " + cachePath); + success = true; + break; + } else { + logger.error("Error downloading remote file to cache at " + cachePath + ", retrying..."); + retries++; + } + } catch (IOException e) { + logger.error("Error downloading remote file to cache at " + cachePath + ", retrying..."); + retries++; + } + } + return success; + } + private Path pathFromRemoteUrl(URI uri) { - URLContent urlContent = resolveURI(uri); /* * File is not in cache - next steps: * 1. Download the file and put it in the cache - * 1a. If the remote file does not exist throw an exception + * 1a. If the remote file does not exist log error and return null * 1b. If the download fails jump to step 6 - * 2. Download the checksum and put it in the cache - * 3. Generate a new checksum for the file + * 2. Download the checksum and store in memory + * 3. Generate a new checksum for the file and put it in the cache * 4. compare the checksums * 5. If they match, return the path to the file in the cache * 6. If they don't match/exception downloading repeat steps 1-5 up to a configurable number of times * 6a. If the max attempts have been exceeded throw an exception and clean up the cache */ - return null; + Path cachePath = Path.of(cache + uri.getPath()); + if (downloadFile(uri, cachePath)) { + String remoteChecksumFileStr = uri.getPath().substring(0, uri.getPath().indexOf('.')) + ".sha256"; + try { + String localChecksumStr = generateSHA256Checksum(cachePath.toString()); + URLContent checksum = resolveURI(URI.create(uri.toString().replace(uri.getPath(), remoteChecksumFileStr))); + if (checksum == null) { + logger.warn("Remote checksum file does not exist"); + return cachePath; + } else { + Path checksumPath = Path.of(cachePath.toString().substring(0, cachePath.toString().indexOf('.')) + ".sha256"); + Files.writeString(checksumPath, localChecksumStr); + logger.debug("Generated local checksum and saved to cache at " + checksumPath); + String remoteChecksum = new String(checksum.getInputStream().readAllBytes()); + if (localChecksumStr.equals(remoteChecksum)) { + return cachePath; + } else { + if (!cachePath.toFile().delete()) + logger.warn(() -> "Could not delete cached file " + cachePath); + if (!checksumPath.toFile().delete()) + logger.warn(() -> "Could not delete cached checksum " + checksumPath); + throw new RuntimeException("Checksums do not match"); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + cleanupCache(); + throw new RuntimeException("Error downloading remote file to cache at " + cachePath); + } + } + + private void cleanupCache() { } private Path pathFromLocalCache(Path cachePath, URI uri) { @@ -110,10 +166,10 @@ private Path pathFromLocalCache(Path cachePath, URI uri) { * 5. If checksums match return the local file * 6. If checksums do not match remove the local file and go to "File is not in cache" operations */ - String checksumFileStr = cachePath.toString().substring(0, cachePath.toString().indexOf('.')) + ".sha256"; - if (!Files.isReadable(Path.of(checksumFileStr))) { + Path checksumPath = Path.of(cachePath.toString().substring(0, cachePath.toString().indexOf('.')) + ".sha256"); + if (!Files.isReadable(checksumPath)) { try { - Files.writeString(Path.of(checksumFileStr), generateSHA256Checksum(cachePath.toString())); + Files.writeString(checksumPath, generateSHA256Checksum(cachePath.toString())); } catch (IOException | NoSuchAlgorithmException e) { throw new RuntimeException(e); } @@ -121,8 +177,8 @@ private Path pathFromLocalCache(Path cachePath, URI uri) { if (forceUpdate) { if (!cachePath.toFile().delete()) logger.warn(() -> "Could not delete cached file " + cachePath); - if (!new File(checksumFileStr).delete()) - logger.warn(() -> "Could not delete cached checksum " + checksumFileStr); + if (!checksumPath.toFile().delete()) + logger.warn(() -> "Could not delete cached checksum " + checksumPath); return pathFromRemoteUrl(uri); } if (!verifyChecksum) { @@ -141,7 +197,7 @@ private Path pathFromLocalCache(Path cachePath, URI uri) { return cachePath; } try { - String localChecksum = Files.readString(Path.of(checksumFileStr)); + String localChecksum = Files.readString(checksumPath); String remoteChecksum = new String(checksum.getInputStream().readAllBytes()); if (localChecksum.equals(remoteChecksum)) { return cachePath; @@ -180,7 +236,7 @@ private URLContent resolveURI(URI uri) { logger.debug("Found accessible remote file at " + url); return new URLContent(url, inputStream); } catch (IOException e) { - logger.warn("Unable to find content at URI '" + uri + "', this often indicates a configuration error."); + logger.error("Unable to find content at URI '" + uri + "', this often indicates a configuration error."); return null; } } From e14761bb08ea53471ca16ad27fce297a48cb9730 Mon Sep 17 00:00:00 2001 From: Mark Wolters Date: Fri, 5 Apr 2024 11:03:36 -0400 Subject: [PATCH 06/21] added resolver for nbio cache into flow, resolved download issues --- .../java/io/nosqlbench/nb/api/nbio/NBIO.java | 2 +- .../nb/api/nbio/NBIOResolverConditions.java | 21 ++++++++ .../nb/api/nbio/ResolverForClasspath.java | 8 +-- .../nb/api/nbio/ResolverForNBIOCache.java | 54 +++++++++++++++---- .../nosqlbench/nb/api/nbio/URIResolver.java | 13 ++++- .../nosqlbench/nb/api/nbio/URIResolvers.java | 4 ++ 6 files changed, 86 insertions(+), 16 deletions(-) create mode 100644 nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIOResolverConditions.java diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIO.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIO.java index 04c320c821..68e3f1f9a6 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIO.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIO.java @@ -163,7 +163,7 @@ public NBPathsAPI.GetPrefixes fileContent() { */ @Override public NBPathsAPI.GetPrefixes allContent() { - this.resolver = URIResolvers.inFS().inCP().inURLs(); + this.resolver = URIResolvers.inFS().inCP().inURLs().inNBIOCache(); return this; } diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIOResolverConditions.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIOResolverConditions.java new file mode 100644 index 0000000000..9ea20fe0b9 --- /dev/null +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIOResolverConditions.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.nosqlbench.nb.api.nbio; + +public enum NBIOResolverConditions { +} diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForClasspath.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForClasspath.java index b2d659e5ab..003aaa4368 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForClasspath.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForClasspath.java @@ -101,9 +101,11 @@ public List> resolve(URI uri) { public List resolveDirectory(URI uri) { List path = resolvePaths(uri); List dirs = new ArrayList<>(); - for (Path dirpath : path) { - if (Files.isDirectory(dirpath)) { - dirs.add(dirpath); + if (path != null) { + for (Path dirpath : path) { + if (Files.isDirectory(dirpath)) { + dirs.add(dirpath); + } } } return dirs; diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java index 3034b92fb5..b4147e20e9 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java @@ -35,14 +35,15 @@ public class ResolverForNBIOCache implements ContentResolver { public static final ResolverForNBIOCache INSTANCE = new ResolverForNBIOCache(); private final static Logger logger = LogManager.getLogger(ResolverForNBIOCache.class); + private static final String userHomeDirectory = System.getProperty("user.home"); //TODO: This needs to be set somehow - envvar, yaml setting, etc. - private static final String cache = "~/.nosqlbench/nbio-cache/"; + private static String cache = userHomeDirectory + "/.nosqlbench/nbio-cache/"; //TODO: This needs to be set through configuration at runtime - private final boolean forceUpdate = false; + private static boolean forceUpdate = false; //TODO: This needs to be set through configuration at runtime - private final boolean verifyChecksum = true; + private static boolean verifyChecksum = true; //TODO: This needs to be set through configuration at runtime - private final int maxRetries = 3; + private static int maxRetries = 3; @Override public List> resolve(URI uri) { List> contents = new ArrayList<>(); @@ -64,6 +65,7 @@ private Path resolvePath(URI uri) { * TODO: Need to handle situation where file is in the cache, we want to force update but the update fails. * In this case we don't want to delete the local file because we need to return it. * Suggestion: add enum type defining behavior (force update, for update IF condition x, do not update, etc.) + * See NBIOResolverConditions */ if (uri.getScheme() != null && !uri.getScheme().isEmpty() && (uri.getScheme().equalsIgnoreCase("http") || @@ -86,16 +88,17 @@ private boolean downloadFile(URI uri, Path cachePath) { try { URLContent urlContent = resolveURI(uri); if (urlContent != null) { + logger.info(() -> "Downloading remote file " + uri + " to cache at " + cachePath); Files.copy(urlContent.getInputStream(), cachePath); - logger.debug("Downloaded remote file to cache at " + cachePath); + logger.info(() -> "Downloaded remote file to cache at " + cachePath); success = true; break; } else { - logger.error("Error downloading remote file to cache at " + cachePath + ", retrying..."); + logger.error(() -> "Error downloading remote file to cache at " + cachePath + ", retrying..."); retries++; } } catch (IOException e) { - logger.error("Error downloading remote file to cache at " + cachePath + ", retrying..."); + logger.error(() -> "Error downloading remote file to cache at " + cachePath + ", retrying..."); retries++; } } @@ -116,18 +119,19 @@ private Path pathFromRemoteUrl(URI uri) { * 6a. If the max attempts have been exceeded throw an exception and clean up the cache */ Path cachePath = Path.of(cache + uri.getPath()); + createCacheDir(cachePath); if (downloadFile(uri, cachePath)) { String remoteChecksumFileStr = uri.getPath().substring(0, uri.getPath().indexOf('.')) + ".sha256"; try { String localChecksumStr = generateSHA256Checksum(cachePath.toString()); URLContent checksum = resolveURI(URI.create(uri.toString().replace(uri.getPath(), remoteChecksumFileStr))); if (checksum == null) { - logger.warn("Remote checksum file does not exist"); + logger.warn(() -> "Remote checksum file " + remoteChecksumFileStr + " does not exist"); return cachePath; } else { Path checksumPath = Path.of(cachePath.toString().substring(0, cachePath.toString().indexOf('.')) + ".sha256"); Files.writeString(checksumPath, localChecksumStr); - logger.debug("Generated local checksum and saved to cache at " + checksumPath); + logger.debug(() -> "Generated local checksum and saved to cache at " + checksumPath); String remoteChecksum = new String(checksum.getInputStream().readAllBytes()); if (localChecksumStr.equals(remoteChecksum)) { return cachePath; @@ -148,6 +152,17 @@ private Path pathFromRemoteUrl(URI uri) { } } + private void createCacheDir(Path cachePath) { + Path dir = cachePath.getParent(); + if (!Files.exists(dir)) { + try { + Files.createDirectories(dir); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + private void cleanupCache() { } @@ -233,10 +248,10 @@ private URLContent resolveURI(URI uri) { try { URL url = uri.toURL(); InputStream inputStream = url.openStream(); - logger.debug("Found accessible remote file at " + url); + logger.debug(() -> "Found accessible remote file at " + url); return new URLContent(url, inputStream); } catch (IOException e) { - logger.error("Unable to find content at URI '" + uri + "', this often indicates a configuration error."); + logger.error(() -> "Unable to find content at URI '" + uri + "', this often indicates a configuration error."); return null; } } @@ -251,4 +266,21 @@ public List resolveDirectory(URI uri) { } return dirs; } + + public static void setCache(String cache) { + ResolverForNBIOCache.cache = cache; + } + + public static void setForceUpdate(boolean forceUpdate) { + ResolverForNBIOCache.forceUpdate = forceUpdate; + } + + public static void setVerifyChecksum(boolean verifyChecksum) { + ResolverForNBIOCache.verifyChecksum = verifyChecksum; + } + + public static void setMaxRetries(int maxRetries) { + ResolverForNBIOCache.maxRetries = maxRetries; + } + } diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/URIResolver.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/URIResolver.java index 4aff35b696..90e2be2af4 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/URIResolver.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/URIResolver.java @@ -36,7 +36,8 @@ public class URIResolver implements ContentResolver { private static final List EVERYWHERE = List.of( ResolverForURL.INSTANCE, ResolverForFilesystem.INSTANCE, - ResolverForClasspath.INSTANCE + ResolverForClasspath.INSTANCE, + ResolverForNBIOCache.INSTANCE ); private List extensions; @@ -87,6 +88,16 @@ public URIResolver inCP() { return this; } + /** + * Include resources within the NBIO cache or download them if they are not found. + * + * @return this URISearch + */ + public URIResolver inNBIOCache() { + loaders.add(ResolverForNBIOCache.INSTANCE); + return this; + } + public List> resolve(String uri) { return resolve(URI.create(uri)); } diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/URIResolvers.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/URIResolvers.java index c1a4c852b6..5c08d8eb5c 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/URIResolvers.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/URIResolvers.java @@ -52,4 +52,8 @@ public static URIResolver inURLs() { public static URIResolver inClasspath() { return new URIResolver().inCP(); } + + public static URIResolver inNBIOCache() { + return new URIResolver().inNBIOCache(); + } } From 3845fd825693b7bd54cb204112b96b4a4136f1d4 Mon Sep 17 00:00:00 2001 From: Mark Wolters Date: Fri, 5 Apr 2024 13:53:33 -0400 Subject: [PATCH 07/21] fixed issue returning dirs --- .../io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java index b4147e20e9..3703199027 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java @@ -25,6 +25,7 @@ import java.io.InputStream; import java.net.URI; import java.net.URL; +import java.nio.file.FileSystem; import java.nio.file.Files; import java.nio.file.Path; import java.security.MessageDigest; @@ -129,7 +130,7 @@ private Path pathFromRemoteUrl(URI uri) { logger.warn(() -> "Remote checksum file " + remoteChecksumFileStr + " does not exist"); return cachePath; } else { - Path checksumPath = Path.of(cachePath.toString().substring(0, cachePath.toString().indexOf('.')) + ".sha256"); + Path checksumPath = Path.of(cachePath.toString().substring(0, cachePath.toString().lastIndexOf('.')) + ".sha256"); Files.writeString(checksumPath, localChecksumStr); logger.debug(() -> "Generated local checksum and saved to cache at " + checksumPath); String remoteChecksum = new String(checksum.getInputStream().readAllBytes()); @@ -181,7 +182,7 @@ private Path pathFromLocalCache(Path cachePath, URI uri) { * 5. If checksums match return the local file * 6. If checksums do not match remove the local file and go to "File is not in cache" operations */ - Path checksumPath = Path.of(cachePath.toString().substring(0, cachePath.toString().indexOf('.')) + ".sha256"); + Path checksumPath = Path.of(cachePath.toString().substring(0, cachePath.toString().lastIndexOf('.')) + ".sha256"); if (!Files.isReadable(checksumPath)) { try { Files.writeString(checksumPath, generateSHA256Checksum(cachePath.toString())); @@ -260,8 +261,8 @@ private URLContent resolveURI(URI uri) { public List resolveDirectory(URI uri) { List dirs = new ArrayList<>(); - Path path = resolvePath(uri); - if (path!=null && Files.isDirectory(path)) { + Path path = Path.of(cache + uri.getPath()); + if (Files.isDirectory(path)) { dirs.add(path); } return dirs; From 45ac75b90b471c59a87b0cd3e5f65c8de1d2befb Mon Sep 17 00:00:00 2001 From: Mark Wolters Date: Tue, 9 Apr 2024 09:02:08 -0400 Subject: [PATCH 08/21] added command line option to use nbio cache --- .../java/io/nosqlbench/engine/cli/NBCLI.java | 1 + .../io/nosqlbench/engine/cli/NBCLIOptions.java | 9 +++++++++ .../java/io/nosqlbench/nb/api/nbio/NBIO.java | 17 ++++++++++++++++- 3 files changed, 26 insertions(+), 1 deletion(-) diff --git a/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java b/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java index 0561cb6159..00334b1da3 100644 --- a/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java +++ b/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java @@ -220,6 +220,7 @@ public Integer applyDirect(final String[] args) { NBCLI.logger = LogManager.getLogger("NBCLI"); NBIO.addGlobalIncludes(options.wantsIncludes()); + NBIO.setUseNBIOCache(options.wantsToUseNBIOCache()); if (options.wantsBasicHelp()) { System.out.println(this.loadHelpFile("basic.md")); diff --git a/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java b/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java index 8b7d4de32c..acf77d0a75 100644 --- a/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java +++ b/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java @@ -137,6 +137,7 @@ public class NBCLIOptions { private static final String DEFAULT_CONSOLE_PATTERN = "TERSE"; private static final String DEFAULT_LOGFILE_PATTERN = "VERBOSE"; private final static String ENABLE_DEDICATED_VERIFICATION_LOGGER = "--enable-dedicated-verification-logging"; + private final static String USE_NBIO_CACHE = "--use-nbio-cache"; // private static final String DEFAULT_CONSOLE_LOGGING_PATTERN = "%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"; @@ -206,6 +207,7 @@ public class NBCLIOptions { private String metricsLabelSpec = ""; private String wantsToCatResource = ""; private long heartbeatIntervalMs = 10000; + private boolean useNBIOCache = false; public boolean wantsLoggedMetrics() { return this.wantsConsoleMetrics; @@ -651,6 +653,10 @@ private void parseAllOptions(final String[] args) { this.heartbeatIntervalMs = Long.parseLong(this.readWordOrThrow(arglist, "heartbeat interval in ms")); break; + case USE_NBIO_CACHE: + arglist.removeFirst(); + this.useNBIOCache = true; + break; default: nonincludes.addLast(arglist.removeFirst()); } @@ -812,6 +818,9 @@ public String getSessionName() { public NBLogLevel getConsoleLogLevel() { return this.consoleLevel; } + public boolean wantsToUseNBIOCache() { + return this.useNBIOCache; + } private String readWordOrThrow(final LinkedList arglist, final String required) { if (null == arglist.peekFirst()) diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIO.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIO.java index 68e3f1f9a6..e983cd5278 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIO.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIO.java @@ -43,6 +43,8 @@ public class NBIO implements NBPathsAPI.Facets { private static String[] globalIncludes = new String[0]; + private static boolean useNBIOCache; + public synchronized static void addGlobalIncludes(String[] globalIncludes) { NBIO.globalIncludes = globalIncludes; } @@ -163,7 +165,11 @@ public NBPathsAPI.GetPrefixes fileContent() { */ @Override public NBPathsAPI.GetPrefixes allContent() { - this.resolver = URIResolvers.inFS().inCP().inURLs().inNBIOCache(); + if (useNBIOCache) { + this.resolver = URIResolvers.inFS().inCP().inNBIOCache(); + } else { + this.resolver = URIResolvers.inFS().inCP().inURLs(); + } return this; } @@ -628,4 +634,13 @@ public String toString() { ", extensionSets=" + extensionSets + '}'; } + + public boolean useNBIOCache() { + return useNBIOCache; + } + + public static void setUseNBIOCache(boolean wantsToUseNBIOCache) { + useNBIOCache = wantsToUseNBIOCache; + } + } From e8e67bcdd6083cc976158cf6516cf327cad37e6c Mon Sep 17 00:00:00 2001 From: Mark Wolters Date: Tue, 9 Apr 2024 12:09:15 -0400 Subject: [PATCH 09/21] added command line configuration options --- .../java/io/nosqlbench/engine/cli/NBCLI.java | 20 +++++++++++ .../nosqlbench/engine/cli/NBCLIOptions.java | 36 +++++++++++++++++++ .../java/io/nosqlbench/nb/api/nbio/NBIO.java | 17 +++++++++ .../io/nosqlbench/nb/api/nbio/NBPathsAPI.java | 8 +++++ .../nb/api/nbio/ResolverForNBIOCache.java | 18 ++++------ 5 files changed, 87 insertions(+), 12 deletions(-) diff --git a/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java b/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java index 00334b1da3..eca8b8c582 100644 --- a/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java +++ b/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java @@ -49,6 +49,7 @@ import io.nosqlbench.engine.core.metadata.MarkdownFinder; import io.nosqlbench.nb.annotations.Service; import io.nosqlbench.nb.annotations.ServiceSelector; +import io.nosqlbench.nb.api.nbio.ResolverForNBIOCache; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.config.ConfigurationFactory; @@ -221,6 +222,25 @@ public Integer applyDirect(final String[] args) { NBIO.addGlobalIncludes(options.wantsIncludes()); NBIO.setUseNBIOCache(options.wantsToUseNBIOCache()); + if(options.wantsToUseNBIOCache()) { + logger.info(() -> "Configuring options for NBIO Cache"); + logger.info(() -> "Setting NBIO Cache Force Update to " + options.wantsNbioCacheForceUpdate()); + ResolverForNBIOCache.setForceUpdate(options.wantsNbioCacheForceUpdate()); + logger.info(() -> "Setting NBIO Cache Verify Checksum to " + options.wantsNbioCacheVerify()); + ResolverForNBIOCache.setVerifyChecksum(options.wantsNbioCacheVerify()); + if (options.getNbioCacheDir() != null) { + logger.info(() -> "Setting NBIO Cache directory to " + options.getNbioCacheDir()); + ResolverForNBIOCache.setCacheDir(options.getNbioCacheDir()); + } + if (options.getNbioCacheMaxRetries() != null) { + try { + ResolverForNBIOCache.setMaxRetries(Integer.parseInt(options.getNbioCacheMaxRetries())); + logger.info(() -> "Setting NBIO Cache max retries to " + options.getNbioCacheMaxRetries()); + } catch (NumberFormatException e) { + logger.error("Invalid value for nbio-cache-max-retries: " + options.getNbioCacheMaxRetries()); + } + } + } if (options.wantsBasicHelp()) { System.out.println(this.loadHelpFile("basic.md")); diff --git a/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java b/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java index acf77d0a75..53a646fd87 100644 --- a/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java +++ b/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java @@ -138,6 +138,10 @@ public class NBCLIOptions { private static final String DEFAULT_LOGFILE_PATTERN = "VERBOSE"; private final static String ENABLE_DEDICATED_VERIFICATION_LOGGER = "--enable-dedicated-verification-logging"; private final static String USE_NBIO_CACHE = "--use-nbio-cache"; + private final static String NBIO_CACHE_FORCE_UPDATE = "--nbio-cache-force-update"; + private final static String NBIO_CACHE_NO_VERIFY = "--nbio-cache-no-verify"; + private final static String NBIO_CACHE_DIR = "--nbio-cache-dir"; + private final static String NBIO_CACHE_MAX_RETRIES = "--nbio-cache-max-retries"; // private static final String DEFAULT_CONSOLE_LOGGING_PATTERN = "%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"; @@ -208,6 +212,10 @@ public class NBCLIOptions { private String wantsToCatResource = ""; private long heartbeatIntervalMs = 10000; private boolean useNBIOCache = false; + private boolean nbioCacheForceUpdate = false; + private boolean nbioCacheVerify = true; + private String nbioCachDir; + private String nbioCacheMaxRetries; public boolean wantsLoggedMetrics() { return this.wantsConsoleMetrics; @@ -657,6 +665,22 @@ private void parseAllOptions(final String[] args) { arglist.removeFirst(); this.useNBIOCache = true; break; + case NBIO_CACHE_FORCE_UPDATE: + arglist.removeFirst(); + this.nbioCacheForceUpdate = true; + break; + case NBIO_CACHE_NO_VERIFY: + arglist.removeFirst(); + this.nbioCacheVerify = false; + break; + case NBCLIOptions.NBIO_CACHE_DIR: + arglist.removeFirst(); + this.nbioCachDir = this.readWordOrThrow(arglist, "a NBIO cache directory"); + break; + case NBIO_CACHE_MAX_RETRIES: + arglist.removeFirst(); + this.nbioCacheMaxRetries = this.readWordOrThrow(arglist, "the maximum number of attempts to fetch a resource from the cache"); + break; default: nonincludes.addLast(arglist.removeFirst()); } @@ -821,6 +845,18 @@ public NBLogLevel getConsoleLogLevel() { public boolean wantsToUseNBIOCache() { return this.useNBIOCache; } + public boolean wantsNbioCacheForceUpdate() { + return nbioCacheForceUpdate; + } + public boolean wantsNbioCacheVerify() { + return nbioCacheVerify; + } + public String getNbioCacheDir() { + return nbioCachDir; + } + public String getNbioCacheMaxRetries() { + return nbioCacheMaxRetries; + } private String readWordOrThrow(final LinkedList arglist, final String required) { if (null == arglist.peekFirst()) diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIO.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIO.java index e983cd5278..87badc54ae 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIO.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIO.java @@ -160,6 +160,15 @@ public NBPathsAPI.GetPrefixes fileContent() { return this; } + /** + * {@inheritDoc} + */ + @Override + public NBPathsAPI.GetPrefixes cachedContent() { + this.resolver = URIResolvers.inNBIOCache(); + return this; + } + /** * {@inheritDoc} */ @@ -349,6 +358,14 @@ public static NBPathsAPI.GetPrefixes remote() { return new NBIO().remoteContent(); } + /** + * Return content from the NBIO cache. If the content is not in the cache look for it in the given + * URL and put it in the cache. + * + * @return this builder + */ + public static NBPathsAPI.GetPrefixes cached() { return new NBIO().cachedContent(); } + /** * {@inheritDoc} diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBPathsAPI.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBPathsAPI.java index d3bb764317..75e65af023 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBPathsAPI.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBPathsAPI.java @@ -67,6 +67,14 @@ interface GetSource { */ GetPrefixes fileContent(); + /** + * Return content from the NBIO cache. If the content is not in the cache look for it in the given + * URL and put it in the cache. + * + * @return this builder + */ + GetPrefixes cachedContent(); + /** * Return content from everywhere, from remote URls, or from the file system and then the internal * bundled content if not found in the file system first. diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java index 3703199027..0a42e42536 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java @@ -25,7 +25,6 @@ import java.io.InputStream; import java.net.URI; import java.net.URL; -import java.nio.file.FileSystem; import java.nio.file.Files; import java.nio.file.Path; import java.security.MessageDigest; @@ -36,14 +35,9 @@ public class ResolverForNBIOCache implements ContentResolver { public static final ResolverForNBIOCache INSTANCE = new ResolverForNBIOCache(); private final static Logger logger = LogManager.getLogger(ResolverForNBIOCache.class); - private static final String userHomeDirectory = System.getProperty("user.home"); - //TODO: This needs to be set somehow - envvar, yaml setting, etc. - private static String cache = userHomeDirectory + "/.nosqlbench/nbio-cache/"; - //TODO: This needs to be set through configuration at runtime + private static String cacheDir = System.getProperty("user.home") + "/.nosqlbench/nbio-cache/"; private static boolean forceUpdate = false; - //TODO: This needs to be set through configuration at runtime private static boolean verifyChecksum = true; - //TODO: This needs to be set through configuration at runtime private static int maxRetries = 3; @Override public List> resolve(URI uri) { @@ -71,7 +65,7 @@ private Path resolvePath(URI uri) { if (uri.getScheme() != null && !uri.getScheme().isEmpty() && (uri.getScheme().equalsIgnoreCase("http") || uri.getScheme().equalsIgnoreCase("https"))) { - Path cachePath = Path.of(cache + uri.getPath()); + Path cachePath = Path.of(cacheDir + uri.getPath()); if (Files.isReadable(cachePath)) { return pathFromLocalCache(cachePath, uri); } @@ -119,7 +113,7 @@ private Path pathFromRemoteUrl(URI uri) { * 6. If they don't match/exception downloading repeat steps 1-5 up to a configurable number of times * 6a. If the max attempts have been exceeded throw an exception and clean up the cache */ - Path cachePath = Path.of(cache + uri.getPath()); + Path cachePath = Path.of(cacheDir + uri.getPath()); createCacheDir(cachePath); if (downloadFile(uri, cachePath)) { String remoteChecksumFileStr = uri.getPath().substring(0, uri.getPath().indexOf('.')) + ".sha256"; @@ -261,15 +255,15 @@ private URLContent resolveURI(URI uri) { public List resolveDirectory(URI uri) { List dirs = new ArrayList<>(); - Path path = Path.of(cache + uri.getPath()); + Path path = Path.of(cacheDir + uri.getPath()); if (Files.isDirectory(path)) { dirs.add(path); } return dirs; } - public static void setCache(String cache) { - ResolverForNBIOCache.cache = cache; + public static void setCacheDir(String cacheDir) { + ResolverForNBIOCache.cacheDir = cacheDir; } public static void setForceUpdate(boolean forceUpdate) { From ddb8d4d5508c8944914513ae6e3e46ab2b54a416 Mon Sep 17 00:00:00 2001 From: Mark Wolters Date: Wed, 10 Apr 2024 08:15:49 -0400 Subject: [PATCH 10/21] javadoc --- .../nb/api/nbio/ResolverForNBIOCache.java | 83 ++++++++++--------- 1 file changed, 44 insertions(+), 39 deletions(-) diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java index 0a42e42536..95a03261ec 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java @@ -39,6 +39,7 @@ public class ResolverForNBIOCache implements ContentResolver { private static boolean forceUpdate = false; private static boolean verifyChecksum = true; private static int maxRetries = 3; + @Override public List> resolve(URI uri) { List> contents = new ArrayList<>(); @@ -50,18 +51,16 @@ public List> resolve(URI uri) { return contents; } + /** + * This method is used to resolve the path of a given URI. + * It first checks if the URI has a scheme (http or https) and if it does, it tries to resolve the path from the cache. + * If the file is not in the cache, it tries to download it from the remote URL. + * If the URI does not have a scheme, it returns null. + * + * @param uri the URI to resolve the path for + * @return the resolved Path object, or null if the URI does not have a scheme or the path could not be resolved + */ private Path resolvePath(URI uri) { - /* - * 1st time through this will just be the name of the file. On the second path it will include the full - * URI, including the scheme (eg file:/// or https://, etc.). Since we need to at least verify the - * existence of the remote file, and more typically compare checksums, we don't do anything until - * we get the full URI - * - * TODO: Need to handle situation where file is in the cache, we want to force update but the update fails. - * In this case we don't want to delete the local file because we need to return it. - * Suggestion: add enum type defining behavior (force update, for update IF condition x, do not update, etc.) - * See NBIOResolverConditions - */ if (uri.getScheme() != null && !uri.getScheme().isEmpty() && (uri.getScheme().equalsIgnoreCase("http") || uri.getScheme().equalsIgnoreCase("https"))) { @@ -100,19 +99,19 @@ private boolean downloadFile(URI uri, Path cachePath) { return success; } + /** + * This method is used to download a file from a remote URL and store it in a local cache. + * It first creates the cache directory if it doesn't exist. + * Then it tries to download the file and if successful, it generates a SHA256 checksum for the downloaded file. + * It then compares the generated checksum with the remote checksum. + * If the checksums match, it returns the path to the cached file. + * If the checksums don't match or if there was an error during the download, it cleans up the cache and throws a RuntimeException. + * + * @param uri the URI of the remote file to download + * @return the Path to the downloaded file in the local cache + * @throws RuntimeException if there was an error during the download or if the checksums don't match + */ private Path pathFromRemoteUrl(URI uri) { - /* - * File is not in cache - next steps: - * 1. Download the file and put it in the cache - * 1a. If the remote file does not exist log error and return null - * 1b. If the download fails jump to step 6 - * 2. Download the checksum and store in memory - * 3. Generate a new checksum for the file and put it in the cache - * 4. compare the checksums - * 5. If they match, return the path to the file in the cache - * 6. If they don't match/exception downloading repeat steps 1-5 up to a configurable number of times - * 6a. If the max attempts have been exceeded throw an exception and clean up the cache - */ Path cachePath = Path.of(cacheDir + uri.getPath()); createCacheDir(cachePath); if (downloadFile(uri, cachePath)) { @@ -142,7 +141,7 @@ private Path pathFromRemoteUrl(URI uri) { throw new RuntimeException(e); } } else { - cleanupCache(); + cleanupCache(cachePath); throw new RuntimeException("Error downloading remote file to cache at " + cachePath); } } @@ -158,24 +157,30 @@ private void createCacheDir(Path cachePath) { } } - private void cleanupCache() { + private void cleanupCache(Path cachePath) { + if (!cachePath.toFile().delete()) + logger.warn(() -> "Could not delete cached file " + cachePath); + Path checksumPath = Path.of(cachePath.toString().substring(0, cachePath.toString().lastIndexOf('.')) + ".sha256"); + if (!checksumPath.toFile().delete()) + logger.warn(() -> "Could not delete cached checksum " + checksumPath); } + /** + * This method is used to retrieve a file from the local cache. + * It first checks if the file exists in the cache and if a checksum file is present. + * If the checksum file is not present, it generates a new one. + * If the "force update" option is enabled, it deletes the cached file and downloads it from the remote URL. + * If the "checksum verification" option is enabled, it compares the local checksum with the remote checksum. + * If the checksums match, it returns the path to the cached file. + * If the checksums don't match, it deletes the cached file and downloads it from the remote URL. + * If the remote file or checksum does not exist, it returns the cached file. + * + * @param cachePath the Path to the cached file + * @param uri the URI of the remote file + * @return the Path to the cached file + * @throws RuntimeException if there was an error during the checksum comparison or if the checksums don't match + */ private Path pathFromLocalCache(Path cachePath, URI uri) { - /* - * File is in cache - next steps: - * 1. Check "force update" option - * 1a. If true remove file from cache and go to "File is not in cache" operations - * 1b. If not specified default to false - * 2. Check for existence of remote file - * 2a. If the remote file does not exist generate warning message and return local file - * 3. Check "checksum verification" option (default = true) - * 3a. If false generate warning message and return local file - * 4. If a local checksum exists compare it against the remote checksum - * 4a. If none exists generate a new one and compare it against the remote checksum - * 5. If checksums match return the local file - * 6. If checksums do not match remove the local file and go to "File is not in cache" operations - */ Path checksumPath = Path.of(cachePath.toString().substring(0, cachePath.toString().lastIndexOf('.')) + ".sha256"); if (!Files.isReadable(checksumPath)) { try { From 59b2502e11f813bbef590b4f511f26cfda528980 Mon Sep 17 00:00:00 2001 From: Mark Wolters Date: Wed, 10 Apr 2024 10:21:02 -0400 Subject: [PATCH 11/21] reorganized logic around main resolver conditions --- .../nb/api/nbio/NBIOResolverConditions.java | 4 + .../nb/api/nbio/ResolverForNBIOCache.java | 156 ++++++++++-------- 2 files changed, 92 insertions(+), 68 deletions(-) diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIOResolverConditions.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIOResolverConditions.java index 9ea20fe0b9..7f3e338bcc 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIOResolverConditions.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIOResolverConditions.java @@ -18,4 +18,8 @@ package io.nosqlbench.nb.api.nbio; public enum NBIOResolverConditions { + UPDATE_AND_VERIFY, + UPDATE_NO_VERIFY, + LOCAL_VERIFY, + LOCAL_NO_VERIFY } diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java index 95a03261ec..1b51bb4896 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java @@ -75,7 +75,7 @@ private Path resolvePath(URI uri) { return null; } - private boolean downloadFile(URI uri, Path cachePath) { + private boolean downloadFile(URI uri, Path cachePath, URLContent checksum) { int retries = 0; boolean success = false; while (retries < maxRetries) { @@ -85,8 +85,10 @@ private boolean downloadFile(URI uri, Path cachePath) { logger.info(() -> "Downloading remote file " + uri + " to cache at " + cachePath); Files.copy(urlContent.getInputStream(), cachePath); logger.info(() -> "Downloaded remote file to cache at " + cachePath); - success = true; - break; + if(checksum != null && verifyChecksum(cachePath, checksum)) { + success = true; + break; + } } else { logger.error(() -> "Error downloading remote file to cache at " + cachePath + ", retrying..."); retries++; @@ -99,6 +101,24 @@ private boolean downloadFile(URI uri, Path cachePath) { return success; } + private boolean verifyChecksum(Path cachePath, URLContent checksum) { + try { + String localChecksumStr = generateSHA256Checksum(cachePath.toString()); + Path checksumPath = Path.of(cachePath.toString().substring(0, cachePath.toString().lastIndexOf('.')) + ".sha256"); + Files.writeString(checksumPath, localChecksumStr); + logger.debug(() -> "Generated local checksum and saved to cache at " + checksumPath); + String remoteChecksum = new String(checksum.getInputStream().readAllBytes()); + if (localChecksumStr.equals(remoteChecksum)) { + return true; + } else { + logger.warn(() -> "checksums do not match for " + checksumPath + " and " + checksum); + return false; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + /** * This method is used to download a file from a remote URL and store it in a local cache. * It first creates the cache directory if it doesn't exist. @@ -114,35 +134,11 @@ private boolean downloadFile(URI uri, Path cachePath) { private Path pathFromRemoteUrl(URI uri) { Path cachePath = Path.of(cacheDir + uri.getPath()); createCacheDir(cachePath); - if (downloadFile(uri, cachePath)) { - String remoteChecksumFileStr = uri.getPath().substring(0, uri.getPath().indexOf('.')) + ".sha256"; - try { - String localChecksumStr = generateSHA256Checksum(cachePath.toString()); - URLContent checksum = resolveURI(URI.create(uri.toString().replace(uri.getPath(), remoteChecksumFileStr))); - if (checksum == null) { - logger.warn(() -> "Remote checksum file " + remoteChecksumFileStr + " does not exist"); - return cachePath; - } else { - Path checksumPath = Path.of(cachePath.toString().substring(0, cachePath.toString().lastIndexOf('.')) + ".sha256"); - Files.writeString(checksumPath, localChecksumStr); - logger.debug(() -> "Generated local checksum and saved to cache at " + checksumPath); - String remoteChecksum = new String(checksum.getInputStream().readAllBytes()); - if (localChecksumStr.equals(remoteChecksum)) { - return cachePath; - } else { - if (!cachePath.toFile().delete()) - logger.warn(() -> "Could not delete cached file " + cachePath); - if (!checksumPath.toFile().delete()) - logger.warn(() -> "Could not delete cached checksum " + checksumPath); - throw new RuntimeException("Checksums do not match"); - } - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } else { - cleanupCache(cachePath); - throw new RuntimeException("Error downloading remote file to cache at " + cachePath); + if (!verifyChecksum) { + return execute(NBIOResolverConditions.UPDATE_NO_VERIFY, cachePath, uri); + } + else { + return execute(NBIOResolverConditions.UPDATE_AND_VERIFY, cachePath, uri); } } @@ -165,6 +161,51 @@ private void cleanupCache(Path cachePath) { logger.warn(() -> "Could not delete cached checksum " + checksumPath); } + private Path execute(NBIOResolverConditions condition, Path cachePath, URI uri) { + String remoteChecksumFileStr = uri.getPath().substring(0, uri.getPath().indexOf('.')) + ".sha256"; + URLContent checksum = resolveURI(URI.create(uri.toString().replace(uri.getPath(), remoteChecksumFileStr))); + switch(condition) { + case UPDATE_AND_VERIFY: + if (checksum == null) { + logger.warn(() -> "Remote checksum file " + remoteChecksumFileStr + " does not exist. Proceeding without verification"); + } + if (downloadFile(uri, cachePath, checksum)) { + return cachePath; + } else { + throw new RuntimeException("Error downloading remote file to cache at " + cachePath); + } + case UPDATE_NO_VERIFY: + logger.warn(() -> "Checksum verification is disabled, downloading remote file to cache at " + cachePath); + if (downloadFile(uri, cachePath, null)) { + return cachePath; + } else { + throw new RuntimeException("Error downloading remote file to cache at " + cachePath); + } + case LOCAL_VERIFY: + if (checksum == null) { + logger.warn(() -> "Remote checksum file does not exist, returning cached file " + cachePath); + return cachePath; + } + try { + String localChecksum = Files.readString(getOrCreateChecksum(cachePath)); + String remoteChecksum = new String(checksum.getInputStream().readAllBytes()); + if (localChecksum.equals(remoteChecksum)) { + return cachePath; + } + else { + logger.warn(() -> "Checksums do not match, rehydrating cache " + cachePath); + return pathFromRemoteUrl(uri); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + case LOCAL_NO_VERIFY: + return cachePath; + default: + throw new RuntimeException("Invalid NBIO Cache condition"); + } + } + /** * This method is used to retrieve a file from the local cache. * It first checks if the file exists in the cache and if a checksum file is present. @@ -181,50 +222,29 @@ private void cleanupCache(Path cachePath) { * @throws RuntimeException if there was an error during the checksum comparison or if the checksums don't match */ private Path pathFromLocalCache(Path cachePath, URI uri) { - Path checksumPath = Path.of(cachePath.toString().substring(0, cachePath.toString().lastIndexOf('.')) + ".sha256"); - if (!Files.isReadable(checksumPath)) { - try { - Files.writeString(checksumPath, generateSHA256Checksum(cachePath.toString())); - } catch (IOException | NoSuchAlgorithmException e) { - throw new RuntimeException(e); - } - } + if (forceUpdate) { - if (!cachePath.toFile().delete()) - logger.warn(() -> "Could not delete cached file " + cachePath); - if (!checksumPath.toFile().delete()) - logger.warn(() -> "Could not delete cached checksum " + checksumPath); return pathFromRemoteUrl(uri); } if (!verifyChecksum) { logger.warn(() -> "Checksum verification is disabled, returning cached file " + cachePath); - return cachePath; - } - URLContent content = resolveURI(uri); - if (content == null) { - logger.warn(() -> "Remote file does not exist, returning cached file " + cachePath); - return cachePath; - } - String remoteChecksumFileStr = uri.getPath().substring(0, uri.getPath().indexOf('.')) + ".sha256"; - URLContent checksum = resolveURI(URI.create(uri.toString().replace(uri.getPath(), remoteChecksumFileStr))); - if (checksum == null) { - logger.warn(() -> "Remote checksum file does not exist, returning cached file " + cachePath); - return cachePath; + return execute(NBIOResolverConditions.LOCAL_NO_VERIFY, cachePath, uri); + } else { + return execute(NBIOResolverConditions.LOCAL_VERIFY, cachePath, uri); } - try { - String localChecksum = Files.readString(checksumPath); - String remoteChecksum = new String(checksum.getInputStream().readAllBytes()); - if (localChecksum.equals(remoteChecksum)) { - return cachePath; - } - else { - if (!cachePath.toFile().delete()) - logger.warn(() -> "Could not delete cached file " + cachePath); - return pathFromRemoteUrl(uri); + + } + + private Path getOrCreateChecksum(Path cachePath) { + Path checksumPath = Path.of(cachePath.toString().substring(0, cachePath.toString().lastIndexOf('.')) + ".sha256"); + if (!Files.isReadable(checksumPath)) { + try { + Files.writeString(checksumPath, generateSHA256Checksum(cachePath.toString())); + } catch (IOException | NoSuchAlgorithmException e) { + throw new RuntimeException(e); } - } catch (IOException e) { - throw new RuntimeException(e); } + return checksumPath; } private static String generateSHA256Checksum(String filePath) throws IOException, NoSuchAlgorithmException { From 17a206f613c9c5baf49b2abef15b424ff0f9be0a Mon Sep 17 00:00:00 2001 From: Mark Wolters Date: Wed, 10 Apr 2024 11:00:04 -0400 Subject: [PATCH 12/21] removed breaking test --- virtdata-lib-vectors/pom.xml | 6 -- .../lib/vectors/dnn/DNN_Symbolic_Tests.java | 95 ------------------- 2 files changed, 101 deletions(-) delete mode 100644 virtdata-lib-vectors/src/test/java/io/nosqlbench/virtdata/lib/vectors/dnn/DNN_Symbolic_Tests.java diff --git a/virtdata-lib-vectors/pom.xml b/virtdata-lib-vectors/pom.xml index b579fb27fe..cc81640792 100644 --- a/virtdata-lib-vectors/pom.xml +++ b/virtdata-lib-vectors/pom.xml @@ -47,12 +47,6 @@ ${revision} - - org.apfloat - apfloat - 1.13.0 - - org.matheclipse matheclipse-core diff --git a/virtdata-lib-vectors/src/test/java/io/nosqlbench/virtdata/lib/vectors/dnn/DNN_Symbolic_Tests.java b/virtdata-lib-vectors/src/test/java/io/nosqlbench/virtdata/lib/vectors/dnn/DNN_Symbolic_Tests.java deleted file mode 100644 index 3b7828fc27..0000000000 --- a/virtdata-lib-vectors/src/test/java/io/nosqlbench/virtdata/lib/vectors/dnn/DNN_Symbolic_Tests.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2024 nosqlbench - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.nosqlbench.virtdata.lib.vectors.dnn; - -import org.junit.jupiter.api.Test; -import org.matheclipse.core.eval.ExprEvaluator; -import org.matheclipse.core.expression.F; -import org.matheclipse.core.interfaces.IExpr; -import org.matheclipse.core.interfaces.ISymbol; -import org.matheclipse.core.interfaces.IAST; - -public class DNN_Symbolic_Tests { - - @Test - public void testExactRepresentation() { - ExprEvaluator util = new ExprEvaluator(false, (short)10); - - // Convert an expression to the internal Java form: - // Note: single character identifiers are case sensitive - // (the "D()" function identifier must be written as upper case - // character) - String javaForm = util.toJavaForm("D(sin(x)*cos(x),x)"); - // prints: D(Times(Sin(x),Cos(x)),x) - System.out.println("Out[1]: " + javaForm.toString()); - - // Use the Java form to create an expression with F.* static - // methods: - ISymbol x = F.Dummy("x"); - IAST function = F.D(F.Times(F.Sin(x), F.Cos(x)), x); - IExpr result = util.eval(function); - // print: Cos(x)^2-Sin(x)^2 - System.out.println("Out[2]: " + result.toString()); - - // Note "diff" is an alias for the "D" function - result = util.eval("diff(sin(x)*cos(x),x)"); - // print: Cos(x)^2-Sin(x)^2 - System.out.println("Out[3]: " + result.toString()); - - // evaluate the last result (% contains "last answer") - result = util.eval("%+cos(x)^2"); - // print: 2*Cos(x)^2-Sin(x)^2 - System.out.println("Out[4]: " + result.toString()); - - // evaluate an Integrate[] expression - result = util.eval("integrate(sin(x)^5,x)"); - // print: 2/3*Cos(x)^3-1/5*Cos(x)^5-Cos(x) - System.out.println("Out[5]: " + result.toString()); - - // set the value of a variable "a" to 10 - result = util.eval("a=10"); - // print: 10 - System.out.println("Out[6]: " + result.toString()); - - // do a calculation with variable "a" - result = util.eval("a*3+b"); - // print: 30+b - System.out.println("Out[7]: " + result.toString()); - - // Do a calculation in "numeric mode" with the N() function - // Note: single character identifiers are case sensistive - // (the "N()" function identifier must be written as upper case - // character) - result = util.eval("N(sinh(5))"); - // print: 74.20321057778875 - System.out.println("Out[8]: " + result.toString()); - - // define a function with a recursive factorial function definition. - // Note: fac(0) is the stop condition. - result = util.eval("fac(x_Integer):=x*fac(x-1);fac(0)=1"); - // now calculate factorial of 10: - result = util.eval("fac(10)"); - // print: 3628800 - System.out.println("Out[9]: " + result.toString()); - - function = F.Function(F.Divide(F.Gamma(F.Plus(F.C1, F.Slot1)), F.Gamma(F.Plus(F.C1, F.Slot2)))); - // eval function ( Gamma(1+#1)/Gamma(1+#2) ) & [23,20] - result = util.evalFunction(function, "23", "20"); - // print: 10626 - System.out.println("Out[10]: " + result.toString()); - } -} From a96fa4e486ac89b0b1f0abbeff9771d77030db22 Mon Sep 17 00:00:00 2001 From: Mark Wolters Date: Wed, 10 Apr 2024 12:21:47 -0400 Subject: [PATCH 13/21] fixed bug on checksum check --- .../java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java index 1b51bb4896..f9b18543c3 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java @@ -85,7 +85,7 @@ private boolean downloadFile(URI uri, Path cachePath, URLContent checksum) { logger.info(() -> "Downloading remote file " + uri + " to cache at " + cachePath); Files.copy(urlContent.getInputStream(), cachePath); logger.info(() -> "Downloaded remote file to cache at " + cachePath); - if(checksum != null && verifyChecksum(cachePath, checksum)) { + if(checksum == null || verifyChecksum(cachePath, checksum)) { success = true; break; } From 5021a00fb42be3468a0f694176d210e69b04a2ea Mon Sep 17 00:00:00 2001 From: Mark Wolters Date: Wed, 10 Apr 2024 16:55:29 -0400 Subject: [PATCH 14/21] fixed file download code --- .../nb/api/nbio/ResolverForNBIOCache.java | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java index f9b18543c3..11eba8a830 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java @@ -20,11 +20,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; +import java.io.*; +import java.net.HttpURLConnection; import java.net.URI; import java.net.URL; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; import java.nio.file.Files; import java.nio.file.Path; import java.security.MessageDigest; @@ -80,10 +81,13 @@ private boolean downloadFile(URI uri, Path cachePath, URLContent checksum) { boolean success = false; while (retries < maxRetries) { try { - URLContent urlContent = resolveURI(uri); - if (urlContent != null) { + if (this.remoteFileExists(uri)) { logger.info(() -> "Downloading remote file " + uri + " to cache at " + cachePath); - Files.copy(urlContent.getInputStream(), cachePath); + ReadableByteChannel channel = Channels.newChannel(uri.toURL().openStream()); + FileOutputStream outputStream = new FileOutputStream(cachePath.toFile()); + outputStream.getChannel().transferFrom(channel, 0, Long.MAX_VALUE); + outputStream.close(); + channel.close(); logger.info(() -> "Downloaded remote file to cache at " + cachePath); if(checksum == null || verifyChecksum(cachePath, checksum)) { success = true; @@ -276,6 +280,17 @@ private URLContent resolveURI(URI uri) { } } + private boolean remoteFileExists(URI uri) { + try { + HttpURLConnection connection = (HttpURLConnection) uri.toURL().openConnection(); + connection.setRequestMethod("HEAD"); + int responseCode = connection.getResponseCode(); + return responseCode == HttpURLConnection.HTTP_OK; + } catch (Exception e) { + return false; // Error occurred or file does not exist + } + } + @Override public List resolveDirectory(URI uri) { List dirs = new ArrayList<>(); From aa6326ffd8d8bd04255b9f0c216c85a91b798376 Mon Sep 17 00:00:00 2001 From: Mark Wolters Date: Thu, 11 Apr 2024 14:51:08 -0400 Subject: [PATCH 15/21] Updated code to simply append .sha256 to complete filename or url --- .../nosqlbench/engine/cli/NBCLIOptions.java | 6 +- .../nb/api/nbio/ResolverForNBIOCache.java | 92 +++++++++---------- 2 files changed, 49 insertions(+), 49 deletions(-) diff --git a/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java b/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java index 53a646fd87..dd2970ee40 100644 --- a/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java +++ b/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java @@ -214,7 +214,7 @@ public class NBCLIOptions { private boolean useNBIOCache = false; private boolean nbioCacheForceUpdate = false; private boolean nbioCacheVerify = true; - private String nbioCachDir; + private String nbioCacheDir; private String nbioCacheMaxRetries; public boolean wantsLoggedMetrics() { @@ -675,7 +675,7 @@ private void parseAllOptions(final String[] args) { break; case NBCLIOptions.NBIO_CACHE_DIR: arglist.removeFirst(); - this.nbioCachDir = this.readWordOrThrow(arglist, "a NBIO cache directory"); + this.nbioCacheDir = this.readWordOrThrow(arglist, "a NBIO cache directory"); break; case NBIO_CACHE_MAX_RETRIES: arglist.removeFirst(); @@ -852,7 +852,7 @@ public boolean wantsNbioCacheVerify() { return nbioCacheVerify; } public String getNbioCacheDir() { - return nbioCachDir; + return nbioCacheDir; } public String getNbioCacheMaxRetries() { return nbioCacheMaxRetries; diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java index 11eba8a830..39ea76a7cd 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java @@ -65,9 +65,9 @@ private Path resolvePath(URI uri) { if (uri.getScheme() != null && !uri.getScheme().isEmpty() && (uri.getScheme().equalsIgnoreCase("http") || uri.getScheme().equalsIgnoreCase("https"))) { - Path cachePath = Path.of(cacheDir + uri.getPath()); - if (Files.isReadable(cachePath)) { - return pathFromLocalCache(cachePath, uri); + Path cachedFilePath = Path.of(cacheDir + uri.getPath()); + if (Files.isReadable(cachedFilePath)) { + return pathFromLocalCache(cachedFilePath, uri); } else { return pathFromRemoteUrl(uri); @@ -76,39 +76,39 @@ private Path resolvePath(URI uri) { return null; } - private boolean downloadFile(URI uri, Path cachePath, URLContent checksum) { + private boolean downloadFile(URI uri, Path cachedFilePath, URLContent checksum) { int retries = 0; boolean success = false; while (retries < maxRetries) { try { if (this.remoteFileExists(uri)) { - logger.info(() -> "Downloading remote file " + uri + " to cache at " + cachePath); + logger.info(() -> "Downloading remote file " + uri + " to cache at " + cachedFilePath); ReadableByteChannel channel = Channels.newChannel(uri.toURL().openStream()); - FileOutputStream outputStream = new FileOutputStream(cachePath.toFile()); + FileOutputStream outputStream = new FileOutputStream(cachedFilePath.toFile()); outputStream.getChannel().transferFrom(channel, 0, Long.MAX_VALUE); outputStream.close(); channel.close(); - logger.info(() -> "Downloaded remote file to cache at " + cachePath); - if(checksum == null || verifyChecksum(cachePath, checksum)) { + logger.info(() -> "Downloaded remote file to cache at " + cachedFilePath); + if(checksum == null || verifyChecksum(cachedFilePath, checksum)) { success = true; break; } } else { - logger.error(() -> "Error downloading remote file to cache at " + cachePath + ", retrying..."); + logger.error(() -> "Error downloading remote file to cache at " + cachedFilePath + ", retrying..."); retries++; } } catch (IOException e) { - logger.error(() -> "Error downloading remote file to cache at " + cachePath + ", retrying..."); + logger.error(() -> "Error downloading remote file to cache at " + cachedFilePath + ", retrying..."); retries++; } } return success; } - private boolean verifyChecksum(Path cachePath, URLContent checksum) { + private boolean verifyChecksum(Path cachedFilePath, URLContent checksum) { try { - String localChecksumStr = generateSHA256Checksum(cachePath.toString()); - Path checksumPath = Path.of(cachePath.toString().substring(0, cachePath.toString().lastIndexOf('.')) + ".sha256"); + String localChecksumStr = generateSHA256Checksum(cachedFilePath.toString()); + Path checksumPath = Path.of(cachedFilePath + ".sha256"); Files.writeString(checksumPath, localChecksumStr); logger.debug(() -> "Generated local checksum and saved to cache at " + checksumPath); String remoteChecksum = new String(checksum.getInputStream().readAllBytes()); @@ -136,18 +136,18 @@ private boolean verifyChecksum(Path cachePath, URLContent checksum) { * @throws RuntimeException if there was an error during the download or if the checksums don't match */ private Path pathFromRemoteUrl(URI uri) { - Path cachePath = Path.of(cacheDir + uri.getPath()); - createCacheDir(cachePath); + Path cachedFilePath = Path.of(cacheDir + uri.getPath()); + createCacheDir(cachedFilePath); if (!verifyChecksum) { - return execute(NBIOResolverConditions.UPDATE_NO_VERIFY, cachePath, uri); + return execute(NBIOResolverConditions.UPDATE_NO_VERIFY, cachedFilePath, uri); } else { - return execute(NBIOResolverConditions.UPDATE_AND_VERIFY, cachePath, uri); + return execute(NBIOResolverConditions.UPDATE_AND_VERIFY, cachedFilePath, uri); } } - private void createCacheDir(Path cachePath) { - Path dir = cachePath.getParent(); + private void createCacheDir(Path cachedFilePath) { + Path dir = cachedFilePath.getParent(); if (!Files.exists(dir)) { try { Files.createDirectories(dir); @@ -157,54 +157,54 @@ private void createCacheDir(Path cachePath) { } } - private void cleanupCache(Path cachePath) { - if (!cachePath.toFile().delete()) - logger.warn(() -> "Could not delete cached file " + cachePath); - Path checksumPath = Path.of(cachePath.toString().substring(0, cachePath.toString().lastIndexOf('.')) + ".sha256"); + private void cleanupCache(Path cachedFilePath) { + if (!cachedFilePath.toFile().delete()) + logger.warn(() -> "Could not delete cached file " + cachedFilePath); + Path checksumPath = Path.of(cachedFilePath + ".sha256"); if (!checksumPath.toFile().delete()) logger.warn(() -> "Could not delete cached checksum " + checksumPath); } - private Path execute(NBIOResolverConditions condition, Path cachePath, URI uri) { - String remoteChecksumFileStr = uri.getPath().substring(0, uri.getPath().indexOf('.')) + ".sha256"; + private Path execute(NBIOResolverConditions condition, Path cachedFilePath, URI uri) { + String remoteChecksumFileStr = uri.getPath() + ".sha256"; URLContent checksum = resolveURI(URI.create(uri.toString().replace(uri.getPath(), remoteChecksumFileStr))); switch(condition) { case UPDATE_AND_VERIFY: if (checksum == null) { logger.warn(() -> "Remote checksum file " + remoteChecksumFileStr + " does not exist. Proceeding without verification"); } - if (downloadFile(uri, cachePath, checksum)) { - return cachePath; + if (downloadFile(uri, cachedFilePath, checksum)) { + return cachedFilePath; } else { - throw new RuntimeException("Error downloading remote file to cache at " + cachePath); + throw new RuntimeException("Error downloading remote file to cache at " + cachedFilePath); } case UPDATE_NO_VERIFY: - logger.warn(() -> "Checksum verification is disabled, downloading remote file to cache at " + cachePath); - if (downloadFile(uri, cachePath, null)) { - return cachePath; + logger.warn(() -> "Checksum verification is disabled, downloading remote file to cache at " + cachedFilePath); + if (downloadFile(uri, cachedFilePath, null)) { + return cachedFilePath; } else { - throw new RuntimeException("Error downloading remote file to cache at " + cachePath); + throw new RuntimeException("Error downloading remote file to cache at " + cachedFilePath); } case LOCAL_VERIFY: if (checksum == null) { - logger.warn(() -> "Remote checksum file does not exist, returning cached file " + cachePath); - return cachePath; + logger.warn(() -> "Remote checksum file does not exist, returning cached file " + cachedFilePath); + return cachedFilePath; } try { - String localChecksum = Files.readString(getOrCreateChecksum(cachePath)); + String localChecksum = Files.readString(getOrCreateChecksum(cachedFilePath)); String remoteChecksum = new String(checksum.getInputStream().readAllBytes()); if (localChecksum.equals(remoteChecksum)) { - return cachePath; + return cachedFilePath; } else { - logger.warn(() -> "Checksums do not match, rehydrating cache " + cachePath); + logger.warn(() -> "Checksums do not match, rehydrating cache " + cachedFilePath); return pathFromRemoteUrl(uri); } } catch (IOException e) { throw new RuntimeException(e); } case LOCAL_NO_VERIFY: - return cachePath; + return cachedFilePath; default: throw new RuntimeException("Invalid NBIO Cache condition"); } @@ -220,30 +220,30 @@ private Path execute(NBIOResolverConditions condition, Path cachePath, URI uri) * If the checksums don't match, it deletes the cached file and downloads it from the remote URL. * If the remote file or checksum does not exist, it returns the cached file. * - * @param cachePath the Path to the cached file + * @param cachedFilePath the Path to the cached file * @param uri the URI of the remote file * @return the Path to the cached file * @throws RuntimeException if there was an error during the checksum comparison or if the checksums don't match */ - private Path pathFromLocalCache(Path cachePath, URI uri) { + private Path pathFromLocalCache(Path cachedFilePath, URI uri) { if (forceUpdate) { return pathFromRemoteUrl(uri); } if (!verifyChecksum) { - logger.warn(() -> "Checksum verification is disabled, returning cached file " + cachePath); - return execute(NBIOResolverConditions.LOCAL_NO_VERIFY, cachePath, uri); + logger.warn(() -> "Checksum verification is disabled, returning cached file " + cachedFilePath); + return execute(NBIOResolverConditions.LOCAL_NO_VERIFY, cachedFilePath, uri); } else { - return execute(NBIOResolverConditions.LOCAL_VERIFY, cachePath, uri); + return execute(NBIOResolverConditions.LOCAL_VERIFY, cachedFilePath, uri); } } - private Path getOrCreateChecksum(Path cachePath) { - Path checksumPath = Path.of(cachePath.toString().substring(0, cachePath.toString().lastIndexOf('.')) + ".sha256"); + private Path getOrCreateChecksum(Path cachedFilePath) { + Path checksumPath = Path.of(cachedFilePath + ".sha256"); if (!Files.isReadable(checksumPath)) { try { - Files.writeString(checksumPath, generateSHA256Checksum(cachePath.toString())); + Files.writeString(checksumPath, generateSHA256Checksum(cachedFilePath.toString())); } catch (IOException | NoSuchAlgorithmException e) { throw new RuntimeException(e); } From 26bb89f2be09ee2fec64208a8cdcfa963ad4d679 Mon Sep 17 00:00:00 2001 From: Mark Wolters Date: Thu, 11 Apr 2024 14:59:48 -0400 Subject: [PATCH 16/21] moved checksum path generation to separate method --- .../nosqlbench/nb/api/nbio/ResolverForNBIOCache.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java index 39ea76a7cd..a2e70bcebe 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java @@ -108,7 +108,7 @@ private boolean downloadFile(URI uri, Path cachedFilePath, URLContent checksum) private boolean verifyChecksum(Path cachedFilePath, URLContent checksum) { try { String localChecksumStr = generateSHA256Checksum(cachedFilePath.toString()); - Path checksumPath = Path.of(cachedFilePath + ".sha256"); + Path checksumPath = checksumPath(cachedFilePath); Files.writeString(checksumPath, localChecksumStr); logger.debug(() -> "Generated local checksum and saved to cache at " + checksumPath); String remoteChecksum = new String(checksum.getInputStream().readAllBytes()); @@ -160,7 +160,7 @@ private void createCacheDir(Path cachedFilePath) { private void cleanupCache(Path cachedFilePath) { if (!cachedFilePath.toFile().delete()) logger.warn(() -> "Could not delete cached file " + cachedFilePath); - Path checksumPath = Path.of(cachedFilePath + ".sha256"); + Path checksumPath = checksumPath(cachedFilePath); if (!checksumPath.toFile().delete()) logger.warn(() -> "Could not delete cached checksum " + checksumPath); } @@ -240,7 +240,7 @@ private Path pathFromLocalCache(Path cachedFilePath, URI uri) { } private Path getOrCreateChecksum(Path cachedFilePath) { - Path checksumPath = Path.of(cachedFilePath + ".sha256"); + Path checksumPath = checksumPath(cachedFilePath); if (!Files.isReadable(checksumPath)) { try { Files.writeString(checksumPath, generateSHA256Checksum(cachedFilePath.toString())); @@ -251,6 +251,10 @@ private Path getOrCreateChecksum(Path cachedFilePath) { return checksumPath; } + private Path checksumPath(Path cachedFilePath) { + return Path.of(cachedFilePath + ".sha256"); + } + private static String generateSHA256Checksum(String filePath) throws IOException, NoSuchAlgorithmException { MessageDigest md = MessageDigest.getInstance("SHA-256"); try (InputStream is = new FileInputStream(filePath)) { From 3ee54432d00254325aef63dbb01fae0c16664904 Mon Sep 17 00:00:00 2001 From: Mark Wolters Date: Thu, 11 Apr 2024 20:14:52 -0400 Subject: [PATCH 17/21] strip control characters from checksums --- .../io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java index a2e70bcebe..52355e9d3a 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java @@ -111,7 +111,7 @@ private boolean verifyChecksum(Path cachedFilePath, URLContent checksum) { Path checksumPath = checksumPath(cachedFilePath); Files.writeString(checksumPath, localChecksumStr); logger.debug(() -> "Generated local checksum and saved to cache at " + checksumPath); - String remoteChecksum = new String(checksum.getInputStream().readAllBytes()); + String remoteChecksum = stripControlCharacters(new String(checksum.getInputStream().readAllBytes())); if (localChecksumStr.equals(remoteChecksum)) { return true; } else { @@ -123,6 +123,10 @@ private boolean verifyChecksum(Path cachedFilePath, URLContent checksum) { } } + private static String stripControlCharacters(String input) { + return input.replaceAll("[\\p{Cntrl}]+$", ""); + } + /** * This method is used to download a file from a remote URL and store it in a local cache. * It first creates the cache directory if it doesn't exist. @@ -192,7 +196,7 @@ private Path execute(NBIOResolverConditions condition, Path cachedFilePath, URI } try { String localChecksum = Files.readString(getOrCreateChecksum(cachedFilePath)); - String remoteChecksum = new String(checksum.getInputStream().readAllBytes()); + String remoteChecksum = stripControlCharacters(new String(checksum.getInputStream().readAllBytes())); if (localChecksum.equals(remoteChecksum)) { return cachedFilePath; } From baefd5f178cfe88ae12c3c3bce0927a4ac055dbc Mon Sep 17 00:00:00 2001 From: Jonathan Shook Date: Thu, 11 Apr 2024 22:53:45 -0500 Subject: [PATCH 18/21] gitignore cache directories --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 8fa3f2952d..0318a188f2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +**/cache/** exported_docs.zip .nosqlbench/** workspaces/** From cfba3bd6f9f4598a6e1a300ff02b2361877004cd Mon Sep 17 00:00:00 2001 From: Jonathan Shook Date: Thu, 11 Apr 2024 23:06:17 -0500 Subject: [PATCH 19/21] align to API change --- .../adapter/neo4j/opdispensers/Neo4JBaseOpDispenser.java | 2 +- pom.xml | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JBaseOpDispenser.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JBaseOpDispenser.java index f79de7c4ec..ce852c8861 100644 --- a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JBaseOpDispenser.java +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JBaseOpDispenser.java @@ -61,7 +61,7 @@ private LongFunction createQueryFunc() { public abstract LongFunction createOpFunc(); @Override - public Neo4JBaseOp apply(long cycle) { + public Neo4JBaseOp getOp(long cycle) { return opFunc.apply(cycle); } } diff --git a/pom.xml b/pom.xml index bdda03576c..0337b089dc 100644 --- a/pom.xml +++ b/pom.xml @@ -70,8 +70,7 @@ adapter-amqp adapter-jdbc - adapter-milvus - adapter-mongodb + adapter-neo4j adapter-aws-opensearch From 26747550c407ae65d680efe57a697ecdcf44b1c5 Mon Sep 17 00:00:00 2001 From: Jonathan Shook Date: Thu, 11 Apr 2024 23:23:45 -0500 Subject: [PATCH 20/21] upgrade polyglot dependency --- mvn-defaults/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mvn-defaults/pom.xml b/mvn-defaults/pom.xml index 8c13ca342a..d0c368e961 100644 --- a/mvn-defaults/pom.xml +++ b/mvn-defaults/pom.xml @@ -362,7 +362,7 @@ org.graalvm.polyglot polyglot - 23.1.0 + 23.1.2 org.graalvm.polyglot From e7f34844a572e846472c276c228f05acdb1c7c26 Mon Sep 17 00:00:00 2001 From: Jonathan Shook Date: Thu, 11 Apr 2024 23:24:03 -0500 Subject: [PATCH 21/21] disable inactive matheclipse dependency --- virtdata-lib-vectors/pom.xml | 76 ++++++++++++++++++------------------ 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/virtdata-lib-vectors/pom.xml b/virtdata-lib-vectors/pom.xml index cc81640792..72597e7a04 100644 --- a/virtdata-lib-vectors/pom.xml +++ b/virtdata-lib-vectors/pom.xml @@ -47,46 +47,46 @@ ${revision} - - org.matheclipse - matheclipse-core - 3.1.0-SNAPSHOT - - - org.slf4j - * - - - org.apache.logging.log4j - * - - - org.apache.log4j - * - - + + + + + + + + + + + + + + + + + + - - - org.matheclipse - matheclipse-gpl - 3.1.0-SNAPSHOT - - - org.slf4j - * - - - org.apache.logging.log4j - * - - - org.apache.log4j - * - - + + + + + + + + + + + + + + + + + + + - +