Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 42 additions & 4 deletions api/metrics/+opentelemetry/+metrics/AsynchronousInstrument.m
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
Callbacks % Callback function, called at each data export
end

properties (Constant, Hidden)
DefaultTimeout = seconds(30)
end

properties (Access=private)
Proxy % Proxy object to interface C++ code
end
Expand All @@ -29,16 +33,42 @@
end

methods
function addCallback(obj, callback)
function addCallback(obj, callback, optionnames, optionvalues)
% ADDCALLBACK Add a callback function
% ADDCALLBACK(INST, CALLBACK) adds a callback function to
% collect metrics at every export. CALLBACK is specified as a
% collect metrics at every export. CALLBACK is specified as a
% function handle, and must accept no input and return one
% output of type opentelemetry.metrics.ObservableResult.
%
% ADDCALLBACK(INST, CALLBACK, "Timeout", TIMEOUT)
% also specifies the maximum time before callback is timed
% out and its results not get recorded. TIMEOUT must be a
% positive duration scalar.
%
% See also REMOVECALLBACK, OPENTELEMETRY.METRICS.OBSERVABLERESULT
arguments
obj
callback
end
arguments (Repeating)
optionnames
optionvalues
end

if isa(callback, "function_handle")
obj.Proxy.addCallback(callback);
% parse name-value pairs
validnames = "Timeout";
timeout = obj.DefaultTimeout;
for i = 1:length(optionnames)
try
validatestring(optionnames{i}, validnames);
catch
continue
end
timeout = optionvalues{i};
end
timeout = obj.mustBeScalarPositiveDurationTimeout(timeout);
obj.Proxy.addCallback(callback, milliseconds(timeout));
% append to Callbacks property
if isempty(obj.Callbacks)
obj.Callbacks = callback;
Expand All @@ -47,7 +77,7 @@ function addCallback(obj, callback)
else
obj.Callbacks = [obj.Callbacks, {callback}];
end
end
end
end

function removeCallback(obj, callback)
Expand Down Expand Up @@ -78,4 +108,12 @@ function removeCallback(obj, callback)
end
end
end

methods (Static)
function timeout = mustBeScalarPositiveDurationTimeout(timeout)
if ~(isscalar(timeout) && isa(timeout, "duration") && timeout > 0)
timeout = opentelemetry.metrics.AsynchronousInstrument.DefaultTimeout;
end
end
end
end
61 changes: 43 additions & 18 deletions api/metrics/+opentelemetry/+metrics/Meter.m
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@
histogram = opentelemetry.metrics.Histogram(HistogramProxy, name, description, unit);
end

function obscounter = createObservableCounter(obj, callback, name, description, unit)
function obscounter = createObservableCounter(obj, callback, name, ...
description, unit, timeout)
% CREATEOBSERVABLECOUNTER Create an observable counter
% C = CREATEOBSERVABLECOUNTER(M, CALLBACK, NAME) creates an
% observable counter with the specified callback function
Expand All @@ -117,9 +118,14 @@
% output of type opentelemetry.metrics.ObservableResult.
% The counter's value can only increase but not decrease.
%
% C = CREATEOBSERVABLECOUNTER(M, CALLBACK NAME, DESCRIPTION, UNIT)
% C = CREATEOBSERVABLECOUNTER(M, CALLBACK, NAME, DESCRIPTION, UNIT)
% also specifies a description and a unit.
%
% C = CREATEOBSERVABLECOUNTER(M, CALLBACK, NAME, DESCRIPTION, UNIT, TIMEOUT)
% also specifies the maximum time before callback is timed
% out and its results not get recorded. TIMEOUT must be a
% duration.
%
% See also OPENTELEMETRY.METRICS.OBSERVABLERESULT,
% CREATEOBSERVABLEUPDOWNCOUNTER, CREATEOBSERVABLEGAUGE, CREATECOUNTER
arguments
Expand All @@ -128,17 +134,20 @@
name
description = ""
unit = ""
timeout = opentelemetry.metrics.ObservableCounter.DefaultTimeout
end

