Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
if: "!contains(github.event.pull_request.labels.*.name, 'disable-integration-tests')"
runs-on: ubuntu-24.04
env:
SCYLLA_VERSION: release:2025.2
SCYLLA_VERSION: release:2026.1
strategy:
fail-fast: false
matrix:
Expand Down
259 changes: 175 additions & 84 deletions benchmarks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,36 @@

dirname = os.path.dirname(os.path.abspath(__file__))
sys.path.append(dirname)
sys.path.append(os.path.join(dirname, '..'))
sys.path.append(os.path.join(dirname, ".."))

import cassandra
from cassandra.cluster import Cluster
from cassandra.io.asyncorereactor import AsyncoreConnection

log = logging.getLogger()
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
handler.setFormatter(
logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")
)
log.addHandler(handler)

logging.getLogger('cassandra').setLevel(logging.WARN)
logging.getLogger("cassandra").setLevel(logging.WARN)

_log_levels = {
'CRITICAL': logging.CRITICAL,
'ERROR': logging.ERROR,
'WARN': logging.WARNING,
'WARNING': logging.WARNING,
'INFO': logging.INFO,
'DEBUG': logging.DEBUG,
'NOTSET': logging.NOTSET,
"CRITICAL": logging.CRITICAL,
"ERROR": logging.ERROR,
"WARN": logging.WARNING,
"WARNING": logging.WARNING,
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
"NOTSET": logging.NOTSET,
}

have_libev = False
supported_reactors = [AsyncoreConnection]
try:
from cassandra.io.libevreactor import LibevConnection

have_libev = True
supported_reactors.append(LibevConnection)
except cassandra.DependencyException as exc:
Expand All @@ -60,6 +63,7 @@
have_asyncio = False
try:
from cassandra.io.asyncioreactor import AsyncioConnection

have_asyncio = True
supported_reactors.append(AsyncioConnection)
except (ImportError, SyntaxError):
Expand All @@ -68,6 +72,7 @@
have_twisted = False
try:
from cassandra.io.twistedreactor import TwistedConnection

have_twisted = True
supported_reactors.append(TwistedConnection)
except ImportError as exc:
Expand All @@ -78,27 +83,32 @@
TABLE = "testtable"

COLUMN_VALUES = {
'int': 42,
'text': "'42'",
'float': 42.0,
'uuid': uuid.uuid4(),
'timestamp': "'2016-02-03 04:05+0000'"
"int": 42,
"text": "'42'",
"float": 42.0,
"uuid": uuid.uuid4(),
"timestamp": "'2016-02-03 04:05+0000'",
}


def setup(options):
log.info("Using 'cassandra' package from %s", cassandra.__path__)

cluster = Cluster(options.hosts, schema_metadata_enabled=False, token_metadata_enabled=False)
cluster = Cluster(
options.hosts, schema_metadata_enabled=False, token_metadata_enabled=False
)
try:
session = cluster.connect()

log.debug("Creating keyspace...")
try:
session.execute("""
session.execute(
"""
CREATE KEYSPACE %s
WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' }
""" % options.keyspace)
WITH replication = { 'class': 'NetworkTopologyStrategy', 'replication_factor': '2' }
"""
% options.keyspace
)

log.debug("Setting keyspace...")
except cassandra.AlreadyExists:
Expand All @@ -125,7 +135,9 @@ def setup(options):


def teardown(options):
cluster = Cluster(options.hosts, schema_metadata_enabled=False, token_metadata_enabled=False)
cluster = Cluster(
options.hosts, schema_metadata_enabled=False, token_metadata_enabled=False
)
session = cluster.connect()
if not options.keep_data:
session.execute("DROP KEYSPACE " + options.keyspace)
Expand All @@ -138,17 +150,18 @@ def benchmark(thread_class):
setup(options)
log.info("==== %s ====" % (conn_class.__name__,))

kwargs = {'metrics_enabled': options.enable_metrics,
'connection_class': conn_class}
kwargs = {
"metrics_enabled": options.enable_metrics,
"connection_class": conn_class,
}
if options.protocol_version:
kwargs['protocol_version'] = options.protocol_version
kwargs["protocol_version"] = options.protocol_version
cluster = Cluster(options.hosts, **kwargs)
session = cluster.connect(options.keyspace)

