diff --git a/graphdb/src/main/java/com/orientechnologies/orient/graph/stresstest/OBaseGraphWorkload.java b/graphdb/src/main/java/com/orientechnologies/orient/graph/stresstest/OBaseGraphWorkload.java index 39dd1e915c1..308a2a39f9d 100644 --- a/graphdb/src/main/java/com/orientechnologies/orient/graph/stresstest/OBaseGraphWorkload.java +++ b/graphdb/src/main/java/com/orientechnologies/orient/graph/stresstest/OBaseGraphWorkload.java @@ -27,6 +27,7 @@ import com.tinkerpop.blueprints.impls.orient.OrientBaseGraph; import com.tinkerpop.blueprints.impls.orient.OrientGraph; import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx; +import com.tinkerpop.blueprints.impls.orient.OrientVertex; /** * CRUD implementation of the workload. @@ -36,8 +37,14 @@ public abstract class OBaseGraphWorkload extends OBaseWorkload { protected boolean tx = false; + protected OBaseGraphWorkload(final boolean tx) { + this.tx = tx; + } + public class OWorkLoadContext extends OBaseWorkload.OBaseWorkLoadContext { OrientBaseGraph graph; + OrientVertex lastVertexToConnect; + int lastVertexEdges; @Override public void init(ODatabaseIdentifier dbIdentifier) { @@ -51,6 +58,11 @@ public void close() { } } + @Override + protected OBaseWorkLoadContext getContext() { + return new OWorkLoadContext(); + } + protected OrientGraphNoTx getGraphNoTx(final ODatabaseIdentifier databaseIdentifier) { final ODatabase database = ODatabaseUtils.openDatabase(databaseIdentifier); if (database == null) diff --git a/graphdb/src/main/java/com/orientechnologies/orient/graph/stresstest/OGraphInsertWorkload.java b/graphdb/src/main/java/com/orientechnologies/orient/graph/stresstest/OGraphInsertWorkload.java index c34bbc8ed4f..9dd9263ddaa 100644 --- a/graphdb/src/main/java/com/orientechnologies/orient/graph/stresstest/OGraphInsertWorkload.java +++ b/graphdb/src/main/java/com/orientechnologies/orient/graph/stresstest/OGraphInsertWorkload.java @@ -24,8 +24,11 @@ import com.orientechnologies.orient.core.record.impl.ODocument; import com.orientechnologies.orient.stresstest.ODatabaseIdentifier; import com.tinkerpop.blueprints.impls.orient.OrientBaseGraph; +import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx; import com.tinkerpop.blueprints.impls.orient.OrientVertex; +import java.util.List; + /** * CRUD implementation of the workload. * @@ -35,14 +38,17 @@ public class OGraphInsertWorkload extends OBaseGraphWorkload { static final String INVALID_FORM_MESSAGE = "GRAPH INSERT workload must be in form of F."; - private int vertices = 0; private int factor = 80; private OWorkLoadResult resultVertices = new OWorkLoadResult(); private OWorkLoadResult resultEdges = new OWorkLoadResult(); + public OGraphInsertWorkload() { + super(false); + } + @Override public String getName() { - return "GRAPHINSERT"; + return "GINSERT"; } @Override @@ -64,46 +70,59 @@ public void parseParameters(final String args) { } assignState(state, number, ' '); - if (vertices == 0) + if (resultVertices.total == 0) throw new IllegalArgumentException(INVALID_FORM_MESSAGE); } @Override public void execute(final int concurrencyLevel, final ODatabaseIdentifier databaseIdentifier) { - // TODO: aggregation, shortest path, hard path, neighbors, neighbors2, look also at XDBench - executeOperation(databaseIdentifier, vertices, concurrencyLevel, new OCallable() { - OrientVertex lastVertexToConnect; - int lastVertexEdges; + final List contexts = executeOperation(databaseIdentifier, resultVertices, concurrencyLevel, + new OCallable() { + @Override + public Void call(final OBaseWorkLoadContext context) { + final OWorkLoadContext graphContext = ((OWorkLoadContext) context); + final OrientBaseGraph graph = graphContext.graph; + + final OrientVertex v = graph.addVertex(null, "_id", resultVertices.current.get()); - @Override - public Void call(final OBaseWorkLoadContext context) { - final OrientBaseGraph graph = ((OBaseGraphWorkload.OWorkLoadContext) context).graph; + if (graphContext.lastVertexToConnect != null) { + v.addEdge("E", graphContext.lastVertexToConnect); + resultEdges.current.incrementAndGet(); - final OrientVertex v = graph.addVertex(null, "_id", resultVertices.current.get()); + graphContext.lastVertexEdges++; - if (lastVertexToConnect != null) { - v.addEdge("E", lastVertexToConnect); - resultEdges.current.incrementAndGet(); + if (graphContext.lastVertexEdges > factor) { + graphContext.lastVertexEdges = 0; + graphContext.lastVertexToConnect = v; + } + } else + graphContext.lastVertexToConnect = v; - lastVertexEdges++; + resultVertices.current.incrementAndGet(); - if (lastVertexEdges > factor) { - lastVertexEdges = 0; - lastVertexToConnect = v; + return null; } - } else - lastVertexToConnect = v; + }); - resultVertices.current.incrementAndGet(); + final OrientGraphNoTx graph = getGraphNoTx(databaseIdentifier); + try { + // CONNECTED ALL THE SUB GRAPHS + OrientVertex lastVertex = null; + for (OBaseWorkLoadContext context : contexts) { + if (lastVertex != null) + lastVertex.addEdge("E", ((OWorkLoadContext) context).lastVertexToConnect); - return null; + lastVertex = ((OWorkLoadContext) context).lastVertexToConnect; } - }); + } finally { + graph.shutdown(); + } + } @Override public String getPartialResult() { - return String.format("%d%% [Vertices: %d - Edges: %d]", ((100 * resultVertices.current.get() / vertices)), + return String.format("%d%% [Vertices: %d - Edges: %d]", ((100 * resultVertices.current.get() / resultVertices.total)), resultVertices.current.get(), resultEdges.current.get()); } @@ -111,18 +130,10 @@ public String getPartialResult() { public String getFinalResult() { final StringBuilder buffer = new StringBuilder(getErrors()); - buffer.append(String.format("\nCreated %d vertices and %d edges in %.3f secs", resultVertices.total, resultEdges.total, - resultVertices.totalTime / 1000f)); - - buffer.append(String.format( - "\n- Vertices Throughput: %.3f/sec - Avg: %.3fms/op (%dth percentile) - 99th Perc: %.3fms - 99.9th Perc: %.3fms", - resultVertices.total * 1000 / (float) resultVertices.totalTime, resultVertices.avgNs / 1000000f, - resultVertices.percentileAvg, resultVertices.percentile99Ns / 1000000f, resultVertices.percentile99_9Ns / 1000000f)); + buffer.append(String.format("- Created %d vertices and %d edges in %.3f secs", resultVertices.current.get(), + resultEdges.current.get(), resultVertices.totalTime / 1000f)); - buffer.append(String.format( - "\n- Edges Throughput: %.3f/sec - Avg: %.3fms/op (%dth percentile) - 99th Perc: %.3fms - 99.9th Perc: %.3fms", - resultEdges.total * 1000 / (float) resultEdges.totalTime, resultEdges.avgNs / 1000000f, resultEdges.percentileAvg, - resultEdges.percentile99Ns / 1000000f, resultEdges.percentile99_9Ns / 1000000f)); + buffer.append(resultVertices.toOutput()); return buffer.toString(); } @@ -142,7 +153,7 @@ private char assignState(final char state, final StringBuilder number, final cha number.append("0"); if (state == 'V') - vertices = Integer.parseInt(number.toString()); + resultVertices.total = Integer.parseInt(number.toString()); else if (state == 'F') factor = Integer.parseInt(number.toString()); @@ -156,7 +167,7 @@ protected OBaseWorkLoadContext getContext() { } public int getVertices() { - return vertices; + return resultVertices.total; } public int getFactor() { diff --git a/graphdb/src/main/java/com/orientechnologies/orient/graph/stresstest/OGraphShortestPathWorkload.java b/graphdb/src/main/java/com/orientechnologies/orient/graph/stresstest/OGraphShortestPathWorkload.java new file mode 100644 index 00000000000..bf97645a5fd --- /dev/null +++ b/graphdb/src/main/java/com/orientechnologies/orient/graph/stresstest/OGraphShortestPathWorkload.java @@ -0,0 +1,180 @@ +/* + * + * * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com) + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * + * * For more information: http://www.orientechnologies.com + * + */ +package com.orientechnologies.orient.graph.stresstest; + +import com.orientechnologies.common.util.OCallable; +import com.orientechnologies.orient.core.db.record.OIdentifiable; +import com.orientechnologies.orient.core.id.ORID; +import com.orientechnologies.orient.core.metadata.schema.OType; +import com.orientechnologies.orient.core.record.impl.ODocument; +import com.orientechnologies.orient.core.sql.OCommandSQL; +import com.orientechnologies.orient.stresstest.ODatabaseIdentifier; +import com.tinkerpop.blueprints.impls.orient.OrientBaseGraph; +import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx; +import com.tinkerpop.blueprints.impls.orient.OrientVertex; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +/** + * CRUD implementation of the workload. + * + * @author Luca Garulli + */ +public class OGraphShortestPathWorkload extends OBaseGraphWorkload { + + static final String INVALID_FORM_MESSAGE = "SHORTESTPATH workload must be in form of L."; + + private int limit = -1; + private OWorkLoadResult result = new OWorkLoadResult(); + private final AtomicLong totalDepth = new AtomicLong(); + private final AtomicLong maxDepth = new AtomicLong(); + private final AtomicLong notConnected = new AtomicLong(); + private final List startingVertices = new ArrayList(limit > -1 ? limit : 1000); + + public OGraphShortestPathWorkload() { + super(false); + } + + @Override + public String getName() { + return "GSP"; + } + + @Override + public void parseParameters(final String args) { + if (args == null) + return; + + final String ops = args.toUpperCase(); + char state = 'L'; + final StringBuilder number = new StringBuilder(); + + for (int pos = 0; pos < ops.length(); ++pos) { + final char c = ops.charAt(pos); + + if (c == ' ' || c == 'L') { + state = assignState(state, number, c); + } else if (c >= '0' && c <= '9') + number.append(c); + else + throw new IllegalArgumentException( + "Character '" + c + "' is not valid on " + getName() + " workload. " + INVALID_FORM_MESSAGE); + } + assignState(state, number, ' '); + + result.total = 1; + } + + @Override + public void execute(final int concurrencyLevel, final ODatabaseIdentifier databaseIdentifier) { + // RETRIEVE THE STARTING VERTICES + final OrientGraphNoTx g = getGraphNoTx(databaseIdentifier); + try { + for (OIdentifiable id : g.getRawGraph().browseClass("V")) { + startingVertices.add(id.getIdentity()); + if (limit > -1 && startingVertices.size() >= limit) + break; + } + } finally { + g.shutdown(); + } + result.total = startingVertices.size(); + + executeOperation(databaseIdentifier, result, concurrencyLevel, new OCallable() { + @Override + public Void call(final OBaseWorkLoadContext context) { + final OWorkLoadContext graphContext = ((OWorkLoadContext) context); + final OrientBaseGraph graph = graphContext.graph; + + for (int i = 0; i < startingVertices.size(); ++i) { + final Iterable commandResult = graph.command(new OCommandSQL("select shortestPath(?,?, 'both')")) + .execute(startingVertices.get(context.currentIdx), startingVertices.get(i)); + + for (OrientVertex v : commandResult) { + Collection depth = v.getRecord().field("shortestPath"); + if (depth != null && !depth.isEmpty()) { + totalDepth.addAndGet(depth.size()); + + long max = maxDepth.get(); + while (depth.size() > max) { + if (maxDepth.compareAndSet(max, depth.size())) + break; + max = maxDepth.get(); + } + } else + notConnected.incrementAndGet(); + } + } + result.current.incrementAndGet(); + + return null; + } + }); + } + + @Override + public String getPartialResult() { + return String.format("%d%% [Shortest paths blocks (block size=%d) executed: %d/%d]", + ((100 * result.current.get() / result.total)), startingVertices.size(), result.current.get(), startingVertices.size()); + } + + @Override + public String getFinalResult() { + final StringBuilder buffer = new StringBuilder(getErrors()); + + buffer.append(String.format("- Executed %d shortest paths in %.3f secs", result.current.get(), result.totalTime / 1000f)); + buffer.append(String.format("\n- Path depth: maximum %d, average %.3f, not connected %d", maxDepth.get(), + totalDepth.get() / (float) startingVertices.size() / (float) startingVertices.size(), notConnected.get())); + buffer.append(result.toOutput()); + + return buffer.toString(); + } + + @Override + public String getFinalResultAsJson() { + final ODocument json = new ODocument(); + + json.field("shortestPath", result.toJSON(), OType.EMBEDDED); + + return json.toString(); + } + + private char assignState(final char state, final StringBuilder number, final char c) { + if (number.length() == 0) + number.append("0"); + + if (state == 'L') + limit = Integer.parseInt(number.toString()); + + number.setLength(0); + return c; + } + + public int getShortestPaths() { + return result.total; + } + + public int getLimit() { + return limit; + } +} diff --git a/tools/src/main/java/com/orientechnologies/orient/stresstest/OConsoleProgressWriter.java b/tools/src/main/java/com/orientechnologies/orient/stresstest/OConsoleProgressWriter.java index aa98611b01e..4541ce6f3ff 100644 --- a/tools/src/main/java/com/orientechnologies/orient/stresstest/OConsoleProgressWriter.java +++ b/tools/src/main/java/com/orientechnologies/orient/stresstest/OConsoleProgressWriter.java @@ -31,7 +31,7 @@ public class OConsoleProgressWriter extends OSoftThread { final private OWorkload workload; - private String lastResult = null; + private String lastResult = null; public OConsoleProgressWriter(final OWorkload workload) { this.workload = workload; @@ -45,9 +45,13 @@ public void printMessage(final String message) { protected void execute() throws Exception { final String result = workload.getPartialResult(); if (lastResult == null || !lastResult.equals(result)) - System.out.print("\rStress test in progress " + result); + System.out.print("\r- Workload in progress " + result); lastResult = result; - Thread.sleep(20); + try { + Thread.sleep(300); + } catch (InterruptedException e) { + interruptCurrentOperation(); + } } @Override diff --git a/tools/src/main/java/com/orientechnologies/orient/stresstest/OStressTester.java b/tools/src/main/java/com/orientechnologies/orient/stresstest/OStressTester.java index 816bba0b823..3b4f2aa9a10 100644 --- a/tools/src/main/java/com/orientechnologies/orient/stresstest/OStressTester.java +++ b/tools/src/main/java/com/orientechnologies/orient/stresstest/OStressTester.java @@ -27,6 +27,8 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; /** * The main class of the OStressTester. It is instantiated from the OStressTesterCommandLineParser and takes care of launching the @@ -42,23 +44,24 @@ public enum OMode { PLOCAL, MEMORY, REMOTE, DISTRIBUTED } - private int threadsNumber; - private ODatabaseIdentifier databaseIdentifier; - private int opsInTx; - private String outputResultFile; + private final int threadsNumber; + private final ODatabaseIdentifier databaseIdentifier; + private final int opsInTx; + private final String outputResultFile; + private final boolean keepDatabaseAfterTest; private OConsoleProgressWriter consoleProgressWriter; private static final OWorkloadFactory workloadFactory = new OWorkloadFactory(); - private OWorkload workload; + private List workloads = new ArrayList(); - public OStressTester(final OWorkload workload, ODatabaseIdentifier databaseIdentifier, int threadsNumber, int opsInTx, - String outputResultFile) throws Exception { - this.workload = workload; + public OStressTester(final List workloads, ODatabaseIdentifier databaseIdentifier, int threadsNumber, int opsInTx, + String outputResultFile, final boolean keepDatabaseAfterTest) throws Exception { + this.workloads = workloads; this.threadsNumber = threadsNumber; this.databaseIdentifier = databaseIdentifier; this.opsInTx = opsInTx; this.outputResultFile = outputResultFile; - consoleProgressWriter = new OConsoleProgressWriter(this.workload); + this.keepDatabaseAfterTest = keepDatabaseAfterTest; } public static void main(String[] args) { @@ -84,46 +87,63 @@ private int execute() throws Exception { // creates the temporary DB where to execute the test ODatabaseUtils.createDatabase(databaseIdentifier); - consoleProgressWriter.printMessage(String.format("Created database [%s].", databaseIdentifier.getUrl())); + System.out.println(String.format("Created database [%s].", databaseIdentifier.getUrl())); try { - new Thread(consoleProgressWriter).start(); + for (OWorkload workload : workloads) { + consoleProgressWriter = new OConsoleProgressWriter(workload); - consoleProgressWriter - .printMessage(String.format("Starting workload %s - concurrencyLevel=%d...", workload.getName(), threadsNumber)); + consoleProgressWriter.start(); - final long startTime = System.currentTimeMillis(); + consoleProgressWriter + .printMessage(String.format("\nStarting workload %s (concurrencyLevel=%d)...", workload.getName(), threadsNumber)); - workload.execute(threadsNumber, databaseIdentifier); + final long startTime = System.currentTimeMillis(); - final long endTime = System.currentTimeMillis(); + workload.execute(threadsNumber, databaseIdentifier); - consoleProgressWriter.sendShutdown(); + final long endTime = System.currentTimeMillis(); - System.out.println(String.format("\nTotal execution time: %.3f secs", ((float) (endTime - startTime) / 1000f))); + consoleProgressWriter.sendShutdown(); - System.out.println(workload.getFinalResult()); + System.out.println(String.format("\n- Total execution time: %.3f secs", ((float) (endTime - startTime) / 1000f))); + + System.out.println(workload.getFinalResult()); + } if (outputResultFile != null) - writeFile(workload); + writeFile(); } catch (Exception ex) { System.err.println("\nAn error has occurred while running the stress test: " + ex.getMessage()); returnCode = 1; } finally { // we don't need to drop the in-memory DB - if (databaseIdentifier.getMode() != OMode.MEMORY) { + if (keepDatabaseAfterTest || databaseIdentifier.getMode() == OMode.MEMORY) + consoleProgressWriter.printMessage(String.format("\nDatabase is available on [%s].", databaseIdentifier.getUrl())); + else { ODatabaseUtils.dropDatabase(databaseIdentifier); consoleProgressWriter.printMessage(String.format("\nDropped database [%s].", databaseIdentifier.getUrl())); } + } return returnCode; } - private void writeFile(final OWorkload workload) { + private void writeFile() { try { - OIOUtils.writeFile(new File(outputResultFile), workload.getFinalResultAsJson()); + final StringBuilder output = new StringBuilder(); + output.append("{\"result\":["); + int i = 0; + for (OWorkload workload : workloads) { + if (i++ > 0) + output.append(","); + workload.getFinalResultAsJson(); + } + output.append("]}"); + + OIOUtils.writeFile(new File(outputResultFile), output.toString()); } catch (IOException e) { System.err.println("\nError on writing the result file : " + e.getMessage()); } diff --git a/tools/src/main/java/com/orientechnologies/orient/stresstest/OStressTesterCommandLineParser.java b/tools/src/main/java/com/orientechnologies/orient/stresstest/OStressTesterCommandLineParser.java index ea162b0e32a..c8d1b406edd 100644 --- a/tools/src/main/java/com/orientechnologies/orient/stresstest/OStressTesterCommandLineParser.java +++ b/tools/src/main/java/com/orientechnologies/orient/stresstest/OStressTesterCommandLineParser.java @@ -24,9 +24,7 @@ import java.io.Console; import java.io.File; import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; +import java.util.*; /** * This is the parser of the command line arguments passed with the invocation of OStressTester. It contains a static method that - @@ -42,6 +40,7 @@ public class OStressTesterCommandLineParser { public final static String OPTION_MODE = "m"; public final static String OPTION_WORKLOAD = "w"; public final static String OPTION_TRANSACTIONS = "tx"; + public final static String OPTION_KEEP_DATABASE_AFTER_TEST = "k"; public final static String OPTION_OUTPUT_FILE = "o"; public static final String OPTION_PLOCAL_PATH = "d"; public final static String OPTION_ROOT_PASSWORD = "root-password"; @@ -53,7 +52,7 @@ public class OStressTesterCommandLineParser { public final static String OPTION_REMOTE_PORT = "remote-port"; public final static String MAIN_OPTIONS = OPTION_MODE + OPTION_CONCURRENCY - + OPTION_WORKLOAD + OPTION_TRANSACTIONS + OPTION_OUTPUT_FILE + OPTION_PLOCAL_PATH; + + OPTION_WORKLOAD + OPTION_TRANSACTIONS + OPTION_OUTPUT_FILE + OPTION_PLOCAL_PATH + OPTION_KEEP_DATABASE_AFTER_TEST; public static final String SYNTAX = "StressTester " + "\n\t-m mode (can be any of these: [plocal|memory|remote|distributed] )" + "\n\t-s operationSet" + "\n\t-t threadsNumber" @@ -85,15 +84,17 @@ public static OStressTester getStressTester(String[] args) throws Exception { final Map options = checkOptions(readOptions(args)); - String dbName = TEMP_DATABASE_NAME + new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date()); - OStressTester.OMode mode = OStressTester.OMode.valueOf(options.get(OPTION_MODE).toUpperCase()); + final String dbName = TEMP_DATABASE_NAME + new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date()); + final OStressTester.OMode mode = OStressTester.OMode.valueOf(options.get(OPTION_MODE).toUpperCase()); String rootPassword = options.get(OPTION_ROOT_PASSWORD); - String resultOutputFile = options.get(OPTION_OUTPUT_FILE); + final String resultOutputFile = options.get(OPTION_OUTPUT_FILE); String plocalPath = options.get(OPTION_PLOCAL_PATH); - int operationsPerTransaction = getNumber(options.get(OPTION_TRANSACTIONS), "transactions"); - int threadsNumber = getNumber(options.get(OPTION_CONCURRENCY), "concurrency"); - String remoteIp = options.get(OPTION_REMOTE_IP); - String workloadCfg = options.get(OPTION_WORKLOAD); + final int operationsPerTransaction = getNumber(options.get(OPTION_TRANSACTIONS), "transactions"); + final int threadsNumber = getNumber(options.get(OPTION_CONCURRENCY), "concurrency"); + final String remoteIp = options.get(OPTION_REMOTE_IP); + final String workloadCfg = options.get(OPTION_WORKLOAD); + final boolean keepDatabaseAfterTest = options.get(OPTION_KEEP_DATABASE_AFTER_TEST) != null + ? Boolean.parseBoolean(options.get(OPTION_KEEP_DATABASE_AFTER_TEST)) : false; int remotePort = 2424; if (plocalPath != null) { @@ -160,38 +161,45 @@ public static OStressTester getStressTester(String[] args) throws Exception { } } - final OWorkload workload = parseWorkload(workloadCfg); + final List workloads = parseWorkloads(workloadCfg); final ODatabaseIdentifier databaseIdentifier = new ODatabaseIdentifier(mode, dbName, rootPassword, remoteIp, remotePort, plocalPath); - return new OStressTester(workload, databaseIdentifier, threadsNumber, operationsPerTransaction, resultOutputFile); + return new OStressTester(workloads, databaseIdentifier, threadsNumber, operationsPerTransaction, resultOutputFile, + keepDatabaseAfterTest); } - private static OWorkload parseWorkload(final String workloadConfig) { + private static List parseWorkloads(final String workloadConfig) { if (workloadConfig == null || workloadConfig.isEmpty()) - throw new IllegalArgumentException("Workload parameter is mandatory. Syntax: "); - - String workloadName; - String workloadParams; - - final int pos = workloadConfig.indexOf(":"); - if (pos > -1) { - workloadName = workloadConfig.substring(0, pos); - workloadParams = workloadConfig.substring(pos + 1); - } else { - workloadName = workloadConfig; - workloadParams = null; - } + throw new IllegalArgumentException("Workload parameter is mandatory. Syntax: "); + + final List result = new ArrayList(); - final OWorkload workload = OStressTester.getWorkloadFactory().get(workloadName); - if (workload == null) - throw new IllegalArgumentException("Workload '" + workloadName + "' is not configured. Use one of the following: " - + OStressTester.getWorkloadFactory().getRegistered()); + final String[] parts = workloadConfig.split(","); + for (String part : parts) { + String workloadName; + String workloadParams; - workload.parseParameters(workloadParams); + final int pos = part.indexOf(":"); + if (pos > -1) { + workloadName = part.substring(0, pos); + workloadParams = part.substring(pos + 1); + } else { + workloadName = part; + workloadParams = null; + } + + final OWorkload workload = OStressTester.getWorkloadFactory().get(workloadName); + if (workload == null) + throw new IllegalArgumentException("Workload '" + workloadName + "' is not configured. Use one of the following: " + + OStressTester.getWorkloadFactory().getRegistered()); + workload.parseParameters(workloadParams); + + result.add(workload); + } - return workload; + return result; } private static int getNumber(String value, String option) throws IllegalArgumentException { diff --git a/tools/src/main/java/com/orientechnologies/orient/stresstest/workload/OBaseWorkload.java b/tools/src/main/java/com/orientechnologies/orient/stresstest/workload/OBaseWorkload.java index e6d602105f8..737dd467d76 100644 --- a/tools/src/main/java/com/orientechnologies/orient/stresstest/workload/OBaseWorkload.java +++ b/tools/src/main/java/com/orientechnologies/orient/stresstest/workload/OBaseWorkload.java @@ -46,13 +46,19 @@ public abstract class OBaseWorkLoadContext { public class OWorkLoadResult { public AtomicInteger current = new AtomicInteger(); - public int total; + public int total = 1; public long totalTime; public long avgNs; public int percentileAvg; public long percentile99Ns; public long percentile99_9Ns; + public String toOutput() { + return String.format("\n- Throughput: %.3f/sec - Avg: %.3fms/op (%dth percentile) - 99th Perc: %.3fms - 99.9th Perc: %.3fms", + total * 1000 / (float) totalTime, avgNs / 1000000f, percentileAvg, percentile99Ns / 1000000f, + percentile99_9Ns / 1000000f); + } + public ODocument toJSON() { final ODocument json = new ODocument(); json.field("total", total); @@ -69,29 +75,30 @@ public ODocument toJSON() { protected static final long MAX_ERRORS = 100; protected List errors = new ArrayList(); - protected OWorkLoadResult executeOperation(final ODatabaseIdentifier dbIdentifier, final int operationTotal, + protected List executeOperation(final ODatabaseIdentifier dbIdentifier, final OWorkLoadResult result, final int concurrencyLevel, final OCallable callback) { - final OWorkLoadResult result = new OWorkLoadResult(); - - if (operationTotal == 0) - return result; + if (result.total == 0) + return null; - final int totalPerThread = operationTotal / concurrencyLevel; - final int totalPerLastThread = totalPerThread + operationTotal % concurrencyLevel; + final int totalPerThread = result.total / concurrencyLevel; + final int totalPerLastThread = totalPerThread + result.total % concurrencyLevel; - final ArrayList operationTiming = new ArrayList(operationTotal); - for (int i = 0; i < operationTotal; ++i) + final ArrayList operationTiming = new ArrayList(result.total); + for (int i = 0; i < result.total; ++i) operationTiming.add(null); + final List contexts = new ArrayList(concurrencyLevel); + final Thread[] thread = new Thread[concurrencyLevel]; for (int t = 0; t < concurrencyLevel; ++t) { final int currentThread = t; + final OBaseWorkLoadContext context = getContext(); + contexts.add(context); + thread[t] = new Thread(new Runnable() { @Override public void run() { - final OBaseWorkLoadContext context = getContext(); - context.threadId = currentThread; context.totalPerThread = context.threadId < concurrencyLevel - 1 ? totalPerThread : totalPerLastThread; @@ -150,7 +157,7 @@ public void run() { result.percentile99Ns = operationTiming.get((int) (operationTiming.size() * 99f / 100f)); result.percentile99_9Ns = operationTiming.get((int) (operationTiming.size() * 99.9f / 100f)); - return result; + return contexts; } protected abstract OBaseWorkLoadContext getContext(); diff --git a/tools/src/main/java/com/orientechnologies/orient/stresstest/workload/OCRUDWorkload.java b/tools/src/main/java/com/orientechnologies/orient/stresstest/workload/OCRUDWorkload.java index d47cf5f017e..2fb50d4b717 100644 --- a/tools/src/main/java/com/orientechnologies/orient/stresstest/workload/OCRUDWorkload.java +++ b/tools/src/main/java/com/orientechnologies/orient/stresstest/workload/OCRUDWorkload.java @@ -95,7 +95,7 @@ public void execute(final int concurrencyLevel, final ODatabaseIdentifier databa for (int i = 0; i < createsResult.total; ++i) records.add(null); - executeOperation(databaseIdentifier, createsResult.total, concurrencyLevel, new OCallable() { + executeOperation(databaseIdentifier, createsResult, concurrencyLevel, new OCallable() { @Override public Void call(final OBaseWorkLoadContext context) { final ODocument doc = createOperation(context.currentIdx); @@ -112,7 +112,7 @@ public Void call(final OBaseWorkLoadContext context) { if (records.size() != createsResult.total) throw new RuntimeException("Error on creating records: found " + records.size() + " but expected " + createsResult.total); - executeOperation(databaseIdentifier, readsResult.total, concurrencyLevel, new OCallable() { + executeOperation(databaseIdentifier, readsResult, concurrencyLevel, new OCallable() { @Override public Void call(final OBaseWorkLoadContext context) { readOperation(((OWorkLoadContext) context).getDb(), context.currentIdx); @@ -121,26 +121,24 @@ public Void call(final OBaseWorkLoadContext context) { } }); - updatesResult = executeOperation(databaseIdentifier, updatesResult.total, concurrencyLevel, - new OCallable() { - @Override - public Void call(final OBaseWorkLoadContext context) { - updateOperation(((OWorkLoadContext) context).getDb(), records.get(context.currentIdx)); - updatesResult.current.incrementAndGet(); - return null; - } - }); - - deletesResult = executeOperation(databaseIdentifier, deletesResult.total, concurrencyLevel, - new OCallable() { - @Override - public Void call(final OBaseWorkLoadContext context) { - deleteOperation(((OWorkLoadContext) context).getDb(), records.get(context.currentIdx)); - records.set(context.currentIdx, null); - deletesResult.current.incrementAndGet(); - return null; - } - }); + executeOperation(databaseIdentifier, updatesResult, concurrencyLevel, new OCallable() { + @Override + public Void call(final OBaseWorkLoadContext context) { + updateOperation(((OWorkLoadContext) context).getDb(), records.get(context.currentIdx)); + updatesResult.current.incrementAndGet(); + return null; + } + }); + + executeOperation(databaseIdentifier, deletesResult, concurrencyLevel, new OCallable() { + @Override + public Void call(final OBaseWorkLoadContext context) { + deleteOperation(((OWorkLoadContext) context).getDb(), records.get(context.currentIdx)); + records.set(context.currentIdx, null); + deletesResult.current.incrementAndGet(); + return null; + } + }); } protected void createSchema(ODatabaseIdentifier databaseIdentifier) { @@ -171,29 +169,17 @@ public String getPartialResult() { public String getFinalResult() { final StringBuilder buffer = new StringBuilder(getErrors()); - buffer.append(String.format( - "\nCreated %d records in %.3f secs - Throughput: %.3f/sec - Avg: %.3fms/op (%dth percentile) - 99th Perc: %.3fms - 99.9th Perc: %.3fms", - createsResult.total, (createsResult.totalTime / 1000f), createsResult.total * 1000 / (float) createsResult.totalTime, - createsResult.avgNs / 1000000f, createsResult.percentileAvg, createsResult.percentile99Ns / 1000000f, - createsResult.percentile99_9Ns / 1000000f)); - - buffer.append(String.format( - "\nRead %d records in %.3f secs - Throughput: %.3f/sec - Avg: %.3fms/op (%dth percentile) - 99th perc: %.3fms - 99.9th Perc: %.3fms", - readsResult.total, (readsResult.totalTime / 1000f), readsResult.total * 1000 / (float) readsResult.totalTime, - readsResult.avgNs / 1000000f, readsResult.percentileAvg, readsResult.percentile99Ns / 1000000f, - readsResult.percentile99_9Ns / 1000000f)); - - buffer.append(String.format( - "\nUpdated %d records in %.3f secs - Throughput: %.3f/sec - Avg: %.3fms/op (%dth percentile) - 99th perc: %.3fms - 99.9th Perc: %.3fms", - updatesResult.total, (updatesResult.totalTime / 1000f), updatesResult.total * 1000 / (float) updatesResult.totalTime, - updatesResult.avgNs / 1000000f, updatesResult.percentileAvg, updatesResult.percentile99Ns / 1000000f, - updatesResult.percentile99_9Ns / 1000000f)); - - buffer.append(String.format( - "\nDeleted %d records in %.3f secs - Throughput: %.3f/sec - Avg: %.3fms/op (%dth percentile) - 99th perc: %.3fms - 99.9th Perc: %.3fms", - deletesResult.total, (deletesResult.totalTime / 1000f), deletesResult.total * 1000 / (float) deletesResult.totalTime, - deletesResult.avgNs / 1000000f, deletesResult.percentileAvg, deletesResult.percentile99Ns / 1000000f, - deletesResult.percentile99_9Ns / 1000000f)); + buffer.append(String.format("- Created %d records in %.3f secs - %s", createsResult.total, (createsResult.totalTime / 1000f), + createsResult.toOutput())); + + buffer.append(String.format("\n- Read %d records in %.3f secs - %s", readsResult.total, (readsResult.totalTime / 1000f), + readsResult.toOutput())); + + buffer.append(String.format("\n- Updated %d records in %.3f secs - %s", updatesResult.total, (updatesResult.totalTime / 1000f), + updatesResult.toOutput())); + + buffer.append(String.format("\n- Deleted %d records in %.3f secs - %s", deletesResult.total, (deletesResult.totalTime / 1000f), + deletesResult.toOutput())); return buffer.toString(); }