Skip to content
This repository
Browse code

Add work/result batching; use blocking sockets when possible.

fixes #47.  fixes #46
  • Loading branch information...
commit e96c86bc058ba6d5d5b21d8190e19d232e15f716 1 parent 28ade3c
Darrell Bishop authored March 28, 2013
36  README.rst
Source Rendered
@@ -232,9 +232,11 @@ runs a benchmark scenario::
232 232
                                      [--os-cacert <ca-certificate>] [--insecure]
233 233
                                      [-S STORAGE_URL] [-T TOKEN] [-c COUNT]
234 234
                                      [-u COUNT] [-o COUNT] [--workers COUNT]
235  
-                                     [-q] [--profile] [--noop] [-k]
  235
+                                     [--batch-size COUNT] [-q] [--profile]
  236
+                                     [--noop] [-k]
  237
+                                     [--connect-timeout CONNECT_TIMEOUT]
  238
+                                     [--network-timeout NETWORK_TIMEOUT]
236 239
                                      [-s STATS_FILE] [-r] [--pctile PERCENTILE]
237  
-
238 240
   ...
239 241
 
240 242
 
@@ -244,6 +246,7 @@ previously-run benchmark scenario::
244 246
   $ ssbench-master report-scenario -h
245 247
   usage: ssbench-master report-scenario [-h] -s STATS_FILE [-f REPORT_FILE]
246 248
                                         [--pctile PERCENTILE] [-r RPS_HISTOGRAM]
  249
+                                        [--profile]
247 250
 
248 251
   ...
249 252
 
@@ -404,6 +407,35 @@ command.  Simply use the ``--workers COUNT`` option to ``ssbench-master``::
404 407
     ssbench-master report-scenario -s /tmp/ssbench-results/Small_test_scenario.2013-02-20.091016.stat
405 408
 
406 409
 
  410
+Scalability and Throughput
  411
+--------------------------
  412
+
  413
+Assuming the Swift cluster being benchmarked is not the bottleneck, the
  414
+scalability of ssbench may be increased by
  415
+
  416
+- Running up to one ``ssbench-worker`` process per CPU core on any number of
  417
+  benchmarking servers.
  418
+- Increasing the default ``--batch-size`` parameter (defaults to 1) on both the
  419
+  ``ssbench-master`` and ``ssbench-worker`` command-lines.  Note that if you
  420
+  are running everything on one server and using the ``--workers`` argument to
  421
+  ``ssbench-master``, the ``--batch-size`` parameter passed to
  422
+  ``ssbench-master`` will be passed on to the automatically-started
  423
+  ``ssbench-worker`` processes.
  424
+- For optimal scalability, the user-count (concurrency) should be greater than
  425
+  and also an even multiple of both the batch-size and number of
  426
+  ``ssbench-worker`` processes.
  427
+
  428
+As a simple example, on my quad-core MacBook Pro, I get around **9,800** requests
  429
+per second with ``--noop`` (see below) with this command-line (a
  430
+``--batch-size`` of 1)::
  431
+
  432
+  $ ssbench-master run-scenario ... -u 24 -o 30000 --workers 3 --noop
  433
+
  434
+But with a ``--batch-size`` of 8, I can get around **19,500** requests per second::
  435
+
  436
+  $ ssbench-master run-scenario ... -u 24 -o 30000 --workers 3 --noop --batch-size 8
  437
+
  438
+
407 439
 HTTPS on OS X
408 440
 -------------
409 441
 
42  bin/ssbench-master
@@ -49,6 +49,14 @@ def run_scenario(master, args):
49 49
             if args.user_count != 'value from scenario' else None
50 50
     operation_count = int(args.op_count) \
51 51
             if args.op_count != 'value from scenario' else None
  52
+
  53
+    # Sanity-check batch_size
  54
+    if args.batch_size > user_count:
  55
+        logger.warning('--batch-size %d was > --user-count %d; using ',
  56
+                       '--batch-size %d', args.batch_size, user_count,
  57
+                       user_count)
  58
