Permalink
Browse files

Made several changes for use with TPC-H implementation and col-stores…

… DBs:

- New benchmark states for cold and hot query runs.
- Can run queries serially (run each query in sequential order on one terminal)
  exactly once (a `latency' run) or for a set amount of time (looping through
  the query list until the timer expires).
- Added support for MonetDB in TPC-C.
- Can re-run a test using the .raw output. Submits identical queries to the
  system at the same time; for repeatability of tests.
- Can specify `groupings' of query weights in config file. A shorthand to
  make config management a little easier.
  • Loading branch information...
ben-reilly committed Dec 13, 2013
1 parent 7f8d589 commit e37103790ff0208ecc5d51c3b504e838d83a401c
View
Binary file not shown.
View
@@ -1,18 +1,18 @@
truncate table warehouse;
delete warehouse;
truncate table item;
delete item;
truncate table stock;
delete stock;
truncate table district;
delete district;
truncate table customer;
delete customer;
truncate table history;
delete history;
truncate table oorder;
delete oorder;
truncate table order_line;
delete order_line;
truncate table new_order;
delete new_order;
@@ -70,6 +70,26 @@ public void startMeasure() {
state = State.MEASURE;
}
public void startColdQuery() {
assert state == State.MEASURE;
state = State.COLD_QUERY;
}
public void startHotQuery() {
assert state == State.COLD_QUERY;
state = State.MEASURE;
}
public void signalLatencyComplete() {
assert state == State.MEASURE;
state = State.LATENCY_COMPLETE;
}
public void ackLatencyComplete() {
assert state == State.LATENCY_COMPLETE;
state = State.MEASURE;
}
public void startCoolDown() {
assert state == State.MEASURE;
state = State.DONE;

Large diffs are not rendered by default.

Oops, something went wrong.
@@ -24,17 +24,16 @@
/** Efficiently stores a record of (start time, latency) pairs. */
public class LatencyRecord implements Iterable<LatencyRecord.Sample> {
/** Allocate int[] arrays of this length (262144 = 1 MB). */
static final int BLOCK_SIZE = 262144;
/** Allocate space for 500k samples at a time */
static final int ALLOC_SIZE = 500000;
/**
* Contains (start time, latency, transactionType, workerid, phaseid) pentiplets
* in microsecond form. The start times are "compressed" by encoding them as
* increments, starting from startNs. A 32-bit integer provides sufficient resolution
* for an interval of 2146 seconds, or 35 minutes.
*/
// TODO: Use a real variable length encoding?
private final ArrayList<long[]> values = new ArrayList<long[]>();
private final ArrayList<Sample[]> values = new ArrayList<Sample[]>();
private int nextIndex;
private final long startNs;
@@ -49,57 +48,54 @@ public LatencyRecord(long startNs) {
}
public void addLatency(int transType, long startNs, long endNs, int workerId, int phaseId) {
public void addLatency(int transType, long startNs, long endNs, int workerId, int phaseId) {
assert lastNs > 0;
assert lastNs - 500 <= startNs;
assert endNs >= startNs;
if (nextIndex >= BLOCK_SIZE - 5) { // barzan: I changed this!
if (nextIndex == ALLOC_SIZE) {
allocateChunk();
}
long[] chunk = values.get(values.size() - 1);
Sample[] chunk = values.get(values.size() - 1);
long startOffsetUs = ((startNs - lastNs + 500) / 1000);
assert startOffsetUs >= 0;
long startOffsetNs = (startNs - lastNs + 500);
assert startOffsetNs >= 0;
int latencyUs = (int) ((endNs - startNs + 500) / 1000);
assert latencyUs >= 0;
chunk[nextIndex] = transType;
chunk[nextIndex + 1] = startOffsetUs;
chunk[nextIndex + 2] = latencyUs;
chunk[nextIndex + 3] = workerId;
chunk[nextIndex + 4] = phaseId;
nextIndex += 5;
chunk[nextIndex] = new Sample(transType, startOffsetNs, latencyUs
, workerId, phaseId);
++nextIndex;
lastNs += startOffsetUs * 1000L;
lastNs += startOffsetNs;
}
private void allocateChunk() {
assert (values.isEmpty() && nextIndex == 0)
|| nextIndex >= BLOCK_SIZE - 5;
values.add(new long[BLOCK_SIZE]);
|| nextIndex == ALLOC_SIZE;
values.add(new Sample[ALLOC_SIZE]);
nextIndex = 0;
}
/** Returns the number of recorded samples. */
public int size() {
// Samples stored in full chunks
int samples = (values.size() - 1) * (BLOCK_SIZE / 5);
int samples = (values.size() - 1) * ALLOC_SIZE;
// Samples stored in the last not full chunk
samples += nextIndex / 5;
samples += nextIndex;
return samples;
}
/** Stores the start time and latency for a single sample. Immutable. */
public static final class Sample implements Comparable<Sample> {
public final int tranType;
public final long startNs;
public long startNs;
public final int latencyUs;
public final int workerId;
public final int phaseId;
public Sample(int tranType, long startNs, int latencyUs, int workerId, int phaseId) {
public Sample(int tranType, long startNs, int latencyUs, int workerId, int phaseId) {
this.tranType = tranType;
this.startNs = startNs;
this.latencyUs = latencyUs;
@@ -145,21 +141,23 @@ public boolean hasNext() {
@Override
public Sample next() {
long[] chunk = values.get(chunkIndex);
int tranType = (int) chunk[subIndex];
long offsetUs = chunk[subIndex + 1];
int latencyUs = (int) chunk[subIndex + 2];
int workerId = (int) chunk[subIndex + 3];
int phaseId = (int) chunk[subIndex + 4];
subIndex += 5;
if (subIndex >= BLOCK_SIZE - 5) {
Sample[] chunk = values.get(chunkIndex);
Sample s = chunk[subIndex];
// Iterate in chunk, and wrap to next one
++subIndex;
assert subIndex <= ALLOC_SIZE;
if (subIndex == ALLOC_SIZE) {
chunkIndex += 1;
subIndex = 0;
}
long startNs = lastIteratorNs + offsetUs * 1000L;
lastIteratorNs = startNs;
return new Sample(tranType, startNs, latencyUs, workerId, phaseId);
// Previously, s.startNs was just an offset from the previous
// value. Now we make it an absolute.
s.startNs += lastIteratorNs;
lastIteratorNs = s.startNs;
return s;
}
@Override
@@ -20,12 +20,15 @@
private final boolean rateLimited;
private final boolean disabled;
private final boolean serial;
private final boolean timed;
private final List<Double> weights;
private final int num_weights;
private int activeTerminals;
private int nextSerial;
Phase(String benchmarkName, int id, int t, int r, List<String> o, boolean rateLimited, boolean disabled, int activeTerminals, Arrival a) {
Phase(String benchmarkName, int id, int t, int r, List<String> o, boolean rateLimited, boolean disabled, boolean serial, boolean timed, int activeTerminals, Arrival a) {
ArrayList<Double> w = new ArrayList<Double>();
for (String s : o)
w.add(Double.parseDouble(s));
@@ -38,6 +41,9 @@
this.num_weights = this.weights.size();
this.rateLimited = rateLimited;
this.disabled = disabled;
this.serial = serial;
this.timed = timed;
this.nextSerial = 1;
this.activeTerminals = activeTerminals;
this.arrival=a;
}
@@ -50,6 +56,26 @@ public boolean isDisabled() {
return disabled;
}
public boolean isSerial() {
return serial;
}
public boolean isTimed() {
return timed;
}
public boolean isLatencyRun() {
return !timed && serial;
}
public boolean isThroughputRun() {
return !isLatencyRun();
}
public void resetSerial() {
this.nextSerial = 1;
}
public int getActiveTerminals() {
return activeTerminals;
}
@@ -80,14 +106,49 @@ public double totalWeight() {
* @return
*/
public int chooseTransaction() {
int randomPercentage = gen.nextInt(100) + 1;
return chooseTransaction(false);
}
public int chooseTransaction(boolean isColdQuery) {
if (isDisabled())
return -1;
if (isSerial()) {
int ret;
synchronized(this) {
ret = this.nextSerial;
// Serial runs should not execute queries with non-positive
// weights.
while (ret <= this.num_weights && weights.get(ret - 1).doubleValue() <= 0.0)
ret = ++this.nextSerial;
// If it's a cold execution, then we don't want to advance yet,
// since the hot run needs to execute the same query.
if (!isColdQuery) {
// For timed, serial executions, we're doing a QPS (query
// throughput) run, so we loop through the list multiple
// times. Note that we do the modulus before the increment
// so that we end up in the range [1,num_weights]
if (isTimed()) {
assert this.isThroughputRun();
this.nextSerial %= this.num_weights;
}
++this.nextSerial;
}
}
return ret;
}
else {
int randomPercentage = gen.nextInt((int)totalWeight()) + 1;
double weight = 0.0;
for (int i = 0; i < this.num_weights; i++) {
weight += weights.get(i).doubleValue();
if (randomPercentage <= weight) {
return i + 1;
}
} // FOR
}
return -1;
}
@@ -102,7 +163,14 @@ public String currentPhaseString() {
if (isDisabled()){
retString += "[Disabled= true]";
} else {
retString += "[Time= " + time + "] [Rate= " + (isRateLimited() ? rate : "unlimited") + "] [Arrival= " + arrival + "] [Ratios= " + getWeights() + "] [Active Workers=" + getActiveTerminals() + "]";
if (isLatencyRun()) {
retString += "[Serial= true] [Time= n/a] ";
}
else {
retString += "[Serial= " + (isSerial()? "true" : "false")
+ "] [Time= " + time + "] ";
}
retString += "[Rate= " + (isRateLimited() ? rate : "unlimited") + "] [Arrival= " + arrival + "] [Ratios= " + getWeights() + "] [Active Workers=" + getActiveTerminals() + "]";
}
return retString;
}
@@ -0,0 +1,33 @@
package com.oltpbenchmark;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.LinkedList;
import com.oltpbenchmark.types.State;
import com.oltpbenchmark.util.QueueLimitException;
import org.apache.log4j.Logger;
/**
* This class is used for keeping track of the procedures that have been
* submitted to the system when running a rate-limited benchmark.
* @author breilly
*/
public class SubmittedProcedure {
private final int type;
private final long startTime;
SubmittedProcedure(int type) {
this.type = type;
this.startTime = System.nanoTime();
}
SubmittedProcedure(int type, long startTime) {
this.type = type;
this.startTime = startTime;
}
public int getType() { return type; }
public long getStartTime() { return startTime; }
}
Oops, something went wrong.

0 comments on commit e371037

Please sign in to comment.