Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add per group statistics in rosbag2_performance_benchmarking #1306

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,42 +77,41 @@ def __process_test(compression_selected,
compression_threads_selected,
max_bagfile_size_selected):
for storage_cfg_name, data in splitted_data.items():
cache_samples = {}
for sample in data:
cache_size_pub_groups_percentage = {}
for pub_groups in data:
# Single sample contains multiple rows
if len(sample) != len(producers_config_publishers['publisher_groups']):
if len(pub_groups) != len(producers_config_publishers['publisher_groups']):
raise RuntimeError('Invalid number of records in results detected.')

# These parameters are same for all rows in sample
# (multiple publishers in publisher group)
if sample[0]['compression'] != compression_selected:
if pub_groups[0]['compression'] != compression_selected:
continue
if int(sample[0]['compression_queue']) != compression_queue_size_selected:
if int(pub_groups[0]['compression_queue']) != compression_queue_size_selected:
continue
if int(sample[0]['compression_threads']) != compression_threads_selected:
if int(pub_groups[0]['compression_threads']) != compression_threads_selected:
continue
if int(sample[0]['max_bagfile_size']) != max_bagfile_size_selected:
if int(pub_groups[0]['max_bagfile_size']) != max_bagfile_size_selected:
continue

if sample[0]['cache_size'] not in cache_samples.keys():
cache_samples.update({sample[0]['cache_size']: []})
if pub_groups[0]['cache_size'] not in cache_size_pub_groups_percentage.keys():
cache_size_pub_groups_percentage.update({pub_groups[0]['cache_size']: []})

# TODO(piotr.jaroszek) WARNING, currently results in 'total_produced' column
# are correct (per publisher group), but 'total_recorded' is already summed
# for all the publisher groups!
sample_total_produced = 0
for row in sample:
sample_total_produced += int(row['total_produced'])
cache_samples[sample[0]['cache_size']].append(
int(sample[0]['total_recorded_count'])/sample_total_produced)
for pub_group in pub_groups:
if pub_group['cache_size'] not in cache_size_pub_groups_percentage.keys():
cache_size_pub_groups_percentage.update({pub_group['cache_size']: []})
total_produced = int(pub_group['total_produced'])
total_recorded = int(pub_group['total_recorded_count'])
cache_size_pub_groups_percentage[pub_group['cache_size']].append(
total_recorded / total_produced)

cache_recorded_percentage_stats = {
cache: {
'avg': statistics.mean(samples),
'min': min(samples),
'max': max(samples)
'avg': statistics.mean(pub_groups),
'min': min(pub_groups),
'max': max(pub_groups)
}
for cache, samples in cache_samples.items()
for cache, pub_groups in cache_size_pub_groups_percentage.items()
}
cache_data_per_storage_conf.update(
{storage_cfg_name: cache_recorded_percentage_stats}
Expand Down Expand Up @@ -144,6 +143,7 @@ def __process_test(compression_selected,
percent_recorded['min'],
percent_recorded['avg'],
percent_recorded['max']))
print('======================== end of report ========================')