+        args.batch_size = user_count
  59
+
52 60
     if args.noop:
53 61
         scenario_class = ScenarioNoop
54 62
         logging.info('NOTE: --noop was specified; not testing Swift.')
@@ -83,7 +91,8 @@ def run_scenario(master, args):
83 91
                 'ssbench-worker', '--zmq-host', zmq_host,
84 92
                 '--zmq-work-port', str(args.zmq_work_port),
85 93
                 '--zmq-results-port', str(args.zmq_results_port),
86  
-                '--concurrency', str(users_per_worker)]
  94
+                '--concurrency', str(users_per_worker),
  95
+                '--batch-size', str(args.batch_size)]
87 96
             if args.profile:
88 97
                 worker_cmd = worker_cmd + [
89 98
                     '--profile-count',
@@ -112,8 +121,13 @@ def run_scenario(master, args):
112 121
                                       storage_url=args.storage_url,
113 122
                                       token=args.token, noop=args.noop,
114 123
                                       with_profiling=args.profile,
115  
-                                      keep_objects=args.keep_objects)
  124
+                                      keep_objects=args.keep_objects,
  125
+                                      batch_size=args.batch_size)
  126
+        logging.debug('  dumping %d results to %r', len(results), args.stats_file)
  127
+        dump_start = time.time()
116 128
         pickle.dump([scenario, results], args.stats_file)
  129
+        logging.debug('  done dumping results (took %.2fs)',
  130
+                      time.time() - dump_start)
117 131
     finally:
118 132
         # Make sure any local spawned workers get killed
119 133
         if worker_count:
@@ -131,11 +145,14 @@ def run_scenario(master, args):
131 145
                                  int(os.environ['SUDO_GID']))
132 146
 
133 147
     if not args.no_default_report:
  148
+        report_start = time.time()
134 149
         args.stats_file.close()
135 150
         args.stats_file = stats_file_path
136 151
         args.report_file = sys.stdout
137 152
         args.rps_histogram = None
138 153
         report_scenario(master, args)
  154
+        logging.debug('  scenario report took %.2fs',
  155
+                      time.time() - report_start)
139 156
     else:
140 157
         args.stats_file.close()
141 158
 
@@ -150,6 +167,11 @@ def run_scenario(master, args):
150 167
 
151 168
 
152 169
 def report_scenario(master, args):
  170
+    if args.profile:
  171
+        import cProfile
  172
+        prof = cProfile.Profile()
  173
+        prof.enable()
  174
+
153 175
     if args.stats_file.endswith('.gz'):
154 176
         args.stats_file = GzipFile(args.stats_file, 'rb')
155 177
     else:
@@ -163,6 +185,12 @@ def report_scenario(master, args):
163 185
         # Note: not explicitly closing here in case it's redirected to STDOUT
164 186
         # (i.e. "-")
165 187
 
  188
+    if args.profile:
  189
+        prof.disable()
  190
+        prof_output_path = '/tmp/report_scenario.%d.prof' % os.getpid()
  191
+        prof.dump_stats(prof_output_path)
  192
+        logging.info('PROFILED report_scenario to %s', prof_output_path)
  193
+
166 194
 
167 195
 if __name__ == "__main__":
168 196
     arg_parser = argparse.ArgumentParser(
@@ -317,6 +345,13 @@ if __name__ == "__main__":
317 345
         help='Spawn COUNT local ssbench-worker processes just for this '
318 346
         'run. To workers on other hosts, they must be started manually.')
319 347
     run_scenario_arg_parser.add_argument(
  348
+        '--batch-size', metavar='COUNT', type=int,
  349
+        default=1,
  350
+        help='Send bench jobs to workers in batches of this size to '
  351
+        'increase benchmarking throughput; for best results, '
  352
+        'user-count should be greater than and an even multiple of '
  353
+        'both batch-size and worker count.')
  354
