In [None]:
# Load configuration
%run config.ipynb

In [1]:
# Benchmark configuration
runs_per_config = 10

data_sizes = ['1GB']#, '10GB']

jul_logging = {
    'java.util.logging.config.file': f"{BASE_DIR}/logging.properties"
}

spark_props = {
    'spark.ui.enabled': 'false',
    'spark.ui.showConsoleProgress': 'false'
}

java_configs = {
    #'default': ['-XX:+CrashOnOutOfMemoryError'],
    # '-6g-g1': ['-XX:+UseG1GC', '-Xms6G', '-Xmx6G', '-XX:+CrashOnOutOfMemoryError'],
    # '-6g-g1-cops': ['-XX:+UseG1GC', '-Xms6G', '-Xmx6G', '-XX:+UseCompressedOops', '-XX:+CrashOnOutOfMemoryError'],
    # '-6g-g1-noBiasedLocking': ['-XX:+UseG1GC', '-Xms6G', '-Xmx6G', '-XX:-UseBiasedLocking', '-XX:+CrashOnOutOfMemoryError'],
    '-6g-g1-cops-noBiasedLocking': ['-XX:+UseG1GC', '-Xms6G', '-Xmx6G', '-XX:+UseCompressedOops', '-XX:-UseBiasedLocking', '-XX:+CrashOnOutOfMemoryError'],
    # '-4g-g1': ['-XX:+UseG1GC', '-Xms4G', '-Xmx4G', '-XX:+CrashOnOutOfMemoryError'],
    # '-4g-g1-cops': ['-XX:+UseG1GC', '-Xms4G', '-Xmx4G', '-XX:+UseCompressedOops', '-XX:+CrashOnOutOfMemoryError'],
    # '-4g-g1-noBiasedLocking': ['-XX:+UseG1GC', '-Xms4G', '-Xmx4G', '-XX:-UseBiasedLocking', '-XX:+CrashOnOutOfMemoryError'],
    '-4g-g1-cops-noBiasedLocking': ['-XX:+UseG1GC', '-Xms4G', '-Xmx4G', '-XX:+UseCompressedOops', '-XX:-UseBiasedLocking', '-XX:+CrashOnOutOfMemoryError'],
    # '-2g-g1': ['-XX:+UseG1GC', '-Xms2G', '-Xmx2G', '-XX:+CrashOnOutOfMemoryError'],
    # '-2g-g1-cops': ['-XX:+UseG1GC', '-Xms2G', '-Xmx2G', '-XX:+UseCompressedOops', '-XX:+CrashOnOutOfMemoryError'],
    # '-2g-g1-noBiasedLocking': ['-XX:+UseG1GC', '-Xms2G', '-Xmx2G', '-XX:-UseBiasedLocking', '-XX:+CrashOnOutOfMemoryError'],
    '-2g-g1-cops-noBiasedLocking': ['-XX:+UseG1GC', '-Xms2G', '-Xmx2G', '-XX:+UseCompressedOops', '-XX:-UseBiasedLocking', '-XX:+CrashOnOutOfMemoryError']
}