log.debug("Sleeping for two seconds...")
time.sleep(2.0)


# Generate the query
if options.read:
query = "SELECT * FROM {0} WHERE thekey = '{{key}}'".format(TABLE)
Expand All @@ -166,13 +179,19 @@ def benchmark(thread_class):
per_thread = options.num_ops // options.threads
threads = []

log.debug("Beginning {0}...".format('reads' if options.read else 'inserts'))
log.debug("Beginning {0}...".format("reads" if options.read else "inserts"))
start = time.time()
try:
for i in range(options.threads):
thread = thread_class(
i, session, query, values, per_thread,
cluster.protocol_version, options.profile)
i,
session,
query,
values,
per_thread,
cluster.protocol_version,
options.profile,
)
thread.daemon = True
threads.append(thread)

Expand All @@ -192,73 +211,144 @@ def benchmark(thread_class):
log.info("Total time: %0.2fs" % total)
log.info("Average throughput: %0.2f/sec" % (options.num_ops / total))
if options.enable_metrics:
stats = getStats()['cassandra']
log.info("Connection errors: %d", stats['connection_errors'])
log.info("Write timeouts: %d", stats['write_timeouts'])
log.info("Read timeouts: %d", stats['read_timeouts'])
log.info("Unavailables: %d", stats['unavailables'])
log.info("Other errors: %d", stats['other_errors'])
log.info("Retries: %d", stats['retries'])

request_timer = stats['request_timer']
stats = getStats()["cassandra"]
log.info("Connection errors: %d", stats["connection_errors"])
log.info("Write timeouts: %d", stats["write_timeouts"])
log.info("Read timeouts: %d", stats["read_timeouts"])
log.info("Unavailables: %d", stats["unavailables"])
log.info("Other errors: %d", stats["other_errors"])
log.info("Retries: %d", stats["retries"])

request_timer = stats["request_timer"]
log.info("Request latencies:")
log.info(" min: %0.4fs", request_timer['min'])
log.info(" max: %0.4fs", request_timer['max'])
log.info(" mean: %0.4fs", request_timer['mean'])
log.info(" stddev: %0.4fs", request_timer['stddev'])
log.info(" median: %0.4fs", request_timer['median'])
log.info(" 75th: %0.4fs", request_timer['75percentile'])
log.info(" 95th: %0.4fs", request_timer['95percentile'])
log.info(" 98th: %0.4fs", request_timer['98percentile'])
log.info(" 99th: %0.4fs", request_timer['99percentile'])
log.info(" 99.9th: %0.4fs", request_timer['999percentile'])
log.info(" min: %0.4fs", request_timer["min"])
log.info(" max: %0.4fs", request_timer["max"])
log.info(" mean: %0.4fs", request_timer["mean"])
log.info(" stddev: %0.4fs", request_timer["stddev"])
log.info(" median: %0.4fs", request_timer["median"])
log.info(" 75th: %0.4fs", request_timer["75percentile"])
log.info(" 95th: %0.4fs", request_timer["95percentile"])
log.info(" 98th: %0.4fs", request_timer["98percentile"])
log.info(" 99th: %0.4fs", request_timer["99percentile"])
log.info(" 99.9th: %0.4fs", request_timer["999percentile"])


def parse_options():
parser = OptionParser()
parser.add_option('-H', '--hosts', default='127.0.0.1',
help='cassandra hosts to connect to (comma-separated list) [default: %default]')
parser.add_option('-t', '--threads', type='int', default=1,
help='number of threads [default: %default]')
parser.add_option('-n', '--num-ops', type='int', default=10000,
help='number of operations [default: %default]')
parser.add_option('--asyncore-only', action='store_true', dest='asyncore_only',
help='only benchmark with asyncore connections')
parser.add_option('--asyncio-only', action='store_true', dest='asyncio_only',
help='only benchmark with asyncio connections')
parser.add_option('--libev-only', action='store_true', dest='libev_only',
help='only benchmark with libev connections')
parser.add_option('--twisted-only', action='store_true', dest='twisted_only',
help='only benchmark with Twisted connections')
parser.add_option('-m', '--metrics', action='store_true', dest='enable_metrics',
help='enable and print metrics for operations')
parser.add_option('-l', '--log-level', default='info',
help='logging level: debug, info, warning, or error')
parser.add_option('-p', '--profile', action='store_true', dest='profile',
help='Profile the run')
parser.add_option('--protocol-version', type='int', dest='protocol_version', default=4,
help='Native protocol version to use')
parser.add_option('-c', '--num-columns', type='int', dest='num_columns', default=2,
help='Specify the number of columns for the schema')
parser.add_option('-k', '--keyspace', type='str', dest='keyspace', default=KEYSPACE,
help='Specify the keyspace name for the schema')
parser.add_option('--keep-data', action='store_true', dest='keep_data', default=False,
help='Keep the data after the benchmark')
parser.add_option('--column-type', type='str', dest='column_type', default='text',
help='Specify the column type for the schema (supported: int, text, float, uuid, timestamp)')
parser.add_option('--read', action='store_true', dest='read', default=False,
help='Read mode')