+    run_scenario_arg_parser.add_argument(
320 355
         '-q', '--quiet', action='store_true', default=False,
321 356
         help='Suppress most output (including progress characters during '
322 357
         'run).')
@@ -370,6 +405,9 @@ if __name__ == "__main__":
370 405
         '-r', '--rps-histogram', type=argparse.FileType('w'),
371 406
         help='Also write a CSV file with requests completed per second '
372 407
         'histogram data')
  408
+    report_scenario_arg_parser.add_argument(
  409
+        '--profile', action='store_true', default=False,
  410
+        help='Profile the report generation.')
373 411
     report_scenario_arg_parser.set_defaults(func=report_scenario)
374 412
 
375 413
     args = arg_parser.parse_args(sys.argv[1:])
8  bin/ssbench-worker
@@ -40,6 +40,12 @@ if __name__ == "__main__":
40 40
                             'provide.')
41 41
     arg_parser.add_argument('--retries', default=10, type=int,
42 42
                             help='Maximum number of times to retry a job.')
  43
+    arg_parser.add_argument(
  44
+        '--batch-size', metavar='COUNT', type=int, default=1,
  45
+        help='Send back bench results in batches of this size to '
  46
+        'increase benchmarking throughput; for best results, '
  47
+        'this should match the --batch-size specified in the ssbench-master '
  48
+        'command-line.')
43 49
     arg_parser.add_argument('-p', '--profile-count', type=int, metavar='COUNT',
44 50
                             default=0,
45 51
                             help='Profile %(metavar)s work jobs, starting '
@@ -56,5 +62,5 @@ if __name__ == "__main__":
56 62
     worker = Worker(args.zmq_host, args.zmq_work_port, args.zmq_results_port,
57 63
                     args.worker_id, args.retries,
58 64
                     profile_count=args.profile_count,
59  
-                    concurrency=args.concurrency)
  65
+                    concurrency=args.concurrency, batch_size=args.batch_size)
60 66
     worker.go()
158  ssbench/master.py
@@ -76,41 +76,42 @@ def __init__(self, zmq_bind_ip=None, zmq_work_port=None,
76 76
         self.network_timeout = network_timeout
77 77
         self.quiet = quiet
78 78
 
79  
-    def process_result_to(self, job, processor, label=''):
80  
-        result = msgpack.loads(job)
81  
-        logging.debug('RESULT: %13s %s/%-17s %s/%s %s',
82  
-                      result['type'], result['container'], result['name'],
83  
-                      '%7.4f' % result.get('first_byte_latency')
84  
-                      if result.get('first_byte_latency', None) else ' (none)',
85  
-                      '%7.4f' % result.get('last_byte_latency')
86  
-                      if result.get('last_byte_latency', None) else '(none) ',
87  
-                      result.get('trans_id', ''))
88  
-        if label and not self.quiet:
89  
-            if 'exception' in result:
90  
-                sys.stderr.write('X')
91  
-            elif result.get('first_byte_latency', None) is not None:
92  
-                if result['first_byte_latency'] < 1:
93  
-                    sys.stderr.write('.')
94  
-                elif result['first_byte_latency'] < 3:
95  
-                    sys.stderr.write('o')
96  
-                elif result['first_byte_latency'] < 10:
97  
-                    sys.stderr.write('O')
98  
-                else:
99  
-                    sys.stderr.write('*')
100  
-            else:
101  
-                if result['last_byte_latency'] < 1:
102  
-                    sys.stderr.write('_')
103  
-                elif result['last_byte_latency'] < 3:
104  
-                    sys.stderr.write('|')
105  
-                elif result['last_byte_latency'] < 10:
106  
-                    sys.stderr.write('^')
  79
+    def process_results_to(self, results, processor, label=''):
  80
+        for result in results:
  81
+            logging.debug('RESULT: %13s %s/%-17s %s/%s %s',
  82
+                        result['type'], result['container'], result['name'],
  83
+                        '%7.4f' % result.get('first_byte_latency')
  84
+                        if result.get('first_byte_latency', None) else ' (none)',
  85
+                        '%7.4f' % result.get('last_byte_latency')
  86
+                        if result.get('last_byte_latency', None) else '(none) ',
  87
+                        result.get('trans_id', ''))
  88
+            if label and not self.quiet:
  89
+                if 'exception' in result:
  90
+                    sys.stderr.write('X')
  91
+                elif result.get('first_byte_latency', None) is not None:
  92
+                    if result['first_byte_latency'] < 1:
  93
+                        sys.stderr.write('.')
  94
+                    elif result['first_byte_latency'] < 3:
  95
+                        sys.stderr.write('o')
  96
+                    elif result['first_byte_latency'] < 10:
  97
+                        sys.stderr.write('O')
  98
+                    else:
  99
+                        sys.stderr.write('*')
107 100
                 else:
108  
-                    sys.stderr.write('@')
109  
-            sys.stderr.flush()
110  
-        processor(result)
  101
+                    if result['last_byte_latency'] < 1:
  102
+                        sys.stderr.write('_')
  103
+                    elif result['last_byte_latency'] < 3:
  104
+                        sys.stderr.write('|')
  105
+                    elif result['last_byte_latency'] < 10:
  106
+                        sys.stderr.write('^')
  107
+                    else:
  108
+                        sys.stderr.write('@')
  109
+                sys.stderr.flush()
  110
+            processor(result)
111 111
 
112 112
     def do_a_run(self, concurrency, job_generator, result_processor,
113  
-                 auth_kwargs, mapper_fn=None, label='', noop=False):
  113
+                 auth_kwargs, mapper_fn=None, label='', noop=False,
  114
+                 batch_size=1):
114 115
         if label and not self.quiet:
