diff --git a/README.md b/README.md index c807700..93b9e69 100644 --- a/README.md +++ b/README.md @@ -4,13 +4,13 @@ ### High Level -An autoscaler for Jitsi instances (`jibri`, `sip-jibri`, `jigasi`, `JVB`, `nomad`), which are deployed in one of the following ways: +An autoscaler for Jitsi instances (`jibri`, `sip-jibri`, `jigasi`, `JVB`, `nomad`, `skynet`), which are deployed in one of the following ways: * as a parameterized Nomad batch job * as an Instance in Oracle Cloud * as a Droplet in Digital Ocean * custom deployment model -The autoscaler manages multiple `groups` of instances, each having a `type` (`jibri`, `sip-jibri`, `jigasi`, `JVB`, `nomad`) and being deployed in a specific `cloud` (`oracle`, `digitalocean`, `custom`). +The autoscaler manages multiple `groups` of instances, each having a `type` (`jibri`, `sip-jibri`, `jigasi`, `JVB`, `nomad`, `skynet`) and being deployed in a specific `cloud` (`oracle`, `digitalocean`, `custom`). The autoscaler knows the Jitsi instances status and communicates with them via the [jitsi-autoscaler-sidecar](https://github.com/jitsi/jitsi-autoscaler-sidecar), which needs to be co-located on each Jitsi instance. The sidecar periodically checks in with the autoscaler via a REST call and sends its status. diff --git a/src/app.ts b/src/app.ts index bcc51f7..b969e9c 100644 --- a/src/app.ts +++ b/src/app.ts @@ -136,6 +136,7 @@ const lockManager: LockManager = new LockManager(logger, { }); const instanceGroupManager = new InstanceGroupManager({ + audit, redisClient, redisScanCount: config.RedisScanCount, initialGroupList: config.GroupList, @@ -216,6 +217,7 @@ const jobManager = new JobManager({ metricsLoop, autoscalerProcessingTimeoutMs: config.GroupProcessingTimeoutMs, launcherProcessingTimeoutMs: config.GroupProcessingTimeoutMs, + reportProcessingTimeoutMs: config.GroupProcessingTimeoutMs, sanityLoopProcessingTimeoutMs: config.SanityProcessingTimoutMs, }); @@ -556,13 +558,13 @@ app.put( body('options.scaleUpPeriodsCount').optional().isInt({ min: 0 }).withMessage('Value must be positive'), body('options.scaleDownPeriodsCount').optional().isInt({ min: 0 }).withMessage('Value must be positive'), body('instanceType').custom(async (value) => { - if (!(await validator.supportedInstanceType(value))) { - throw new Error('Instance type not supported. Use jvb, jigasi, nomad, jibri or sip-jibri instead'); + if (!validator.supportedInstanceType(value)) { + throw new Error('Instance type not supported. Use jvb, jigasi, nomad, jibri, sip-jibri or skynet instead'); } return true; }), body('direction').custom(async (value) => { - if (!(await validator.supportedScalingDirection(value))) { + if (!validator.supportedScalingDirection(value)) { throw new Error('Scaling direction not supported. Use up or down instead'); } return true; diff --git a/src/audit.ts b/src/audit.ts index d93a8d0..b38629c 100644 --- a/src/audit.ts +++ b/src/audit.ts @@ -15,6 +15,7 @@ export interface GroupAudit { timestamp?: number | string; autoScalerActionItem?: AutoScalerActionItem; launcherActionItem?: LauncherActionItem; + groupMetricValue?: number; } export interface AutoScalerActionItem { @@ -37,6 +38,7 @@ export interface LauncherActionItem { export interface GroupAuditResponse { lastLauncherRun: string; lastAutoScalerRun: string; + lastGroupMetricValue: number; lastReconfigureRequest: string; lastScaleMetrics?: Array; autoScalerActionItems?: AutoScalerActionItem[]; @@ -222,6 +224,18 @@ export default class Audit { return updateResponse; } + async updateLastGroupMetricValue(ctx: Context, groupName: string, groupMetricValue: number): Promise { + const value: GroupAudit = { + groupMetricValue, + groupName, + type: 'last-group-metric-value', + }; + const updateResponse = this.setGroupValue(groupName, value); + ctx.logger.info(`Updated last group metric for group ${groupName}`); + + return updateResponse; + } + async updateLastAutoScalerRun(ctx: Context, groupName: string, scaleMetrics: Array): Promise { const updateLastAutoScalerStart = process.hrtime(); @@ -345,6 +359,7 @@ export default class Audit { const groupAuditResponse: GroupAuditResponse = { lastLauncherRun: 'unknown', lastAutoScalerRun: 'unknown', + lastGroupMetricValue: null, lastReconfigureRequest: 'unknown', lastScaleMetrics: [], }; @@ -356,6 +371,9 @@ export default class Audit { case 'last-launcher-run': groupAuditResponse.lastLauncherRun = new Date(groupAudit.timestamp).toISOString(); break; + case 'last-group-metric-value': + groupAuditResponse.lastGroupMetricValue = groupAudit.groupMetricValue; + break; case 'last-autoScaler-run': groupAuditResponse.lastScaleMetrics = groupAudit.autoScalerActionItem ? groupAudit.autoScalerActionItem.scaleMetrics diff --git a/src/autoscaler.ts b/src/autoscaler.ts index 2e04a4b..adca0bf 100644 --- a/src/autoscaler.ts +++ b/src/autoscaler.ts @@ -2,7 +2,7 @@ import { InstanceMetric, InstanceTracker } from './instance_tracker'; import CloudManager from './cloud_manager'; import Redlock from 'redlock'; import Redis from 'ioredis'; -import InstanceGroupManager, { InstanceGroup } from './instance_group'; +import InstanceGroupManager, { GroupMetric, InstanceGroup } from './instance_group'; import LockManager from './lock_manager'; import { Context } from './context'; import Audit from './audit'; @@ -75,13 +75,24 @@ export default class AutoscaleProcessor { group.scalingOptions.scaleUpPeriodsCount, group.scalingOptions.scaleDownPeriodsCount, ); - const metricInventoryPerPeriod: Array> = - await this.instanceTracker.getMetricInventoryPerPeriod( + + let metricInventoryPerPeriod = []; + + if (group.metricsUrl) { + metricInventoryPerPeriod = await this.instanceGroupManager.getGroupMetricInventoryPerPeriod( ctx, group.name, maxPeriodCount, group.scalingOptions.scalePeriod, ); + } else { + metricInventoryPerPeriod = await this.instanceTracker.getMetricInventoryPerPeriod( + ctx, + group.name, + maxPeriodCount, + group.scalingOptions.scalePeriod, + ); + } const scaleMetrics = await this.updateDesiredCountIfNeeded(ctx, group, count, metricInventoryPerPeriod); await this.audit.updateLastAutoScalerRun(ctx, group.name, scaleMetrics); @@ -96,7 +107,7 @@ export default class AutoscaleProcessor { ctx: Context, group: InstanceGroup, count: number, - metricInventoryPerPeriod: Array>, + metricInventoryPerPeriod: Array>, ): Promise> { ctx.logger.debug( `[AutoScaler] Begin desired count adjustments for group ${group.name} with ${count} instances and current desired count ${group.scalingOptions.desiredCount}`, @@ -191,6 +202,7 @@ export default class AutoscaleProcessor { case 'jigasi': case 'nomad': case 'JVB': + case 'skynet': // in the case of JVB scale up only if value (average stress level) is above or equal to threshhold return ( (count < group.scalingOptions.maxDesired && value >= group.scalingOptions.scaleUpThreshold) || @@ -209,6 +221,7 @@ export default class AutoscaleProcessor { case 'jigasi': case 'nomad': case 'JVB': + case 'skynet': // in the case of JVB scale down only if value (average stress level) is below threshhold return count > group.scalingOptions.minDesired && value < group.scalingOptions.scaleDownThreshold; } diff --git a/src/group_report.ts b/src/group_report.ts index 587616d..91c167b 100644 --- a/src/group_report.ts +++ b/src/group_report.ts @@ -153,6 +153,7 @@ export default class GroupReportGenerator { case 'jigasi': case 'nomad': case 'JVB': + case 'skynet': // @TODO: implement JVB instance counting break; } @@ -211,7 +212,7 @@ export default class GroupReportGenerator { instanceReport.scaleStatus = 'IN USE'; } if ( - instanceState.status.jigasiStatus && + instanceState.status.nomadStatus && !instanceState.status.nomadStatus.eligibleForScheduling ) { instanceReport.scaleStatus = 'GRACEFUL SHUTDOWN'; diff --git a/src/instance_group.ts b/src/instance_group.ts index a7d54e4..43d1676 100644 --- a/src/instance_group.ts +++ b/src/instance_group.ts @@ -1,5 +1,7 @@ import Redis from 'ioredis'; import { Context } from './context'; +import got from 'got'; +import Audit from './audit'; export interface ScalingOptions { minDesired: number; @@ -18,6 +20,16 @@ export interface InstanceGroupTags { [id: string]: string; } +export interface GroupMetric { + groupName: string; + timestamp: number; + value: number; +} + +export const GroupTypeToGroupMetricKey: { [id: string]: string } = { + skynet: 'queueSize', +}; + export interface InstanceGroup { id: string; name: string; @@ -31,6 +43,7 @@ export interface InstanceGroup { enableScheduler: boolean; enableUntrackedThrottle: boolean; enableReconfiguration?: boolean; + metricsUrl: string; gracePeriodTTLSec: number; protectedTTLSec: number; scalingOptions: ScalingOptions; @@ -39,6 +52,7 @@ export interface InstanceGroup { } export interface InstanceGroupManagerOptions { + audit: Audit; redisClient: Redis.Redis; redisScanCount: number; initialGroupList: Array; @@ -48,6 +62,7 @@ export interface InstanceGroupManagerOptions { export default class InstanceGroupManager { private readonly GROUPS_HASH_NAME = 'allgroups'; + private readonly audit: Audit; private redisClient: Redis.Redis; private readonly redisScanCount: number; private readonly initialGroupList: Array; @@ -55,6 +70,7 @@ export default class InstanceGroupManager { private readonly sanityJobsIntervalSeconds: number; constructor(options: InstanceGroupManagerOptions) { + this.audit = options.audit; this.redisClient = options.redisClient; this.redisScanCount = options.redisScanCount; this.initialGroupList = options.initialGroupList; @@ -83,6 +99,10 @@ export default class InstanceGroupManager { return this.initialGroupList; } + private getGroupMetricsKey(groupName: string): string { + return `gmetric:${groupName}`; + } + async existsAtLeastOneGroup(): Promise { let cursor = '0'; do { @@ -274,6 +294,71 @@ export default class InstanceGroupManager { return !(result !== null && result.length > 0); } + async fetchGroupMetrics(ctx: Context, groupName: string): Promise { + try { + const group = await this.getInstanceGroup(groupName); + if (!group) { + throw new Error(`Group ${groupName} not found, failed to report on group metrics`); + } + + if (!group.metricsUrl) { + ctx.logger.debug(`Group ${groupName} no metrics url, skipping metrics fetching`); + return false; + } + + const metrics: { [id: string]: number } = await got(group.metricsUrl).json(); + + const key: string = Object.keys(metrics).find((key) => { + return GroupTypeToGroupMetricKey[group.type] === key; + }); + + const metricsObject: GroupMetric = { + groupName, + timestamp: Date.now(), + value: metrics[key], + }; + + // store the group metrics + await this.redisClient.zadd( + this.getGroupMetricsKey(groupName), + metricsObject.timestamp, + JSON.stringify(metricsObject), + ); + + await this.audit.updateLastGroupMetricValue(ctx, groupName, metricsObject.value); + } catch (e) { + ctx.logger.error(`Failed to report group metrics for group ${groupName}`, e); + return false; + } + } + + async getGroupMetricInventoryPerPeriod( + ctx: Context, + groupName: string, + periodsCount: number, + periodDurationSeconds: number, + ): Promise>> { + const metricPoints: Array> = []; + const metricsKey = this.getGroupMetricsKey(groupName); + const now = Date.now(); + const items: string[] = await this.redisClient.zrange(metricsKey, 0, -1); + + for (let periodIdx = 0; periodIdx < periodsCount; periodIdx++) { + metricPoints[periodIdx] = []; + } + + items.forEach((item) => { + const itemJson: GroupMetric = JSON.parse(item); + const periodIdx = Math.floor((now - itemJson.timestamp) / (periodDurationSeconds * 1000)); + + if (periodIdx >= 0 && periodIdx < periodsCount) { + metricPoints[periodIdx].push(itemJson); + } + }); + + return metricPoints; + } + async setGroupJobsCreationGracePeriod(): Promise { return this.setValue(`groupJobsCreationGracePeriod`, this.processingIntervalSeconds); } diff --git a/src/instance_tracker.ts b/src/instance_tracker.ts index c6bd4ca..5205f33 100644 --- a/src/instance_tracker.ts +++ b/src/instance_tracker.ts @@ -2,7 +2,7 @@ import { Context } from './context'; import Redis from 'ioredis'; import ShutdownManager from './shutdown_manager'; import Audit from './audit'; -import { InstanceGroup } from './instance_group'; +import { GroupMetric, InstanceGroup } from './instance_group'; /* eslint-disable */ function isEmpty(obj: any) { @@ -82,6 +82,7 @@ export interface JigasiStatus { // largest_conference: number; graceful_shutdown: boolean; } + export interface InstanceDetails { instanceId: string; instanceType: string; @@ -352,21 +353,49 @@ export class InstanceTracker { async getSummaryMetricPerPeriod( ctx: Context, group: InstanceGroup, - metricInventoryPerPeriod: Array>, + metricInventoryPerPeriod: Array>, periodCount: number, ): Promise> { switch (group.type) { case 'jibri': case 'sip-jibri': - return this.getAvailableMetricPerPeriod(ctx, metricInventoryPerPeriod, periodCount); + return this.getAvailableMetricPerPeriod( + ctx, + metricInventoryPerPeriod as Array>, + periodCount, + ); case 'nomad': case 'jigasi': case 'JVB': - return this.getAverageMetricPerPeriod(ctx, metricInventoryPerPeriod, periodCount); + return this.getAverageMetricPerPeriod( + ctx, + metricInventoryPerPeriod as Array>, + periodCount, + ); + case 'skynet': + return this.getSkynetGroupMetricPerPeriod( + ctx, + metricInventoryPerPeriod as Array>, + periodCount, + ); } return; } + async getSkynetGroupMetricPerPeriod( + ctx: Context, + metricInventoryPerPeriod: Array>, + periodCount: number, + ): Promise> { + ctx.logger.debug(`Getting skynet group metric per period for ${periodCount} periods`, { + metricInventoryPerPeriod, + }); + + return metricInventoryPerPeriod + .slice(0, periodCount) + .map((groupMetrics: Array) => groupMetrics[0].value); + } + async getAvailableMetricPerPeriod( ctx: Context, metricInventoryPerPeriod: Array>, diff --git a/src/job_manager.ts b/src/job_manager.ts index a71cb9c..c94e640 100644 --- a/src/job_manager.ts +++ b/src/job_manager.ts @@ -24,12 +24,14 @@ export interface JobManagerOptions { metricsLoop: MetricsLoop; autoscalerProcessingTimeoutMs: number; launcherProcessingTimeoutMs: number; + reportProcessingTimeoutMs: number; sanityLoopProcessingTimeoutMs: number; } export enum JobType { Autoscale = 'AUTOSCALE', Launch = 'LAUNCH', + FetchGroupMetrics = 'FETCH_GROUP_METRICS', Sanity = 'SANITY', } @@ -40,6 +42,7 @@ const jobCreateFailureCounter = new promClient.Counter({ }); jobCreateFailureCounter.labels(JobType.Autoscale).inc(0); jobCreateFailureCounter.labels(JobType.Launch).inc(0); +jobCreateFailureCounter.labels(JobType.FetchGroupMetrics).inc(0); jobCreateFailureCounter.labels(JobType.Sanity).inc(0); const jobCreateTotalCounter = new promClient.Counter({ @@ -49,6 +52,7 @@ const jobCreateTotalCounter = new promClient.Counter({ }); jobCreateTotalCounter.labels(JobType.Autoscale).inc(0); jobCreateTotalCounter.labels(JobType.Launch).inc(0); +jobCreateTotalCounter.labels(JobType.FetchGroupMetrics).inc(0); jobCreateTotalCounter.labels(JobType.Sanity).inc(0); const jobProcessFailureCounter = new promClient.Counter({ @@ -58,6 +62,7 @@ const jobProcessFailureCounter = new promClient.Counter({ }); jobProcessFailureCounter.labels(JobType.Autoscale).inc(0); jobProcessFailureCounter.labels(JobType.Launch).inc(0); +jobProcessFailureCounter.labels(JobType.FetchGroupMetrics).inc(0); jobProcessFailureCounter.labels(JobType.Sanity).inc(0); const jobProcessTotalCounter = new promClient.Counter({ @@ -67,6 +72,7 @@ const jobProcessTotalCounter = new promClient.Counter({ }); jobProcessTotalCounter.labels(JobType.Autoscale).inc(0); jobProcessTotalCounter.labels(JobType.Launch).inc(0); +jobProcessTotalCounter.labels(JobType.FetchGroupMetrics).inc(0); jobProcessTotalCounter.labels(JobType.Sanity).inc(0); const queueErrorCounter = new promClient.Counter({ @@ -94,6 +100,7 @@ export default class JobManager { private sanityLoop: SanityLoop; private metricsLoop: MetricsLoop; private jobQueue: Queue; + private reportProcessingTimeoutMs: number; private autoscalerProcessingTimeoutMs: number; private launcherProcessingTimeoutMs: number; private sanityLoopProcessingTimeoutMs: number; @@ -109,6 +116,7 @@ export default class JobManager { this.metricsLoop = options.metricsLoop; this.autoscalerProcessingTimeoutMs = options.autoscalerProcessingTimeoutMs; this.launcherProcessingTimeoutMs = options.launcherProcessingTimeoutMs; + this.reportProcessingTimeoutMs = options.reportProcessingTimeoutMs; this.sanityLoopProcessingTimeoutMs = options.sanityLoopProcessingTimeoutMs; this.jobQueue = this.createQueue(JobManager.jobQueueName, options.queueRedisOptions); @@ -173,6 +181,14 @@ export default class JobManager { done, ); break; + case JobType.FetchGroupMetrics: + this.processJob( + ctx, + job, + (ctx, group) => this.instanceGroupManager.fetchGroupMetrics(ctx, group), + done, + ); + break; case JobType.Sanity: this.processJob( ctx, @@ -304,6 +320,14 @@ export default class JobManager { } const instanceGroupNames = await this.instanceGroupManager.getAllInstanceGroupNames(ctx); + + await this.createJobs( + ctx, + instanceGroupNames, + this.jobQueue, + JobType.FetchGroupMetrics, + this.reportProcessingTimeoutMs, + ); await this.createJobs( ctx, instanceGroupNames, diff --git a/src/nomad_instance_manager.ts b/src/nomad_instance_manager.ts index 57ca203..1947379 100644 --- a/src/nomad_instance_manager.ts +++ b/src/nomad_instance_manager.ts @@ -92,16 +92,12 @@ export default class NomadInstanceManager extends AbstractCloudInstanceManager { switch (status) { case 'pending': return 'PROVISIONING'; - break; case 'running': return 'RUNNING'; - break; case 'stopped': return 'SHUTDOWN'; - break; case 'dead': return 'SHUTDOWN'; - break; } return 'Unknown'; } diff --git a/src/validator.ts b/src/validator.ts index c43b607..c6f10c1 100644 --- a/src/validator.ts +++ b/src/validator.ts @@ -57,23 +57,11 @@ export default class Validator { return count + instanceGroup.scalingOptions.desiredCount <= max; } - async supportedInstanceType(instanceType: string): Promise { - return ( - instanceType !== null && - instanceType !== '' && - (instanceType.toLowerCase() == 'jibri' || - instanceType.toLowerCase() == 'sip-jibri' || - instanceType.toLowerCase() == 'jigasi' || - instanceType.toLowerCase() == 'nomad' || - instanceType.toLowerCase() == 'jvb') - ); + supportedInstanceType(instanceType: string): boolean { + return ['jibri', 'sip-jibri', 'jigasi', 'nomad', 'jvb', 'skynet'].includes(instanceType.toLowerCase()); } - async supportedScalingDirection(direction: string): Promise { - return ( - direction !== null && - direction !== '' && - (direction.toLowerCase() == 'up' || direction.toLowerCase() == 'down') - ); + supportedScalingDirection(direction: string): boolean { + return ['up', 'down'].includes(direction.toLowerCase()); } }