Permalink
Browse files

Merge branch 'dual-reconfig' into benchmark-controller

  • Loading branch information...
Aaron Elmore
Aaron Elmore committed Apr 18, 2014
2 parents bd4fe09 + ced9841 commit 112a4aeef7d50712b076e440d1d4c8f11ab600a7
Showing with 1,235 additions and 781 deletions.
  1. +1 −0 build-common.xml
  2. +0 −2 properties/benchmarks/ycsb.properties
  3. +20 −11 scripts/reconfiguration/experiment-runner.py
  4. +2 −2 scripts/reconfiguration/local.sh
  5. +35 −0 scripts/reconfiguration/plans/ycsb-size100000-2.json
  6. +40 −6 scripts/reconfiguration/plotter.py
  7. +7 −7 scripts/reconfiguration/reconfig-test.sh
  8. +19 −4 scripts/reconfiguration/reconfiguration_experiments.py
  9. +67 −0 src/frontend/edu/brown/hashing/ExplicitHasher.java
  10. +110 −0 src/frontend/edu/brown/hashing/ExplicitPartitions.java
  11. +8 −5 src/frontend/edu/brown/hashing/PlannedHasher.java
  12. +25 −2 src/frontend/edu/brown/hashing/PlannedPartitions.java
  13. +27 −6 src/frontend/edu/brown/hashing/ReconfigurationPlan.java
  14. +9 −4 src/frontend/edu/brown/hashing/TwoTieredRangeHasher.java
  15. +54 −32 src/frontend/edu/brown/hashing/TwoTieredRangePartitions.java
  16. +0 −1 src/frontend/edu/brown/hstore/HStoreCoordinator.java
  17. +320 −309 src/frontend/edu/brown/hstore/PartitionExecutor.java
  18. +21 −0 src/frontend/edu/brown/hstore/conf/HStoreConf.java
  19. +316 −373 src/frontend/edu/brown/hstore/reconfiguration/ReconfigurationCoordinator.java
  20. +11 −6 src/frontend/edu/brown/hstore/reconfiguration/ReconfigurationTracking.java
  21. +3 −1 src/frontend/org/voltdb/compiler/VoltCompiler.java
  22. +121 −0 src/frontend/org/voltdb/sysprocs/ReconfigurationStatic.java
  23. +4 −0 src/frontend/org/voltdb/sysprocs/SysProcFragmentId.java
  24. +2 −1 tests/frontend/edu/brown/hashing/TestMultiTablePlannedPartitions.java
  25. +2 −1 tests/frontend/edu/brown/hashing/TestPlannedPartitions.java
  26. +3 −3 tests/frontend/edu/brown/hashing/TestTwoTieredRangePartitions.java
  27. +2 −1 tests/frontend/edu/brown/hstore/reconfiguration/TestReconfigurationPlanSplitter.java
  28. +3 −2 tests/frontend/edu/brown/hstore/reconfiguration/TestReconfigurationTracking.java
  29. +2 −1 tests/frontend/edu/brown/hstore/reconfiguration/TestReconfigurationTrackingMultiTable.java
  30. +1 −1 tools/hstore/fabric/abstractfabric.py