parser.add_option(
"-H",
"--hosts",
default="127.0.0.1",
help="cassandra hosts to connect to (comma-separated list) [default: %default]",
)
parser.add_option(
"-t",
"--threads",
type="int",
default=1,
help="number of threads [default: %default]",
)
parser.add_option(
"-n",
"--num-ops",
type="int",
default=10000,
help="number of operations [default: %default]",
)
parser.add_option(
"--asyncore-only",
action="store_true",
dest="asyncore_only",
help="only benchmark with asyncore connections",
)
parser.add_option(
"--asyncio-only",
action="store_true",
dest="asyncio_only",
help="only benchmark with asyncio connections",
)
parser.add_option(
"--libev-only",
action="store_true",
dest="libev_only",
help="only benchmark with libev connections",
)
parser.add_option(
"--twisted-only",
action="store_true",
dest="twisted_only",
help="only benchmark with Twisted connections",
)
parser.add_option(
"-m",
"--metrics",
action="store_true",
dest="enable_metrics",
help="enable and print metrics for operations",
)
parser.add_option(
"-l",
"--log-level",
default="info",
help="logging level: debug, info, warning, or error",
)
parser.add_option(
"-p", "--profile", action="store_true", dest="profile", help="Profile the run"
)
parser.add_option(
"--protocol-version",
type="int",
dest="protocol_version",
default=4,
help="Native protocol version to use",
)
parser.add_option(
"-c",
"--num-columns",
type="int",
dest="num_columns",
default=2,
help="Specify the number of columns for the schema",
)
parser.add_option(
"-k",
"--keyspace",
type="str",
dest="keyspace",
default=KEYSPACE,
help="Specify the keyspace name for the schema",
)
parser.add_option(
"--keep-data",
action="store_true",
dest="keep_data",
default=False,
help="Keep the data after the benchmark",
)
parser.add_option(
"--column-type",
type="str",
dest="column_type",
default="text",
help="Specify the column type for the schema (supported: int, text, float, uuid, timestamp)",
)
parser.add_option(
"--read", action="store_true", dest="read", default=False, help="Read mode"
)

options, args = parser.parse_args()

options.hosts = options.hosts.split(',')
options.hosts = options.hosts.split(",")

level = options.log_level.upper()
try:
log.setLevel(_log_levels[level])
except KeyError:
log.warning("Unknown log level specified: %s; specify one of %s", options.log_level, _log_levels.keys())
log.warning(
"Unknown log level specified: %s; specify one of %s",
options.log_level,
_log_levels.keys(),
)

if options.asyncore_only:
options.supported_reactors = [AsyncoreConnection]
Expand All @@ -283,8 +373,9 @@ def parse_options():


class BenchmarkThread(Thread):

def __init__(self, thread_num, session, query, values, num_queries, protocol_version, profile):
def __init__(
self, thread_num, session, query, values, num_queries, protocol_version, profile
):
Thread.__init__(self)
self.thread_num = thread_num
self.session = session
Expand All @@ -304,4 +395,4 @@ def run_query(self, key, **kwargs):
def finish_profile(self):
if self.profiler:
self.profiler.disable()
self.profiler.dump_stats('profile-%d' % self.thread_num)
self.profiler.dump_stats("profile-%d" % self.thread_num)
Loading
Loading