benchmarks = {
    # Available from  https://github.com/mosche/beam/tree/reactor
    'reactor': {
        'path': 'reactor',
        'class': 'ReactorRunner',
        'props': jul_logging,
        'configs': {
            # previous revision: 1b68405c7cb7752fbd637615a5ed7412957dee1d
            # 'sync_fused_sdf': {'SDFMode':'SYNC', 'fuseSDF': 'true'},
            # 'async_fused_sdf': {'SDFMode':'ASYNC', 'fuseSDF': 'true'},
            # 'async_bp_fused_sdf': {'SDFMode':'ASYNC_WITH_BACKPRESSURE', 'fuseSDF': 'true'},
            # 'async_bp_unfused_sdf': {'SDFMode':'ASYNC_WITH_BACKPRESSURE', 'fuseSDF': 'false'},

            # previous revision: 2cbf83097476e2a0dba1f269908b2b564f79201d
            # 'sdf_subscriber_threads': {'SDFMode':'ASYNC'},
             
            # 'sdf_sync': {'SDFMode':'SYNC'},
            'sdf_async': {'SDFMode':'ASYNC'},
        }
    },
    'spark': {
        'path': 'spark:3',
        'class': 'SparkRunner',
        'props': spark_props,
        'configs': {
            'local1': {'sparkMaster':'local[1]'},
            'local4': {'sparkMaster':'local[4]'}
        }
    },
    'sparkDS': {
        'path': 'spark:3',
        'class': 'SparkStructuredStreamingRunner',
        'props': spark_props,
        'configs':  {
           'local1': {'sparkMaster':'local[1]'},
           'local4': {'sparkMaster':'local[4]'}
        }
    },
    'flink': {
        'path': 'flink:1.16',
        'class': 'FlinkRunner',
        'props': jul_logging,
        'configs':  {
           'local1': {'flinkMaster': '[local]', 'fasterCopy': 'true', 'parallelism': '1', 'maxParallelism': '1'},
           'local4': {'flinkMaster': '[local]', 'fasterCopy': 'true', 'parallelism': '4', 'maxParallelism': '4'},
           # Fails with OOM errors
           # 'collection1': {'flinkMaster': '[collection]', 'fasterCopy': 'true', 'parallelism': '1', 'maxParallelism': '1'},
           # 'collection4': {'flinkMaster': '[collection]', 'fasterCopy': 'true', 'parallelism': '4', 'maxParallelism': '4'},
        }
    },
    # Fails with OOM errors
    # 'direct': {
    #     'path': 'direct-java',
    #     'class': 'DirectRunner',
    #     'props': jul_logging,
    #     'configs':  {
    #     #    'default1': {'targetParallelism': '1'},
    #     #    'default4': {'targetParallelism': '4'},
    #        'minimal1': {'targetParallelism': '1', 'enforceImmutability': 'false', 'enforceEncodability': 'false'},
    #     }
    # }
}

def tpcds_options(runner, data_size, path):
    return {
      'queries': '3,7,10,25,26,29,35,40,42,43,52,55,69,79,83,84,87,93,96',
      'tpcParallel': '1',
      'sourceType': 'PARQUET',
      'dataDirectory': TPCDS_DATA,
      'dataSize': data_size,
      'resultsDirectory': path,
      'runner': runner
    }

In [5]:
import os

def prepare_classpath(runner_path):
    !rm -Rf {CLASSPATH_DIR}
    !cd {BEAM_DIR} && gradle :sdks:java:testing:tpcds:cpcopy -q --console=plain -Ptarget={CLASSPATH_DIR} -Ptpcds.runner=:runners:{runner_path}

    classpath = ''
    for jar in os.listdir(CLASSPATH_DIR):
        classpath += f':{os.path.join(CLASSPATH_DIR, jar)}'
    return classpath

In [None]:
import subprocess

def build_options(config, runner, data_size, path):
    tpcds = tpcds_options(runner, data_size, path)
    return [f'--{key}={value}' for key, value in {**tpcds, **config}.items()]

def build_props(props):
    return [f'-D{key}={value}' for key, value in props.items()]

def run_tpcds(runner, java_opts, config, data_size, path):
    os.makedirs(path, exist_ok=True)
    props = build_props(runner['props'])
    options = build_options(config, runner['class'], data_size, path)
    command = ['time', '-l', '-o', f"{path}/timings.txt", 'java', '-cp', classpath] + java_opts + props + ['org.apache.beam.sdk.tpcds.BeamTpcds'] + options
    with open(f"{path}/queries.txt", 'w') as stdout:
        with open(f"{path}/stderr.txt", 'w') as stderr:
            subprocess.run(command, stderr=stderr, stdout=stdout)

for name, runner in benchmarks.items():
    classpath = prepare_classpath(runner['path'])
    for data_size in data_sizes:
        for config_id, config in runner['configs'].items():
            for suffix, java_opts in java_configs.items():
                for run_id in range(runs_per_config):
                    path = f"{BENCHMARKS_DIR}/{data_size}/{name}/{config_id}{suffix}/{run_id}"
                    if os.path.exists(path):
                        print(f"Skipping iteration {run_id} with {config} ({config_id}{suffix}) on runner {name} [{data_size}]")
                        continue
                    print(f"Running iteration {run_id} with {config} ({config_id}{suffix}) on runner {name} [{data_size}]")
                    run_tpcds(runner, java_opts, config, data_size, path)