115 116
             print >>sys.stderr, label + """
116 117
   X    work job raised an exception
@@ -123,50 +124,62 @@ def do_a_run(self, concurrency, job_generator, result_processor,
123 124
   ^  < 10s last-byte-latency  (CREATE or UPDATE)
124 125
   @ >= 10s last-byte-latency  (CREATE or UPDATE)
125 126
             """.rstrip()
126  
-        poller = zmq.Poller()
127  
-        poller.register(self.results_pull, zmq.POLLIN)
128  
-        active = 0
129  
-        for job in job_generator:
  127
+
  128
+        def _job_decorator(raw_job):
130 129
             if mapper_fn is not None:
131  
-                work_job = mapper_fn(job)
  130
+                work_job = mapper_fn(raw_job)
132 131
                 if not work_job:
133 132
                     if noop:
134  
-                        job['container'] = 'who_cares'
135  
-                        job['name'] = 'who_cares'
  133
+                        work_job = raw_job
  134
+                        work_job['container'] = 'who_cares'
  135
+                        work_job['name'] = 'who_cares'
136 136
                     else:
137  
-                        logging.warning('Unable to fill in job %r', job)
138  
-                        continue
139  
-                else:
140  
-                    job = work_job
141  
-            job['auth_kwargs'] = auth_kwargs
142  
-            job['connect_timeout'] = self.connect_timeout
143  
-            job['network_timeout'] = self.network_timeout
  137
+                        logging.warning('Unable to fill in job %r', raw_job)
  138
+                        return None
  139
+            else:
  140
+                work_job = raw_job
  141
+            work_job['auth_kwargs'] = auth_kwargs
  142
+            work_job['connect_timeout'] = self.connect_timeout
  143
+            work_job['network_timeout'] = self.network_timeout
  144
+            return work_job
  145
+
  146
+        active = 0
  147
+        for raw_job in job_generator:
  148
+            work_job = _job_decorator(raw_job)
  149
+            if not work_job:
  150
+                logging.warning('Unable to fill in job %r', raw_job)
  151
+                continue
  152
+
  153
+            send_q = [work_job]
144 154
 
