Skip to content

Commit

Permalink
StressTest: implemented MT shortest path
Browse files Browse the repository at this point in the history
  • Loading branch information
lvca committed Jun 29, 2016
1 parent 4b99f50 commit 274baf3
Show file tree
Hide file tree
Showing 8 changed files with 382 additions and 154 deletions.
Expand Up @@ -27,6 +27,7 @@
import com.tinkerpop.blueprints.impls.orient.OrientBaseGraph; import com.tinkerpop.blueprints.impls.orient.OrientBaseGraph;
import com.tinkerpop.blueprints.impls.orient.OrientGraph; import com.tinkerpop.blueprints.impls.orient.OrientGraph;
import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx; import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx;
import com.tinkerpop.blueprints.impls.orient.OrientVertex;


/** /**
* CRUD implementation of the workload. * CRUD implementation of the workload.
Expand All @@ -36,8 +37,14 @@
public abstract class OBaseGraphWorkload extends OBaseWorkload { public abstract class OBaseGraphWorkload extends OBaseWorkload {
protected boolean tx = false; protected boolean tx = false;


protected OBaseGraphWorkload(final boolean tx) {
this.tx = tx;
}

public class OWorkLoadContext extends OBaseWorkload.OBaseWorkLoadContext { public class OWorkLoadContext extends OBaseWorkload.OBaseWorkLoadContext {
OrientBaseGraph graph; OrientBaseGraph graph;
OrientVertex lastVertexToConnect;
int lastVertexEdges;


@Override @Override
public void init(ODatabaseIdentifier dbIdentifier) { public void init(ODatabaseIdentifier dbIdentifier) {
Expand All @@ -51,6 +58,11 @@ public void close() {
} }
} }


@Override
protected OBaseWorkLoadContext getContext() {
return new OWorkLoadContext();
}

protected OrientGraphNoTx getGraphNoTx(final ODatabaseIdentifier databaseIdentifier) { protected OrientGraphNoTx getGraphNoTx(final ODatabaseIdentifier databaseIdentifier) {
final ODatabase database = ODatabaseUtils.openDatabase(databaseIdentifier); final ODatabase database = ODatabaseUtils.openDatabase(databaseIdentifier);
if (database == null) if (database == null)
Expand Down
Expand Up @@ -24,8 +24,11 @@
import com.orientechnologies.orient.core.record.impl.ODocument; import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.stresstest.ODatabaseIdentifier; import com.orientechnologies.orient.stresstest.ODatabaseIdentifier;
import com.tinkerpop.blueprints.impls.orient.OrientBaseGraph; import com.tinkerpop.blueprints.impls.orient.OrientBaseGraph;
import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx;
import com.tinkerpop.blueprints.impls.orient.OrientVertex; import com.tinkerpop.blueprints.impls.orient.OrientVertex;


import java.util.List;

/** /**
* CRUD implementation of the workload. * CRUD implementation of the workload.
* *
Expand All @@ -35,14 +38,17 @@ public class OGraphInsertWorkload extends OBaseGraphWorkload {


static final String INVALID_FORM_MESSAGE = "GRAPH INSERT workload must be in form of <vertices>F<connection-factor>."; static final String INVALID_FORM_MESSAGE = "GRAPH INSERT workload must be in form of <vertices>F<connection-factor>.";


private int vertices = 0;
private int factor = 80; private int factor = 80;
private OWorkLoadResult resultVertices = new OWorkLoadResult(); private OWorkLoadResult resultVertices = new OWorkLoadResult();
private OWorkLoadResult resultEdges = new OWorkLoadResult(); private OWorkLoadResult resultEdges = new OWorkLoadResult();


public OGraphInsertWorkload() {
super(false);
}

@Override @Override
public String getName() { public String getName() {
return "GRAPHINSERT"; return "GINSERT";
} }


@Override @Override
Expand All @@ -64,65 +70,70 @@ public void parseParameters(final String args) {
} }
assignState(state, number, ' '); assignState(state, number, ' ');


if (vertices == 0) if (resultVertices.total == 0)
throw new IllegalArgumentException(INVALID_FORM_MESSAGE); throw new IllegalArgumentException(INVALID_FORM_MESSAGE);
} }


@Override @Override
public void execute(final int concurrencyLevel, final ODatabaseIdentifier databaseIdentifier) { public void execute(final int concurrencyLevel, final ODatabaseIdentifier databaseIdentifier) {
// TODO: aggregation, shortest path, hard path, neighbors, neighbors2, look also at XDBench final List<OBaseWorkLoadContext> contexts = executeOperation(databaseIdentifier, resultVertices, concurrencyLevel,
executeOperation(databaseIdentifier, vertices, concurrencyLevel, new OCallable<Void, OBaseWorkLoadContext>() { new OCallable<Void, OBaseWorkLoadContext>() {
OrientVertex lastVertexToConnect; @Override
int lastVertexEdges; public Void call(final OBaseWorkLoadContext context) {
final OWorkLoadContext graphContext = ((OWorkLoadContext) context);
final OrientBaseGraph graph = graphContext.graph;

final OrientVertex v = graph.addVertex(null, "_id", resultVertices.current.get());


@Override if (graphContext.lastVertexToConnect != null) {
public Void call(final OBaseWorkLoadContext context) { v.addEdge("E", graphContext.lastVertexToConnect);
final OrientBaseGraph graph = ((OBaseGraphWorkload.OWorkLoadContext) context).graph; resultEdges.current.incrementAndGet();


final OrientVertex v = graph.addVertex(null, "_id", resultVertices.current.get()); graphContext.lastVertexEdges++;


if (lastVertexToConnect != null) { if (graphContext.lastVertexEdges > factor) {
v.addEdge("E", lastVertexToConnect); graphContext.lastVertexEdges = 0;
resultEdges.current.incrementAndGet(); graphContext.lastVertexToConnect = v;
}
} else
graphContext.lastVertexToConnect = v;


lastVertexEdges++; resultVertices.current.incrementAndGet();


if (lastVertexEdges > factor) { return null;
lastVertexEdges = 0;
lastVertexToConnect = v;
} }
} else });
lastVertexToConnect = v;


resultVertices.current.incrementAndGet(); final OrientGraphNoTx graph = getGraphNoTx(databaseIdentifier);
try {
// CONNECTED ALL THE SUB GRAPHS
OrientVertex lastVertex = null;
for (OBaseWorkLoadContext context : contexts) {
if (lastVertex != null)
lastVertex.addEdge("E", ((OWorkLoadContext) context).lastVertexToConnect);


return null; lastVertex = ((OWorkLoadContext) context).lastVertexToConnect;
} }
}); } finally {
graph.shutdown();
}

} }


@Override @Override
public String getPartialResult() { public String getPartialResult() {
return String.format("%d%% [Vertices: %d - Edges: %d]", ((100 * resultVertices.current.get() / vertices)), return String.format("%d%% [Vertices: %d - Edges: %d]", ((100 * resultVertices.current.get() / resultVertices.total)),
resultVertices.current.get(), resultEdges.current.get()); resultVertices.current.get(), resultEdges.current.get());
} }


@Override @Override
public String getFinalResult() { public String getFinalResult() {
final StringBuilder buffer = new StringBuilder(getErrors()); final StringBuilder buffer = new StringBuilder(getErrors());


buffer.append(String.format("\nCreated %d vertices and %d edges in %.3f secs", resultVertices.total, resultEdges.total, buffer.append(String.format("- Created %d vertices and %d edges in %.3f secs", resultVertices.current.get(),
resultVertices.totalTime / 1000f)); resultEdges.current.get(), resultVertices.totalTime / 1000f));

buffer.append(String.format(
"\n- Vertices Throughput: %.3f/sec - Avg: %.3fms/op (%dth percentile) - 99th Perc: %.3fms - 99.9th Perc: %.3fms",
resultVertices.total * 1000 / (float) resultVertices.totalTime, resultVertices.avgNs / 1000000f,
resultVertices.percentileAvg, resultVertices.percentile99Ns / 1000000f, resultVertices.percentile99_9Ns / 1000000f));


buffer.append(String.format( buffer.append(resultVertices.toOutput());
"\n- Edges Throughput: %.3f/sec - Avg: %.3fms/op (%dth percentile) - 99th Perc: %.3fms - 99.9th Perc: %.3fms",
resultEdges.total * 1000 / (float) resultEdges.totalTime, resultEdges.avgNs / 1000000f, resultEdges.percentileAvg,
resultEdges.percentile99Ns / 1000000f, resultEdges.percentile99_9Ns / 1000000f));


return buffer.toString(); return buffer.toString();
} }
Expand All @@ -142,7 +153,7 @@ private char assignState(final char state, final StringBuilder number, final cha
number.append("0"); number.append("0");


if (state == 'V') if (state == 'V')
vertices = Integer.parseInt(number.toString()); resultVertices.total = Integer.parseInt(number.toString());
else if (state == 'F') else if (state == 'F')
factor = Integer.parseInt(number.toString()); factor = Integer.parseInt(number.toString());


Expand All @@ -156,7 +167,7 @@ protected OBaseWorkLoadContext getContext() {
} }


public int getVertices() { public int getVertices() {
return vertices; return resultVertices.total;
} }


public int getFactor() { public int getFactor() {
Expand Down
@@ -0,0 +1,180 @@
/*
*
* * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* *
* * For more information: http://www.orientechnologies.com
*
*/
package com.orientechnologies.orient.graph.stresstest;

