In [None]:
import os
import pickle

import pandas as pd
import numpy as np

from collections import OrderedDict     

import matplotlib.pyplot as plt

In [None]:
%matplotlib inline

In [None]:
plt.rcParams.update({'font.size': 14})

## Collected data

In [None]:
%env MLPERF_HPC_ROOT=/home/lukasd/src/mlperf

#%env MLPERF_COSMO_DATA_BENCHMARK_TIMESTAMP=data_benchmark/2020-08-05_23-48-09_daint101/
#%env MLPERF_COSMO_DATA_BENCHMARK_TIMESTAMP=data_benchmark/2020-08-05_00-21-54_daint101/

%env MLPERF_COSMO_DATA_BENCHMARK_TIMESTAMP=data_benchmark/2020-08-06_15-25-54_daint101

In [None]:
%%bash

echo "## Data benchmark experiments ##"
set -x
tree ${MLPERF_HPC_ROOT}/cosmoflow-benchmark/results/${MLPERF_COSMO_DATA_BENCHMARK_TIMESTAMP}
set +x

In [None]:
def load_config(result_dir):
    config_file = os.path.join(result_dir, 'config.pkl')
    with open(config_file, 'rb') as f:
        return pickle.load(f)

def load_result(result_dir):
    history_file = os.path.join(result_dir, 'data_benchmark_history.csv')
    history = pd.read_csv(history_file)
    for col in ['local_times', 'global_times']:
        history[col] = history[col].apply(lambda x: np.fromstring(x.strip('[]'), dtype=float, sep=' '))
    return history

def get_num_samples(config, ranks):
    dconf = config['data']
    n = dconf['n_train'] + dconf['n_valid']
    if not dconf['shard']:
        n *= ranks
    return n


### Data from a single data benchmark run

In [None]:
result_dir = os.path.expandvars('${MLPERF_HPC_ROOT}/cosmoflow-benchmark/results/${MLPERF_COSMO_DATA_BENCHMARK_TIMESTAMP}/gpu-n%i-inter2-intra12') % 2
load_config(result_dir)

In [None]:
load_result(result_dir)

## Evaluation utilities

In [None]:
# The following functions filter config records for relevant entries 
# (rewrite them to analyze performance over different config variables)
def get_rank_samples(ranks, config):
    return OrderedDict(ranks=ranks,
                       samples=get_num_samples(config, ranks))    


def get_rank_samples_device_props(ranks, config):
    return OrderedDict(ranks=ranks,
                       samples=get_num_samples(config, ranks),
                       **config['device'])    


# compute per-run performance metrics on selection of epochs
def compute_epoch_selection_summary(config_param, benchmark_history):
    if isinstance(benchmark_history, pd.Series):
        local_times = benchmark_history['local_times']
        global_times = benchmark_history['global_times']
    else:
        local_times = np.vstack(benchmark_history['local_times'].to_numpy())
        global_times = np.vstack(benchmark_history['global_times'].to_numpy())

    n_ranks =  config_param['ranks']
    n_samples = config_param['samples']
    
    local_throughputs = n_samples / (n_ranks*local_times)
    global_throughputs = n_samples / global_times

    return dict(total_time_mean=np.mean(global_times),
                total_time_std=np.std(global_times),
                local_throughput_mean=np.mean(local_throughputs),
                local_throughput_std=np.std(local_throughputs),
                total_throughput_mean=np.mean(global_throughputs),
                total_throughput_std=np.std(global_throughputs))

# compute per-run performance metrics
def compute_data_benchmark_summary(config_param, benchmark_history):

    initial_epoch_stats = compute_epoch_selection_summary(config_param, 
                                                          benchmark_history.loc[0])
    if benchmark_history.shape[0] > 1:
        later_epoch_stats = compute_epoch_selection_summary(config_param,
                                                            benchmark_history[benchmark_history['epoch'] > 0])
    else:
        later_epoch_stats = {}
    return initial_epoch_stats, later_epoch_stats
    
    