[
__process_test(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,31 @@ int get_message_count_from_metadata(const std::string & uri)
return total_recorded_count;
}

size_t get_messages_recorded(
const rosbag2_storage::BagMetadata & metadata,
const PublisherGroupConfig & pub_config)
{
size_t messages_recorded = 0;
for (unsigned int i = 0; i < pub_config.count; ++i) {
const std::string topic_without_node_name =
MichaelOrlov marked this conversation as resolved.
Show resolved Hide resolved
pub_config.topic_root + "_" + std::to_string(i + 1);

for (const auto & topic_info : metadata.topics_with_message_count) {
const auto & topic_name_with_node_name = topic_info.topic_metadata.name;
if (topic_name_with_node_name.size() < topic_without_node_name.size()) {
continue;
}
if (topic_name_with_node_name.compare(
topic_name_with_node_name.size() - topic_without_node_name.size(),
topic_without_node_name.size(), topic_without_node_name) == 0)
{
messages_recorded += topic_info.message_count;
}
}
}
return messages_recorded;
}

/// Based on configuration and metadata from completed benchmark, write results
void write_benchmark_results(
const std::vector<PublisherGroupConfig> & publisher_groups_config,
Expand Down Expand Up @@ -87,7 +112,8 @@ void write_benchmark_results(
output_file << std::endl;
}

int total_recorded_count = get_message_count_from_metadata(bag_config.storage_options.uri);
rosbag2_storage::MetadataIo metadata_io;
rosbag2_storage::BagMetadata metadata = metadata_io.read_metadata(bag_config.storage_options.uri);

for (const auto & c : publisher_groups_config) {
output_file << bag_config.storage_options.storage_id << " ";
Expand All @@ -102,12 +128,10 @@ void write_benchmark_results(
output_file << bag_config.compression_queue_size << " ";
output_file << bag_config.compression_threads << " ";

// TODO(adamdbrw) - this is a result for the entire group,
// but we don't yet have per-group stats.
// For now, these need to be summed for each group
auto total_messages_produced = c.producer_config.max_count * c.count;
size_t total_messages_recorded = get_messages_recorded(metadata, c);
output_file << total_messages_produced << " ";
output_file << total_recorded_count << " ";
output_file << total_messages_recorded << " ";
output_file << std::fixed; // Fix the number of decimal digits
output_file << std::setprecision(2); // to 2
output_file << std::setw(4) << producer_cpu_usage << " ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,42 +86,41 @@ def __process_test(compression_selected,
compression_threads_selected,
max_bagfile_size_selected):
for storage_cfg_name, data in splitted_data.items():
cache_samples = {}
for sample in data:
cache_size_pub_groups_percentage = {}
for pub_groups in data:
# Single sample contains multiple rows
if len(sample) != len(producers_config_publishers['publisher_groups']):
if len(pub_groups) != len(producers_config_publishers['publisher_groups']):
raise RuntimeError('Invalid number of records in results detected.')

# These parameters are same for all rows in sample
# (multiple publishers in publisher group)
if sample[0]['compression'] != compression_selected:
if pub_groups[0]['compression'] != compression_selected:
continue
if int(sample[0]['compression_queue']) != compression_queue_size_selected:
if int(pub_groups[0]['compression_queue']) != compression_queue_size_selected:
continue
if int(sample[0]['compression_threads']) != compression_threads_selected:
if int(pub_groups[0]['compression_threads']) != compression_threads_selected:
continue
if int(sample[0]['max_bagfile_size']) != max_bagfile_size_selected:
if int(pub_groups[0]['max_bagfile_size']) != max_bagfile_size_selected:
continue

if sample[0]['cache_size'] not in cache_samples.keys():
cache_samples.update({sample[0]['cache_size']: []})
if pub_groups[0]['cache_size'] not in cache_size_pub_groups_percentage.keys():
cache_size_pub_groups_percentage.update({pub_groups[0]['cache_size']: []})

# TODO(piotr.jaroszek) WARNING, currently results in 'total_produced' column
# are correct (per publisher group), but 'total_recorded' is already summed
# for all the publisher groups!
sample_total_produced = 0
for row in sample:
sample_total_produced += int(row['total_produced'])
cache_samples[sample[0]['cache_size']].append(
int(sample[0]['total_recorded_count']) / sample_total_produced)
for pub_group in pub_groups:
if pub_group['cache_size'] not in cache_size_pub_groups_percentage.keys():
cache_size_pub_groups_percentage.update({pub_group['cache_size']: []})
total_produced = int(pub_group['total_produced'])
total_recorded = int(pub_group['total_recorded_count'])
cache_size_pub_groups_percentage[pub_group['cache_size']].append(
total_recorded / total_produced)

cache_recorded_percentage_stats = {
cache: {
'avg': statistics.mean(samples),
'min': min(samples),
'max': max(samples)
'avg': statistics.mean(pub_groups),
'min': min(pub_groups),
'max': max(pub_groups)
}
for cache, samples in cache_samples.items()
for cache, pub_groups in cache_size_pub_groups_percentage.items()
}
cache_data_per_storage_conf.update(
{storage_cfg_name: cache_recorded_percentage_stats}
Expand Down Expand Up @@ -153,6 +152,7 @@ def __process_test(compression_selected,
percent_recorded['min'],
percent_recorded['avg'],
percent_recorded['max']))
print('======================== end of report ========================')
return result

results = [
Expand Down Expand Up @@ -302,19 +302,18 @@ def load_results(self):
"""Process the benchmark results and generates a report."""
self.find_benchmark_directories()
for (benchmark, producer), result in self.results.items():
print('read_results')
print('Reading results for ' + benchmark + ' ' + producer)
if not result.directory:
print('No results found for ' + benchmark + ' ' + producer)
else:
print('Generate report for: ' + result.directory)
print('Generating report for: ' + result.directory)
report = Report(result.directory)
result.reports = report.generate()

def check_expectations(self):
"""Check the generated report against the expected results and stores the result."""
self.load_results()

print(self.results.items())
for (benchmark, producer), result in self.results.items():
if not result.reports:
print('No results found for ' + benchmark + ' ' + producer)
Expand Down
Loading