Skip to content

Commit

Permalink
refactor: Abstracted registering of new aggregators into a Harvester …
Browse files Browse the repository at this point in the history
…class that is responsible for starting, stopping, updating all registered aggregators. (#1994)
  • Loading branch information
bizob2828 committed Feb 8, 2024
1 parent d1a3e13 commit 1fb85a6
Show file tree
Hide file tree
Showing 43 changed files with 751 additions and 812 deletions.
263 changes: 46 additions & 217 deletions lib/agent.js
Expand Up @@ -39,6 +39,7 @@ const {
maybeAddQueueAttributes
} = require('./util/attributes')
const synthetics = require('./synthetics')
const Harvester = require('./harvester')

// Map of valid states to whether or not data collection is valid
const STATES = {
Expand Down Expand Up @@ -77,7 +78,7 @@ function Agent(config) {
this._state = 'stopped'
this.config = config
this.environment = require('./environment')
this.version = this.config.version
this.version = config.version

if (config.serverless_mode.enabled) {
this.collector = new ServerlessCollector(this)
Expand All @@ -87,6 +88,7 @@ function Agent(config) {

this.mapper = new MetricMapper()
this.metricNameNormalizer = new MetricNormalizer(this.config, 'metric name')
this.harvester = new Harvester()

this.metrics = new MetricAggregator(
{
Expand All @@ -95,12 +97,13 @@ function Agent(config) {
mapper: this.mapper,
normalizer: this.metricNameNormalizer
},
this.collector
this.collector,
this.harvester
)

this.metrics.on('starting metric_data data send.', this._beforeMetricDataSend.bind(this))

this.spanEventAggregator = createSpanEventAggregator(config, this.collector, this.metrics)
this.spanEventAggregator = createSpanEventAggregator(config, this)

this.transactionNameNormalizer = new MetricNormalizer(this.config, 'transaction name')
// Segment term based tx renaming for MGI mitigation.
Expand All @@ -114,45 +117,53 @@ function Agent(config) {
this.transactionEventAggregator = new TransactionEventAggregator(
{
periodMs: config.event_harvest_config.report_period_ms,
limit: config.event_harvest_config.harvest_limits.analytic_event_data
limit: config.event_harvest_config.harvest_limits.analytic_event_data,
config,
enabled: (config) => config.transaction_events.enabled

Check warning on line 122 in lib/agent.js

View workflow job for this annotation

GitHub Actions / lint (lts/*)

'config' is already declared in the upper scope on line 70 column 16
},
this.collector,
this.metrics
this
)

this.customEventAggregator = new CustomEventAggregator(
{
periodMs: config.event_harvest_config.report_period_ms,
limit: config.event_harvest_config.harvest_limits.custom_event_data
limit: config.event_harvest_config.harvest_limits.custom_event_data,
metricNames: NAMES.CUSTOM_EVENTS,
config,
enabled: (config) => config.custom_insights_events.enabled

Check warning on line 133 in lib/agent.js

View workflow job for this annotation

GitHub Actions / lint (lts/*)

'config' is already declared in the upper scope on line 70 column 16
},
this.collector,
this.metrics
this
)

const errorTraceAggregator = new ErrorTraceAggregator(
{
periodMs: DEFAULT_HARVEST_INTERVAL_MS,
limit: MAX_ERROR_TRACES_DEFAULT
limit: MAX_ERROR_TRACES_DEFAULT,
config,
enabled: (config) => config.error_collector.enabled && config.collect_errors

Check warning on line 143 in lib/agent.js

View workflow job for this annotation

GitHub Actions / lint (lts/*)

'config' is already declared in the upper scope on line 70 column 16
},
this.collector
this.collector,
this.harvester
)

const errorEventAggregator = new ErrorEventAggregator(
{
periodMs: config.event_harvest_config.report_period_ms,
limit: config.event_harvest_config.harvest_limits.error_event_data
limit: config.event_harvest_config.harvest_limits.error_event_data,
config,
enabled: (config) => config.error_collector.enabled && config.error_collector.capture_events

Check warning on line 154 in lib/agent.js

View workflow job for this annotation

GitHub Actions / lint (lts/*)

'config' is already declared in the upper scope on line 70 column 16
},
this.collector,
this.metrics
this
)

this.logs = new LogAggregator(
{
periodMs: config.event_harvest_config.report_period_ms,
limit: config.event_harvest_config.harvest_limits.log_event_data
limit: config.event_harvest_config.harvest_limits.log_event_data,
config,
enabled: (config) =>

Check warning on line 164 in lib/agent.js

View workflow job for this annotation

GitHub Actions / lint (lts/*)

'config' is already declared in the upper scope on line 70 column 16
config.application_logging.enabled && config.application_logging.forwarding.enabled
},
this.collector,
this.metrics,
this
)

Expand All @@ -164,11 +175,13 @@ function Agent(config) {
this.traces = new TransactionTraceAggregator(
{
periodMs: DEFAULT_HARVEST_INTERVAL_MS,
config: this.config,
config,
isAsync: !config.serverless_mode.enabled,
method: 'transaction_sample_data'
method: 'transaction_sample_data',
enabled: (config) => config.transaction_tracer.enabled && config.collect_traces

Check warning on line 181 in lib/agent.js

View workflow job for this annotation

GitHub Actions / lint (lts/*)

'config' is already declared in the upper scope on line 70 column 16
},
this.collector
this.collector,
this.harvester
)
this.transactionSampler = new AdaptiveSampler({
agent: this,
Expand All @@ -179,12 +192,14 @@ function Agent(config) {

this.queries = new QueryTraceAggregator(
{
config: this.config,
config,
periodMs: DEFAULT_HARVEST_INTERVAL_MS,
method: 'sql_trace_data',
isAsync: !config.serverless_mode.enabled
isAsync: !config.serverless_mode.enabled,
enabled: (config) => config.slow_sql.enabled

Check warning on line 199 in lib/agent.js

View workflow job for this annotation

GitHub Actions / lint (lts/*)

'config' is already declared in the upper scope on line 70 column 16
},
this.collector
this.collector,
this.harvester
)

// Set up all the configuration events the agent needs to listen for.
Expand Down Expand Up @@ -276,190 +291,15 @@ Agent.prototype.start = function start(callback) {
* @param {Function} callback The callback to invoke when all data types have been sent.
*/
Agent.prototype.forceHarvestAll = function forceHarvestAll(callback) {
const agent = this
const promises = []

const metricPromise = new Promise((resolve) => {
agent.metrics.once('finished metric_data data send.', function onMetricsFinished() {
resolve()
})
agent.metrics.send()
})

promises.push(metricPromise)

// TODO: plumb config through to aggregators so they can do their own checking.
if (
agent.config.distributed_tracing.enabled &&
agent.config.span_events.enabled &&
!agent.spanEventAggregator.isStream // Not valid to send on streaming aggregator
) {
const spanPromise = new Promise((resolve) => {
agent.spanEventAggregator.once(
'finished span_event_data data send.',
function onSpansFinished() {
resolve()
}
)
agent.spanEventAggregator.send()
})

promises.push(spanPromise)
}

if (agent.config.custom_insights_events.enabled) {
const customEventPromise = new Promise((resolve) => {
agent.customEventAggregator.once(
'finished custom_event_data data send.',
function onCustomEventsFinished() {
resolve()
}
)
agent.customEventAggregator.send()
})

promises.push(customEventPromise)
}

if (agent.config.transaction_events.enabled) {
const transactionEventPromise = new Promise((resolve) => {
agent.transactionEventAggregator.once(
'finished analytic_event_data data send.',
function onTransactionEventsFinished() {
resolve()
}
)
agent.transactionEventAggregator.send()
})

promises.push(transactionEventPromise)
}

if (agent.config.transaction_tracer.enabled && agent.config.collect_traces) {
const transactionTracePromise = new Promise((resolve) => {
agent.traces.once('finished transaction_sample_data data send.', function onTracesFinished() {
resolve()
})
agent.traces.send()
})

promises.push(transactionTracePromise)
}

if (agent.config.slow_sql.enabled) {
const sqlTracePromise = new Promise((resolve) => {
agent.queries.once('finished sql_trace_data data send.', function onSqlTracesFinished() {
resolve()
})
agent.queries.send()
})

promises.push(sqlTracePromise)
}

const errorCollectorEnabled = agent.config.error_collector && agent.config.error_collector.enabled

if (errorCollectorEnabled && agent.config.collect_errors) {
const errorTracePromise = new Promise((resolve) => {
agent.errors.traceAggregator.once(
'finished error_data data send.',
function onErrorTracesFinished() {
resolve()
}
)
agent.errors.traceAggregator.send()
})

promises.push(errorTracePromise)
}

if (errorCollectorEnabled && agent.config.error_collector.capture_events) {
const errorEventPromise = new Promise((resolve) => {
agent.errors.eventAggregator.once(
'finished error_event_data data send.',
function onErrorEventsFinished() {
resolve()
}
)
agent.errors.eventAggregator.send()
})

promises.push(errorEventPromise)
}

if (
agent.config.application_logging.enabled &&
agent.config.application_logging.forwarding.enabled
) {
const logEventPromise = new Promise((resolve) => {
agent.logs.once('finished log_event_data data send.', function onLogEventsFinished() {
resolve()
})
agent.logs.send()
})

promises.push(logEventPromise)
}

Promise.all(promises).then(() => {
// Get out of the promise so callback errors aren't treated as
// promise rejections.
setImmediate(callback)
})
}

Agent.prototype.stopAggregators = function stopAggregators() {
this.metrics.stop()
this.errors.stop()
this.traces.stop()
this.queries.stop()
this.spanEventAggregator.stop()
this.transactionEventAggregator.stop()
this.customEventAggregator.stop()
this.logs.stop()
this.harvester.clear(callback)
}

Agent.prototype.startStreaming = function startStreaming() {
if (
this.spanEventAggregator.isStream &&
this.config.distributed_tracing.enabled &&
this.config.span_events.enabled
) {
if (this.spanEventAggregator.isStream && this.spanEventAggregator.enabled) {
this.spanEventAggregator.start()
}
}

Agent.prototype.startAggregators = function startAggregators() {
this.metrics.start()
this.errors.start()
if (this.config.transaction_tracer.enabled && this.config.collect_traces) {
this.traces.start()
}

if (this.config.slow_sql.enabled) {
this.queries.start()
}

if (this.config.distributed_tracing.enabled && this.config.span_events.enabled) {
this.spanEventAggregator.start()
}

if (this.config.transaction_events.enabled) {
this.transactionEventAggregator.start()
}

if (this.config.custom_insights_events.enabled) {
this.customEventAggregator.start()
}

if (
this.config.application_logging.enabled &&
this.config.application_logging.forwarding.enabled
) {
this.logs.start()
}
}

/**
* Completes any final setup upon full connection to New Relic
* servers and sets the agent state to 'started'.
Expand All @@ -468,8 +308,7 @@ Agent.prototype.startAggregators = function startAggregators() {
* @param {Function} callback callback function that executes after harvest completes (now if immediate, otherwise later)
*/
Agent.prototype.onConnect = function onConnect(shouldImmediatelyHarvest, callback) {
this._reconfigureAggregators(this.config)

this.harvester.update(this.config)
generateLoggingSupportMetrics(this)

if (this.config.certificates && this.config.certificates.length > 0) {
Expand All @@ -481,20 +320,9 @@ Agent.prototype.onConnect = function onConnect(shouldImmediatelyHarvest, callbac
this.setState('started')
}

Agent.prototype._reconfigureAggregators = function _reconfigureAggregators(config) {
this.metrics.reconfigure(config)
this.errors.reconfigure(config)
this.traces.reconfigure(config)
this.queries.reconfigure(config)
this.spanEventAggregator.reconfigure(config)
this.transactionEventAggregator.reconfigure(config)
this.customEventAggregator.reconfigure(config)
this.logs.reconfigure(config)
}

Agent.prototype._scheduleHarvests = function _scheduleHarvests(shouldImmediatelyHarvest, callback) {
if (!shouldImmediatelyHarvest) {
this.startAggregators()
this.harvester.start()
setImmediate(callback)
return
}
Expand All @@ -514,7 +342,7 @@ Agent.prototype._scheduleHarvests = function _scheduleHarvests(shouldImmediately
logger.info(`Starting initial ${INITIAL_HARVEST_DELAY_MS}ms harvest.`)

agent.forceHarvestAll(function afterAllAggregatorsSend() {
agent.startAggregators()
agent.harvester.start()
callback()
})
}, INITIAL_HARVEST_DELAY_MS)
Expand Down Expand Up @@ -550,7 +378,7 @@ Agent.prototype.stop = function stop(callback) {

this.setState('stopping')

this.stopAggregators()
this.harvester.stop()

sampler.stop()

Expand Down Expand Up @@ -585,7 +413,8 @@ Agent.prototype._resetErrors = function resetErrors() {

// TODO: is this still necessary?
// Likely do more direct with new config
this.errors.reconfigure(this.config)
this.errors.traceAggregator.reconfigure(this.config)
this.errors.eventAggregator.reconfigure(this.config)
}

/**
Expand Down

0 comments on commit 1fb85a6

Please sign in to comment.