# Compute ideal vs. effective weak scaling relative to single node
def compute_scaling(stats):
    stats['local_ideal'] = stats.ranks.apply(lambda r: 1.) * stats['local_throughput_mean'].loc[0]
    stats['local_eff']   = stats.apply(lambda result: result['local_throughput_mean']/result['local_ideal'], axis=1)

    stats['total_ideal'] = stats.ranks * stats['total_throughput_mean'].loc[0]
    stats['total_eff']   = stats.apply(lambda result: result['total_throughput_mean']/result['total_ideal'], axis=1)    

    return stats
    


def get_experimental_results(paths, ranks, extract_config_params, extract_results, compute_extra_metrics=None):
    """
    Loops over ranks-paths-pairs and computes throughput metrics.
    Configuration parameters and results to keep can be specified with `extract_config_params`, `extract_results`.
    Extra metrics on first and higher epochs can be added to DataFrames in `compute_extra_metrics`.
    Returns results in a dataframe.
    """    
    configs, results = [], []
    for (path,r) in zip(paths,ranks):
        result_dir = path
        configs.append(load_config(result_dir))
        results.append(load_result(result_dir))

    config_params = [extract_config_params(r,c) for (r,c) in zip(ranks, configs)]

    for config in config_params:
        if config.keys() != config_params[0].keys():
            raise RuntimeError('Extracted config keys must be the same for all experiments.')
    config_keys = list(config_params[0].keys())
            
    init_summaries, later_summaries = \
        zip(*[compute_data_benchmark_summary(conf,res) for (conf,res) in zip(config_params, results)])

    results = [extract_results(r) for r in results]
    
    # Merge config and summary (result for double checking)
    init_stats  = pd.DataFrame.from_records([{**conf, **res, **summ} for (conf, res, summ) in zip(config_params, results, init_summaries )])
    later_stats = pd.DataFrame.from_records([{**conf, **res, **summ} for (conf, res, summ) in zip(config_params, results, later_summaries)])
    
    if compute_extra_metrics is not None:
        init_stats  = compute_extra_metrics(init_stats)
        later_stats = compute_extra_metrics(later_stats)
    
#     stats = init_stats.merge(later_stats, on=['ranks','samples'], suffixes=('_init','_later'))
    stats = init_stats.merge(later_stats, on=config_keys, suffixes=('_init','_later'))

    return stats

# Ignore results (except for computed throughput metrics)
def discard_results(result):
    return {}

# Standard weak scaling metrics
def get_scaling_results(path_pattern, ranks):
    paths = [path_pattern % r for r in ranks]
    return get_experimental_results(paths, ranks, get_rank_samples, discard_results, compute_scaling)

# Extract device config parameters in addition to above metrics
def get_device_prop_results(paths, ranks):
    return get_experimental_results(paths, ranks, get_rank_samples_device_props, discard_results)


# Data loading benchmark

Weak scaling of data loading with 256 training samples & 64 validation samples per rank (one rank per node), up to 1024 nodes. 

In [None]:
results = get_scaling_results(
    os.path.expandvars('${MLPERF_HPC_ROOT}/cosmoflow-benchmark/results/${MLPERF_COSMO_DATA_BENCHMARK_TIMESTAMP}/gpu-n%i-inter2-intra12'),
    ranks=np.array([2**i for i in range(11)]))
results[['ranks', 'samples', 'total_time_mean_init',  'local_throughput_mean_init',  'total_throughput_mean_init',  'local_ideal_init',  'local_eff_init',  'total_ideal_init',  'total_eff_init']] 
results[['ranks', 'samples', 'total_time_mean_later', 'local_throughput_mean_later', 'total_throughput_mean_later', 'local_ideal_later', 'local_eff_later', 'total_ideal_later', 'total_eff_later']]

In [None]:
results[['ranks', 'samples', 'total_time_mean_init',  'local_throughput_mean_init',  'total_throughput_mean_init',  'local_ideal_init',  'local_eff_init',  'total_ideal_init',  'total_eff_init']] 

In [None]:
results[['ranks', 'samples', 'total_time_mean_later', 'local_throughput_mean_later', 'total_throughput_mean_later', 'local_ideal_later', 'local_eff_later', 'total_ideal_later', 'total_eff_later']]

In [None]:
fig, (ax0, ax1) = plt.subplots(nrows=2, sharex=True, figsize=(8,8),
                               gridspec_kw=dict(height_ratios=[.8, .2], hspace=0))