View
@@ -166,6 +166,7 @@
<arg value="site.reconfig_replication_delay=${site.reconfig_replication_delay}" />
<arg value="site.reconfig_chunk_size_kb=${site.reconfig_chunk_size_kb}" />
<arg value="site.reconfig_async_chunk_size_kb=${site.reconfig_async_chunk_size_kb}" />
<arg value="site.reconfig_subplan_split=${site.reconfig_subplan_split}" />
<arg value="site.storage_mmap=${site.storage_mmap}" />
<arg value="site.storage_mmap_dir=${site.storage_mmap_dir}" />
<arg value="site.mr_map_blocking=${site.mr_map_blocking}" />
@@ -8,8 +8,6 @@ builder = edu.brown.benchmark.ycsb.YCSBProjectBuilder
# only add the number of tuples defined in 'num_records'
# Specify the distribution for key selection
#requestdistribution=custom
#requestdistribution=uniform
requestdistribution=zipfian
# Override operation weights
@@ -850,7 +850,7 @@ def saveCSVResults(inst, fabric, args, benchmark, partitions, filename):
contents = fabric.get_file(inst, filename)
if len(contents) > 0:
# We'll prefix the name with the number of partitions
suffix =''
suffix =''
if args['exp_suffix']:
suffix = "-%s" % (args['exp_suffix'])
localName = "%s-%02dp%s-%s" % (benchmark, partitions, suffix, os.path.basename(filename))
@@ -984,26 +984,35 @@ def createFabricHandle(name, env):
## reconfiguration
## ==============================================
def sweepReconfiguration(client_inst, fabric, args, benchmark, partitions):
LOG.info("Sweeping the reconfiguration events")
LOG.info("Sweeping the reconfiguration events ")
swept_inst = set()
filename = "hevent.log"
suffix =''
if args['exp_suffix']:
suffix = "-%s" % (args['exp_suffix'])
localName = "%s-%02dp%s-%s" % (benchmark, partitions, suffix, os.path.basename(filename))
resultsDir = os.path.join(args['results_dir'], args['exp_type'])
localFile = os.path.join(resultsDir, localName)
if os.path.exists(localFile):
LOG.info("Removing local event file %s" % localFile)
os.remove(localFile)
for inst in fabric.getRunningInstances():
sweepHevent(inst, client_inst, fabric, args, benchmark, partitions)
if str(inst) not in swept_inst:
LOG.info("Swepts :%s " % ",".join(swept_inst))
sweepHevent(inst, client_inst, fabric, args, benchmark, partitions, localFile, filename)
swept_inst.add(str(inst))
else:
LOG.info("Skipping inst:%s due to already swept"% inst)
def sweepHevent(inst, client_inst, fabric, args, benchmark, partitions):
def sweepHevent(inst, client_inst, fabric, args, benchmark, partitions, localFile, filename):
LOG.info("Sweeping the event file for inst %s" % inst)
filename = "hevent.log"
complete_filename = os.path.join(fabric.hstore_dir, filename)
LOG.info("Going to retrieve remote reconfiguration event file '%s'" % complete_filename)
if inst != client_inst:
contents = fabric.touch_file(inst, complete_filename)
contents = fabric.get_file(inst, complete_filename)
if len(contents) > 0:
# Prefix the name with the number of partitions
suffix =''
if args['exp_suffix']:
suffix = "-%s" % (args['exp_suffix'])
localName = "%s-%02dp%s-%s" % (benchmark, partitions, suffix, os.path.basename(filename))
resultsDir = os.path.join(args['results_dir'], args['exp_type'])
localFile = os.path.join(resultsDir, localName)
with open(localFile, "a") as f:
f.write(contents)
LOG.info("Saved reconfiguration events to '%s'" % os.path.realpath(localFile))
@@ -12,14 +12,14 @@ function onexit() {
DATA_DIR="out"
FABRIC_TYPE="ssh"
FIRST_PARAM_OFFSET=0
FIRST_PARAM_OFFSET=1
EXP_TYPES=( \
"reconfig-localhost --partitions=2"
)
#for b in smallbank tpcc seats; do
for b in ycsb; do
for b in tpcc; do
# for b in seats; do
PARAMS=( \
--no-update \
@@ -0,0 +1,35 @@
{
"partition_plans": {
"1": {
"tables": {
"usertable": {
"partitions": {
"0": "0-50000",
"1": "50000-100000"
}
}
}
},
"2": {
"tables": {
"usertable": {
"partitions": {
"1": "0-100000"
}
}
}
},
"3": {
"tables": {
"usertable": {
"partitions": {
"0": "0-10000",
"1": "10000-100000"
}
}
}
}
},
"default_table": "usertable"
}
@@ -97,6 +97,10 @@ def getReconfigEvents(hevent_log):
event = "INIT"
elif "END" in line:
event = "END"
elif "ASYNC_PULL_REQUESTED" in line:
event = "ASYNC_PULL_REQUESTED"
elif "LIVE_PULL_REQUESTED" in line:
event = "LIVE_PULL_REQUESTED"
else:
event = "UNKNOWN"
if event != "UNKNOWN":
@@ -139,6 +143,9 @@ def addReconfigEvent(df, reconfig_events):
df['RECONFIG'] = ''
df['IN_RECONFIG'] = False
df['MISSING_DATA'] = False
df['ASYNC_PULLS'] = 0
df['LIVE_PULLS'] = 0
start, end = getReconfigStartEnd(reconfig_events)
if not reconfig_events:
return
@@ -150,17 +157,26 @@ def addReconfigEvent(df, reconfig_events):
ts += 1000
#find the index last row that has a smaller physical TS
_i = df[(df['TIMESTAMP'] <= (ts) ) ][-1:].index
#LATENCY.isnull()
#if we have new event set, otherwise append
if df.RECONFIG[_i] == "":
df.RECONFIG[_i] = event[1]
if "ASYNC_PULL_REQUESTED" == event[1]:
df.ASYNC_PULLS[_i] = df.ASYNC_PULLS[_i] + 1
elif "LIVE_PULL_REQUESTED" == event[1]:
df.LIVE_PULLS[_i] = df.LIVE_PULLS[_i] + 1
else:
df.RECONFIG[_i] = df.RECONFIG[_i] + "-" + event[1]
#LATENCY.isnull()
#if we have new event set, otherwise append
if df.RECONFIG[_i] == "":
df.RECONFIG[_i] = event[1]
else:
df.RECONFIG[_i] = df.RECONFIG[_i] + "-" + event[1]
df['IN_RECONFIG'][(df.TIMESTAMP >= start-1000) & (df.TIMESTAMP <= end)] = True
df['MISSING_DATA'] = df.LATENCY.isnull()
df['DOWNTIME'] = df['MISSING_DATA'].sum()
df['RECONFIG_TIME'] = end-start
print df['ASYNC_PULLS'].values
print df['LIVE_PULLS'].values
print df['IN_RECONFIG'].values
#df.groupby('IN_RECONFIG')['LATENCY','LATENCY_50','LATENCY_95','LATENCY_99','THROUGHPUT'].mean()
def getIntStats(interval_file):
@@ -383,6 +399,21 @@ def plotResults(args, files, ax):
plotGraph(args)
def plotReconfigs(args, d, ax):
ymean = np.mean(ax.get_ylim())
y1 = ymean + ax.get_ylim()[1]*0.1
y2 = ymean - ax.get_ylim()[1]*0.1
lives = d[d.LIVE_PULLS>0]['LIVE_PULLS']
asyncs = d[d.ASYNC_PULLS>0]['ASYNC_PULLS']
label = 'Live Pull'
for z in lives.iteritems():
plot.plot(z[0],y1,'^',ms=z[1]*2,color='black',label=label)
label = None
label = 'Async Pull'
for z in asyncs.iteritems():
plot.plot(z[0],y2,'D',ms=z[1]*2,color='blue',label=label)
label = None
## ==============================================
## tsd
## ==============================================
@@ -416,6 +447,7 @@ def plotTSD(args, files, ax):
reconfig_events = getReconfigEvents(_file.replace("interval_res.csv", "hevent.log"))
if reconfig_events:
addReconfigEvent(df, reconfig_events)
if len(df[df.RECONFIG.str.contains('TXN')]) == 1:
ax.axvline(df[df.RECONFIG.str.contains('TXN')].index[0], color=color, lw=1.5, linestyle="--",label=init_legend)
if any(df.RECONFIG.str.contains('END')):
@@ -441,6 +473,8 @@ def plotTSD(args, files, ax):
if args.type == "line":
#plot the line with the same color
ax.plot(df.index, data[name], color=color,label=name,ls=linestyle, lw=2.0)
if reconfig_events:
plotReconfigs(args, df, ax)
x+=1 # FOR
plotFrame = pandas.DataFrame(data=data)
if args.type == "line":
@@ -12,20 +12,20 @@ function onexit() {
DATA_DIR="out"
FABRIC_TYPE="ssh"
FIRST_PARAM_OFFSET=1
FIRST_PARAM_OFFSET=0
EXP_TYPES=( \
"reconfig-.5 --partitions=2" \
"reconfig-1 --partitions=2" \
"reconfig-2 --partitions=2" \
"reconfig-4 --partitions=2" \
# "reconfig-.5 --partitions=2" \
# "reconfig-1 --partitions=2" \
"reconfig-slow --partitions=2 --benchmark-size=100000" \
# "reconfig-4 --partitions=2" \
# "reconfig-2 --partitions=4" \
"stopcopy-2 --partitions=2" \
# "stopcopy-2 --partitions=2" \
# "stopcopy-2 --partitions=4" \
)
#for b in smallbank tpcc seats; do
for b in tpcc ycsb; do
for b in ycsb; do
# for b in seats; do
PARAMS=( \
--no-update \
@@ -19,9 +19,10 @@
"stopcopy-tpcc-small",
"reconfig-fast",
"stopcopy-fast",
"reconfig-slow",
]
RECONFIG_CLIENT_COUNT = 5
RECONFIG_CLIENT_COUNT = 1
def updateReconfigurationExperimentEnv(fabric, args, benchmark, partitions ):
partitions_per_site = fabric.env["hstore.partitions_per_site"]
@@ -168,8 +169,8 @@ def updateReconfigurationExperimentEnv(fabric, args, benchmark, partitions ):
fabric.env["client.count"] = RECONFIG_CLIENT_COUNT
fabric.env["client.blocking"] = True
fabric.env["client.output_response_status"] = True
fabric.env["client.threads_per_host"] = min(50, int(partitions * 4))
args["reconfig"] = None
fabric.env["client.threads_per_host"] = min(50, int(partitions * 4))
args["reconfig"] = None
if 'reconfig-tpcc-small' in args['exp_type'] or 'stopcopy-tpcc-small' in args['exp_type']:
fabric.env["client.count"] = RECONFIG_CLIENT_COUNT
@@ -181,10 +182,24 @@ def updateReconfigurationExperimentEnv(fabric, args, benchmark, partitions ):
fabric.env["client.output_txn_profiling_combine"] = True
fabric.env["client.output_txn_counters"] = "txncounters.csv"
fabric.env["client.threads_per_host"] = 50 #partitions * 2 # max(1, int(partitions/2))
fabric.env["client.txnrate"] = 10
fabric.env["client.txnrate"] = 10
fabric.env["site.reconfig_chunk_size_kb"] = 20048
fabric.env["site.reconfig_async_chunk_size_kb"] = 2048
fabric.env["site.commandlog_enable"] = False
fabric.env["benchmark.neworder_multip"] = False
fabric.env["benchmark.payment_multip"] = False
if 'reconfig-slow' == args['exp_type']:
fabric.env["client.blocking_concurrent"] = 1 # * int(partitions/8)
fabric.env["client.count"] = 1
fabric.env["client.blocking"] = True
fabric.env["client.output_response_status"] = True
fabric.env["client.threads_per_host"] = 2
fabric.env["site.reconfig_chunk_size_kb"] = 2048
fabric.env["site.reconfig_async_chunk_size_kb"] = 2048
fabric.env["site.commandlog_enable"] = False
fabric.env["client.txnrate"] = 100
#fabric.env["site.reconfig_async"] = False
fabric.env["benchmark.loadthreads"] = 1
fabric.env["benchmark.requestdistribution"] = "uniform"
@@ -0,0 +1,67 @@
package edu.brown.hashing;
import java.io.File;
import java.io.IOException;
import org.voltdb.CatalogContext;
import org.voltdb.catalog.CatalogType;
import org.voltdb.catalog.Database;
public interface ExplicitHasher {
/**
* Combine multiple values into a single key and get the hash of that
* Should be uniformly distributed (or at least good enough for what we need)
* @param values
* @return
*/
public abstract int multiValueHash(Object values[]);
public abstract int multiValueHash(Object val0, Object val1);
public abstract int multiValueHash(int... values);
/**
* Return the number of partitions that this hasher can map values to
* @return
*/
public abstract int getNumPartitions();
public abstract void init(CatalogContext catalogContext);
/**
* Hash the given value based on the partition count
* @param value
* @return
*/
public abstract int hash(Object value);
/**
* Hash the given value that is derived from a particular catalog object
* @param value
* @param catalog_item
* @return
*/
public abstract int hash(Object value, CatalogType catalog_item);
/**
* Hash the given value using a specific partition count
* @param value
* @param num_partitions
* @return
*/
public abstract int hash(Object value, int num_partitions);
public abstract void load(File input_path, Database catalog_db) throws IOException;
public abstract void save(File output_path) throws IOException;
public abstract String toJSONString();
public ReconfigurationPlan changePartitionPhase(String partition_plan) throws Exception;
public ReconfigurationPlan changePartitionPlan(String partition_json_file) throws Exception;
public ExplicitPartitions getPartitions();
}
Oops, something went wrong.

0 comments on commit 112a4ae

Please sign in to comment.