Skip to content

Commit

Permalink
Make Telemetry.submit() non async (#979)
Browse files Browse the repository at this point in the history
We now defer all the flushing to the event loop itself, so that
the event loop just handles flushing.
  • Loading branch information
NullSoldier committed Feb 10, 2022
1 parent 2dfb7b5 commit 99ac487
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 40 deletions.
6 changes: 3 additions & 3 deletions ironfish/src/metrics/metricsMonitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class MetricsMonitor {
this.memoryInterval = setInterval(() => this.refreshMemory(), this.memoryRefreshPeriodMs)
if (this.telemetry) {
this.memoryTelemetryInterval = setInterval(
() => void this.submitMemoryTelemetry(),
() => this.submitMemoryTelemetry(),
this.memoryTelemetryPeriodMs,
)
}
Expand Down Expand Up @@ -95,9 +95,9 @@ export class MetricsMonitor {
this.rss.value = memoryUsage.rss
}

private async submitMemoryTelemetry(): Promise<void> {
private submitMemoryTelemetry(): void {
if (this.telemetry) {
await this.telemetry.submitMemoryUsage(this.heapUsed.value, this.heapTotal.value)
this.telemetry.submitMemoryUsage(this.heapUsed.value, this.heapTotal.value)
}
}
}
2 changes: 1 addition & 1 deletion ironfish/src/mining/director.ts
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ export class MiningDirector {

this.onNewBlock.emit(block)

await this.telemetry.submitBlockMined(block)
this.telemetry.submitBlockMined(block)

return MINED_RESULT.SUCCESS
}
Expand Down
2 changes: 1 addition & 1 deletion ironfish/src/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ export class IronfishNode {
await this.rpc.start()
}

await this.telemetry.submitNodeStarted()
this.telemetry.submitNodeStarted()
}

async waitForShutdown(): Promise<void> {
Expand Down
27 changes: 7 additions & 20 deletions ironfish/src/telemetry/telemetry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,42 +54,29 @@ describe('Telemetry', () => {

describe('submit', () => {
describe('when disabled', () => {
it('does nothing', async () => {
it('does nothing', () => {
const disabledTelemetry = mockTelemetry(false)
const currentPoints = disabledTelemetry['points']
await disabledTelemetry.submit(mockMetric)
disabledTelemetry.submit(mockMetric)
expect(disabledTelemetry['points']).toEqual(currentPoints)
})
})

describe('when submitting a metric without fields', () => {
it('throws an error', async () => {
it('throws an error', () => {
const metric: Metric = {
measurement: 'node',
name: 'memory',
fields: [],
}
await expect(telemetry.submit(metric)).rejects.toThrowError()
})
})

describe('when the queue max size has been reached', () => {
it('flushes the queue', async () => {
const flush = jest.spyOn(telemetry, 'flush')
const points = []
for (let i = 0; i < telemetry['MAX_QUEUE_SIZE']; i++) {
points.push(mockMetric)
}
telemetry['points'] = points

await telemetry.submit(mockMetric)
expect(flush).toHaveBeenCalled()
expect(() => telemetry.submit(metric)).toThrowError()
})
})

it('stores the metric', async () => {
it('stores the metric', () => {
const currentPointsLength = telemetry['points'].length
await telemetry.submit(mockMetric)
telemetry.submit(mockMetric)

const points = telemetry['points']
expect(points).toHaveLength(currentPointsLength + 1)
Expand Down Expand Up @@ -119,7 +106,7 @@ describe('Telemetry', () => {

it('submits telemetry to the pool', async () => {
const submitTelemetry = jest.spyOn(telemetry['pool'], 'submitTelemetry')
await telemetry.submit(mockMetric)
telemetry.submit(mockMetric)
await telemetry.flush()

expect(submitTelemetry).toHaveBeenCalled()
Expand Down
38 changes: 23 additions & 15 deletions ironfish/src/telemetry/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ export class Telemetry {

start(): void {
if (this.enabled) {
this.flushInterval = setInterval(() => void this.flush(), this.FLUSH_INTERVAL)
void this.flushLoop()
}
}

async stop(): Promise<void> {
if (this.enabled) {
await this.submitNodeStopped()
this.submitNodeStopped()
await this.flush()
}

Expand All @@ -48,7 +48,15 @@ export class Telemetry {
}
}

async submit(metric: Metric): Promise<void> {
async flushLoop(): Promise<void> {
await this.flush()

this.flushInterval = setTimeout(() => {
void this.flushLoop()
}, this.FLUSH_INTERVAL)
}

submit(metric: Metric): void {
if (!this.enabled) {
return
}
Expand All @@ -67,16 +75,16 @@ export class Telemetry {
timestamp: metric.timestamp || new Date(),
tags,
})

if (this.points.length >= this.MAX_QUEUE_SIZE) {
await this.flush()
}
}

async flush(): Promise<void> {
const points = this.points
this.points = []

if (points.length === 0) {
return
}

try {
await this.pool.submitTelemetry(points)
this.logger.debug(`Submitted ${points.length} telemetry points`)
Expand All @@ -90,24 +98,24 @@ export class Telemetry {
}
}

async submitNodeStarted(): Promise<void> {
await this.submit({
submitNodeStarted(): void {
this.submit({
measurement: 'node',
name: 'started',
fields: [{ name: 'online', type: 'boolean', value: true }],
})
}

async submitNodeStopped(): Promise<void> {
await this.submit({
submitNodeStopped(): void {
this.submit({
measurement: 'node',
name: 'started',
fields: [{ name: 'online', type: 'boolean', value: false }],
})
}

async submitBlockMined(block: Block): Promise<void> {
await this.submit({
submitBlockMined(block: Block): void {
this.submit({
measurement: 'node',
name: 'block_mined',
fields: [
Expand All @@ -125,8 +133,8 @@ export class Telemetry {
})
}

async submitMemoryUsage(heapUsed: number, heapTotal: number): Promise<void> {
await this.submit({
submitMemoryUsage(heapUsed: number, heapTotal: number): void {
this.submit({
measurement: 'node',
name: 'memory',
fields: [
Expand Down

0 comments on commit 99ac487

Please sign in to comment.