ax0.set_title('Initial data loading (first epoch)')
ax0.plot(results.ranks, results.local_throughput_mean_init, 'o-',  ms=8, label='Per-rank throughput', color='tab:blue')
ax0.fill_between(results.ranks, results.local_throughput_mean_init - results.local_throughput_std_init, results.local_throughput_mean_init + results.local_throughput_std_init, alpha=0.2, color='tab:blue')
ax0.plot(results.ranks, results.total_throughput_mean_init, '^-', ms=8, label='Total throughput', color='tab:orange')
ax0.fill_between(results.ranks, results.total_throughput_mean_init - results.total_throughput_std_init, results.total_throughput_mean_init + results.total_throughput_std_init, alpha=0.2, color='tab:orange')
ax0.plot(results.ranks, results.local_ideal_init, '--',  label='Per-rank ideal', color='tab:blue')
ax0.plot(results.ranks, results.total_ideal_init, '--', label='Total ideal', color='tab:orange')
ax0.set_ylabel('Data loading throughput [samples/s]')
ax0.set_yscale('log')
ax0.legend(loc=0)
ax0.grid()

# Scaling efficiency
ax1.plot(results.ranks, results.local_eff_init, 'o-', ms=8, color='tab:blue')
ax1.plot(results.ranks, results.total_eff_init, '^-', ms=8, color='tab:orange')
ax1.set_xlabel('Number of workers')
ax1.set_ylabel('Efficiency')
ax1.set_ylim(bottom=0.4)
ax1.yaxis.set_major_locator(plt.MultipleLocator(0.1))
ax1.set_xscale('log')
ax1.set_xticks(results.ranks)
ax1.xaxis.set_major_formatter(plt.ScalarFormatter())
ax1.grid()

# Customize y-axis
throughput_ticks = np.array([(1.*scale, 3.*scale) for scale in np.logspace(0,4,5)]).flatten()[1:-1] #[100, 300, 1000, 3000, 10000, 30000, 100000]
ax0.set_yticks(throughput_ticks)
ax0.yaxis.set_major_formatter(plt.ScalarFormatter())

ax1.set_yticks(np.linspace(0.5, 1., 6))
ax1.yaxis.set_major_formatter(plt.ScalarFormatter())

plt.tight_layout()

In [None]:
fig, (ax0, ax1) = plt.subplots(nrows=2, sharex=True, figsize=(8,8),
                               gridspec_kw=dict(height_ratios=[.8, .2], hspace=0))

ax0.set_title('Repeated data loading (later epochs)')
ax0.plot(results.ranks, results.local_throughput_mean_later, 'o-',  ms=8, label='Per-rank throughput', color='tab:red')
ax0.fill_between(results.ranks, results.local_throughput_mean_later - results.local_throughput_std_later, results.local_throughput_mean_later + results.local_throughput_std_later, alpha=0.2, color='tab:red')
ax0.plot(results.ranks, results.total_throughput_mean_later, '^-', ms=8, label='Total throughput', color='tab:cyan')
ax0.fill_between(results.ranks, results.total_throughput_mean_later - results.total_throughput_std_later, results.total_throughput_mean_later + results.total_throughput_std_later, alpha=0.2, color='tab:cyan')
ax0.plot(results.ranks, results.local_ideal_later, '--',  label='Local ideal', color='tab:red')
ax0.plot(results.ranks, results.total_ideal_later, '--', label='Total ideal', color='tab:cyan')
ax0.set_ylabel('Data loading throughput [samples/s]')
ax0.set_yscale('log')
ax0.legend(loc=0)
ax0.grid()

# Scaling efficiency
ax1.plot(results.ranks, results.local_eff_later, 'o-', ms=8, color='tab:red')
ax1.plot(results.ranks, results.total_eff_later, '^-', ms=8, color='tab:cyan')
ax1.set_xlabel('Number of workers')
ax1.set_ylabel('Efficiency')
ax1.set_ylim(bottom=0.4)
ax1.yaxis.set_major_locator(plt.MultipleLocator(0.1))
ax1.set_xscale('log')
ax1.set_xticks(results.ranks)
ax1.xaxis.set_major_formatter(plt.ScalarFormatter())
ax1.grid()