[callback, name, description, unit] = processAsynchronousInputs(...
callback, name, description, unit);
id = obj.Proxy.createObservableCounter(name, description, unit, callback);
[callback, name, description, unit, timeout] = processAsynchronousInputs(...
callback, name, description, unit, timeout);
id = obj.Proxy.createObservableCounter(name, description, unit, ...
callback, milliseconds(timeout));
ObservableCounterproxy = libmexclass.proxy.Proxy("Name", ...
"libmexclass.opentelemetry.ObservableCounterProxy", "ID", id);
obscounter = opentelemetry.metrics.ObservableCounter(ObservableCounterproxy, name, description, unit, callback);
end

function obsudcounter = createObservableUpDownCounter(obj, callback, name, description, unit)
function obsudcounter = createObservableUpDownCounter(obj, callback, ...
name, description, unit, timeout)
% CREATEOBSERVABLEUPDOWNCOUNTER Create an observable UpDownCounter
% C = CREATEOBSERVABLEUPDOWNCOUNTER(M, CALLBACK, NAME)
% creates an observable UpDownCounter with the specified
Expand All @@ -149,7 +158,12 @@
%
% C = CREATEOBSERVABLEUPDOWNCOUNTER(M, CALLBACK, NAME, DESCRIPTION, UNIT)
% also specifies a description and a unit.
%
%
% C = CREATEOBSERVABLEUPDOWNCOUNTER(M, CALLBACK, NAME, DESCRIPTION, UNIT, TIMEOUT)
% also specifies the maximum time before callback is timed
% out and its results not get recorded. TIMEOUT must be a
% duration.
%
% See also OPENTELEMETRY.METRICS.OBSERVABLERESULT,
% CREATEOBSERVABLECOUNTER, CREATEOBSERVABLEGAUGE, CREATEUPDOWNCOUNTER
arguments
Expand All @@ -158,18 +172,21 @@
name
description = ""
unit = ""
timeout = opentelemetry.metrics.ObservableUpDownCounter.DefaultTimeout
end

[callback, name, description, unit] = processAsynchronousInputs(...
callback, name, description, unit);
id = obj.Proxy.createObservableUpDownCounter(name, description, unit, callback);
[callback, name, description, unit, timeout] = processAsynchronousInputs(...
callback, name, description, unit, timeout);
id = obj.Proxy.createObservableUpDownCounter(name, description, ...
unit, callback, milliseconds(timeout));
ObservableUpDownCounterproxy = libmexclass.proxy.Proxy("Name", ...
"libmexclass.opentelemetry.ObservableUpDownCounterProxy", "ID", id);
obsudcounter = opentelemetry.metrics.ObservableUpDownCounter(...
ObservableUpDownCounterproxy, name, description, unit, callback);
end

function obsgauge = createObservableGauge(obj, callback, name, description, unit)
function obsgauge = createObservableGauge(obj, callback, name, ...
description, unit, timeout)
% CREATEOBSERVABLEGAUGE Create an observable gauge
% C = CREATEOBSERVABLEGAUGE(M, CALLBACK, NAME) creates an
% observable gauge with the specified callback function
Expand All @@ -179,9 +196,14 @@
% A gauge's value can increase or decrease but it should
% never be summed in aggregation.
%
% C = CREATEOBSERVABLEGAUGE(M, CALLBACK NAME, DESCRIPTION, UNIT)
% C = CREATEOBSERVABLEGAUGE(M, CALLBACK, NAME, DESCRIPTION, UNIT)
% also specifies a description and a unit.
%
%
% C = CREATEOBSERVABLEGAUGE(M, CALLBACK, NAME, DESCRIPTION, UNIT, TIMEOUT)
% also specifies the maximum time before callback is timed
% out and its results not get recorded. TIMEOUT must be a
% positive duration scalar.
%
% See also OPENTELEMETRY.METRICS.OBSERVABLERESULT,
% CREATEOBSERVABLECOUNTER, CREATEOBSERVABLEUPDOWNCOUNTER
arguments
Expand All @@ -190,11 +212,13 @@
name
description = ""
unit = ""
timeout = opentelemetry.metrics.ObservableGauge.DefaultTimeout
end

