Skip to content

Commit

Permalink
reuse temporal metric storage for sync storage (#1369)
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb committed May 9, 2022
1 parent 02630e0 commit 54abc27
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 113 deletions.
21 changes: 5 additions & 16 deletions sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@
#ifndef ENABLE_METRICS_PREVIEW
# include "opentelemetry/common/key_value_iterable_view.h"
# include "opentelemetry/sdk/common/attributemap_hash.h"
# include "opentelemetry/sdk/instrumentationlibrary/instrumentation_library.h"
# include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h"
# include "opentelemetry/sdk/metrics/exemplar/reservoir.h"
# include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"
# include "opentelemetry/sdk/metrics/state/metric_collector.h"
# include "opentelemetry/sdk/metrics/state/metric_storage.h"

# include "opentelemetry/sdk/metrics/state/temporal_metric_storage.h"
# include "opentelemetry/sdk/metrics/view/attributes_processor.h"
# include "opentelemetry/sdk/metrics/view/view.h"
# include "opentelemetry/sdk/resource/resource.h"

# include <list>
# include <memory>
Expand All @@ -24,13 +22,6 @@ namespace sdk
{
namespace metrics
{

struct LastReportedMetrics
{
std::unique_ptr<AttributesHashMap> attributes_map;
opentelemetry::common::SystemTimestamp collection_ts;
};

class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
{

Expand All @@ -43,7 +34,9 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
aggregation_type_{aggregation_type},
attributes_hashmap_(new AttributesHashMap()),
attributes_processor_{attributes_processor},
exemplar_reservoir_(exemplar_reservoir)
exemplar_reservoir_(exemplar_reservoir),
temporal_metric_storage_(instrument_descriptor)

{
create_default_aggregation_ = [&]() -> std::unique_ptr<Aggregation> {
return std::move(
Expand Down Expand Up @@ -114,14 +107,10 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage

// hashmap to maintain the metrics for delta collection (i.e, collection since last Collect call)
std::unique_ptr<AttributesHashMap> attributes_hashmap_;
// unreported metrics stash for all the collectors
std::unordered_map<CollectorHandle *, std::list<std::shared_ptr<AttributesHashMap>>>
unreported_metrics_;
// last reported metrics stash for all the collectors.
std::unordered_map<CollectorHandle *, LastReportedMetrics> last_reported_metrics_;
const AttributesProcessor *attributes_processor_;
std::function<std::unique_ptr<Aggregation>()> create_default_aggregation_;
nostd::shared_ptr<ExemplarReservoir> exemplar_reservoir_;
TemporalMetricStorage temporal_metric_storage_;
};

} // namespace metrics
Expand Down
99 changes: 2 additions & 97 deletions sdk/src/metrics/state/sync_metric_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,104 +25,9 @@ bool SyncMetricStorage::Collect(CollectorHandle *collector,
// recordings
std::shared_ptr<AttributesHashMap> delta_metrics = std::move(attributes_hashmap_);
attributes_hashmap_.reset(new AttributesHashMap);
for (auto &col : collectors)
{
unreported_metrics_[col.get()].push_back(delta_metrics);
}

// Get the unreported metrics for the `collector` from `unreported metrics stash`
// since last collection, this will also cleanup the unreported metrics for `collector`
// from the stash.
auto present = unreported_metrics_.find(collector);
if (present == unreported_metrics_.end())
{
// no unreported metrics for the collector, return.
return true;
}
auto unreported_list = std::move(present->second);

// Iterate over the unreporter metrics for `collector` and store result in `merged_metrics`
std::unique_ptr<AttributesHashMap> merged_metrics(new AttributesHashMap);
for (auto &agg_hashmap : unreported_list)
{
agg_hashmap->GetAllEnteries([&merged_metrics, this](const MetricAttributes &attributes,
Aggregation &aggregation) {
auto agg = merged_metrics->Get(attributes);
if (agg)
{
merged_metrics->Set(attributes, std::move(agg->Merge(aggregation)));
}
else
{
merged_metrics->Set(
attributes,
std::move(
DefaultAggregation::CreateAggregation(instrument_descriptor_)->Merge(aggregation)));
merged_metrics->GetAllEnteries(
[](const MetricAttributes &attr, Aggregation &aggr) { return true; });
}
return true;
});
}
// Get the last reported metrics for the `collector` from `last reported metrics` stash
// - If the aggregation_temporarily for the collector is cumulative
// - Merge the last reported metrics with unreported metrics (which is in merged_metrics),
// Final result of merge would be in merged_metrics.
// - Move the final merge to the `last reported metrics` stash.
// - If the aggregation_temporarily is delta
// - Store the unreported metrics for `collector` (which is in merged_mtrics) to
// `last reported metrics` stash.

auto reported = last_reported_metrics_.find(collector);
if (reported != last_reported_metrics_.end())
{
last_collection_ts = last_reported_metrics_[collector].collection_ts;
auto last_aggr_hashmap = std::move(last_reported_metrics_[collector].attributes_map);
if (aggregation_temporarily == AggregationTemporality::kCumulative)
{
// merge current delta to previous cumulative
last_aggr_hashmap->GetAllEnteries(
[&merged_metrics, this](const MetricAttributes &attributes, Aggregation &aggregation) {
auto agg = merged_metrics->Get(attributes);
if (agg)
{
merged_metrics->Set(attributes, agg->Merge(aggregation));
}
else
{
merged_metrics->Set(attributes,
DefaultAggregation::CreateAggregation(instrument_descriptor_));
}
return true;
});
}
last_reported_metrics_[collector] =
LastReportedMetrics{std::move(merged_metrics), collection_ts};
}
else
{
merged_metrics->GetAllEnteries(
[](const MetricAttributes &attr, Aggregation &aggr) { return true; });
last_reported_metrics_.insert(
std::make_pair(collector, LastReportedMetrics{std::move(merged_metrics), collection_ts}));
}

// Generate the MetricData from the final merged_metrics, and invoke callback over it.

AttributesHashMap *result_to_export = (last_reported_metrics_[collector]).attributes_map.get();
MetricData metric_data;
metric_data.instrument_descriptor = instrument_descriptor_;
metric_data.start_ts = last_collection_ts;
metric_data.end_ts = collection_ts;
result_to_export->GetAllEnteries(
[&metric_data](const MetricAttributes &attributes, Aggregation &aggregation) {
PointDataAttributes point_data_attr;
point_data_attr.point_data = aggregation.ToPoint();
point_data_attr.attributes = attributes;
metric_data.point_data_attr_.push_back(point_data_attr);
return true;
});
return callback(metric_data);
return temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, collection_ts,
std::move(delta_metrics), callback);
}

} // namespace metrics
Expand Down

0 comments on commit 54abc27

Please sign in to comment.