# Customize y-axis
throughput_ticks = np.array([(1.*scale, 3.*scale) for scale in np.logspace(2,4,3)]).flatten() #[100, 300, 1000, 3000, 10000, 30000, 100000]
ax0.set_yticks(throughput_ticks)
ax0.yaxis.set_major_formatter(plt.ScalarFormatter())

ax1.set_yticks(np.linspace(0.5, 1., 6))
ax1.yaxis.set_major_formatter(plt.ScalarFormatter())

plt.tight_layout()

## Discussion

For 256 training and 64 validation samples per node and batch size 4, we see near optimal weak scaling of throughput up to 64 nodes in the first epoch, from where the efficiency decays to 50 % at 1024. The locally measured throughput (based on local epoch time) follows closely the globally measured throughput (overall epoch time) so that the performance can be associated with the distributed filesystem. This is also consistent with the reduction of the local throughput. The loss in global throughput cannot be due to the global synchronization mechanism used to define the global epoch time (then the gap local-global would be bigger) and neither other data transfer components as the later epochs show much higher throughput.

For higher epochs, we observe a near optimal local epoch time as the data is now cached in memory on each node. The reduced global efficiency is due to the overhead that Horovod adds for synchronizing to define the global epoch end. This should disappear when running the benchmark with more data per node (longer epoch times).

For both, the initial and the higher epochs we observe a stable duration (low standard deviation), which is reflected in the displayed throughput values.

This measurement was conducted at local batch size 4, however, submission candidates for Cosmoflow often use a value of 1 or 2 for this, which may lead to different results. 

Comparison to weak scaling of training results (an actual SGD-step adds extra work both per-node - local gradient computation - and globally - hovorod.allreduce() of local gradients). The number of samples do not agree due to different number of validation samples chosen (64 vs. 256). Nevertheless, from the data benchmark results of the initial epoch and the weak scaling of training above (only higher epochs displayed) we can conclude that if the data set does not fit in memory the training performance of a Cosmoflow neural network will be reduced by the filesystem bottleneck by a factor of 2x and even more above ~600 samples/s (64 nodes in data benchmark, ~> 32 nodes in the training benchmark, where the dataset is held in memory). This may be both due to latency effects (low node count) and throughput limits (high node count) of filesystem operations. To fit the full 5.1 TB in memory on 64 GB RAM nodes, at least 80 GPU nodes are needed, but to also train Cosmoflow, the number of GPU nodes (memory) required might actually be significantly higher.

# Optimizing device parameters (thread pools)