145 155
             logging.debug('active: %d\tconcurrency: %d', active, concurrency)
146  
-            timeout = 0
147 156
             if active >= concurrency:
148  
-                timeout = None
149  
-            socks = dict(poller.poll(timeout))
150  
-            if self.results_pull in socks \
151  
-                    and socks[self.results_pull] == zmq.POLLIN:
152  
-                result_job = self.results_pull.recv()
153  
-                self.process_result_to(result_job, result_processor,
154  
-                                       label=label)
155  
-                active -= 1
156  
-            self.work_push.send(msgpack.dumps(job))
157  
-            active += 1
  157
+                result_jobs_raw = self.results_pull.recv()
  158
+                result_jobs = msgpack.loads(result_jobs_raw)
  159
+                self.process_results_to(result_jobs, result_processor,
  160
+                                        label=label)
  161
+                active -= len(result_jobs)
  162
+
  163
+            while len(send_q) < min(batch_size, concurrency - active):
  164
+                try:
  165
+                    work_job = _job_decorator(job_generator.next())
  166
+                    send_q.append(work_job)
  167
+                except StopIteration:
  168
+                    break
  169
+
  170
+            logging.debug('len(send_q): %d', len(send_q))
  171
+            self.work_push.send(msgpack.dumps(send_q))
  172
+            active += len(send_q)
158 173
 
159 174
         # Drain the results
160 175
         logging.debug('All jobs sent; awaiting results...')
161 176
         while active > 0:
162 177
             logging.debug('Draining results: active = %d', active)
163  
-            socks = dict(poller.poll())
164  
-            if self.results_pull in socks \
165  
-                    and socks[self.results_pull] == zmq.POLLIN:
166  
-                result_job = self.results_pull.recv()
167  
-                self.process_result_to(result_job, result_processor,
168  
-                                       label=label)
169  
-                active -= 1
  178
+            result_jobs_raw = self.results_pull.recv()
  179
+            result_jobs = msgpack.loads(result_jobs_raw)
  180
+            self.process_results_to(result_jobs, result_processor,
  181
+                                    label=label)
  182
+            active -= len(result_jobs)
170 183
         if label and not self.quiet:
171 184
             sys.stderr.write('\n')
172 185
             sys.stderr.flush()
@@ -184,7 +197,7 @@ def kill_workers(self, timeout=5):
184 197
             # all the workers have died.  Also, gevent.Timeout() doesn't seem
185 198
             # to work here?!
186 199
             signal.alarm(int(timeout))
187  
-            self.work_push.send(msgpack.dumps({'type': 'PING'}))
  200
+            self.work_push.send(msgpack.dumps([{'type': 'PING'}]))
188 201
             socks = dict(poller.poll(timeout * 1500))
189 202
             if self.results_pull in socks \
190 203
                     and socks[self.results_pull] == zmq.POLLIN:
@@ -192,7 +205,7 @@ def kill_workers(self, timeout=5):
192 205
                 result = msgpack.loads(result_packed)
193 206
                 logging.info('Heard from worker id=%d; sending SUICIDE',
194 207
                             result['worker_id'])
195  
-                self.work_push.send(msgpack.dumps({'type': 'SUICIDE'}))
  208
+                self.work_push.send(msgpack.dumps([{'type': 'SUICIDE'}]))
196 209
                 gevent.sleep(0.1)
197 210
             else:
198 211
                 break
@@ -200,7 +213,8 @@ def kill_workers(self, timeout=5):
200 213
 
201 214
     def run_scenario(self, scenario, auth_url, user, key, auth_version,
202 215
                      os_options, cacert, insecure, storage_url, token,
203  
-                     noop=False, with_profiling=False, keep_objects=False):
  216
+                     noop=False, with_profiling=False, keep_objects=False,
  217
