Permalink
Browse files

Many small clean ups, tracking for pulls in hevent, and some exp runn…

…er log clean up
  • Loading branch information...
Aaron Elmore
Aaron Elmore committed Apr 18, 2014
1 parent d863e9d commit ced9841e4fe988af15c88729e73fad2c04d2482d
@@ -850,7 +850,7 @@ def saveCSVResults(inst, fabric, args, benchmark, partitions, filename):
contents = fabric.get_file(inst, filename)
if len(contents) > 0:
# We'll prefix the name with the number of partitions
suffix =''
suffix =''
if args['exp_suffix']:
suffix = "-%s" % (args['exp_suffix'])
localName = "%s-%02dp%s-%s" % (benchmark, partitions, suffix, os.path.basename(filename))
@@ -984,26 +984,35 @@ def createFabricHandle(name, env):
## reconfiguration
## ==============================================
def sweepReconfiguration(client_inst, fabric, args, benchmark, partitions):
LOG.info("Sweeping the reconfiguration events")
LOG.info("Sweeping the reconfiguration events ")
swept_inst = set()
filename = "hevent.log"
suffix =''
if args['exp_suffix']:
suffix = "-%s" % (args['exp_suffix'])
localName = "%s-%02dp%s-%s" % (benchmark, partitions, suffix, os.path.basename(filename))
resultsDir = os.path.join(args['results_dir'], args['exp_type'])
localFile = os.path.join(resultsDir, localName)
if os.path.exists(localFile):
LOG.info("Removing local event file %s" % localFile)
os.remove(localFile)
for inst in fabric.getRunningInstances():
sweepHevent(inst, client_inst, fabric, args, benchmark, partitions)
if str(inst) not in swept_inst:
LOG.info("Swepts :%s " % ",".join(swept_inst))
sweepHevent(inst, client_inst, fabric, args, benchmark, partitions, localFile, filename)
swept_inst.add(str(inst))
else:
LOG.info("Skipping inst:%s due to already swept"% inst)
def sweepHevent(inst, client_inst, fabric, args, benchmark, partitions):
def sweepHevent(inst, client_inst, fabric, args, benchmark, partitions, localFile, filename):
LOG.info("Sweeping the event file for inst %s" % inst)
filename = "hevent.log"
complete_filename = os.path.join(fabric.hstore_dir, filename)
LOG.info("Going to retrieve remote reconfiguration event file '%s'" % complete_filename)
if inst != client_inst:
contents = fabric.touch_file(inst, complete_filename)
contents = fabric.get_file(inst, complete_filename)
if len(contents) > 0:
# Prefix the name with the number of partitions
suffix =''
if args['exp_suffix']:
suffix = "-%s" % (args['exp_suffix'])
localName = "%s-%02dp%s-%s" % (benchmark, partitions, suffix, os.path.basename(filename))
resultsDir = os.path.join(args['results_dir'], args['exp_type'])
localFile = os.path.join(resultsDir, localName)
with open(localFile, "a") as f:
f.write(contents)
LOG.info("Saved reconfiguration events to '%s'" % os.path.realpath(localFile))
@@ -12,14 +12,14 @@ function onexit() {
DATA_DIR="out"
FABRIC_TYPE="ssh"
FIRST_PARAM_OFFSET=0
FIRST_PARAM_OFFSET=1
EXP_TYPES=( \
"reconfig-localhost --partitions=2"
)
#for b in smallbank tpcc seats; do
for b in ycsb; do
for b in tpcc; do
# for b in seats; do
PARAMS=( \
--no-update \
@@ -0,0 +1,35 @@
{
"partition_plans": {
"1": {
"tables": {
"usertable": {
"partitions": {
"0": "0-50000",
"1": "50000-100000"
}
}
}
},
"2": {
"tables": {
"usertable": {
"partitions": {
"1": "0-100000"
}
}
}
},
"3": {
"tables": {
"usertable": {
"partitions": {
"0": "0-10000",
"1": "10000-100000"
}
}
}
}
},
"default_table": "usertable"
}
@@ -97,6 +97,10 @@ def getReconfigEvents(hevent_log):
event = "INIT"
elif "END" in line:
event = "END"
elif "ASYNC_PULL_REQUESTED" in line:
event = "ASYNC_PULL_REQUESTED"
elif "LIVE_PULL_REQUESTED" in line:
event = "LIVE_PULL_REQUESTED"
else:
event = "UNKNOWN"
if event != "UNKNOWN":
@@ -139,6 +143,9 @@ def addReconfigEvent(df, reconfig_events):
df['RECONFIG'] = ''
df['IN_RECONFIG'] = False
df['MISSING_DATA'] = False
df['ASYNC_PULLS'] = 0
df['LIVE_PULLS'] = 0
start, end = getReconfigStartEnd(reconfig_events)
if not reconfig_events:
return
@@ -150,17 +157,26 @@ def addReconfigEvent(df, reconfig_events):
ts += 1000
#find the index last row that has a smaller physical TS
_i = df[(df['TIMESTAMP'] <= (ts) ) ][-1:].index
#LATENCY.isnull()
#if we have new event set, otherwise append
if df.RECONFIG[_i] == "":
df.RECONFIG[_i] = event[1]
if "ASYNC_PULL_REQUESTED" == event[1]:
df.ASYNC_PULLS[_i] = df.ASYNC_PULLS[_i] + 1
elif "LIVE_PULL_REQUESTED" == event[1]:
df.LIVE_PULLS[_i] = df.LIVE_PULLS[_i] + 1
else:
df.RECONFIG[_i] = df.RECONFIG[_i] + "-" + event[1]
#LATENCY.isnull()
#if we have new event set, otherwise append
if df.RECONFIG[_i] == "":
df.RECONFIG[_i] = event[1]
else:
df.RECONFIG[_i] = df.RECONFIG[_i] + "-" + event[1]
df['IN_RECONFIG'][(df.TIMESTAMP >= start-1000) & (df.TIMESTAMP <= end)] = True
df['MISSING_DATA'] = df.LATENCY.isnull()
df['DOWNTIME'] = df['MISSING_DATA'].sum()
df['RECONFIG_TIME'] = end-start
print df['ASYNC_PULLS'].values
print df['LIVE_PULLS'].values
print df['IN_RECONFIG'].values
#df.groupby('IN_RECONFIG')['LATENCY','LATENCY_50','LATENCY_95','LATENCY_99','THROUGHPUT'].mean()
def getIntStats(interval_file):
@@ -383,6 +399,21 @@ def plotResults(args, files, ax):
plotGraph(args)
def plotReconfigs(args, d, ax):
ymean = np.mean(ax.get_ylim())
y1 = ymean + ax.get_ylim()[1]*0.1
y2 = ymean - ax.get_ylim()[1]*0.1
lives = d[d.LIVE_PULLS>0]['LIVE_PULLS']
asyncs = d[d.ASYNC_PULLS>0]['ASYNC_PULLS']
label = 'Live Pull'
for z in lives.iteritems():
plot.plot(z[0],y1,'^',ms=z[1]*2,color='black',label=label)
label = None
label = 'Async Pull'
for z in asyncs.iteritems():
plot.plot(z[0],y2,'D',ms=z[1]*2,color='blue',label=label)
label = None
## ==============================================
## tsd
## ==============================================
@@ -416,6 +447,7 @@ def plotTSD(args, files, ax):
reconfig_events = getReconfigEvents(_file.replace("interval_res.csv", "hevent.log"))
if reconfig_events:
addReconfigEvent(df, reconfig_events)
if len(df[df.RECONFIG.str.contains('TXN')]) == 1:
ax.axvline(df[df.RECONFIG.str.contains('TXN')].index[0], color=color, lw=1.5, linestyle="--",label=init_legend)
if any(df.RECONFIG.str.contains('END')):
@@ -441,6 +473,8 @@ def plotTSD(args, files, ax):
if args.type == "line":
#plot the line with the same color
ax.plot(df.index, data[name], color=color,label=name,ls=linestyle, lw=2.0)
if reconfig_events:
plotReconfigs(args, df, ax)
x+=1 # FOR
plotFrame = pandas.DataFrame(data=data)
if args.type == "line":
@@ -12,20 +12,20 @@ function onexit() {
DATA_DIR="out"
FABRIC_TYPE="ssh"
FIRST_PARAM_OFFSET=1
FIRST_PARAM_OFFSET=0
EXP_TYPES=( \
"reconfig-.5 --partitions=2" \
"reconfig-1 --partitions=2" \
"reconfig-2 --partitions=2" \
"reconfig-4 --partitions=2" \
# "reconfig-.5 --partitions=2" \
# "reconfig-1 --partitions=2" \
"reconfig-slow --partitions=2 --benchmark-size=100000" \
# "reconfig-4 --partitions=2" \
# "reconfig-2 --partitions=4" \
"stopcopy-2 --partitions=2" \
# "stopcopy-2 --partitions=2" \
# "stopcopy-2 --partitions=4" \
)
#for b in smallbank tpcc seats; do
for b in tpcc ycsb; do
for b in ycsb; do
# for b in seats; do
PARAMS=( \
--no-update \
@@ -19,9 +19,10 @@
"stopcopy-tpcc-small",
"reconfig-fast",
"stopcopy-fast",
"reconfig-slow",
]
RECONFIG_CLIENT_COUNT = 5
RECONFIG_CLIENT_COUNT = 1
def updateReconfigurationExperimentEnv(fabric, args, benchmark, partitions ):
partitions_per_site = fabric.env["hstore.partitions_per_site"]
@@ -168,8 +169,8 @@ def updateReconfigurationExperimentEnv(fabric, args, benchmark, partitions ):
fabric.env["client.count"] = RECONFIG_CLIENT_COUNT
fabric.env["client.blocking"] = True
fabric.env["client.output_response_status"] = True
fabric.env["client.threads_per_host"] = min(50, int(partitions * 4))
args["reconfig"] = None
fabric.env["client.threads_per_host"] = min(50, int(partitions * 4))
args["reconfig"] = None
if 'reconfig-tpcc-small' in args['exp_type'] or 'stopcopy-tpcc-small' in args['exp_type']:
fabric.env["client.count"] = RECONFIG_CLIENT_COUNT
@@ -181,10 +182,24 @@ def updateReconfigurationExperimentEnv(fabric, args, benchmark, partitions ):
fabric.env["client.output_txn_profiling_combine"] = True
fabric.env["client.output_txn_counters"] = "txncounters.csv"
fabric.env["client.threads_per_host"] = 50 #partitions * 2 # max(1, int(partitions/2))
fabric.env["client.txnrate"] = 10
fabric.env["client.txnrate"] = 10
fabric.env["site.reconfig_chunk_size_kb"] = 20048
fabric.env["site.reconfig_async_chunk_size_kb"] = 2048
fabric.env["site.commandlog_enable"] = False
fabric.env["benchmark.neworder_multip"] = False
fabric.env["benchmark.payment_multip"] = False
if 'reconfig-slow' == args['exp_type']:
fabric.env["client.blocking_concurrent"] = 1 # * int(partitions/8)
fabric.env["client.count"] = 1
fabric.env["client.blocking"] = True
fabric.env["client.output_response_status"] = True
fabric.env["client.threads_per_host"] = 2
fabric.env["site.reconfig_chunk_size_kb"] = 2048
fabric.env["site.reconfig_async_chunk_size_kb"] = 2048
fabric.env["site.commandlog_enable"] = False
fabric.env["client.txnrate"] = 100
#fabric.env["site.reconfig_async"] = False
fabric.env["benchmark.loadthreads"] = 1
fabric.env["benchmark.requestdistribution"] = "uniform"
@@ -252,7 +252,7 @@ public ReconfigurationTable(PartitionedTable<T> old_table, PartitionedTable<T> n
//Check if we should split
if (conf == null)
return reconfiguration_range;
long currentMax = conf.site.reconfig_max_transfer_bytes;
long currentMax = Math.min(conf.site.reconfig_max_transfer_bytes, conf.site.reconfig_chunk_size_kb*1024);
if (currentMax <= 1)
return reconfiguration_range;
@@ -907,7 +907,6 @@ public void reconfigurationControlMsg(RpcController controller,
if(request.getReconfigControlType() == ReconfigurationControlType.PULL_RECEIVED){
hstore_site.getReconfigurationCoordinator().deleteTuples(request);
} else if(request.getReconfigControlType() == ReconfigurationControlType.CHUNK_RECEIVED){
hstore_site.getReconfigurationCoordinator().queueAsyncDataRequestMessageToWorkQueue(request.getDestPartition());
//TODO : Have to delete tuples for the chunk received messages as well
//hstore_site.getReconfigurationCoordinator().deleteTuples(request);
} else if(request.getReconfigControlType() == ReconfigurationControlType.RECONFIGURATION_DONE) {
Oops, something went wrong.

0 comments on commit ced9841

Please sign in to comment.