In the following, we run the data benchmark on single nodes with varying values of `inter_op_parallelism_threads` and `intra_op_parallelism_threads` (see https://github.com/tensorflow/tensorflow/blob/b21bc388a142c2c15a57af59f9d57ca3413f0c07/tensorflow/core/protobuf/config.proto#L379).

In [None]:
#%env MLPERF_COSMO_DATA_BENCHMARK_DEVICE_PROP_TIMESTAMP=data_benchmark/2020-08-05_11-31-33_daint101/
%env MLPERF_COSMO_DATA_BENCHMARK_DEVICE_PROP_TIMESTAMP=data_benchmark/2020-08-05_19-54-10_daint101/

path_pattern = os.path.expandvars('${MLPERF_HPC_ROOT}/cosmoflow-benchmark/results/${MLPERF_COSMO_DATA_BENCHMARK_DEVICE_PROP_TIMESTAMP}/gpu-n%i-inter%i-intra%i')

ranks_range = [1]
inter_threads = [0, 1, 2, 3, 4]
intra_threads = [0, 6, 12, 24, 36, 48]

paths = []
ranks = []

for rank in ranks_range:
    for inter in inter_threads:
        for intra in intra_threads:
            paths.append(path_pattern % (rank, inter, intra))
            ranks.append(rank)

results = get_device_prop_results(
     paths,ranks)
results

In [None]:
# 2D-heat maps of single-node throughput over inter_threads/intra_threads-domain
fig, (ax0, ax1) = plt.subplots(ncols=2, sharex=True, figsize=(14,7))

ax0.set_title('first epoch')
image = results[['inter_threads','intra_threads','total_throughput_mean_init']].to_numpy().reshape((len(inter_threads), len(intra_threads), 3))[:,:,2]
pos = ax0.contourf( intra_threads, inter_threads, image,cmap='hot')
ax0.set_ylabel('inter_op_threads')
ax0.set_yticks(inter_threads)
ax0.set_xlabel('intra_op_threads')
ax0.set_xticks(intra_threads)
ax0.grid()
fig.colorbar(pos, ax=ax0)

ax1.set_title('later epochs')
image = results[['inter_threads','intra_threads','total_throughput_mean_later']].to_numpy().reshape((len(inter_threads), len(intra_threads), 3))[:,:,2]
pos = ax1.contourf( intra_threads, inter_threads, image,cmap='hot')
ax1.set_ylabel('inter_op_threads')
ax1.set_yticks(inter_threads)
ax1.set_xlabel('intra_op_threads')
ax1.set_xticks(intra_threads)
ax1.grid()
fig.colorbar(pos, ax=ax1)

fig.suptitle('single-node throughput in data loading benchmark [samples/s]')

plt.tight_layout()

# Group-by inter_threads-/intra_threads-line plots
def group_by_plot(ax, group_by_key, x_var, y_var, title=None, x_label=None, y_label=None):
    for key, grp in results.groupby([group_by_key], sort=False):
        ax.plot(grp[x_var], grp[y_var], '^-',  ms=8, label=key)
        
    grouped_by_x = results[[x_var,y_var]].groupby([x_var], sort=False)[y_var]
    grouped_by_x_mean = grouped_by_x.mean()
    grouped_by_x_std = grouped_by_x.std()
    ax.plot(grouped_by_x_mean.reset_index()[x_var], grouped_by_x_mean, '--',  ms=8, label='mean', color='black')
    ax.fill_between(grouped_by_x_std.reset_index()[x_var], grouped_by_x_mean - grouped_by_x_std, grouped_by_x_mean + grouped_by_x_std, alpha=0.2, color='black', label='+/-std')

    
    ax.set_xticks(grp[x_var])
    ax.set_xlabel(x_var if x_label is None else x_label)
    ax.set_ylabel(y_var if y_label is None else y_label)

    ax.legend(title=group_by_key, loc='center right', bbox_to_anchor=(1.4, 0.5))
    ax.grid()
    if title is not None:
        ax.set_title(title)

        
fig, axs = plt.subplots(nrows=1, ncols=2, sharey=True, figsize=(16,4))

fig.suptitle('initial data loading (first epoch)')

group_by_plot(axs[0], 'intra_threads', 'inter_threads', 'total_throughput_mean_init', y_label='throughput [samples/s]')
group_by_plot(axs[1], 'inter_threads', 'intra_threads', 'total_throughput_mean_init', y_label='throughput [samples/s]')

fig.tight_layout()

fig, axs = plt.subplots(nrows=1, ncols=2, sharey=True, figsize=(16,4))

fig.suptitle('repeated data loading (later epochs)')

group_by_plot(axs[0], 'intra_threads', 'inter_threads', 'total_throughput_mean_later', y_label='throughput [samples/s]')
group_by_plot(axs[1], 'inter_threads', 'intra_threads', 'total_throughput_mean_later', y_label='throughput [samples/s]')

fig.tight_layout()
plt.show()

## Discussion

(CPU specification see below)

- `inter_threads` (left column): Setting this to 1 (number of sockets) generally leads to worse performance than leaving it at 0. For `intra_threads` in {12, 24}, the best performance is obtained at `inter_threads` = 0, although, higher values should be tested.
- `intra_threads` (right column): The first epoch shows that `intra_threads` should be set to no more than the max. number of running threads (24), whereas the repeated data loading measurements demonstrate that also should not be smaller than the core number (12) on the machine.

The heatmaps suggest that the configurations (12,0) and (24,2) (or a mix of the two components) may yield optimal performance. This measurement was done with exclusive access to the data during for each benchmark process, but should be repeated with the actual model training. The same experiment could also be repeated on the multi-core partition if one is interested in the pure I/O-performance.

This topic has also been discussed [here](https://groups.google.com/a/tensorflow.org/d/msg/discuss/UQx7G9Gs7tI/ZYDcbEPkBQAJ).