+                     batch_size=1):
204 218
         """
205 219
         Runs a CRUD scenario, given cluster parameters and a Scenario object.
206 220
 
@@ -220,6 +234,7 @@ def run_scenario(self, scenario, auth_url, user, key, auth_version,
220 234
         :param noop: Run in no-op mode?
221 235
         :param with_profiing: Profile the run?
222 236
         :param keep_objects: Keep uploaded objects instead of deleting them?
  237
+        :param batch_size: Send this many bench jobs per packet to workers
223 238
         :param returns: Collected result records from workers
224 239
         """
225 240
 
@@ -274,7 +289,8 @@ def run_scenario(self, scenario, auth_url, user, key, auth_version,
274 289
                          'concurrent workers)', scenario.user_count)
275 290
 
276 291
             self.do_a_run(scenario.user_count, scenario.initial_jobs(),
277  
-                          run_state.handle_initialization_result, auth_kwargs)
  292
+                          run_state.handle_initialization_result, auth_kwargs,
  293
+                          batch_size=batch_size)
278 294
 
279 295
         logging.info('Starting benchmark run (up to %d concurrent '
280 296
                      'workers)', scenario.user_count)
@@ -288,7 +304,7 @@ def run_scenario(self, scenario, auth_url, user, key, auth_version,
288 304
         self.do_a_run(scenario.user_count, scenario.bench_jobs(),
289 305
                       run_state.handle_run_result, auth_kwargs,
290 306
                       mapper_fn=run_state.fill_in_job,
291  
-                      label='Benchmark Run:', noop=noop)
  307
+                      label='Benchmark Run:', noop=noop, batch_size=batch_size)
292 308
         if with_profiling:
293 309
             prof.disable()
294 310
             prof_output_path = '/tmp/do_a_run.%d.prof' % os.getpid()
@@ -300,8 +316,8 @@ def run_scenario(self, scenario, auth_url, user, key, auth_version,
300 316
             self.do_a_run(scenario.user_count,
301 317
                           run_state.cleanup_object_infos(),
302 318
                           lambda *_: None,
303  
-                          auth_kwargs,
304  
-                          mapper_fn=_gen_cleanup_job)
  319
+                          auth_kwargs, mapper_fn=_gen_cleanup_job,
  320
+                          batch_size=batch_size)
305 321
         elif keep_objects:
306 322
             logging.info('NOT deleting any objects due to -k/--keep-objects')
307 323
 
22  ssbench/tests/test_worker.py
@@ -282,10 +282,10 @@ def test_handle_upload_object(self):
282 282
         }).once
283 283
         self.time_expectation.once
284 284
         self.result_queue.should_receive('put').with_args(
285  
-            msgpack.dumps(worker.add_dicts(
  285
+            worker.add_dicts(
286 286
                 object_info, worker_id=self.worker_id,
287 287
                 first_byte_latency=0.492393, last_byte_latency=8.23283,
288  
-                trans_id='abcdef', completed_at=self.stub_time)),
  288
+                trans_id='abcdef', completed_at=self.stub_time),
289 289
         ).once
290 290
         self.mock_worker.handle_upload_object(object_info)
291 291
 
@@ -305,10 +305,10 @@ def test_handle_delete_object(self):
305 305
             'x-trans-id': '9bjkk',
306 306
         }).once
307 307
         self.result_queue.should_receive('put').with_args(
308  
-            msgpack.dumps(worker.add_dicts(
  308
+            worker.add_dicts(
309 309
                 object_info, worker_id=self.worker_id,
310 310
                 first_byte_latency=0.94932, last_byte_latency=8.3273,
311  
-                trans_id='9bjkk', completed_at=self.stub_time)),
  311
+                trans_id='9bjkk', completed_at=self.stub_time),
312 312
         ).once
313 313
         self.mock_worker.handle_delete_object(object_info)
314 314
 
@@ -331,10 +331,10 @@ def test_handle_update_object(self):
331 331
             'x-trans-id': 'biejs',
332 332
         }).once
