Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,443 changes: 1,749 additions & 694 deletions package-lock.json

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,23 @@
"license": "ISC",
"dependencies": {
"lodash": "^4.17.21",
"uuid": "^8.3.2"
},
"devDependencies": {
"@globalid/eslint-plugin": "^1.3.1",
"@types/amqplib": "^0.8.2",
"@types/lodash": "^4.14.185",
"@types/node": "^18.7.18",
"@types/uuid": "^8.3.4",
"@typescript-eslint/eslint-plugin": "^5.38.0",
"rimraf": "^3.0.2",
"ts-node": "^10.9.1"
"uuid": "^9.0.0"
},
"peerDependencies": {
"amqplib": "*",
"class-transformer": "*",
"class-validator": "*",
"class-validator-jsonschema": "*",
"reflect-metadata": "*"
},
"devDependencies": {
"@globalid/eslint-plugin": "^1.3.1",
"@types/amqplib": "^0.10.1",
"@types/lodash": "^4.14.191",
"@types/node": "^18.14.0",
"@types/uuid": "^9.0.0",
"@typescript-eslint/eslint-plugin": "^5.53.0",
"rimraf": "^4.1.2",
"ts-node": "^10.9.1"
}
}
4 changes: 2 additions & 2 deletions src/integration.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { ClassConstructor } from 'class-transformer'
import { loggers, LoggerI } from './logger'
import { registerRejectableErrors, CustomRejectableErrorI } from './lib/errors'
import { registerRejectableErrors, CustomErrorI } from './lib/errors'
import { IrisChannels, CustomChannelClassesI } from './lib/asyncapi/schema/channels'

export interface IrisIntegrationI {
customLoggerClass?: ClassConstructor<LoggerI>
rejectableErrors?: CustomRejectableErrorI[]
rejectableErrors?: CustomErrorI[]
asyncapi?: {
customChannelClasses?: CustomChannelClassesI
}
Expand Down
7 changes: 7 additions & 0 deletions src/lib/amqp.helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,10 @@ export function cloneAmqpMsgProperties(msg: amqplib.ConsumeMessage): amqplib.Mes

return msgProperties
}