import com.orientechnologies.common.util.OCallable;
import com.orientechnologies.orient.core.db.record.OIdentifiable;
import com.orientechnologies.orient.core.id.ORID;
import com.orientechnologies.orient.core.metadata.schema.OType;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.core.sql.OCommandSQL;
import com.orientechnologies.orient.stresstest.ODatabaseIdentifier;
import com.tinkerpop.blueprints.impls.orient.OrientBaseGraph;
import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx;
import com.tinkerpop.blueprints.impls.orient.OrientVertex;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
* CRUD implementation of the workload.
*
* @author Luca Garulli
*/
public class OGraphShortestPathWorkload extends OBaseGraphWorkload {

static final String INVALID_FORM_MESSAGE = "SHORTESTPATH workload must be in form of L<limit>.";

private int limit = -1;
private OWorkLoadResult result = new OWorkLoadResult();
private final AtomicLong totalDepth = new AtomicLong();
private final AtomicLong maxDepth = new AtomicLong();
private final AtomicLong notConnected = new AtomicLong();
private final List<ORID> startingVertices = new ArrayList<ORID>(limit > -1 ? limit : 1000);

public OGraphShortestPathWorkload() {
super(false);
}

@Override
public String getName() {
return "GSP";
}

@Override
public void parseParameters(final String args) {
if (args == null)
return;

final String ops = args.toUpperCase();
char state = 'L';
final StringBuilder number = new StringBuilder();

for (int pos = 0; pos < ops.length(); ++pos) {
final char c = ops.charAt(pos);

if (c == ' ' || c == 'L') {
state = assignState(state, number, c);
} else if (c >= '0' && c <= '9')
number.append(c);
else
throw new IllegalArgumentException(
"Character '" + c + "' is not valid on " + getName() + " workload. " + INVALID_FORM_MESSAGE);
}
assignState(state, number, ' ');

result.total = 1;
}

@Override
public void execute(final int concurrencyLevel, final ODatabaseIdentifier databaseIdentifier) {
// RETRIEVE THE STARTING VERTICES
final OrientGraphNoTx g = getGraphNoTx(databaseIdentifier);
try {
for (OIdentifiable id : g.getRawGraph().browseClass("V")) {
startingVertices.add(id.getIdentity());
if (limit > -1 && startingVertices.size() >= limit)
break;
}
} finally {
g.shutdown();
}
result.total = startingVertices.size();

executeOperation(databaseIdentifier, result, concurrencyLevel, new OCallable<Void, OBaseWorkLoadContext>() {
@Override
public Void call(final OBaseWorkLoadContext context) {
final OWorkLoadContext graphContext = ((OWorkLoadContext) context);
final OrientBaseGraph graph = graphContext.graph;

for (int i = 0; i < startingVertices.size(); ++i) {
final Iterable<OrientVertex> commandResult = graph.command(new OCommandSQL("select shortestPath(?,?, 'both')"))
.execute(startingVertices.get(context.currentIdx), startingVertices.get(i));

for (OrientVertex v : commandResult) {
Collection depth = v.getRecord().field("shortestPath");
if (depth != null && !depth.isEmpty()) {
totalDepth.addAndGet(depth.size());

long max = maxDepth.get();
while (depth.size() > max) {
if (maxDepth.compareAndSet(max, depth.size()))
break;
max = maxDepth.get();
}
} else
notConnected.incrementAndGet();
}
}
result.current.incrementAndGet();

return null;
}
});
}

@Override
public String getPartialResult() {
return String.format("%d%% [Shortest paths blocks (block size=%d) executed: %d/%d]",
((100 * result.current.get() / result.total)), startingVertices.size(), result.current.get(), startingVertices.size());
}

@Override
public String getFinalResult() {
final StringBuilder buffer = new StringBuilder(getErrors());

buffer.append(String.format("- Executed %d shortest paths in %.3f secs", result.current.get(), result.totalTime / 1000f));
buffer.append(String.format("\n- Path depth: maximum %d, average %.3f, not connected %d", maxDepth.get(),
totalDepth.get() / (float) startingVertices.size() / (float) startingVertices.size(), notConnected.get()));
buffer.append(result.toOutput());

return buffer.toString();
}

@Override
public String getFinalResultAsJson() {
final ODocument json = new ODocument();

json.field("shortestPath", result.toJSON(), OType.EMBEDDED);

return json.toString();
}

private char assignState(final char state, final StringBuilder number, final char c) {
if (number.length() == 0)
number.append("0");

if (state == 'L')
limit = Integer.parseInt(number.toString());

number.setLength(0);
return c;
}

public int getShortestPaths() {
return result.total;
}

public int getLimit() {
return limit;
}
}
Expand Up @@ -31,7 +31,7 @@
public class OConsoleProgressWriter extends OSoftThread { public class OConsoleProgressWriter extends OSoftThread {


final private OWorkload workload; final private OWorkload workload;
private String lastResult = null; private String lastResult = null;


public OConsoleProgressWriter(final OWorkload workload) { public OConsoleProgressWriter(final OWorkload workload) {
this.workload = workload; this.workload = workload;
Expand All @@ -45,9 +45,13 @@ public void printMessage(final String message) {
protected void execute() throws Exception { protected void execute() throws Exception {
final String result = workload.getPartialResult(); final String result = workload.getPartialResult();
if (lastResult == null || !lastResult.equals(result)) if (lastResult == null || !lastResult.equals(result))
System.out.print("\rStress test in progress " + result); System.out.print("\r- Workload in progress " + result);
lastResult = result; lastResult = result;
Thread.sleep(20); try {
Thread.sleep(300);
} catch (InterruptedException e) {
interruptCurrentOperation();
}
} }


@Override @Override
Expand Down

0 comments on commit 274baf3

Please sign in to comment.