Permalink
Browse files

Moved number of reconfig subplans to property

  • Loading branch information...
Aaron Elmore
Aaron Elmore committed Apr 4, 2014
2 parents 53721eb + afed61d commit 58e110e397cc952db1f70d7cf9ea3fbc33edaf48
View
@@ -166,6 +166,7 @@
<arg value="site.reconfig_replication_delay=${site.reconfig_replication_delay}" />
<arg value="site.reconfig_chunk_size_kb=${site.reconfig_chunk_size_kb}" />
<arg value="site.reconfig_async_chunk_size_kb=${site.reconfig_async_chunk_size_kb}" />
<arg value="site.reconfig_subplan_split=${site.reconfig_subplan_split}" />
<arg value="site.storage_mmap=${site.storage_mmap}" />
<arg value="site.storage_mmap_dir=${site.storage_mmap_dir}" />
<arg value="site.mr_map_blocking=${site.mr_map_blocking}" />
View
107 load.sh
@@ -0,0 +1,107 @@
#!/bin/bash
if [ $# -lt 6 ]
then
echo "Load benchmark in the cluster (EStore project)"
echo "Parameters"
echo "1) benchmark type (e.g. ycsb, voter)"
echo "2) skew (uniform, high-skew, low-skew)"
echo "3) number of tuples"
echo "4) number of partitions per server"
echo "2) initial server"
echo "3) number of servers"
echo "5) servers to skip (optional)"
echo
echo "Press ENTER to continue with default parameters"
read
exit
fi
if [ -z "$1" ]
then
bench="ycsb"
else
bench=$1
fi
if [ -z "$2" ]
then
skew="high-skew"
else
skew=$2
fi
if [[ $2 =~ '(uniform|high-skew|low-skew)' ]]
then
echo "incorrect skew"
exit
fi
if [ $2 = "uniform" ]
then
sed -i".bak" '/requestdistribution/d' properties/benchmarks/ycsb.properties
sed -i".bak" '/skew_factor/d' properties/benchmarks/ycsb.properties
sed -i".bak" '/num_hot_spots/d' properties/benchmarks/ycsb.properties
sed -i".bak" '/percent_accesses_to_hot_spots/d' properties/benchmarks/ycsb.properties
echo "requestdistribution=uniform" >> properties/benchmarks/ycsb.properties
fi
if [ $2 = "low-skew" ]
then
sed -i".bak" '/requestdistribution/d' properties/benchmarks/ycsb.properties
sed -i".bak" '/skew_factor/d' properties/benchmarks/ycsb.properties
sed -i".bak" '/num_hot_spots/d' properties/benchmarks/ycsb.properties
sed -i".bak" '/percent_accesses_to_hot_spots/d' properties/benchmarks/ycsb.properties
echo "requestdistribution=zipfian" >> properties/benchmarks/ycsb.properties
echo "skew_factor = 0.65" >> properties/benchmarks/ycsb.properties
fi
if [ $2 = "high-skew" ]
then
sed -i".bak" '/requestdistribution/d' properties/benchmarks/ycsb.properties
sed -i".bak" '/skew_factor/d' properties/benchmarks/ycsb.properties
sed -i".bak" '/num_hot_spots/d' properties/benchmarks/ycsb.properties
sed -i".bak" '/percent_accesses_to_hot_spots/d' properties/benchmarks/ycsb.properties
echo "requestdistribution=zipfian" >> properties/benchmarks/ycsb.properties
echo "skew_factor = 0.65" >> properties/benchmarks/ycsb.properties
echo "num_hot_spots=30" >> properties/benchmarks/ycsb.properties
echo "percent_accesses_to_hot_spots=0.80" >> properties/benchmarks/ycsb.properties
fi
if [ -z "$3" ]
then
tuples=60000000
else
tuples=$3
fi
sed -i".bak" '/num_records/d' properties/benchmarks/ycsb.properties
echo "num_records = $tuples" >> properties/benchmarks/ycsb.properties
if [ -z "$4" ]
then
part_per_server=6
else
part_per_server=$4
fi
if [ -z "$5" ]
then
init_server=5
else
init_server=$5
fi
if [ -z "$6" ]
then
num_servers=5
else
num_servers=$6
fi
skip_list="${@:7}"
partitions=$(( $part_per_server * $num_servers ))
python scripts/reconfiguration/plan-generator.py -t ycsb -s $tuples -p $partitions > plan.json
./prepare.sh $bench $init_server $num_servers $part_per_server $skip_list
ant hstore-benchmark -Dproject=$bench -Dglobal.hasher_plan=plan.json -Dglobal.hasher_class=edu.brown.hashing.TwoTieredRangeHasher -Dnoshutdown=true -Dnoexecute=true -Dsite.txn_restart_limit_sysproc=100 -Dsite.jvm_asserts=false -Dsite.reconfig_live=false | tee load.log
View
@@ -0,0 +1,49 @@
#!/bin/bash
if [ $# -lt 4 ]
then
echo "Allocates partitions starting from server da${init_server}"
echo "Parameters"
echo "1) benchmark type (e.g. tpcc, ycsb)"
echo "2) initial server"
echo "3) number of servers"
echo "4) number of partitions per server"
echo "5) servers to skip (optional)"
exit
fi
cmd="ant hstore-prepare -Dproject=$1 -Dhosts=\""
init_server=$2
num_servers=$3
part_per_server=$4
skip_list="${@:5}"
curr_host=0
curr_part=0
first_loop=""
for (( var=1; var<=$num_servers; var++ ))
do
end_part=`expr $curr_part + $part_per_server - 1`
curr_server=`expr $init_server + $curr_host`
if [[ $skip_list =~ $curr_server ]]
then
var=`expr $var - 1`
curr_host=`expr $curr_host + 1`
else
if [ "$curr_server" -lt "10" ]
then
cmd="${cmd}${first_loop}da0${curr_server}:${curr_host}:${curr_part}-${end_part}"
else
cmd="${cmd}${first_loop}da${curr_server}:${curr_host}:${curr_part}-${end_part}"
fi
curr_part=`expr $curr_part + $part_per_server`
curr_host=`expr $curr_host + 1`
first_loop=";"
fi
done
echo "$cmd\""
set -x
eval "$cmd\""
@@ -0,0 +1,15 @@
#!/bin/bash
if [ $# -eq 0 ]
then
echo ""
echo "enter site ID (two digits) and partition ID (three digits)"
echo "the site and the partitions must be local"
echo "example: ${0} 00 001 checks site 0 and partition 1"
echo ""
exit
fi
PID=$(top -n1 | grep -m1 java | perl -pe 's/\e\[?.*?[\@-~] ? ?//g' | cut -f1 -d' ')
NID=$(printf '%d' $(jstack $PID | grep -m1 "H${1}-${2}" | cut -d '=' -f 4 | cut -d ' ' -f 1))
top -H -n1 | grep -m1 $NID
@@ -25,10 +25,13 @@ def sinewave(steps, max_factor, frequency = 2.0):
print "ERROR: frequency must be greater than one"
exit(1)
for current in range(steps):
lastRate = 1.0
for current in range(1, steps+1):
ampl = (max_factor - 1.0) / 2.0
radians = math.pi * 2 * (frequency * float(current)/float(steps))
res.append(ampl * math.sin(radians + 1.5 * math.pi) + ampl + 1.0)
currentRate = ampl * math.sin(radians + 1.5 * math.pi) + ampl + 1.0
res.append(currentRate/lastRate)
lastRate = currentRate
return res
@@ -50,9 +53,12 @@ def linear(steps, max_factor):
print "ERROR: steps must be greater than one"
exit(1)
increment = (max_factor - 1.0) / float(steps)
for current in range(steps):
res.append(1.0 + float(current) * increment);
lastRate = 1.0
stepIncrement = (max_factor - 1.0) / (float(steps) - 1)
for current in range(1, steps+1):
currentRate = 1.0 + float(current) * stepIncrement
res.append(currentRate / lastRate);
lastRate = currentRate
return res
@@ -85,16 +91,16 @@ def spike(steps, max_factor, period, duration):
exit(1)
in_spike = False
last_spike = 0
for current in range(1,steps+1):
if not in_spike and current % period == 0:
if not in_spike and (current - last_spike) % period == 0:
in_spike = True
spike_started = current
res.append(max_factor)
elif in_spike and (current - spike_started) < duration:
res.append(max_factor)
elif in_spike and (current - spike_started) >= duration:
in_spike = False
res.append(1.0)
last_spike = current
res.append(1.0 / max_factor)
else:
res.append(1.0)
@@ -64,6 +64,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Random;
import org.apache.commons.lang.NotImplementedException;
import org.apache.log4j.Logger;
@@ -234,7 +235,9 @@
private static final UtilityWorkMessage UTIL_WORK_MSG = new UtilityWorkMessage();
private static final UpdateMemoryMessage STATS_WORK_MSG = new UpdateMemoryMessage();
private static long MIN_MS_BETWEEN_ASYNC_PULLS = 500;
private static int MIN_MS_BETWEEN_ASYNC_PULLS = 100;
private static int RAND_MS_BETWEEN_ASYNC_PULLS = 200;
private static Random rand = new Random();
private ReconfigurationState reconfig_state;
@@ -1237,15 +1240,14 @@ private boolean utilityWork() {
}
this.idle_click_count = 0;
nextAsyncPullTimeMS = System.currentTimeMillis() + MIN_MS_BETWEEN_ASYNC_PULLS;
nextAsyncPullTimeMS = System.currentTimeMillis() + MIN_MS_BETWEEN_ASYNC_PULLS + rand.nextInt(RAND_MS_BETWEEN_ASYNC_PULLS);
this.work_queue.offer(asyncRequestPullQueue.remove());
}
else if (reconfiguration_coordinator.getReconfigurationInProgress()
//&& asyncOutstanding.get() == false
&& reconfiguration_coordinator.queueAsyncPull()
&& this.scheduleAsyncPullQueue.isEmpty() == false
&& (idle_click_count > MAX_PULL_ASYNC_EVERY_CLICKS || System.currentTimeMillis() > this.nextAsyncPullTimeMS )){
&& ((idle_click_count > MAX_PULL_ASYNC_EVERY_CLICKS && asyncOutstanding.get() == false) || System.currentTimeMillis() > this.nextAsyncPullTimeMS )){
if (idle_click_count > MAX_PULL_ASYNC_EVERY_CLICKS) {
LOG.info(String.format(" ### Pulling and scheduling the next async pull from the scheduleAsyncPullQueue due to IDLE Clicks. Items : %s IdleCount:%s", scheduleAsyncPullQueue.size(),idle_click_count));
} else {
@@ -1257,7 +1259,7 @@ else if (reconfiguration_coordinator.getReconfigurationInProgress()
}
this.idle_click_count = 0;
nextAsyncPullTimeMS = System.currentTimeMillis() + MIN_MS_BETWEEN_ASYNC_PULLS;
nextAsyncPullTimeMS = System.currentTimeMillis() + MIN_MS_BETWEEN_ASYNC_PULLS + rand.nextInt(RAND_MS_BETWEEN_ASYNC_PULLS);
ScheduleAsyncPullRequestMessage pullMsg = scheduleAsyncPullQueue.poll();
if (pullMsg != null){
this.work_queue.offer(pullMsg);
@@ -6507,14 +6509,14 @@ public void receiveTuples(Long txnId, int oldPartitionId, int newPartitionId, St
if (moreDataComing) {
this.reconfiguration_tracker.markRangeAsPartiallyReceived(new ReconfigurationRange<Long>
(table_name, VoltType.BIGINT, minInclusive, maxExclusive, oldPartitionId, newPartitionId));
nextAsyncPullTimeMS = System.currentTimeMillis() + MIN_MS_BETWEEN_ASYNC_PULLS;
nextAsyncPullTimeMS = System.currentTimeMillis() + MIN_MS_BETWEEN_ASYNC_PULLS + rand.nextInt(RAND_MS_BETWEEN_ASYNC_PULLS);
} else {
this.reconfiguration_tracker.markRangeAsReceived(new ReconfigurationRange<Long>
(table_name, VoltType.BIGINT, minInclusive, maxExclusive, oldPartitionId, newPartitionId));
if(isAsyncRequest){
LOG.info("Last chunk received for async request, unsetting async in progress");
asyncOutstanding.set(false);
nextAsyncPullTimeMS = System.currentTimeMillis() + MIN_MS_BETWEEN_ASYNC_PULLS;
nextAsyncPullTimeMS = System.currentTimeMillis() + MIN_MS_BETWEEN_ASYNC_PULLS + rand.nextInt(RAND_MS_BETWEEN_ASYNC_PULLS);
}
}
@@ -803,6 +803,13 @@
)
public int reconfig_async_chunk_size_kb;
@ConfigProperty(
description="How many subplans should a reconfiguration be split into.",
defaultInt=1,
experimental=true
)
public int reconfig_subplan_split;
// ----------------------------------------------------------------------------
// Storage Options
// ----------------------------------------------------------------------------
@@ -135,7 +135,7 @@
private Map<Integer,Integer> livePullKBMap;
private List<ReconfigurationPlan> reconfigPlanQueue;
private int reconfig_split = 100;
private int reconfig_split = 1;
public static long STOP_COPY_TXNID = -2L;
@@ -196,6 +196,9 @@ public ReconfigurationCoordinator(HStoreSite hstore_site, HStoreConf hstore_conf
async_queue_pulls = hstore_conf.site.reconfig_async_pull;
live_pull = hstore_conf.site.reconfig_live;
//Default reconfig plan split
this.reconfig_split = hstore_conf.site.reconfig_subplan_split;
if (hstore_conf.site.reconfig_async == true){
LOG.debug("Updating transfer bytes");
ReconfigurationConstants.MAX_TRANSFER_BYTES =
@@ -611,7 +614,7 @@ public void receiveNextReconfigurationPlanFromLeader() {
executor.initReconfiguration(rplan, reconfigurationProtocol, ReconfigurationState.PREPARE, this.planned_partitions);
this.partitionStates.put(executor.getPartitionId(), ReconfigurationState.PREPARE);
}
FileUtil.appendEventToFile("RECONFIGURATION_NEXT_PLAN, siteId="+this.hstore_site.getSiteId());
FileUtil.appendEventToFile("RECONFIGURATION_NEXT_PLAN, siteId="+this.hstore_site.getSiteId() + " plansRemaining=" + this.reconfigPlanQueue.size());
} else {
LOG.error("Leader expected next reconfig plan, but planQueue was empty");
}
Oops, something went wrong.

0 comments on commit 58e110e

Please sign in to comment.