Skip to content

Commit

Permalink
add pipeline feature
Browse files Browse the repository at this point in the history
  • Loading branch information
thefab committed Feb 26, 2015
1 parent 49deff6 commit 7a786bc
Showing 1 changed file with 40 additions and 1 deletion.
41 changes: 40 additions & 1 deletion tornadis/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ def get_parameters():
parser.add_argument('-b', '--batch-size',
help="Number of request to send in parallel",
type=int, default=None)
parser.add_argument('-P', '--pipeline',
help="Pipeline requests (honnor batch-size if set)",
action="store_true")
parser.add_argument('-d', '--data-size', default=2,
help="Data size of SET/GET value in bytes (default 2)",
type=int)
Expand Down Expand Up @@ -83,12 +86,45 @@ def multiple_set(self, client_number):
client_number)
self.response_count += resp_count

@tornado.gen.coroutine
def _call_pipeline(self, client, pipeline, client_number):
print "Send {} pipelined requests " \
"with client {}".format(pipeline.number_of_stacked_calls,
client_number)
responses = yield client.call(pipeline)
resp_count = len(responses)
print "Received {} pipelined responses " \
"with client {}".format(resp_count, client_number)
raise tornado.gen.Return(resp_count)

@tornado.gen.coroutine
def pipelined_multiple_set(self, client_number):
client = tornadis.Client()
print "Connect client", client_number
yield client.connect()
print "Client", client_number, "connected"
pipeline_size = self.params.batch_size or self.requests_per_client
print pipeline_size
pipeline = tornadis.Pipeline()
for _ in range(0, self.requests_per_client):
pipeline.stack_call("SET", "benchmark-key", self.value)
if pipeline.number_of_stacked_calls >= pipeline_size:
resp_count = yield self._call_pipeline(client, pipeline,
client_number)
self.response_count += resp_count
pipeline = tornadis.Pipeline()
if pipeline.number_of_stacked_calls > 0:
resp_count = yield self._call_pipeline(client, pipeline,
client_number)
self.response_count += resp_count

def stop_loop(self, future):
excep = future.exception()
if self.response_count == self.request_count:
loop = tornado.ioloop.IOLoop.instance()
loop.stop()
if excep is not None:
print excep
raise(excep)


Expand All @@ -104,7 +140,10 @@ def main():
print "Max requests per client:", benchmark.requests_per_client
before = datetime.datetime.now()
for client_number in xrange(params.clients):
future = benchmark.multiple_set(client_number)
if params.pipeline:
future = benchmark.pipelined_multiple_set(client_number)
else:
future = benchmark.multiple_set(client_number)
loop.add_future(future, benchmark.stop_loop)
loop.start()
after = datetime.datetime.now()
Expand Down

0 comments on commit 7a786bc

Please sign in to comment.