Skip to content

Commit

Permalink
Refactored queue system for the aggregator to now accept messages com…
Browse files Browse the repository at this point in the history
…ing into a processing queue, with priority support, including better configuration. Implemented delay queues with configurable message TTLs. Update ampqlib deps to latest. #173
  • Loading branch information
Maelstromeous committed Jun 5, 2022
1 parent 4d81127 commit ed9fc6f
Show file tree
Hide file tree
Showing 41 changed files with 510 additions and 313 deletions.
2 changes: 1 addition & 1 deletion .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ module.exports = {
],

// eslint: Possible Errors
"no-console": "error",
"no-console": "warn",

// eslint: Best Practices
"curly": "error",
Expand Down
10 changes: 5 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"build": "tsc && cp -r src/constants/lattice dist/constants",
"build:watch": "tsc -w",
"wipe": "rm -rf dist",
"test": "echo \"Error: no test specified\" && exit 1",
"test": "echo \\\"Error: no tests specified\\\" && exit 1",
"lint": "eslint ./src --ext .ts",
"lint:fix": "eslint ./src --ext .ts --fix",
"lint:check": "prettier --check .",
Expand All @@ -27,8 +27,8 @@
},
"homepage": "https://github.com/ps2alerts/aggregator#readme",
"dependencies": {
"amqp-connection-manager": "^3.2.2",
"amqplib": "^0.8.0",
"amqp-connection-manager": "^4.1.3",
"amqplib": "^0.9.1",
"axios": "^0.21.1",
"body-parser": "^1.19.0",
"bufferutil": "^4.0.3",
Expand All @@ -50,8 +50,8 @@
"ws": "^7.4.3"
},
"devDependencies": {
"@types/amqp-connection-manager": "^2.0.10",
"@types/amqplib": "^0.5.17",
"@types/amqp-connection-manager": "^3.4.0",
"@types/amqplib": "^0.8.2",
"@types/ioredis": "^4.28.8",
"@types/lodash": "^4.14.168",
"@types/node": "^16.11.7",
Expand Down
10 changes: 5 additions & 5 deletions provisioning/admin-aggregator-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
set_fact:
message:
type: "instanceStart"
body:
data:
instanceId: "{{ 100000 | random }}"
world: "{{ world }}"
zone: "{{ zone }}"
Expand All @@ -87,23 +87,23 @@
set_fact:
message:
type: "instanceEnd"
body:
data:
instanceId: "{{ instanceId }}"
when: type == "end"

- name: Create endAll message body
set_fact:
message:
type: "endAll"
body:
data:
foo: "bar"
when: type == "endAll"

- name: Create activeInstances message body
set_fact:
message:
type: "{{ type }}"
body:
data:
foo: "bar"
when: type == "activeInstances"

Expand All @@ -126,7 +126,7 @@
msg: "{{ message }}"

- name: Publish Instance message
rabbitmq_publish:
community.rabbitmq.rabbitmq_publish:
queue: "{{ queue_name }}"
body: "{{ message | to_json }}"
proto: amqp
Expand Down
4 changes: 0 additions & 4 deletions provisioning/k8s/application-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ spec:
value: "{{ rabbitmq_pass }}"
- name: RABBITMQ_VHOST
value: "ps2alerts"
- name: RABBITMQ_API_QUEUE
value: "api-queue-production"
- name: RABBITMQ_API_QUEUE_DELAY
value: "api-queue-delay-production"
- name: REDIS_HOST
value: "{{ redis_host }}"
- name: REDIS_PASS
Expand Down
10 changes: 7 additions & 3 deletions src/config/census.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {CharacterManagerOptions, ClientOptions, Rest} from 'ps2census';
import {censusEnvironments} from '../constants/censusEnvironments';
import {pcWorldArray, WorldPC, WorldPS4EU, WorldPS4US} from '../constants/world';
import {CensusEnvironment} from '../types/CensusEnvironment';
import {TestCharacters} from '../constants/testCharacters';

export default class Census {
public static readonly characterManagerConfig: CharacterManagerOptions = {
Expand All @@ -25,13 +26,15 @@ export default class Census {
};

public readonly streamManagerConfig = {
worlds: this.getWorldsForEnvironment(),
characters: ['all'],
// worlds: this.getWorldsForEnvironment(),
worlds: ['19'],
characters: [TestCharacters.MAELSTROME26_JAEGER_VS],
logicalAndCharactersWithWorlds: true,
};

public readonly serviceID: string = get('CENSUS_SERVICE_ID');
public readonly censusEnvironment = this.getCensusEnvironment();
public readonly staleConnectionWatcherEnabled = false; // Turn off for debugging / development otherwise you'll get constantly rebooted.

/**
* @type {ClientOptions} Configuration for PS2 Census aggregator client
Expand All @@ -40,7 +43,8 @@ export default class Census {
streamManager: {
subscription: {
...this.streamManagerConfig,
eventNames: ['Death', 'FacilityControl', 'GainExperience', 'MetagameEvent', 'PlayerLogin', 'PlayerLogout', 'VehicleDestroy'],
// eventNames: ['Death', 'FacilityControl', 'GainExperience', 'MetagameEvent', 'PlayerLogin', 'PlayerLogout', 'VehicleDestroy'],
eventNames: ['FacilityControl', 'PlayerFacilityCapture', 'PlayerFacilityDefend'],
},
},
characterManager: Census.characterManagerConfig,
Expand Down
8 changes: 5 additions & 3 deletions src/config/rabbitmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ export default class RabbitMQ {
public readonly user = get('RABBITMQ_USER', 'guest');
public readonly pass = get('RABBITMQ_PASS', 'guest');
public readonly exchange = get('RABBITMQ_EXCHANGE', 'ps2alerts');
public readonly delayExchange = get('RABBITMQ_DELAY_EXCHANGE', 'ps2alerts-delay');
public readonly vhost = get('RABBITMQ_VHOST', '');
public readonly heartbeat = 10;
public readonly timeout = 5000;
public readonly apiQueueName = get('RABBITMQ_API_QUEUE', 'api-queue');
public readonly apiDelayQueueName = get('RABBITMQ_API_QUEUE_DELAY', 'api-queue-delay');
public readonly apiQueueName = `api-queue-${get('NODE_ENV')}`;
public readonly apiDelayQueueName = `api-queue-${get('NODE_ENV')}-delay`;
public readonly aggregatorQueueName = `aggregator-queue-${get('CENSUS_ENVIRONMENT')}`;
public readonly aggregatorDelayQueueName = `aggregator-queue-${get('CENSUS_ENVIRONMENT')}-delay`;
public readonly aggregatorPrefetch = 200;
}
5 changes: 5 additions & 0 deletions src/constants/ps2alertsAggregatorQueueEvents.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// eslint-disable-next-line @typescript-eslint/naming-convention
export enum ps2alertsAggregatorQueueEvents {
DEATH = 'death',
METAGAMEEVENT = 'metagameEvent',
}
3 changes: 1 addition & 2 deletions src/constants/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const TYPES = {
deathAggregates: Symbol.for('deathAggregates'),
facilityControlAggregates: Symbol.for('facilityControlAggregates'),
globalVictoryAggregate: Symbol.for('globalVictoryAggregate'),
playerFacilityAggregates: Symbol.for('playerFacilityAggregates'),
populationAggregates: Symbol.for('populationAggregates'),
vehicleDestroyAggregates: Symbol.for('vehicleDestroyAggregates'),

Expand All @@ -27,8 +28,6 @@ const TYPES = {
rabbitMQSubscribers: Symbol.for('rabbitMQSubscribers'),
// Publishers
rabbitMQPublishers: Symbol.for('rabbitMQPublishers'),
apiMQPublisher: Symbol.for('apiMQPublisher'),
apiMQDelayPublisher: Symbol.for('apiMQDelayPublisher'),
// Handlers
adminMessageHandlers: Symbol.for('adminMessageHandlers'),

Expand Down
13 changes: 13 additions & 0 deletions src/data/AggregatorQueueMessage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/* eslint-disable @typescript-eslint/explicit-module-boundary-types,@typescript-eslint/no-explicit-any */
import {ps2alertsAggregatorQueueEvents} from '../constants/ps2alertsAggregatorQueueEvents';

export default class AggregatorQueueMessage {
public readonly type: ps2alertsAggregatorQueueEvents;
public readonly data: any;

constructor(type: ps2alertsAggregatorQueueEvents, data: any) {
this.type = type;
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
this.data = data;
}
}
6 changes: 3 additions & 3 deletions src/data/ParsedQueueMessage.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
export default class ParsedQueueMessage {
public readonly type: string;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
public readonly body: Record<string, any>;
public readonly data: Record<string, any>;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
constructor(type: string, body: Record<string, any>) {
constructor(type: string, data: Record<string, any>) {
this.type = type;
this.body = body;
this.data = data;
}
}
7 changes: 3 additions & 4 deletions src/handlers/aggregate/VehicleAggregateHandler.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import {inject, injectable} from 'inversify';
import {injectable} from 'inversify';
import AggregateHandlerInterface from '../../interfaces/AggregateHandlerInterface';
import VehicleDestroyEvent from '../census/events/VehicleDestroyEvent';
import {getLogger} from '../../logger';
import ApiMQPublisher from '../../services/rabbitmq/publishers/ApiMQPublisher';
import {TYPES} from '../../constants/types';
import VehicleDestroyLogic from '../../logics/VehicleDestroyLogic';
import {MQAcceptedPatterns} from '../../constants/MQAcceptedPatterns';
import ApiMQMessage from '../../data/ApiMQMessage';
Expand All @@ -16,8 +15,8 @@ export default class VehicleAggregateHandler implements AggregateHandlerInterfac
private static readonly logger = getLogger('VehicleAggregateHandler');

constructor(
@inject(TYPES.apiMQPublisher) private readonly apiMQPublisher: ApiMQPublisher,
@inject(TYPES.apiMQDelayPublisher) private readonly apiMQDelayPublisher: ApiMQDelayPublisher,
private readonly apiMQPublisher: ApiMQPublisher,
private readonly apiMQDelayPublisher: ApiMQDelayPublisher,
) {}

public async handle(event: VehicleDestroyEvent): Promise<boolean> {
Expand Down
7 changes: 3 additions & 4 deletions src/handlers/aggregate/VehicleDeathEventHandler.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import {inject, injectable} from 'inversify';
import {injectable} from 'inversify';
import {getLogger} from '../../logger';
import ApiMQMessage from '../../data/ApiMQMessage';
import {MQAcceptedPatterns} from '../../constants/MQAcceptedPatterns';
import ApiMQPublisher from '../../services/rabbitmq/publishers/ApiMQPublisher';
import {TYPES} from '../../constants/types';
import DeathEvent from '../census/events/DeathEvent';
import AggregateHandlerInterface from '../../interfaces/AggregateHandlerInterface';
import VehicleCharacterDeathLogic from '../../logics/VehicleCharacterDeathLogic';
Expand All @@ -16,8 +15,8 @@ export default class VehicleDeathEventHandler implements AggregateHandlerInterfa
private static readonly logger = getLogger('VehicleDeathEventHandler');

constructor(
@inject(TYPES.apiMQPublisher) private readonly apiMQPublisher: ApiMQPublisher,
@inject(TYPES.apiMQDelayPublisher) private readonly apiMQDelayPublisher: ApiMQDelayPublisher,
private readonly apiMQPublisher: ApiMQPublisher,
private readonly apiMQDelayPublisher: ApiMQDelayPublisher,
) {}

/**
Expand Down
7 changes: 3 additions & 4 deletions src/handlers/aggregate/global/GlobalCharacterAggregate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
import AggregateHandlerInterface from '../../../interfaces/AggregateHandlerInterface';
import DeathEvent from '../../census/events/DeathEvent';
import {getLogger} from '../../../logger';
import {inject, injectable} from 'inversify';
import {TYPES} from '../../../constants/types';
import {injectable} from 'inversify';
import {Kill} from 'ps2census';
import ApiMQDelayPublisher from '../../../services/rabbitmq/publishers/ApiMQDelayPublisher';
import {MQAcceptedPatterns} from '../../../constants/MQAcceptedPatterns';
Expand All @@ -17,8 +16,8 @@ export default class GlobalCharacterAggregate implements AggregateHandlerInterfa
private static readonly logger = getLogger('GlobalCharacterAggregate');

constructor(
@inject(TYPES.apiMQPublisher) private readonly apiMQPublisher: ApiMQPublisher,
@inject(TYPES.apiMQDelayPublisher) private readonly apiMQDelayPublisher: ApiMQDelayPublisher,
private readonly apiMQPublisher: ApiMQPublisher,
private readonly apiMQDelayPublisher: ApiMQDelayPublisher,
) {}

public async handle(event: DeathEvent): Promise<boolean> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
/* eslint-disable @typescript-eslint/naming-convention */
import AggregateHandlerInterface from '../../../interfaces/AggregateHandlerInterface';
import {getLogger} from '../../../logger';
import {inject, injectable} from 'inversify';
import {TYPES} from '../../../constants/types';
import {injectable} from 'inversify';
import FactionUtils from '../../../utils/FactionUtils';
import FacilityControlEvent from '../../census/events/FacilityControlEvent';
import {MQAcceptedPatterns} from '../../../constants/MQAcceptedPatterns';
Expand All @@ -16,8 +15,8 @@ export default class GlobalFacilityControlAggregate implements AggregateHandlerI
private static readonly logger = getLogger('GlobalFacilityControlAggregate');

constructor(
@inject(TYPES.apiMQPublisher) private readonly apiMQPublisher: ApiMQPublisher,
@inject(TYPES.apiMQDelayPublisher) private readonly apiMQDelayPublisher: ApiMQDelayPublisher,
private readonly apiMQPublisher: ApiMQPublisher,
private readonly apiMQDelayPublisher: ApiMQDelayPublisher,
) {}

public async handle(event: FacilityControlEvent): Promise<boolean> {
Expand Down
7 changes: 3 additions & 4 deletions src/handlers/aggregate/global/GlobalFactionCombatAggregate.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import AggregateHandlerInterface from '../../../interfaces/AggregateHandlerInterface';
import DeathEvent from '../../census/events/DeathEvent';
import {getLogger} from '../../../logger';
import {inject, injectable} from 'inversify';
import {TYPES} from '../../../constants/types';
import {injectable} from 'inversify';
import FactionUtils from '../../../utils/FactionUtils';
import {Kill} from 'ps2census';
import {MQAcceptedPatterns} from '../../../constants/MQAcceptedPatterns';
Expand All @@ -17,8 +16,8 @@ export default class GlobalFactionCombatAggregate implements AggregateHandlerInt
private static readonly logger = getLogger('GlobalFactionCombatAggregate');

constructor(
@inject(TYPES.apiMQPublisher) private readonly apiMQPublisher: ApiMQPublisher,
@inject(TYPES.apiMQDelayPublisher) private readonly apiMQDelayPublisher: ApiMQDelayPublisher,
private readonly apiMQPublisher: ApiMQPublisher,
private readonly apiMQDelayPublisher: ApiMQDelayPublisher,
) {}

public async handle(event: DeathEvent): Promise<boolean> {
Expand Down
7 changes: 3 additions & 4 deletions src/handlers/aggregate/global/GlobalLoadoutAggregate.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import DeathEvent from '../../census/events/DeathEvent';
import {getLogger} from '../../../logger';
import {inject, injectable} from 'inversify';
import {TYPES} from '../../../constants/types';
import {injectable} from 'inversify';
import AggregateHandlerInterface from '../../../interfaces/AggregateHandlerInterface';
import {Kill} from 'ps2census';
import {MQAcceptedPatterns} from '../../../constants/MQAcceptedPatterns';
Expand All @@ -16,8 +15,8 @@ export default class GlobalLoadoutAggregate implements AggregateHandlerInterface
private static readonly logger = getLogger('GlobalLoadoutAggregate');

constructor(
@inject(TYPES.apiMQPublisher) private readonly apiMQPublisher: ApiMQPublisher,
@inject(TYPES.apiMQDelayPublisher) private readonly apiMQDelayPublisher: ApiMQDelayPublisher,
private readonly apiMQPublisher: ApiMQPublisher,
private readonly apiMQDelayPublisher: ApiMQDelayPublisher,
) {}

public async handle(event: DeathEvent): Promise<boolean> {
Expand Down
7 changes: 3 additions & 4 deletions src/handlers/aggregate/global/GlobalOutfitAggregate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
import AggregateHandlerInterface from '../../../interfaces/AggregateHandlerInterface';
import DeathEvent from '../../census/events/DeathEvent';
import {getLogger} from '../../../logger';
import {inject, injectable} from 'inversify';
import {TYPES} from '../../../constants/types';
import {injectable} from 'inversify';
import {Kill} from 'ps2census';
import {MQAcceptedPatterns} from '../../../constants/MQAcceptedPatterns';
import ApiMQDelayPublisher from '../../../services/rabbitmq/publishers/ApiMQDelayPublisher';
Expand All @@ -17,8 +16,8 @@ export default class GlobalOutfitAggregate implements AggregateHandlerInterface<
private static readonly logger = getLogger('GlobalOutfitAggregate');

constructor(
@inject(TYPES.apiMQPublisher) private readonly apiMQPublisher: ApiMQPublisher,
@inject(TYPES.apiMQDelayPublisher) private readonly apiMQDelayPublisher: ApiMQDelayPublisher,
private readonly apiMQPublisher: ApiMQPublisher,
private readonly apiMQDelayPublisher: ApiMQDelayPublisher,
) {}

public async handle(event: DeathEvent): Promise<boolean> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
/* eslint-disable @typescript-eslint/naming-convention */
import AggregateHandlerInterface from '../../../interfaces/AggregateHandlerInterface';
import {getLogger} from '../../../logger';
import {inject, injectable} from 'inversify';
import {TYPES} from '../../../constants/types';
import {injectable} from 'inversify';
import ApiMQPublisher from '../../../services/rabbitmq/publishers/ApiMQPublisher';
import {MQAcceptedPatterns} from '../../../constants/MQAcceptedPatterns';
import FacilityControlEvent from '../../census/events/FacilityControlEvent';
Expand All @@ -16,8 +15,8 @@ export default class GlobalOutfitCapturesAggregate implements AggregateHandlerIn
private static readonly logger = getLogger('GlobalOutfitCapturesAggregate');

constructor(
@inject(TYPES.apiMQPublisher) private readonly apiMQPublisher: ApiMQPublisher,
@inject(TYPES.apiMQDelayPublisher) private readonly apiMQDelayPublisher: ApiMQDelayPublisher,
private readonly apiMQPublisher: ApiMQPublisher,
private readonly apiMQDelayPublisher: ApiMQDelayPublisher,
) {}

public async handle(event: FacilityControlEvent): Promise<boolean> {
Expand Down
9 changes: 2 additions & 7 deletions src/handlers/aggregate/global/GlobalVictoryAggregate.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
import AggregateHandlerInterface from '../../../interfaces/AggregateHandlerInterface';
import {getLogger} from '../../../logger';
import {inject, injectable} from 'inversify';
import {TYPES} from '../../../constants/types';
import {injectable} from 'inversify';
import {MQAcceptedPatterns} from '../../../constants/MQAcceptedPatterns';
import MetagameTerritoryInstance from '../../../instances/MetagameTerritoryInstance';
import {Faction} from '../../../constants/faction';
import ApplicationException from '../../../exceptions/ApplicationException';
import ApiMQGlobalAggregateMessage from '../../../data/ApiMQGlobalAggregateMessage';
import moment from 'moment/moment';
import ApiMQDelayPublisher from '../../../services/rabbitmq/publishers/ApiMQDelayPublisher';
import ApiMQPublisher from '../../../services/rabbitmq/publishers/ApiMQPublisher';
import {Bracket} from '../../../constants/bracket';

@injectable()
export default class GlobalVictoryAggregate implements AggregateHandlerInterface<MetagameTerritoryInstance> {
private static readonly logger = getLogger('GlobalVictoryAggregate');

constructor(
@inject(TYPES.apiMQPublisher) private readonly apiMQPublisher: ApiMQPublisher,
@inject(TYPES.apiMQDelayPublisher) private readonly apiMQDelayPublisher: ApiMQDelayPublisher,
) {}
constructor(private readonly apiMQPublisher: ApiMQPublisher) {}

public async handle(event: MetagameTerritoryInstance): Promise<boolean> {
GlobalVictoryAggregate.logger.debug('GlobalVictoryAggregate.handle');
Expand Down

0 comments on commit ed9fc6f

Please sign in to comment.