Skip to content

Commit

Permalink
fix: terminated session metrics are not deleted
Browse files Browse the repository at this point in the history
  • Loading branch information
stfsy committed Jan 7, 2023
1 parent 587a21f commit d977b29
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 32 deletions.
72 changes: 45 additions & 27 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,32 @@ const meter = new MeterProvider({
meter.addMetricReader(exporter)

let up = 1
let aggregatedUserSessions = {}
// keep old and new state in array
// index -1 is always new
// other indices are old and metrics need to be removed
let aggregatedUserSessions = []
let singleUserSessions = []

meter.getMeter('default').createObservableGauge('user_sessions_currently_active', {
const userSessionsGauge = meter.getMeter('default').createObservableGauge('user_sessions_currently_active', {
description: 'Sum of sessions per user',
constantLabels: ['user']
}).addCallback((observer) => {
if (!aggregatedUserSessions) {
return
}
Object.keys(aggregatedUserSessions).forEach((key) => {
observer.observe(aggregatedUserSessions[key].length, {
// reset previous metrics first
const outdatedMetrics = aggregatedUserSessions.slice(0, -1)
outdatedMetrics.forEach(metric => {
Object.keys(metric).forEach((key) => {
observer.observe(0, {
user: key
})
})
})

aggregatedUserSessions = [aggregatedUserSessions.at(-1)]

// and update new metrics then
const newMetrics = aggregatedUserSessions.at(-1)
Object.keys(newMetrics).forEach((key) => {
observer.observe(newMetrics[key].length, {
user: key
})
})
Expand All @@ -61,19 +76,29 @@ meter.getMeter('default').createObservableGauge('each_session_currently_active',
description: 'Single sessions per user',
constantLabels: ['user', 'ip', 'tty']
}).addCallback((observer) => {
if (!aggregatedUserSessions) {
return
}
Object.entries(aggregatedUserSessions).forEach(([key, value]) => {
value.forEach((session) => {
const { from, tty } = session
observer.observe(1, {
user: key,
// reset previous metrics first
const outdatedMetrics = singleUserSessions.slice(0, -1)
outdatedMetrics.forEach((metric) => {
Object.entries(metric).forEach(([key, { user, from, tty }]) => {
observer.observe(0, {
ip: from,
user,
tty
})
})
})

singleUserSessions = [singleUserSessions.at(-1)]

// and update new metrics then
const newMetrics = singleUserSessions.at(-1)
newMetrics && Object.entries(newMetrics).forEach(([key, { user, from, tty }]) => {
observer.observe(1, {
ip: from,
user,
tty
})
})
})

meter.getMeter('default').createObservableGauge('up', {
Expand All @@ -93,24 +118,17 @@ const lookup = async () => {

const sessionsByUser = result.map(({ user, 'login@': login, from, tty }) => {
return Object.assign({}, { user, login, from, tty })
}).reduce((context, next) => {
})
singleUserSessions.push(sessionsByUser)

const aggregatedUserSessionsByUser = sessionsByUser.reduce((context, next) => {
if (!Array.isArray(context[next.user])) {
context[next.user] = []
}
context[next.user].push(next)
return context
}, {})

aggregatedUserSessions = {}
// update sessions with fresh data
// will override current data of active sessions
Object.entries(sessionsByUser).forEach(([key, value]) => {
if (!Array.isArray(aggregatedUserSessions[key])) {
aggregatedUserSessions[key] = []
}
aggregatedUserSessions[key].push(...value)
})

aggregatedUserSessions.push(aggregatedUserSessionsByUser)
} catch (e) {
up = 0
console.log(`Could not gather metrics. Will retry after ${interval} ms`, e)
Expand Down
26 changes: 21 additions & 5 deletions test/spec/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ require.cache[wResolvedPath] = {
pip pts/0 192.168.2.107 23:50 1.00s 0.15s 0.01s w
`)
} else if (returnOnePip3) {
console.log('return pip3')
return Promise.resolve(`23:50:13 up 1 day, 11:33, 1 user, load average: 0.08, 0.03, 0.01
USER TTY FROM LOGIN@ IDLE JCPU PCPU WHAT
pip3 pts/0 192.168.2.107 23:50 1.00s 0.15s 0.01s w
pip3 pts/0 192.168.2.111 23:50 1.00s 0.15s 0.01s w
`)
} else if (throwError) {
return Promise.reject('Err!')
Expand Down Expand Up @@ -68,6 +67,7 @@ const waitAndExpectMetricStrings = (waitMillis, ...expectedMetric) => {
} catch (e) {
console.error('Assertion error.', 'Full response was...')
console.error(body)
console.error('Expected to find', metric)
throw e
}
})
Expand Down Expand Up @@ -98,6 +98,7 @@ const waitAndDoNotExpectMetricStrings = (waitMillis, ...expectedMetric) => {
} catch (e) {
console.error('Assertion error.', 'Full response was...')
console.error(body)
console.error('Expected to NOT find', metric)
throw e
}
})
Expand Down Expand Up @@ -143,8 +144,23 @@ describe('WhatActiveUsersExporter', () => {
returnTwoPips = false
throwError = false
return waitAndExpectMetricStrings(500,
'each_session_currently_active{user="pip",ip="192.168.2.107",tty="pts/0"} 1',
'each_session_currently_active{user="pip",ip="192.168.2.107",tty="pts/1"} 1',
'each_session_currently_active{user="pipX",ip="44.33.22.1",tty="pts/2"} 1')
'each_session_currently_active{ip="192.168.2.107",user="pip",tty="pts/0"} 1',
'each_session_currently_active{ip="192.168.2.107",user="pip",tty="pts/1"} 1',
'each_session_currently_active{ip="44.33.22.1",user="pipX",tty="pts/2"} 1')
})

it('removes metric of terminated sessions', async () => {
returnOnePip3 = true
await waitAndExpectMetricStrings(2000, 'user_sessions_currently_active{user="pip3"} 1')
await waitAndExpectMetricStrings(0, 'each_session_currently_active{ip="192.168.2.111",user="pip3",tty="pts/0"} 1')
returnOnePip3 = false
returnTwoPips = false
throwError = false
await waitAndDoNotExpectMetricStrings(2000, 'user_sessions_currently_active{user="pip3"} 1')
await waitAndExpectMetricStrings(500,
'each_session_currently_active{ip="192.168.2.111",user="pip3",tty="pts/0"} 0',
'each_session_currently_active{ip="192.168.2.107",user="pip",tty="pts/0"} 1',
'each_session_currently_active{ip="192.168.2.107",user="pip",tty="pts/1"} 1',
'each_session_currently_active{ip="44.33.22.1",user="pipX",tty="pts/2"} 1')
})
})

0 comments on commit d977b29

Please sign in to comment.