[callback, name, description, unit] = processAsynchronousInputs(...
callback, name, description, unit);
id = obj.Proxy.createObservableGauge(name, description, unit, callback);
[callback, name, description, unit, timeout] = processAsynchronousInputs(...
callback, name, description, unit, timeout);
id = obj.Proxy.createObservableGauge(name, description, unit, ...
callback, milliseconds(timeout));
ObservableGaugeproxy = libmexclass.proxy.Proxy("Name", ...
"libmexclass.opentelemetry.ObservableGaugeProxy", "ID", id);
obsgauge = opentelemetry.metrics.ObservableGauge(...
Expand All @@ -211,10 +235,11 @@
unit = mustBeScalarString(unit);
end

function [callback, name, description, unit] = processAsynchronousInputs(...
callback, name, description, unit)
function [callback, name, description, unit, timeout] = processAsynchronousInputs(...
callback, name, description, unit, timeout)
[name, description, unit] = processSynchronousInputs(name, description, unit);
if ~isa(callback, "function_handle")
callback = []; % callback is invalid, set to empty double
end
timeout = opentelemetry.metrics.AsynchronousInstrument.mustBeScalarPositiveDurationTimeout(timeout);
end
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@

#pragma once

#include <chrono>

#include "MatlabDataArray.hpp"
#include "mex.hpp"

namespace libmexclass::opentelemetry {
struct AsynchronousCallbackInput
{
AsynchronousCallbackInput(const matlab::data::Array& fh,
const std::chrono::milliseconds& timeout,
const std::shared_ptr<matlab::engine::MATLABEngine> eng)
: FunctionHandle(fh), MexEngine(eng) {}
: FunctionHandle(fh), Timeout(timeout), MexEngine(eng) {}

matlab::data::Array FunctionHandle;
std::chrono::milliseconds Timeout;
const std::shared_ptr<matlab::engine::MATLABEngine> MexEngine;
};
} // namespace libmexclass::opentelemetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#pragma once

#include <list>
#include <chrono>

#include "opentelemetry-matlab/metrics/AsynchronousCallbackInput.h"

Expand All @@ -25,7 +26,7 @@ class AsynchronousInstrumentProxy : public libmexclass::proxy::Proxy {

// This method should ideally be an overloaded version of addCallback. However, addCallback is a registered
// method and REGISTER_METHOD macro doesn't like overloaded methods. Rename to avoid overloading.
void addCallback_helper(const matlab::data::Array& callback);
void addCallback_helper(const matlab::data::Array& callback, const std::chrono::milliseconds& timeout);

void removeCallback(libmexclass::proxy::method::Context& context);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2023-2024 The MathWorks, Inc.

#pragma once
#include <chrono>

#include "libmexclass/proxy/Proxy.h"

Expand All @@ -21,7 +22,7 @@ class AsynchronousInstrumentProxyFactory {

std::shared_ptr<libmexclass::proxy::Proxy> create(AsynchronousInstrumentType type,
const matlab::data::Array& callback, const std::string& name, const std::string& description,
const std::string& unit);
const std::string& unit, const std::chrono::milliseconds& timeout);

private:

Expand Down
8 changes: 5 additions & 3 deletions api/metrics/src/AsynchronousInstrumentProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ namespace libmexclass::opentelemetry {


void AsynchronousInstrumentProxy::addCallback(libmexclass::proxy::method::Context& context){
addCallback_helper(context.inputs[0]);
matlab::data::TypedArray<double> timeout_mda = context.inputs[1];
addCallback_helper(context.inputs[0], std::chrono::milliseconds(static_cast<int64_t>(timeout_mda[0])));
}

void AsynchronousInstrumentProxy::addCallback_helper(const matlab::data::Array& callback){
AsynchronousCallbackInput arg(callback, MexEngine);
void AsynchronousInstrumentProxy::addCallback_helper(const matlab::data::Array& callback,
const std::chrono::milliseconds& timeout){
AsynchronousCallbackInput arg(callback, timeout, MexEngine);
CallbackInputs.push_back(arg);
CppInstrument->AddCallback(MeasurementFetcher::Fetcher, static_cast<void*>(&CallbackInputs.back()));
}
Expand Down
5 changes: 3 additions & 2 deletions api/metrics/src/AsynchronousInstrumentProxyFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

namespace libmexclass::opentelemetry {
std::shared_ptr<libmexclass::proxy::Proxy> AsynchronousInstrumentProxyFactory::create(AsynchronousInstrumentType type,
const matlab::data::Array& callback, const std::string& name, const std::string& description, const std::string& unit) {
const matlab::data::Array& callback, const std::string& name, const std::string& description, const std::string& unit,
const std::chrono::milliseconds& timeout) {
std::shared_ptr<libmexclass::proxy::Proxy> proxy;
switch(type) {
case AsynchronousInstrumentType::ObservableCounter:
Expand All @@ -31,7 +32,7 @@ std::shared_ptr<libmexclass::proxy::Proxy> AsynchronousInstrumentProxyFactory::c
}
// add callback
if (!callback.isEmpty()) {
std::static_pointer_cast<AsynchronousInstrumentProxy>(proxy)->addCallback_helper(callback);
std::static_pointer_cast<AsynchronousInstrumentProxy>(proxy)->addCallback_helper(callback, timeout);
}
return proxy;
}
Expand Down
12 changes: 12 additions & 0 deletions api/metrics/src/MeasurementFetcher.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright 2023-2024 The MathWorks, Inc.

#include <chrono>

#include "MatlabDataArray.hpp"
#include "mex.hpp"
#include "cppmex/detail/mexErrorDispatch.hpp"
Expand Down Expand Up @@ -30,11 +32,21 @@ void MeasurementFetcher::Fetcher(metrics_api::ObserverResult observer_result, vo
nostd::shared_ptr<metrics_api::ObserverResultT<double>>>(observer_result))
{
auto arg = static_cast<AsynchronousCallbackInput*>(in);
auto callback_timeout = arg->Timeout;
const std::chrono::seconds property_timeout(1); // for getProperty, use a fixed timeout of 1 second, should be sufficient
auto future = arg->MexEngine->fevalAsync(u"opentelemetry.metrics.collectObservableMetrics",
arg->FunctionHandle);
try {
auto status = future.wait_for(callback_timeout);
if (status != std::future_status::ready) {
return;
}
matlab::data::ObjectArray resultobj = future.get();
auto futureresult = arg->MexEngine->getPropertyAsync(resultobj, 0, u"Results");
status = futureresult.wait_for(property_timeout);
if (status != std::future_status::ready) {
return;
}
matlab::data::CellArray resultdata = futureresult.get();
size_t n = resultdata.getNumberOfElements();
size_t i = 0;
Expand Down
4 changes: 3 additions & 1 deletion api/metrics/src/MeterProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ void MeterProxy::createAsynchronous(libmexclass::proxy::method::Context& context
matlab::data::StringArray unit_mda = context.inputs[2];
std::string unit = static_cast<std::string>(unit_mda[0]);
matlab::data::Array callback_mda = context.inputs[3];
matlab::data::TypedArray<double> timeout_mda = context.inputs[4];
auto timeout = std::chrono::milliseconds(static_cast<int64_t>(timeout_mda[0])); // milliseconds

AsynchronousInstrumentProxyFactory proxyfactory(CppMeter, MexEngine);
auto proxy = proxyfactory.create(type, callback_mda, name, description, unit);
auto proxy = proxyfactory.create(type, callback_mda, name, description, unit, timeout);

// obtain a proxy ID
libmexclass::proxy::ID proxyid = libmexclass::proxy::ProxyManager::manageProxy(proxy);
Expand Down
File renamed without changes.
File renamed without changes.
4 changes: 3 additions & 1 deletion test/performance/traceTest.m
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
classdef traceTest < matlab.perftest.TestCase
% performance tests for tracing

% Copyright 2023-2024 The MathWorks, Inc.

properties
OtelConfigFile
JsonFile
Expand All @@ -17,7 +19,7 @@
methods (TestClassSetup)
function setupOnce(testCase)
testdir = fileparts(mfilename("fullpath"));
addpath(fullfile(testdir, "..")); % add directory where common setup and teardown code lives
addpath(fullfile(testdir, "..", "utils")); % add directory where common setup and teardown code lives
commonSetupOnce(testCase);

% create a global tracer provider
Expand Down
4 changes: 2 additions & 2 deletions test/tbaggage.m
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
classdef tbaggage < matlab.unittest.TestCase
% tests for creating and manipulating baggage object

% Copyright 2023 The MathWorks, Inc.
% Copyright 2023-2024 The MathWorks, Inc.

properties
BaggageKeys
Expand All @@ -15,7 +15,7 @@ function setupOnce(testCase)

% set up path
if ~isempty(otelroot)
addpath(otelroot);
testCase.applyFixture(matlab.unittest.fixtures.PathFixture(otelroot));
end

testCase.BaggageKeys = ["userId", "serverNode", "isProduction"];
Expand Down
Loading