Skip to content

Commit

Permalink
Make Telemetry.submit() non async
Browse files Browse the repository at this point in the history
We now defer all the flushing to the event loop itself, so that
the event loop just handles flushing.
  • Loading branch information
NullSoldier committed Feb 10, 2022
1 parent 2dfb7b5 commit bac0fe3
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 40 deletions.
6 changes: 3 additions & 3 deletions ironfish/src/metrics/metricsMonitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class MetricsMonitor {
this.memoryInterval = setInterval(() => this.refreshMemory(), this.memoryRefreshPeriodMs)
if (this.telemetry) {
this.memoryTelemetryInterval = setInterval(
() => void this.submitMemoryTelemetry(),
() => this.submitMemoryTelemetry(),
this.memoryTelemetryPeriodMs,
)
}
Expand Down Expand Up @@ -95,9 +95,9 @@ export class MetricsMonitor {
this.rss.value = memoryUsage.rss
}

private async submitMemoryTelemetry(): Promise<void> {
private submitMemoryTelemetry(): void {
if (this.telemetry) {
await this.telemetry.submitMemoryUsage(this.heapUsed.value, this.heapTotal.value)
this.telemetry.submitMemoryUsage(this.heapUsed.value, this.heapTotal.value)
}
}
}
2 changes: 1 addition & 1 deletion ironfish/src/mining/director.ts
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ export class MiningDirector {

this.onNewBlock.emit(block)

await this.telemetry.submitBlockMined(block)
this.telemetry.submitBlockMined(block)

return MINED_RESULT.SUCCESS
}
Expand Down
2 changes: 1 addition & 1 deletion ironfish/src/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ export class IronfishNode {
await this.rpc.start()
}

await this.telemetry.submitNodeStarted()
this.telemetry.submitNodeStarted()
}

async waitForShutdown(): Promise<void> {
Expand Down
27 changes: 7 additions & 20 deletions ironfish/src/telemetry/telemetry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,42 +54,29 @@ describe('Telemetry', () => {

describe('submit', () => {
describe('when disabled', () => {
it('does nothing', async () => {
it('does nothing', () => {
const disabledTelemetry = mockTelemetry(false)
const currentPoints = disabledTelemetry['points']
await disabledTelemetry.submit(mockMetric)
disabledTelemetry.submit(mockMetric)
expect(disabledTelemetry['points']).toEqual(currentPoints)
})
})

describe('when submitting a metric without fields', () => {
it('throws an error', async () => {
it('throws an error', () => {
const metric: Metric = {
measurement: 'node',
name: 'memory',
fields: [],
}
await expect(telemetry.submit(metric)).rejects.toThrowError()
})
})

describe('when the queue max size has been reached', () => {
it('flushes the queue', async () => {
const flush = jest.spyOn(telemetry, 'flush')
const points = []
for (let i = 0; i < telemetry['MAX_QUEUE_SIZE']; i++) {
points.push(mockMetric)
}
telemetry['points'] = points

await telemetry.submit(mockMetric)
expect(flush).toHaveBeenCalled()
expect(() => telemetry.submit(metric)).toThrowError()
})
})

it('stores the metric', async () => {
it('stores the metric', () => {
const currentPointsLength = telemetry['points'].length
await telemetry.submit(mockMetric)
telemetry.submit(mockMetric)

const points = telemetry['points']
expect(points).toHaveLength(currentPointsLength + 1)
Expand Down Expand Up @@ -119,7 +106,7 @@ describe('Telemetry', () => {

it('submits telemetry to the pool', async () => {
const submitTelemetry = jest.spyOn(telemetry['pool'], 'submitTelemetry')
await telemetry.submit(mockMetric)
telemetry.submit(mockMetric)
await telemetry.flush()

expect(submitTelemetry).toHaveBeenCalled()
Expand Down
38 changes: 23 additions & 15 deletions ironfish/src/telemetry/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ export class Telemetry {

start(): void {
if (this.enabled) {
this.flushInterval = setInterval(() => void this.flush(), this.FLUSH_INTERVAL)
void this.flushLoop()
}
}

async stop(): Promise<void> {
if (this.enabled) {
await this.submitNodeStopped()
this.submitNodeStopped()
await this.flush()
}

Expand All @@ -48,7 +48,15 @@ export class Telemetry {
}
}

async submit(metric: Metric): Promise<void> {
async flushLoop(): Promise<void> {
await this.flush()

this.flushInterval = setTimeout(() => {
void this.flushLoop()
}, this.FLUSH_INTERVAL)
}

submit(metric: Metric): void {
if (!this.enabled) {
return
}
Expand All @@ -67,16 +75,16 @@ export class Telemetry {
timestamp: metric.timestamp || new Date(),
tags,
})

if (this.points.length >= this.MAX_QUEUE_SIZE) {
await this.flush()
}
}

async flush(): Promise<void> {
const points = this.points
this.points = []

if (points.length === 0) {
return
}

try {
await this.pool.submitTelemetry(points)
this.logger.debug(`Submitted ${points.length} telemetry points`)
Expand All @@ -90,24 +98,24 @@ export class Telemetry {
}
}

async submitNodeStarted(): Promise<void> {
await this.submit({
submitNodeStarted(): void {
this.submit({
measurement: 'node',
name: 'started',
fields: [{ name: 'online', type: 'boolean', value: true }],
})
}

async submitNodeStopped(): Promise<void> {
await this.submit({
submitNodeStopped(): void {
this.submit({
measurement: 'node',
name: 'started',
fields: [{ name: 'online', type: 'boolean', value: false }],
})
}

async submitBlockMined(block: Block): Promise<void> {
await this.submit({
submitBlockMined(block: Block): void {
this.submit({
measurement: 'node',
name: 'block_mined',
fields: [
Expand All @@ -125,8 +133,8 @@ export class Telemetry {
})
}

async submitMemoryUsage(heapUsed: number, heapTotal: number): Promise<void> {
await this.submit({
submitMemoryUsage(heapUsed: number, heapTotal: number): void {
this.submit({
measurement: 'node',
name: 'memory',
fields: [
Expand Down

0 comments on commit bac0fe3

Please sign in to comment.