333 333
         self.result_queue.should_receive('put').with_args(
334  
-            msgpack.dumps(worker.add_dicts(
  334
+            worker.add_dicts(
335 335
                 object_info, worker_id=self.worker_id,
336 336
                 completed_at=self.stub_time, trans_id='biejs',
337  
-                first_byte_latency=4.45, last_byte_latency=23.283)),
  337
+                first_byte_latency=4.45, last_byte_latency=23.283),
338 338
         ).once
339 339
 
340 340
         self.mock_worker.handle_update_object(object_info)
@@ -357,10 +357,10 @@ def test_handle_get_object(self):
357 357
             'x-trans-id': 'bies',
358 358
         }).once
359 359
         self.result_queue.should_receive('put').with_args(
360  
-            msgpack.dumps(worker.add_dicts(
  360
+            worker.add_dicts(
361 361
                 object_info, worker_id=self.worker_id,
362 362
                 completed_at=self.stub_time, trans_id='bies',
363  
-                first_byte_latency=5.33, last_byte_latency=9.99)),
  363
+                first_byte_latency=5.33, last_byte_latency=9.99),
364 364
         ).once
365 365
 
366 366
         self.mock_worker.handle_get_object(object_info)
@@ -376,7 +376,7 @@ def test_dispatching_socket_exception(self):
376 376
         ).once
377 377
         got = []
378 378
         self.result_queue.should_receive('put').replace_with(
379  
-            lambda value: got.append(msgpack.loads(value))).once
  379
+            lambda value: got.append(value)).once
380 380
 
381 381
         self.mock_worker.handle_job(info)
382 382
         assert_equal(1, len(got), repr(got))
@@ -396,7 +396,7 @@ def test_dispatching_client_exception(self):
396 396
         ).once
397 397
         got = []
398 398
         self.result_queue.should_receive('put').replace_with(
399  
-            lambda value: got.append(msgpack.loads(value))).once
  399
+            lambda value: got.append(value)).once
400 400
 
401 401
         self.mock_worker.handle_job(info)
402 402
         assert_equal(1, len(got), repr(got))
@@ -416,7 +416,7 @@ def test_dispatching_value_error_exception(self):
416 416
         ).once
417 417
         got = []
418 418
         self.result_queue.should_receive('put').replace_with(
419  
-            lambda value: got.append(msgpack.loads(value))).once
  419
+            lambda value: got.append(value)).once
420 420
 
421 421
         self.mock_worker.handle_job(info)
422 422
         assert_equal(1, len(got), repr(got))
71  ssbench/worker.py
@@ -100,12 +100,13 @@ def create(self, is_initial=False):
100 100
 
101 101
 class Worker:
102 102
     def __init__(self, zmq_host, zmq_work_port, zmq_results_port, worker_id,
103  
-                 max_retries, profile_count=0, concurrency=256):
  103
+                 max_retries, profile_count=0, concurrency=256, batch_size=1):
104 104
         work_endpoint = 'tcp://%s:%d' % (zmq_host, zmq_work_port)
105 105
         results_endpoint = 'tcp://%s:%d' % (zmq_host, zmq_results_port)
106 106
         self.worker_id = worker_id
107 107
         self.max_retries = max_retries
108 108
         self.profile_count = profile_count
  109
+        self.batch_size = batch_size
109 110
 
110 111
         soft_nofile, hard_nofile = resource.getrlimit(resource.RLIMIT_NOFILE)
111 112
         nofile_target = 1024
@@ -150,42 +151,57 @@ def go(self):
150 151
         logging.debug('Worker %s starting...', self.worker_id)
151 152
         gevent.spawn(self._result_writer)
152 153
         pool = gevent.pool.Pool(self.concurrency)
153  
-        job = self.work_pull.recv()
  154
+        jobs = self.work_pull.recv()
154 155
         if self.profile_count:
155 156
             import cProfile
156 157
             prof = cProfile.Profile()
157 158
             prof.enable()
158 159
         gotten = 1
