diff --git a/README.md b/README.md index 388c0c8..b0304e0 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,94 @@ The structure of the repo is: All the tests were run using [MongoDB Atlas](https://www.mongodb.com/cloud/atlas?jmp=VLDB2019). Use code `VLDB2019` to get $150 credit to get started with MongoDB Atlas. +## Sharded MongoDB Driver + +1. Create ana activate a python env. + +```bash +mkdir ~/python_envs +cd ~/python_envs +~/python_envs$ python -m venv py-tpcc-env +source ~/python_envs/py-tpcc-env/bin/activate +``` +2. Install pymongo + +```bash +pip install pymongo +``` + +3. Print your config. + +```bash +cd ~/py-tpcc/pytpcc +~/py-tpcc/pytpcc$ python ./tpcc.py --print-config mongodb > mongodb.config +``` + +4. Edit the configuraiton for Postgres in the mongodb.config. + * Change shards to the number of `shards` + * Change the mongodb connection `uri` string + * Change the database `name` + +```bash +# MongodbDriver Configuration File +# Created 2025-10-08 14:18:24.378446 +[mongodb] + +# The mongodb connection string or URI +uri = mongodb://user:pass@10.2.1.119:27017/admin?ssl=true&tlsAllowInvalidHostnames=true&tlsAllowInvalidCertificates=true + +# Database name +name = tpcc + +# If true, data will be denormalized using MongoDB schema design best practices +denormalize = True + +# If true, transactions will not be used (benchmarking only) +notransactions = + +# If true, all things to update will be fetched via findAndModify +findandmodify = True + +# If true, aggregation queries will be used +agg = + +# If true, we will allow secondary reads +secondary_reads = True + +# If true, we will enable retryable writes +retry_writes = True + +# If true, we will perform causal reads +causal_consistency = True + +# If true, we will have use only one 'unsharded' items collection +no_global_items = + +# If > 0 then sharded +shards = 3 +``` + +4. Run pytpcc using --warehouses=XXX + + * Reset the database and load the data + ```bash + python ./tpcc.py --reset --no-execute --clients=100 --duration=10 --warehouses=21 --config=mongodb.config mongodb --stop-on-error + ``` + + * Only load the data + ```bash + python ./tpcc.py --no-execute --clients=100 --duration=10 --warehouses=21 --config=mongodb.config mongodb --stop-on-error + ``` + + * Execute the tests without loading data. + ```bash + python ./tpcc.py --no-load --clients=100 --duration=10 --warehouses=21 --config=mongodb.config mongodb --stop-on-error + ``` + + * Execute the tests with loading + ```bash + python ./tpcc.py --clients=100 --duration=10 --warehouses=21 --config=mongodb.config mongodb --stop-on-error + ``` ## Postgres JSONB Driver @@ -88,4 +176,4 @@ postgres=# \l+ # For any SQL command first use the database \c tpcc; -``` \ No newline at end of file +``` diff --git a/pytpcc/coordinator.py b/pytpcc/coordinator.py index 412b2c2..3425f9c 100755 --- a/pytpcc/coordinator.py +++ b/pytpcc/coordinator.py @@ -38,7 +38,7 @@ import execnet import worker import message -from ConfigParser import SafeConfigParser +from configparser import ConfigParser from pprint import pprint, pformat from util import * @@ -80,7 +80,7 @@ def startLoading(scalParameters,args,config,channels): for w_id in range(scaleParameters.starting_warehouse, scaleParameters.ending_warehouse+1): idx = w_id % procs w_ids[idx].append(w_id) - print w_ids + print(w_ids) load_start=time.time() for i in range(len(channels)): @@ -116,7 +116,7 @@ def startExecution(scaleParameters, args, config,channels): aparser = argparse.ArgumentParser(description='Python implementation of the TPC-C Benchmark') aparser.add_argument('system', choices=getDrivers(), help='Target system driver') - aparser.add_argument('--config', type=file, + aparser.add_argument('--config', type=str, help='Path to driver configuration file') aparser.add_argument('--reset', action='store_true', help='Instruct the driver to reset the contents of the database') @@ -132,6 +132,8 @@ def startExecution(scaleParameters, args, config,channels): aparser.add_argument('--clientprocs', default=1, type=int, metavar='N', help='Number of processes on each client node.') + aparser.add_argument('--samewh', default=85, type=float, metavar='PP', + help='Percent paying same warehouse') aparser.add_argument('--stop-on-error', action='store_true', help='Stop the transaction execution when the driver throws an exception.') aparser.add_argument('--no-load', action='store_true', @@ -153,15 +155,16 @@ def startExecution(scaleParameters, args, config,channels): assert driver != None, "Failed to create '%s' driver" % args['system'] if args['print_config']: config = driver.makeDefaultConfig() - print driver.formatConfig(config) - print + print(driver.formatConfig(config)) + print() sys.exit(0) ## Load Configuration file - if args['config']: - logging.debug("Loading configuration file '%s'" % args['config']) + configFilePath = args['config'] + if configFilePath: + logging.debug("Loading configuration file '%s'" % configFilePath) cparser = ConfigParser() - cparser.read(os.path.realpath(args['config'].name)) + cparser.read(os.path.realpath(configFilePath)) config = dict(cparser.items(args['system'])) else: logging.debug("Using default configuration for %s" % args['system']) @@ -171,6 +174,7 @@ def startExecution(scaleParameters, args, config,channels): config['load'] = False config['execute'] = False if config['reset']: logging.info("Reseting database") + config['warehouses'] = args['warehouses'] driver.loadConfig(config) logging.info("Initializing TPC-C benchmark using %s" % driver) @@ -208,8 +212,8 @@ def startExecution(scaleParameters, args, config,channels): if not args['no_execute']: results = startExecution(scaleParameters, args, config,channels) assert results - logging.info(results.show(load_time, driver, len(channels))) - print results.show(load_time, driver, len(channels)) + logging.info(results.show(load_time, driver, len(channels), args['samewh'])) + print(results.show(load_time, driver, len(channels), args['samewh'])) ## IF ## MAIN diff --git a/pytpcc/drivers/mongodbdriver.py b/pytpcc/drivers/mongodbdriver.py index df8fe53..1a412be 100644 --- a/pytpcc/drivers/mongodbdriver.py +++ b/pytpcc/drivers/mongodbdriver.py @@ -39,10 +39,7 @@ from time import sleep import pymongo from pymongo.client_session import TransactionOptions - -# Import TransactionOptions from pymongo.client_session or -# pymongo.synchronous.client_session depending on the version of pymongo -from pymongo.client_session import TransactionOptions +from bson import MinKey import constants from .abstractdriver import AbstractDriver @@ -197,16 +194,17 @@ ## ============================================== class MongodbDriver(AbstractDriver): DEFAULT_CONFIG = { - "uri": ("The mongodb connection string or URI", "mongodb://localhost:27017"), - "name": ("Database name", "tpcc"), - "denormalize": ("If true, data will be denormalized using MongoDB schema design best practices", True), - "notransactions": ("If true, transactions will not be used (benchmarking only)", False), - "findandmodify": ("If true, all things to update will be fetched via findAndModify", True), - "agg": ("If true, aggregation queries will be used", False), - "secondary_reads": ("If true, we will allow secondary reads", True), - "retry_writes": ("If true, we will enable retryable writes", True), - "causal_consistency": ("If true, we will perform causal reads ", True), - "shards": ("If >1 then sharded", "1") + "uri": ("The mongodb connection string or URI", "mongodb://localhost:27017"), + "name": ("Database name", "tpcc"), + "denormalize": ("If true, data will be denormalized using MongoDB schema design best practices", True), + "notransactions": ("If true, transactions will not be used (benchmarking only)", False), + "findandmodify": ("If true, all things to update will be fetched via findAndModify", True), + "agg": ("If true, aggregation queries will be used", False), + "secondary_reads": ("If true, we will allow secondary reads", True), + "retry_writes": ("If true, we will enable retryable writes", True), + "causal_consistency": ("If true, we will perform causal reads ", True), + "no_global_items": ("If true, we will have use only one 'unsharded' items collection", False), + "shards": ("If > 0 then sharded", "0") } DENORMALIZED_TABLES = [ constants.TABLENAME_ORDERS, @@ -237,7 +235,8 @@ def __init__(self, ddl): self.output = open('results.json','a') self.result_doc = {} self.warehouses = 0 - self.shards = 1 + self.no_global_items = False + self.shards = 0 ## Create member mapping to collections for name in constants.ALL_TABLES: @@ -270,6 +269,7 @@ def loadConfig(self, config): self.warehouses = config['warehouses'] self.find_and_modify = config['findandmodify'] == 'True' self.causal_consistency = config['causal_consistency'] == 'True' + self.no_global_items = config['no_global_items'] == 'True' self.retry_writes = config['retry_writes'] == 'True' self.secondary_reads = config['secondary_reads'] == 'True' self.agg = config['agg'] == 'True' @@ -318,6 +318,12 @@ def loadConfig(self, config): logging.debug("Using denormalized data model") try: + # Reset the current database and setup new dataase with sharded configuration + if config["reset"] and self.shards > 0: + logging.info("Deleting the database and setting up a new sharded database '%s'", self.database.name) + self.setup_sharded_db(self.client, str(config['name']), int(self.warehouses), self.shards) + return + if config["reset"]: logging.info("Deleting database '%s'", self.database.name) for name in constants.ALL_TABLES: @@ -357,6 +363,81 @@ def loadConfig(self, config): logging.error("Some general error (%s) when connected to %s: ", str(err), display_uri) print("Got some other error: %s" % str(err)) return + + @staticmethod + def setup_sharded_db(client: pymongo.MongoClient, db_name: str, num_warehouses: int, num_shards: int = 0): + if num_warehouses <= 0: + raise ValueError("Error: Invalid number of warehouses. num_warehouses must be > 0") + + admin = client.admin + config_db = client['config'] + + admin.command('balancerStop', 1) + db = client[db_name] + db.command('dropDatabase') + sleep(10) + + admin.command('enableSharding', db_name) + + admin.command('shardCollection', f'{db_name}.ITEM', key={'I_W_ID': 1, 'I_ID': 1}, unique=True) + + db['WAREHOUSE'].create_index([('W_ID', 1), ('W_TAX', 1)], unique=True) + admin.command('shardCollection', f'{db_name}.WAREHOUSE', key={'W_ID': 1}) + + db['DISTRICT'].create_index([('D_W_ID', 1), ('D_ID', 1), ('D_NEXT_O_ID', 1), ('D_TAX', 1)], unique=True) + admin.command('shardCollection', f'{db_name}.DISTRICT', key={'D_W_ID': 1, 'D_ID': 1}) + + admin.command('shardCollection', f'{db_name}.CUSTOMER', key={'C_W_ID': 1, 'C_D_ID': 1, 'C_ID': 1}, unique=True) + + admin.command('shardCollection', f'{db_name}.HISTORY', key={'H_W_ID': 1}) + + admin.command('shardCollection', f'{db_name}.STOCK', key={'S_W_ID': 1, 'S_I_ID': 1}, unique=True) + + db['NEW_ORDER'].create_index([('NO_W_ID', 1), ('NO_D_ID', 1), ('NO_O_ID', 1)], unique=True) + admin.command('shardCollection', f'{db_name}.NEW_ORDER', key={'NO_W_ID': 1, 'NO_D_ID': 1}) + + db['ORDERS'].create_index([('O_W_ID', 1), ('O_D_ID', 1), ('O_ID', 1), ('O_C_ID', 1)], unique=True) + admin.command('shardCollection', f'{db_name}.ORDERS', key={'O_W_ID': 1, 'O_D_ID': 1, 'O_ID': 1}) + + if num_shards <= 0: + logging.info("Warning: Shards argument is not positive. Getting shards from the cluster") + num_shards = config_db['shards'].count_documents({}) + + remainder = num_warehouses % num_shards + if remainder != 0: + raise ValueError(f"ERROR: Number of Warehouses ({num_warehouses}) is not a multiple of the number of shards ({num_shards})") + + wh_per_shard = num_warehouses // num_shards + logging.info(f"Using ({num_warehouses}) warehouses with {wh_per_shard} per shard") + + # Do splits + for i in range(1 + wh_per_shard, num_warehouses, wh_per_shard): + logging.info(f"Splitting at {i}") + admin.command('split', f'{db_name}.ITEM', middle={'I_W_ID': i, 'I_ID': MinKey()}) + admin.command('split', f'{db_name}.WAREHOUSE', middle={'W_ID': i}) + admin.command('split', f'{db_name}.HISTORY', middle={'H_W_ID': i}) + admin.command('split', f'{db_name}.DISTRICT', middle={'D_W_ID': i, 'D_ID': MinKey()}) + admin.command('split', f'{db_name}.CUSTOMER', middle={'C_W_ID': i, 'C_D_ID': MinKey(), 'C_ID': MinKey()}) + admin.command('split', f'{db_name}.STOCK', middle={'S_W_ID': i, 'S_I_ID': MinKey()}) + admin.command('split', f'{db_name}.NEW_ORDER', middle={'NO_W_ID': i, 'NO_D_ID': MinKey()}) + admin.command('split', f'{db_name}.ORDERS', middle={'O_W_ID': i, 'O_D_ID': MinKey(), 'O_ID': MinKey()}) + + # Do moves + shards = config_db['shards'].distinct('_id') + for i in range(num_shards): + key = i * wh_per_shard + 1 + shd = shards[i] + logging.info(f"Moving {key} to shard {shd}") + admin.command('moveChunk', f'{db_name}.ITEM', find={'I_W_ID': key, 'I_ID': MinKey()}, to=shd) + admin.command('moveChunk', f'{db_name}.WAREHOUSE', find={'W_ID': key}, to=shd) + admin.command('moveChunk', f'{db_name}.HISTORY', find={'H_W_ID': key}, to=shd) + admin.command('moveChunk', f'{db_name}.DISTRICT', find={'D_W_ID': key, 'D_ID': MinKey()}, to=shd) + admin.command('moveChunk', f'{db_name}.CUSTOMER', find={'C_W_ID': key, 'C_D_ID': MinKey(), 'C_ID': MinKey()}, to=shd) + admin.command('moveChunk', f'{db_name}.STOCK', find={'S_W_ID': key, 'S_I_ID': MinKey()}, to=shd) + admin.command('moveChunk', f'{db_name}.NEW_ORDER', find={'NO_W_ID': key, 'NO_D_ID': MinKey()}, to=shd) + admin.command('moveChunk', f'{db_name}.ORDERS', find={'O_W_ID': key, 'O_D_ID': MinKey(), 'O_ID': MinKey()}, to=shd) + + logging.info("Shard configuration succeeded") ## ---------------------------------------------- ## loadTuples @@ -402,10 +483,11 @@ def loadTuples(self, tableName, tuples): else: if tableName == constants.TABLENAME_ITEM: tuples3 = [] - if self.shards > 1: - ww = range(1,self.warehouses+1) + if self.shards > 0: + ww = range(1,self.warehouses+1, int(self.warehouses/self.shards)) else: ww = [0] + for t in tuples: for w in ww: t2 = list(t) @@ -415,7 +497,8 @@ def loadTuples(self, tableName, tuples): for t in tuples: tuple_dicts.append(dict([(columns[i], t[i]) for i in num_columns])) ## FOR - self.database[tableName].insert_many(tuple_dicts) + + self.database[tableName].insert_many(tuple_dicts, ordered=False) ## IF return @@ -423,10 +506,13 @@ def loadTuples(self, tableName, tuples): def loadFinishDistrict(self, w_id, d_id): if self.denormalize: logging.debug("Pushing %d denormalized ORDERS records for WAREHOUSE %d DISTRICT %d into MongoDB", len(self.w_orders), w_id, d_id) - self.database[constants.TABLENAME_ORDERS].insert_many(self.w_orders.values()) + self.database[constants.TABLENAME_ORDERS].insert_many(self.w_orders.values(), ordered=False) self.w_orders.clear() ## IF + def loadFinish(self): + logging.debug("Load finished") + def executeStart(self): """Optional callback before the execution for each client starts""" return None @@ -614,8 +700,10 @@ def _doNewOrderTxn(self, s, params): d_next_o_id = d["D_NEXT_O_ID"] # fetch matching items and see if they are all valid - if self.shards > 1: i_w_id = w_id + if self.shards > 0: i_w_id = w_id-(w_id-1)%(self.warehouses/self.shards) # get_i_w(w_id) else: i_w_id = 0 + if self.no_global_items: + i_w_id = 1 items = list(self.item.find({"I_ID": {"$in": i_ids}, "I_W_ID": i_w_id, "$comment": comment}, {"_id":0, "I_ID": 1, "I_PRICE": 1, "I_NAME": 1, "I_DATA": 1}, session=s)) @@ -628,8 +716,7 @@ def _doNewOrderTxn(self, s, params): #print constants.INVALID_ITEM_MESSAGE + ", Aborting transaction (ok for 1%)" return None ## IF - xxi_ids = tuple(map(lambda o: o['I_ID'], items)) - items = sorted(items, key=lambda x: xxi_ids.index(x['I_ID'])) + items = sorted(items, key=lambda x: i_ids.index(x['I_ID'])) # getWarehouseTaxRate w = self.warehouse.find_one({"W_ID": w_id, "$comment": comment}, {"_id":0, "W_TAX": 1}, session=s) @@ -668,7 +755,7 @@ def _doNewOrderTxn(self, s, params): ## If all of the items are at the same warehouse, then we'll issue a single ## request to get their information, otherwise we'll still issue a single request ## ---------------- - item_w_list = zip(i_ids, i_w_ids) + item_w_list = list(zip(i_ids, i_w_ids)) stock_project = {"_id":0, "S_I_ID": 1, "S_W_ID": 1, "S_QUANTITY": 1, "S_DATA": 1, "S_YTD": 1, "S_ORDER_CNT": 1, "S_REMOTE_CNT": 1, s_dist_col: 1} @@ -684,8 +771,7 @@ def _doNewOrderTxn(self, s, params): session=s)) ## IF assert len(all_stocks) == ol_cnt, "all_stocks len %d != ol_cnt %d" % (len(all_stocks), ol_cnt) - xxxi_ids = tuple(map(lambda o: (o['S_I_ID'], o['S_W_ID']), all_stocks)) - all_stocks = sorted(all_stocks, key=lambda x: xxxi_ids.index((x['S_I_ID'], x["S_W_ID"]))) + all_stocks = sorted(all_stocks, key=lambda x: item_w_list.index((x['S_I_ID'], x["S_W_ID"]))) ## ---------------- ## Insert Order Line, Stock Item Information @@ -784,7 +870,7 @@ def _doNewOrderTxn(self, s, params): if self.batch_writes: if not self.denormalize: - self.order_line.insert_many(order_line_writes, session=s) + self.order_line.insert_many(order_line_writes, ordered=False, session=s) self.stock.bulk_write(stock_writes, session=s) ## IF @@ -936,7 +1022,7 @@ def _doPaymentTxn(self, s, params): session=s) ## IF - search_fields = {"C_W_ID": w_id, "C_D_ID": d_id, "$comment": comment} + search_fields = {"C_W_ID": c_w_id, "C_D_ID": c_d_id, "$comment": comment} return_fields = {"C_BALANCE": 0, "C_YTD_PAYMENT": 0, "C_PAYMENT_CNT": 0} if c_id != None: @@ -1137,6 +1223,7 @@ def run_transaction_with_retries(self, txn_callback, name, params): sleep(txn_retry_counter * .1) logging.debug("txn retry number for %s: %d", name, txn_retry_counter) ## WHILE + def get_server_status(self): ss=self.client.admin.command('serverStatus') if "$configServerState" in ss: diff --git a/pytpcc/runtime/executor.py b/pytpcc/runtime/executor.py index c065e93..be5a5a9 100644 --- a/pytpcc/runtime/executor.py +++ b/pytpcc/runtime/executor.py @@ -44,10 +44,11 @@ class Executor: - def __init__(self, driver, scaleParameters, stop_on_error = False): + def __init__(self, driver, scaleParameters, stop_on_error = False, sameWH = 85): self.driver = driver self.scaleParameters = scaleParameters self.stop_on_error = stop_on_error + self.same_wh = sameWH ## DEF def execute(self, duration): @@ -76,8 +77,7 @@ def execute(self, duration): batch_result.abortTransaction(batch_txn_id) if self.stop_on_error: raise continue - - # This will happen on all failing 1% of the transactions + if val is None: global_result.abortTransaction(global_txn_id, retries) batch_result.abortTransaction(batch_txn_id, retries) @@ -86,7 +86,7 @@ def execute(self, duration): batch_result.stopTransaction(batch_txn_id, retries) global_result.stopTransaction(global_txn_id, retries) - if time.time() - start_batch > 900: # every 15 minutes + if time.time() - start_batch > 1800: # every 30 minutes batch_result.stopBenchmark() logging.info(batch_result.show()) batch_result = results.Results() @@ -221,7 +221,7 @@ def generatePaymentParams(self): h_date = datetime.now() ## 85%: paying through own warehouse (or there is only 1 warehouse) - if self.scaleParameters.warehouses == 1 or x <= 85: + if self.scaleParameters.warehouses == 1 or x <= self.same_wh: c_w_id = w_id c_d_id = d_id ## 15%: paying through another warehouse: diff --git a/pytpcc/tpcc.py b/pytpcc/tpcc.py index 98a8885..3405037 100755 --- a/pytpcc/tpcc.py +++ b/pytpcc/tpcc.py @@ -36,6 +36,7 @@ import time import multiprocessing import subprocess +import random from configparser import ConfigParser from pprint import pprint, pformat @@ -55,12 +56,12 @@ logging.getLogger('').addHandler(console) NOTIFY_PHASE_START_PATH = '/data/workdir/src/flamegraph/notify_phase_start.py' -NOTIFY_PHASE_END_PATH = '/data/workdir/src/flamegraph/notify_phase_start.py' +NOTIFY_PHASE_END_PATH = '/data/workdir/src/flamegraph/notify_phase_end.py' ## ============================================== -## noftifyDsiOfPhaseStart +## notifyDSIOfPhaseStart ## ============================================== -def noftifyDsiOfPhaseStart(phasename): +def notifyDSIOfPhaseStart(phasename): if os.path.isfile(NOTIFY_PHASE_START_PATH): output = subprocess.run(["python3", NOTIFY_PHASE_START_PATH, phasename], capture_output=True) if output.returncode != 0: @@ -68,9 +69,9 @@ def noftifyDsiOfPhaseStart(phasename): ## DEF ## ============================================== -## noftifyDsiOfPhaseStart +## notifyDSIOfPhaseEnd ## ============================================== -def noftifyDsiOfPhaseEnd(phasename): +def notifyDSIOfPhaseEnd(phasename): if os.path.isfile(NOTIFY_PHASE_END_PATH): output = subprocess.run(["python3", NOTIFY_PHASE_END_PATH, phasename], capture_output=True) if output.returncode != 0: @@ -99,7 +100,10 @@ def getDrivers(): ## DEF ## ============================================== -## startLoading +## startLoading. +# This intentionally uses multiprocess pool and intentionally stats new processes for each batch +# becuase for long running, many hour long loads, the connection between the child process and the parent process is lost +# and the parent block indefinitelly waiting for the result. ## ============================================== def startLoading(driverClass, scaleParameters, args, config): """ @@ -115,10 +119,6 @@ def startLoading(driverClass, scaleParameters, args, config): logging.debug(f"Total warehouses: {total_warehouses}") loader_results = [] - try: - del args['config'] - except KeyError: - logging.warning("Key 'config' not found in args") # Iterate through warehouses, processing them in batches of 'clients' for i in range(total_warehouses): @@ -199,10 +199,7 @@ def startExecution(driverClass, scaleParameters, args, config): logging.debug("Creating client pool with %d processes", args['clients']) pool = multiprocessing.Pool(args['clients']) debug = logging.getLogger().isEnabledFor(logging.DEBUG) - try: - del args['config'] - except KeyError: - print() + worker_results = [] for _ in range(args['clients']): r = pool.apply_async(executorFunc, (driverClass, scaleParameters, args, config, debug,)) @@ -236,7 +233,7 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): config['reset'] = False driver.loadConfig(config) - e = executor.Executor(driver, scaleParameters, stop_on_error=args['stop_on_error']) + e = executor.Executor(driver, scaleParameters, stop_on_error=args['stop_on_error'], sameWH=args['samewh']) driver.executeStart() results = e.execute(args['duration']) driver.executeFinish() @@ -251,12 +248,14 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): aparser = argparse.ArgumentParser(description='Python implementation of the TPC-C Benchmark') aparser.add_argument('system', choices=getDrivers(), help='Target system driver') - aparser.add_argument('--config', type=open, + aparser.add_argument('--config', type=str, help='Path to driver configuration file') aparser.add_argument('--reset', action='store_true', help='Instruct the driver to reset the contents of the database') aparser.add_argument('--scalefactor', default=1, type=float, metavar='SF', help='Benchmark scale factor') + aparser.add_argument('--samewh', default=85, type=float, metavar='PP', + help='Percent paying same warehouse') aparser.add_argument('--warehouses', default=4, type=int, metavar='W', help='Number of Warehouses') aparser.add_argument('--duration', default=60, type=int, metavar='D', @@ -293,10 +292,11 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): sys.exit(0) ## Load Configuration file - if args['config']: - logging.debug("Loading configuration file '%s'", args['config']) + configFilePath = args['config'] + if configFilePath: + logging.debug("Loading configuration file '%s'", configFilePath) cparser = ConfigParser() - cparser.read(os.path.realpath(args['config'].name)) + cparser.read(os.path.realpath(configFilePath)) config = dict(cparser.items(args['system'])) else: logging.debug("Using default configuration for %s", args['system']) @@ -320,7 +320,7 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): load_time = None if not args['no_load']: logging.info("Loading TPC-C benchmark data using %s", (driver)) - noftifyDsiOfPhaseStart("TPC-C_load") + notifyDSIOfPhaseStart("TPC-C_load") load_start = time.time() if args['clients'] == 1: l = loader.Loader( @@ -335,14 +335,14 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): else: startLoading(driverClass, scaleParameters, args, config) load_time = time.time() - load_start - noftifyDsiOfPhaseEnd("TPC-C_load") + notifyDSIOfPhaseEnd("TPC-C_load") ## IF ## WORKLOAD DRIVER!!! if not args['no_execute']: - noftifyDsiOfPhaseStart("TPC-C_workload") + notifyDSIOfPhaseStart("TPC-C_workload") if args['clients'] == 1: - e = executor.Executor(driver, scaleParameters, stop_on_error=args['stop_on_error']) + e = executor.Executor(driver, scaleParameters, stop_on_error=args['stop_on_error'], sameWH=args['samewh']) driver.executeStart() results = e.execute(args['duration']) driver.executeFinish() @@ -351,8 +351,8 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): assert results, "No results from execution for %d client!" % args['clients'] logging.info("Final Results") logging.info("Threads: %d", args['clients']) - logging.info(results.show(load_time, driver, args['clients'])) - noftifyDsiOfPhaseEnd("TPC-C_workload") + logging.info(results.show(load_time, driver, args['clients'], args['samewh'])) + notifyDSIOfPhaseEnd("TPC-C_workload") ## IF ## MAIN diff --git a/pytpcc/util/nurand.py b/pytpcc/util/nurand.py index 09fa55c..9dc429d 100644 --- a/pytpcc/util/nurand.py +++ b/pytpcc/util/nurand.py @@ -29,14 +29,13 @@ # OTHER DEALINGS IN THE SOFTWARE. # ----------------------------------------------------------------------- -import random +from .import rand def makeForLoad(): """Create random NURand constants, appropriate for loading the database.""" - cLast = random.randint(0, 255) - cId = random.randint(0, 1023) - orderLineItemId = random.randint(0, 8191) - return NURandC(cLast, cId, orderLineItemId) + cLast = rand.number(0, 255) + cId = rand.number(0, 1023) + orderLineItemId = rand.number(0, 8191) return NURandC(cLast, cId, orderLineItemId) def validCRun(cRun, cLoad): @@ -46,13 +45,13 @@ def validCRun(cRun, cLoad): def makeForRun(loadC): """Create random NURand constants for running TPC-C. TPC-C 2.1.6.1. (page 20) specifies the valid range for these constants.""" - cRun = random.randint(0, 255) + cRun = rand.number(0, 255) while validCRun(cRun, loadC.cLast) == False: - cRun = random.randint(0, 255) + cRun = rand.number(0, 255) assert validCRun(cRun, loadC.cLast) - cId = random.randint(0, 1023) - orderLineItemId = random.randint(0, 8191) + cId = rand.number(0, 1023) + orderLineItemId = rand.number(0, 8191) return NURandC(cRun, cId, orderLineItemId) class NURandC: diff --git a/pytpcc/util/results.py b/pytpcc/util/results.py index ff24f9f..248491f 100644 --- a/pytpcc/util/results.py +++ b/pytpcc/util/results.py @@ -26,6 +26,7 @@ import logging import time +import os from collections import Counter class Results: @@ -142,7 +143,7 @@ def append(self, r): def __str__(self): return self.show() - def show(self, load_time=None, driver=None, threads=1): + def show(self, load_time=None, driver=None, threads=1, samewh=85): if not self.start: return "Benchmark not started" if not self.stop: @@ -223,15 +224,19 @@ def show(self, load_time=None, driver=None, threads=1): result_doc['batch_writes'] = driver.batch_writes result_doc['find_and_modify'] = driver.find_and_modify result_doc['read_preference'] = driver.read_preference - result_doc['write_concern'] = driver.write_concern.document['w'] + result_doc['write_concern'] = str(driver.write_concern.document['w']) result_doc['causal'] = driver.causal_consistency + result_doc['no_global_items'] = driver.no_global_items result_doc['all_in_one_txn'] = driver.all_in_one_txn result_doc['retry_writes'] = driver.retry_writes result_doc['read_concern'] = driver.read_concern + result_doc['shards'] = driver.shards result_doc['total_retries'] = total_retries + result_doc['samewh'] = samewh result_doc['total'] = total_cnt result_doc['aborts'] = total_aborts - ret += "\n%s TpmC for %s %s thr %s txn %d WH: %d %d total %d durSec, batch %s %d retries %s%% %s fnM %s p50 %s p75 %s p90 %s p95 %s p99 %s max %s WC %s causal %s 10in1 %s retry %s %d %d" % ( + result_doc['instance'] = os.getenv('INSTANCE') + ret += "\n%s TpmC for %s %s thr %s txn %d WH: %d %d total %d durSec, batch %s %d retries %s%% %s fnM %s p50 %s p75 %s p90 %s p95 %s p99 %s max %s WC %s causal %s 10in1 %s retry %s %d %d correct %d noGlobalItems %s" % ( time.strftime("%Y-%m-%d %H:%M:%S"), ("normal", "denorm")[driver.denormalize], threads, @@ -246,7 +251,7 @@ def show(self, load_time=None, driver=None, threads=1): u"%6.2f" % (1000*lat[int(samples/100.0*99)]), u"%6.2f" % (1000.0*lat[-1]), str(driver.write_concern), ('false', 'true')[driver.causal_consistency], - ('false', 'true')[driver.all_in_one_txn], ('false', 'true')[driver.retry_writes],total_cnt,total_aborts) + ('false', 'true')[driver.all_in_one_txn], ('false', 'true')[driver.retry_writes],total_cnt,total_aborts, samewh, ('false', 'true')[driver.no_global_items]) driver.save_result(result_doc) print(result_doc) # PostgreSQL driver returns a shorter version of the summary without extra configuration data diff --git a/pytpcc/worker.py b/pytpcc/worker.py index d689307..6a3e55e 100755 --- a/pytpcc/worker.py +++ b/pytpcc/worker.py @@ -75,7 +75,7 @@ def loaderFunc(driverClass, scaleParameters, args, config, w_ids, debug): driver.loadFinish() except KeyboardInterrupt: return -1 - except (Exception, AssertionError), ex: + except (Exception, AssertionError) as ex: logging.warn("Failed to load data: %s" % (ex)) #if debug: traceback.print_exc(file=sys.stdout) @@ -96,7 +96,7 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): config['reset'] = False driver.loadConfig(config) - e = executor.Executor(driver, scaleParameters, stop_on_error=args['stop_on_error']) + e = executor.Executor(driver, scaleParameters, stop_on_error=args['stop_on_error'], sameWH=args['samewh']) driver.executeStart() results = e.execute(args['duration']) driver.executeFinish() @@ -116,14 +116,14 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): w_ids=command.data[3] ## Create a handle to the target client driver at the client side - driverClass = createDriverClass(args['system']) - assert driverClass != None, "Failed to find '%s' class" % args['system'] - driver = driverClass(args['ddl']) - assert driver != None, "Failed to create '%s' driver" % args['system'] + driverClass = createDriverClass(args['system']) + assert driverClass != None, "Failed to find '%s' class" % args['system'] + driver = driverClass(args['ddl']) + assert driver != None, "Failed to create '%s' driver" % args['system'] - loaderFunc(driverClass,scaleParameters,args,config,w_ids,True) + loaderFunc(driverClass,scaleParameters,args,config,w_ids,True) m=message.Message(header=message.LOAD_COMPLETED) - channel.send(pickle.dumps(m,-1)) + channel.send(pickle.dumps(m,-1)) elif command.header==message.CMD_EXECUTE: scaleParameters=command.data[0] args=command.data[1] @@ -136,9 +136,9 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): driver = driverClass(args['ddl']) assert driver != None, "Failed to create '%s' driver" % args['system'] - results=executorFunc(driverClass,scaleParameters,args,config,True) - m=message.Message(header=message.EXECUTE_COMPLETED,data=results) - channel.send(pickle.dumps(m,-1)) + results=executorFunc(driverClass,scaleParameters,args,config,True) + m=message.Message(header=message.EXECUTE_COMPLETED,data=results) + channel.send(pickle.dumps(m,-1)) elif command.header==message.CMD_STOP: pass