export function hasClientContext(msg: amqplib.ConsumeMessage): boolean {
const lookupKey = `properties.headers[${MESSAGE_HEADERS.MESSAGE.SESSION_ID}]`
const hasSession = <string | undefined>_.get(msg, lookupKey)

return hasSession !== undefined
}
2 changes: 1 addition & 1 deletion src/lib/asyncapi/class_validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export class AsyncapiClassValidator {

private fixNullableReferenceObject(prop: SchemaObject & { nullable?: boolean }): void {
// nullable ref. by v3.1.0 spec
const ref = { $ref: (<ReferenceObject>prop).$ref }
const ref = { $ref: (<ReferenceObject>(<unknown>prop)).$ref }
delete prop.nullable
prop.anyOf = [ref, { type: 'null' }]

Expand Down
51 changes: 5 additions & 46 deletions src/lib/asyncapi/interfaces_openapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
/**
* Taken from @NestJS/swagger https://github.com/nestjs/swagger
*/

import { SchemaObject as SchemaObjectOpenapi } from 'openapi3-ts'

export interface OpenAPIObject {
openapi: string
info: InfoObject
Expand Down Expand Up @@ -185,44 +188,8 @@ export interface ReferenceObject {
$ref: string
}

export interface SchemaObject {
nullable?: boolean
discriminator?: DiscriminatorObject
readOnly?: boolean
writeOnly?: boolean
xml?: XmlObject
externalDocs?: ExternalDocumentationObject
example?: any
examples?: any[] | Record<string, any>
deprecated?: boolean
type?: string
allOf?: (SchemaObject | ReferenceObject)[]
oneOf?: (SchemaObject | ReferenceObject)[]
anyOf?: (SchemaObject | ReferenceObject)[]
not?: SchemaObject | ReferenceObject
items?: SchemaObject | ReferenceObject
properties?: Record<string, SchemaObject | ReferenceObject>
additionalProperties?: SchemaObject | ReferenceObject | boolean
patternProperties?: SchemaObject | ReferenceObject | any
description?: string
format?: string
default?: any
title?: string
multipleOf?: number
maximum?: number
exclusiveMaximum?: boolean
minimum?: number
exclusiveMinimum?: boolean
maxLength?: number
minLength?: number
pattern?: string
maxItems?: number
minItems?: number
uniqueItems?: boolean
maxProperties?: number
minProperties?: number
required?: string[]
enum?: any[]
export type SchemaObject = Omit<SchemaObjectOpenapi, 'type'> & {
type?: string | string[]
}

export type SchemasObject = Record<string, SchemaObject>
Expand All @@ -232,14 +199,6 @@ export interface DiscriminatorObject {
mapping?: Record<string, string>
}

export interface XmlObject {
name?: string
namespace?: string
prefix?: string
attribute?: boolean
wrapped?: boolean
}

export type SecuritySchemeType = 'apiKey' | 'http' | 'oauth2' | 'openIdConnect'

export interface SecuritySchemeObject {
Expand Down
9 changes: 3 additions & 6 deletions src/lib/asyncapi/schema/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export class AsyncapiSchema {
IsString()(<object>message.prototype, '__keep__')
}

private isOurMessage<T extends Function>(message: T): boolean {
private isIrisMessage<T extends Function>(message: T): boolean {
if (internallyDefinedMessages.includes(message)) {
return true
}
Expand All @@ -84,12 +84,9 @@ export class AsyncapiSchema {
}

private decorateMessageWithIrisAdditionalProperties<T extends Function>(message: T): void {
const isInternalMessage = this.isOurMessage(message)
const isIrisMessage = this.isIrisMessage(message)
JSONSchema({
additionalProperties: {
generatedClass: isInternalMessage,
isGeneratedClass: isInternalMessage,
},
'x-iris-generated': isIrisMessage,
})(message)
}
}
4 changes: 4 additions & 0 deletions src/lib/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ export class Connection {
return this.connection === undefined && this.connectPromise === undefined && this.reconnectHelper === undefined && this.disconnectPromise === undefined
}

public isReconnecting(): boolean {
return this.connection === undefined && this.reconnectHelper !== undefined
}

public setDoAutoReconnect(autoReconnect: boolean): void {
this.doAutoReconnect = autoReconnect
}
Expand Down
23 changes: 18 additions & 5 deletions src/lib/consume.error.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import _ from 'lodash'
import * as amqplib from 'amqplib'
import * as messageHandler from './message_handler'
import * as messageI from './message.interfaces'
Expand All @@ -6,8 +7,7 @@ import * as errors from './errors'
import * as consumeRetry from './consume.retry'
import * as consumeAck from './consume.ack'
import { MESSAGE_HEADERS, MANAGED_EXCHANGES } from './constants'
import { getTemporaryChannel, cloneAmqpMsgProperties } from './amqp.helper'
import _ from 'lodash'
import { getTemporaryChannel, cloneAmqpMsgProperties, hasClientContext } from './amqp.helper'

const { ERROR } = MANAGED_EXCHANGES

Expand Down Expand Up @@ -36,7 +36,7 @@ export async function handleConsumeError(

if (reject) {
consumeAck.safeAckMsg(msg, channel, 'reject', false)
await sendErrorMessage(msg, error)
await handleRejectableError(msg, error)
} else {
const enqueued = await consumeRetry.enqueueWithBackoff(msg, handler, msgMeta.processedConfig, error)
if (enqueued) {
Expand All @@ -47,8 +47,21 @@ export async function handleConsumeError(
}
}

async function sendErrorMessage(msg: amqplib.ConsumeMessage, error: Error): Promise<void> {
logger.errorDetails('Publishing Error message')
async function handleRejectableError(msg: amqplib.ConsumeMessage, error: Error): Promise<void> {
const notifyClient = errors.shouldNotifyClient(error, msg)

if (notifyClient === false) {
logger.debug('Not publishing error message')

return
}

if (hasClientContext(msg)) {
logger.debug('Publishing error message')
} else {
logger.errorDetails('Publishing error message even though msg does not have client context')
}

try {
const msgProperties = cloneAmqpMsgProperties(msg)
const { headers } = msgProperties
Expand Down
2 changes: 1 addition & 1 deletion src/lib/consume.retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export async function enqueueWithBackoff(
headers[MESSAGE_HEADERS.REQUEUE.ORIGINAL_EXCHANGE] = msg.fields.exchange
headers[MESSAGE_HEADERS.REQUEUE.ORIGINAL_ROUTING_KEY] = msg.fields.routingKey
headers[MESSAGE_HEADERS.REQUEUE.MAX_RETRIES] = msgMeta.maxRetry ?? connection.getConfig().maxMessageRetryCount
headers[MESSAGE_HEADERS.REQUEUE.NOTIFY_CLIENT] = errors.shouldNotifyFrontend(error)
headers[MESSAGE_HEADERS.REQUEUE.NOTIFY_CLIENT] = errors.shouldNotifyClient(error, msg)
headers[MESSAGE_HEADERS.MESSAGE.SERVER_TIMESTAMP] = Date.now()

headers[MESSAGE_HEADERS.REQUEUE.ERROR_CODE] = error.constructor.name
Expand Down
68 changes: 54 additions & 14 deletions src/lib/errors.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
import _ from 'lodash'
import * as amqplib from 'amqplib'
import * as classValidator from 'class-validator'
import { ClassConstructor } from 'class-transformer'
import { hasClientContext } from './amqp.helper'

export const UnauthorizedError = class UnauthorizedException extends Error {}
export const ForbiddenError = class ForbiddenException extends Error {}

export interface CustomRejectableErrorI {
export interface CustomErrorI {
errorClass: ClassConstructor<Error>
errorType: ErrorTypeE
// when registring custom errors, this flag
// can be used to override `notifyClient` flag
// on error itself.
alwaysNotifyClient?: true
}

const REJECTABLE_ERRORS: CustomRejectableErrorI[] = []
const REJECTABLE_ERRORS: CustomErrorI[] = []

export interface ErrorMessageI {
errorType: ErrorTypeE
Expand All @@ -31,12 +38,36 @@ export enum ErrorTypeE {

export abstract class MsgError extends Error {
errorType: ErrorTypeE = ErrorTypeE.INTERNAL_SERVER_ERROR
notifyFrontend: boolean = true

/**
* Whether error should be propagated back to the client.
* If not set, IRIS will do this when client's context is
* available.
*/
notifyClient?: boolean = undefined

constructor(msg: string)
constructor(msg: string, notifyFrontend: boolean)
constructor(msg: string, notifyFrontend?: boolean) {
constructor(msg: string, notifyClient: boolean)
constructor(msg: string, notifyClient?: boolean) {
super(msg)
this.notifyFrontend = notifyFrontend ?? true
if (notifyClient !== undefined) {
this.notifyClient = notifyClient
}
}

/**
* Whether error should be propagated back to the client.
* If not set, IRIS will do this when client's context is
* available.
*/
public setNotifyClient(notifyClient: boolean): MsgError {
this.notifyClient = notifyClient

return this
}

public trhow(): never {
throw this
}

public getMessage(): string {
Expand All @@ -49,15 +80,16 @@ export abstract class MsgError extends Error {
* the message is automatically rejected, no retry/enqueue
* mechanism is used.
*/

export class RejectMsgError extends MsgError {
errorType: ErrorTypeE = ErrorTypeE.BAD_REQUEST
}

export class InvalidObjectConverionError extends RejectMsgError {
validationErrors: classValidator.ValidationError[]

constructor(errorDetails: classValidator.ValidationError[], notifyFrontend: boolean = true) {
super('InvalidObjectCoversion', notifyFrontend)
constructor(errorDetails: classValidator.ValidationError[]) {
super('InvalidObjectCoversion')
this.validationErrors = errorDetails
}

Expand Down Expand Up @@ -107,8 +139,16 @@ export function getErrorMessage(error: Error): ErrorMessageI {
}
}

export function shouldNotifyFrontend(error: Error): boolean {
return error instanceof MsgError ? error.notifyFrontend : true
export function shouldNotifyClient(error: Error, msg: amqplib.ConsumeMessage): boolean {
const customRejectableError = getIfRejectableError(error)

if (customRejectableError?.alwaysNotifyClient === true) {
return true
}

const explicit = error instanceof MsgError ? error.notifyClient : undefined

return explicit ?? hasClientContext(msg)
}

export function isRejectableError(error: Error): boolean {
Expand All @@ -127,13 +167,13 @@ export function isRejectableError(error: Error): boolean {
return false
}

export function registerRejectableErrors(errorClasses: CustomRejectableErrorI[]): void {
errorClasses.forEach(errorClass => {
REJECTABLE_ERRORS.push(errorClass)
export function registerRejectableErrors(errorClasses: CustomErrorI[]): void {
errorClasses.reverse().forEach(errorClass => {
REJECTABLE_ERRORS.unshift(errorClass)
})
}

function getIfRejectableError(error: Error): CustomRejectableErrorI | undefined {
function getIfRejectableError(error: Error): CustomErrorI | undefined {
return REJECTABLE_ERRORS.find(({ errorClass }) => {
return error instanceof errorClass
})
Expand Down
2 changes: 1 addition & 1 deletion src/lib/message_handler.decorator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export const MessageHandler =
}

function manageAutoDecoratedArguments(target: Object, propertyKey: string | symbol): Object {
const methodArgs = <typeof Function[]>Reflect.getMetadata('design:paramtypes', target, propertyKey)
const methodArgs = <(typeof Function)[]>Reflect.getMetadata('design:paramtypes', target, propertyKey)
const targetMessage: Object[] = methodArgs.filter(message.isMessageDecoratedClass)

if (targetMessage.length !== 1) {
Expand Down