159  
-        while job:
160  
-            job_data = msgpack.loads(job)
161  
-            if 'container' in job_data:
162  
-                logging.debug('WORK: %13s %s/%-17s',
163  
-                            job_data['type'], job_data['container'],
164  
-                            job_data['name'])
165  
-            else:
166  
-                logging.debug('CMD: %13s', job_data['type'])
167  
-            if job_data['type'] == 'SUICIDE':
168  
-                logging.info('Got SUICIDE; closing sockets and exiting.')
169  
-                self.work_pull.close()
170  
-                self.results_push.close()
171  
-                os._exit(88)
172  
-            pool.spawn(self.handle_job, job_data)
173  
-            if self.profile_count and gotten == self.profile_count:
174  
-                prof.disable()
175  
-                prof_output_path = '/tmp/worker_go.%d.prof' % os.getpid()
176  
-                prof.dump_stats(prof_output_path)
177  
-                logging.info('PROFILED worker go() to %s', prof_output_path)
178  
-            job = self.work_pull.recv()
179  
-            gotten += 1
  160
+        self.spawned = 0
  161
+        while jobs:
  162
+            job_data = msgpack.loads(jobs)
  163
+            for job_datum in job_data:
  164
+                if 'container' in job_datum:
  165
+                    logging.debug('WORK: %13s %s/%-17s',
  166
+                                job_datum['type'], job_datum['container'],
  167
+                                job_datum['name'])
  168
+                else:
  169
+                    logging.debug('CMD: %13s', job_datum['type'])
  170
+                if job_datum['type'] == 'SUICIDE':
  171
+                    logging.info('Got SUICIDE; closing sockets and exiting.')
  172
+                    self.work_pull.close()
  173
+                    self.results_push.close()
  174
+                    os._exit(88)
  175
+                pool.spawn(self.handle_job, job_datum)
  176
+                self.spawned += 1
  177
+                if self.profile_count and gotten >= self.profile_count:
  178
+                    prof.disable()
  179
+                    prof_output_path = '/tmp/worker_go.%d.prof' % os.getpid()
  180
+                    prof.dump_stats(prof_output_path)
  181
+                    logging.info('PROFILED worker go() to %s', prof_output_path)
  182
+                    self.profile_count = None
  183
+                gotten += 1
  184
+            jobs = self.work_pull.recv()
180 185
 
181 186
     def _result_writer(self):
182 187
         while True:
183 188
             result = self.result_queue.get()
  189
+
  190
+            result_q = [result]
  191
+            while len(result_q) < self.batch_size:
  192
+                try:
  193
+                    result = self.result_queue.get(timeout=1)
  194
+                    result_q.append(result)
  195
+                except gevent.queue.Empty:
  196
+                    # timed out, go ahead and send
  197
+                    break
  198
+
184 199
             if self.results_push.closed:
185 200
                 logging.warning('_result_writer: exiting due to closed '
186 201
                                 'socket!')
187 202
                 break
188  
-            self.results_push.send(result)
  203
+            self.spawned -= len(result_q)
  204
+            self.results_push.send(msgpack.dumps(result_q))
189 205
 
190 206
     def handle_job(self, job_data):
191 207
         # Dispatch type to a handler, if possible
@@ -352,9 +368,10 @@ def put_results(self, *args, **kwargs):
352 368
                    add_dicts())
353 369
         :returns: (nothing)
354 370
         """
355  
-        self.result_queue.put(
356  
-            msgpack.dumps(add_dicts(*args, completed_at=time.time(),
357  
-                                    worker_id=self.worker_id, **kwargs)))
  371
+        self.result_queue.put(add_dicts(*args,
  372
+                                        completed_at=time.time(),
  373
+                                        worker_id=self.worker_id,
  374
+                                        **kwargs))
358 375
 
359 376
     def _put_results_from_response(self, object_info, resp_headers):
360 377
         self.put_results(

0 notes on commit e96c86b

Please sign in to comment.
Something went wrong with that request. Please try again.