Skip to content

Commit

Permalink
Feature/jetstream (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
oncicaradupopovici committed Dec 13, 2023
1 parent 1e82473 commit adf0957
Show file tree
Hide file tree
Showing 29 changed files with 692 additions and 84 deletions.
5 changes: 5 additions & 0 deletions .changeset/tidy-dolphins-wink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@totalsoft/message-bus': minor
---

Added JetStream messaging transport for message-bus
11 changes: 11 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@
"cwd": "${fileDirname}",
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen"
},
{
"type": "node",
"name": "Run current file",
"request": "launch",
"runtimeExecutable": "yarn",
"runtimeArgs": ["run", "ts-node"],
"args": ["${fileBasenameNoExtension}"],
"cwd": "${fileDirname}",
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen"
}
]
}
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ A collection of plugins and other GraphQL utilities.

## Contributing guide
When using Visual Studio Code please follow these steps: [Editor Setup for VSCode](https://yarnpkg.com/getting-started/editor-sdks#vscode) (allows VSCode to read .zip yarn cache files and supports features like go-to-definition).

When using Visual Studio Code please use the extension [`Licenser`](https://marketplace.visualstudio.com/items?itemName=ymotongpoo.licenser) for applying the license header in files.
### - Build
```javascript
yarn install
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"eslint-config-prettier": "^8.5.0",
"eslint-plugin-prettier": "^4.0.0",
"prettier": "^2.6.2",
"ts-node": "^10.9.1",
"typescript": "^4.6.3"
}
}
15 changes: 13 additions & 2 deletions packages/message-bus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ By default the message bus uses the Nats streaming transport. When working with
```javascript
const { messageBus, useTransport, transport } = require('@totalsoft/message-bus');

useTransport(transport.rusi) //or transport.nats or whatever transport
useTransport(transport.rusi) //or transport.nats or transport.jetstream
const msgBus = messageBus() //now every message bus instance points to that transport
await msgBus.publish('some_subject', {});
```
Expand Down Expand Up @@ -89,7 +89,7 @@ const [topic, event] = await msgBus.sendCommandAndReceiveEvent(
Messaging__TopicPrefix="deprecated_please_use_Messaging__Env"
Messaging__Env="messaging_env"
Messaging__Source="your_service_name"
Messaging__Transport="nats_or_rusi"
Messaging__Transport="jetstream_nats_or_rusi"

NATS_URL="your_nats_url"
NATS_CLUSTER="your_nats_cluster"
Expand All @@ -113,4 +113,15 @@ RUSI_PUB_SUB_AckWaitTime="5000"
RUSI_RPC_MaxConcurrentMessages="1"
RUSI_RPC_AckWaitTime="5000"

JETSTREAM_URL,
JETSTREAM_CLIENT_ID,
JETSTREAM_COMMANDS_STREAM,
JETSTREAM_EVENTS_STREAM,
JETSTREAM_STREAM_PROCESSOR_MaxConcurrentMessages = '1',
JETSTREAM_STREAM_PROCESSOR_AckWaitTime = '5000000000', // 5 seconds
JETSTREAM_PUB_SUB_MaxConcurrentMessages = '100',
JETSTREAM_PUB_SUB_AckWaitTime = '5000000000', // 5 seconds
JETSTREAM_RPC_MaxConcurrentMessages = '1',
JETSTREAM_RPC_AckWaitTime = '5000000000' // 5 seconds


32 changes: 32 additions & 0 deletions packages/message-bus/__mocks__/nats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) TotalSoft.
// This source code is licensed under the MIT license.

import type nats from 'nats'

const natsMock: any = jest.createMockFromModule<jest.Mocked<typeof nats>>('nats')

const natsConsumerMock: any = {
info: jest.fn().mockImplementation(() => ({ config: {} })),
consume: jest.fn().mockResolvedValue({})
}

const jetStreamClientMock: any = {
publish: jest.fn().mockResolvedValue({}),
jetstreamManager: jest.fn().mockImplementation(() => ({
consumers: { add: jest.fn().mockResolvedValue({}) }
})),
consumers: {
get: jest.fn().mockResolvedValue(natsConsumerMock)
}
}
natsMock.connect = jest.fn().mockResolvedValue({
close: jest.fn().mockResolvedValue(undefined),
closed: jest.fn().mockReturnValue(new Promise(() => {})),
jetstream: jest.fn().mockImplementation(() => jetStreamClientMock)
})

natsMock.StringCodec = jest.fn().mockImplementation(() => ({
encode: jest.fn()
}))

export default { ...natsMock, __jetStreamClientMock: jetStreamClientMock, __natsConsumerMock: natsConsumerMock }
3 changes: 3 additions & 0 deletions packages/message-bus/__mocks__/node-nats-streaming.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) TotalSoft.
// This source code is licensed under the MIT license.

const nodeNatsStreaming: any = jest.createMockFromModule('node-nats-streaming')

export default nodeNatsStreaming
17 changes: 17 additions & 0 deletions packages/message-bus/__samples__/jetstream/publish.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) TotalSoft.
// This source code is licensed under the MIT license.

process.env.JETSTREAM_URL = 'localhost:4222'
import { ensureStreamsExist } from './util'
import { messageBus, useTransport, transport } from '../../src'

async function main() {
await ensureStreamsExist()

useTransport(transport.jetstream)
const msgBus = messageBus()
await msgBus.publish('events.my-event1', { asa: 'asa' })
await msgBus.transport.disconnect()
}

main()
11 changes: 11 additions & 0 deletions packages/message-bus/__samples__/jetstream/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<!--
Copyright (c) TotalSoft.
This source code is licensed under the MIT license.
-->

# Jetstream samples

Before running the samples, you can start a Nats server docker container by running the following:
```
yarn docker-run-jestream
```
19 changes: 19 additions & 0 deletions packages/message-bus/__samples__/jetstream/subscribe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) TotalSoft.
// This source code is licensed under the MIT license.

import { ensureStreamsExist } from './util'
import { messageBus, useTransport, transport } from '../../src'

async function main() {
await ensureStreamsExist()

useTransport(transport.jetstream)
const msgBus = messageBus()

const sub = await msgBus.subscribe('events.my-event1', _msg=> Promise.resolve())
await new Promise(r => setTimeout(r, 20000))
await sub.unsubscribe()
await msgBus.transport.disconnect()
}

main()
25 changes: 25 additions & 0 deletions packages/message-bus/__samples__/jetstream/util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) TotalSoft.
// This source code is licensed under the MIT license.

process.env.JETSTREAM_URL = 'localhost:4222'
process.env.Messaging__Env = ''
process.env.Messaging__TopicPrefix = ''
process.env.JETSTREAM_COMMANDS_STREAM = 'commands'
process.env.JETSTREAM_EVENTS_STREAM = 'events'
process.env.JETSTREAM_CLIENT_ID = 'rocket-samples'

process.env.JETSTREAM_STREAM_PROCESSOR_AckWaitTime = '5000000000000000'

import { transport } from '../../src'
import { JetstreamConnection } from '../../src/transport/jetstream/types'


export async function ensureStreamsExist() {
const jc = <JetstreamConnection>await transport.jetstream.connect()
const nc = jc._natsConnection
if (!nc) {
throw new Error('Nats connection not set')
}
const jsm = await nc.jetstreamManager()
await jsm.streams.add({ name: 'events', subjects: ['events.>', 'ch.events.>'] })
}
53 changes: 53 additions & 0 deletions packages/message-bus/__tests__/transport/jetstream.tests.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) TotalSoft.
// This source code is licensed under the MIT license.

process.env.JETSTREAM_EVENTS_STREAM = 'events'

import nats from '../../__mocks__/nats'
import jetstream from '../../src/transport/jetstream'
import { serDes, SubscriptionOptions } from '../../src'

describe('Testing Jetstream transport', () => {
// afterEach(async () => {
// await jetstream.disconnect()
// jest.resetAllMocks()
// })

test('connections are opened once', async () => {
// arrange

// act
await Promise.all([jetstream.connect(), jetstream.connect(), jetstream.connect()])
await jetstream.connect()
await jetstream.connect()

// assert
expect(nats.connect).toHaveBeenCalledTimes(1)
})

test('publish a message', async () => {
// arrange
const subject = 'subject'
const envelope = { payload: {}, headers: {} }

// act
await jetstream.publish(subject, envelope, serDes)

// assert
expect(nats.connect).toBeCalled()
expect(nats.__jetStreamClientMock.publish).toBeCalled()
})

test('subscribe to a channel', async () => {
// arrange
const subject = 'events.my.event'
const handler = jest.fn()

// act
await jetstream.subscribe(subject, handler, SubscriptionOptions.PUB_SUB, serDes)

// assert
expect(nats.connect).toBeCalled()
expect(nats.__natsConsumerMock.consume).toBeCalled()
})
})
13 changes: 6 additions & 7 deletions packages/message-bus/__tests__/transport/nats.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { nats, serDes, SubscriptionOptions } from '../../src'
import { Connection } from '../../src/transport/types'

jest.mock('node-nats-streaming')

import { serDes, SubscriptionOptions } from '../../src'
import nats from '../../src/transport/nats'
import { NatsSubscription } from '../../src/transport/nats/types'
import nodeNatsStreaming from 'node-nats-streaming'


describe('Testing nats transport', () => {
let mockConnection: Connection = null
let mockConnection: nodeNatsStreaming.Stan | null = null
let mockSubscriptionOptions: nodeNatsStreaming.SubscriptionOptions | null = null

beforeEach(() => {
Expand Down Expand Up @@ -138,7 +137,7 @@ describe('Testing nats transport', () => {
const handler = jest.fn()

// act
const sub = await nats.subscribe(subject, handler, SubscriptionOptions.STREAM_PROCESSOR, serDes)
const sub = <NatsSubscription>await nats.subscribe(subject, handler, SubscriptionOptions.STREAM_PROCESSOR, serDes)
sub.unsubscribe?.call(sub)

// assert
Expand Down
12 changes: 6 additions & 6 deletions packages/message-bus/__tests__/transport/rusi.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { rusi, serDes, SubscriptionOptions } from '../../src'
import { serDes, SubscriptionOptions } from '../../src'
import rusi from '../../src/transport/rusi'
import { RusiConnection, RusiChannel, RusiSubscription } from '../../src/transport/rusi/types'

jest.mock('@grpc/grpc-js')
Expand All @@ -7,13 +8,12 @@ jest.mock('@grpc/proto-loader')
import * as protoLoader from '@grpc/proto-loader'
import * as grpcJs from '@grpc/grpc-js'
import { GrpcObject } from '@grpc/grpc-js'
import { Subscription } from '../../src/transport/types'

const mockProtoLoader = protoLoader as {
loadSync: typeof protoLoader.loadSync
}
const mockGrpcJs = grpcJs as {
loadPackageDefinition: typeof grpcJs.loadPackageDefinition
const mockGrpcJs = grpcJs as {
loadPackageDefinition: typeof grpcJs.loadPackageDefinition
}

describe('Testing rusi transport', () => {
Expand Down Expand Up @@ -161,11 +161,11 @@ describe('Testing rusi transport', () => {
const handler = jest.fn()

// act
const sub: Subscription = await rusi.subscribe(subject, handler, SubscriptionOptions.STREAM_PROCESSOR, serDes)
const sub = <RusiSubscription>await rusi.subscribe(subject, handler, SubscriptionOptions.STREAM_PROCESSOR, serDes)
await sub.unsubscribe?.call(sub)

// assert
expect((sub._call as RusiSubscription)?.cancel).toBeCalled()
expect(sub._call?.cancel).toBeCalled()
})

test('disconnect happens if the connection is open', async () => {
Expand Down
8 changes: 5 additions & 3 deletions packages/message-bus/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
"lint": "eslint src/*.ts",
"clean": "rimraf dist",
"test": "jest --collectCoverage --passWithNoTests --verbose --silent=false --runInBand",
"tslint": "tslint -p tsconfig.json \"/**/*.ts\"",
"rusi-sample": "node src/___samples/rusi.js",
"copy-files": "copyfiles -u 1 src/**/*.proto dist/src/"
"copy-files": "copyfiles -u 1 src/**/*.proto dist/src/",
"docker-run-jestream": "docker run -p 4222:4222 -it --rm nats -js",
"docker-run-nats-streaming": "docker run -p 4222:4222 -p 8222:8222 -it --rm nats-streaming:alpine"
},
"repository": {
"type": "git",
Expand All @@ -36,6 +36,7 @@
"async-mutex": "^0.3.1",
"bluebird": "3.7.2",
"humps": "^2.0.1",
"nats": "^2.18.0",
"node-nats-streaming": "0.3.2",
"uuid": "^8.3.0"
},
Expand All @@ -53,6 +54,7 @@
"jest": "^28.1.0",
"rimraf": "^3.0.2",
"ts-jest": "^28.0.3",
"ts-node": "^10.9.1",
"typescript": "^4.7.2"
},
"jest": {
Expand Down
5 changes: 4 additions & 1 deletion packages/message-bus/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright (c) TotalSoft.
// This source code is licensed under the MIT license.

export * from './messageBus'
export * from './types'
export * from './transport'
export { default as transport } from './transport'
export * from './envelope'
export { default as serDes } from './serDes'
export * from './topicRegistry'
20 changes: 4 additions & 16 deletions packages/message-bus/src/messageBus.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,15 @@
// Copyright (c) TotalSoft.
// This source code is licensed under the MIT license.

import * as transport from './transport'
import transport from './transport'
import * as topicRegistry from './topicRegistry'
import defaultSerDes from './serDes'
import {
Context,
Envelope,
EnvelopeCustomizer,
isTransportType,
MessageBus,
MessageBusHandler,
SerDes,
SubscriptionOptions,
TransportType
} from './types'
import { Context, Envelope, EnvelopeCustomizer, MessageBus, MessageBusHandler, SerDes, SubscriptionOptions } from './types'
import { envelope } from './envelope'
import { Subscription, Transport } from './transport/types'

const { Messaging__Transport } = process.env
const envTransport: TransportType = isTransportType(Messaging__Transport) ? Messaging__Transport : 'nats'

let currentTransport: Transport = transport[envTransport]
const { Messaging__Transport = 'nats' } = process.env
let currentTransport: Transport = transport[Messaging__Transport] ?? transport.nats

export function useTransport(t: Transport) {
currentTransport = t
Expand Down
5 changes: 2 additions & 3 deletions packages/message-bus/src/serDes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ export function serialize(msg: any): string {
return data
}

export function deSerialize(data: String | Buffer): Envelope<any> {
const stringData: string = data.toString()
const msg = JSON.parse(stringData)
export function deSerialize(data: string): Envelope<any> {
const msg = JSON.parse(data)
const payload = msg.payload || humps.camelizeKeys(msg.Payload)
const headers = msg.headers || msg.Headers
const result = {
Expand Down

0 comments on commit adf0957

Please sign in to comment.