Skip to content

Commit

Permalink
Merge pull request #31 from moleculerjs/connection_flag
Browse files Browse the repository at this point in the history
Add connection flag
  • Loading branch information
icebob committed Dec 28, 2021
2 parents 4a111cf + 3f3b4b6 commit 35afe71
Show file tree
Hide file tree
Showing 9 changed files with 934 additions and 1,040 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
<a name="v0.1.1"></a>

# [0.4.2](https://github.com/moleculerjs/moleculer-channels/compare/v0.1.0...v0.1.1)

- Added Typescript support
- Added `connection` that prevents publishing events before the adapter is connected

<a name="v0.1.0"></a>

# v0.1.0 (2021-10-17)

First public version.
1,897 changes: 874 additions & 1,023 deletions package-lock.json

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,24 @@
"amqplib": "^0.8.0",
"benchmarkify": "^3.0.0",
"coveralls": "^3.1.1",
"eslint": "^8.0.1",
"eslint": "^8.5.0",
"eslint-config-prettier": "^8.3.0",
"eslint-plugin-node": "^11.1.0",
"eslint-plugin-prettier": "^4.0.0",
"eslint-plugin-promise": "^5.1.0",
"eslint-plugin-promise": "^5.2.0",
"eslint-plugin-security": "^1.4.0",
"ioredis": "^4.28.0",
"jest": "^27.2.5",
"jest-cli": "^27.2.5",
"ioredis": "^4.28.2",
"jest": "^27.4.5",
"jest-cli": "^27.4.5",
"kafkajs": "^1.15.0",
"kleur": "^4.1.4",
"moleculer": "^0.14.17",
"moleculer": "^0.14.18",
"moleculer-repl": "^0.6.6",
"msgpack5": "^5.3.2",
"nats": "^2.2.0",
"nodemon": "^2.0.13",
"nats": "^2.4.0",
"nodemon": "^2.0.15",
"npm-check": "^5.9.2",
"prettier": "^2.4.1"
"prettier": "^2.5.1"
},
"jest": {
"testEnvironment": "node",
Expand Down
9 changes: 7 additions & 2 deletions src/adapters/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

const BaseAdapter = require("./base");
const _ = require("lodash");
const { MoleculerError } = require("moleculer").Errors;
const { MoleculerError, MoleculerRetryableError } = require("moleculer").Errors;
const C = require("../constants");

let Amqplib;
Expand Down Expand Up @@ -94,7 +94,7 @@ class AmqpAdapter extends BaseAdapter {
* @type {Map<string,SubscriptionEntry>}
*/
this.subscriptions = new Map();
this.connected = false;

this.stopping = false;
this.connectAttempt = 0;
this.connectionCount = 0; // To detect reconnections
Expand Down Expand Up @@ -235,6 +235,7 @@ class AmqpAdapter extends BaseAdapter {
.then(() => {
this.connection = null;
this.channel = null;
this.connected = false;
resolve();
})
.catch(reject);
Expand Down Expand Up @@ -507,6 +508,10 @@ class AmqpAdapter extends BaseAdapter {
// Adapter is stopping. Publishing no longer is allowed
if (this.stopping) return;

if (!this.connected) {
throw new MoleculerRetryableError("Adapter not yet connected. Skipping publishing.");
}

// Available options: http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish
const messageOptions = _.defaultsDeep(
{},
Expand Down
5 changes: 4 additions & 1 deletion src/adapters/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class BaseAdapter {
* @type {Map<string, string[]>}
*/
this.activeMessages = new Map();

/** @type {Boolean} Flag indicating the adapter's connection status */
this.connected = false;
}

/**
Expand Down Expand Up @@ -119,7 +122,7 @@ class BaseAdapter {
*/
metricsIncrement(metricName, chan) {
if (!this.broker.isMetricsEnabled()) return;

this.broker.metrics.increment(metricName, {
channel: chan.name,
group: chan.group
Expand Down
13 changes: 11 additions & 2 deletions src/adapters/kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

const BaseAdapter = require("./base");
const _ = require("lodash");
const { MoleculerError } = require("moleculer").Errors;
const { MoleculerError, MoleculerRetryableError } = require("moleculer").Errors;
const C = require("../constants");
/** Name of the partition where an error occurred while processing the message */
const HEADER_ORIGINAL_PARTITION = "x-original-partition";
Expand Down Expand Up @@ -171,6 +171,8 @@ class KafkaAdapter extends BaseAdapter {
await this.producer.connect();

this.logger.info("Kafka adapter is connected.");

this.connected = true;
}

/**
Expand Down Expand Up @@ -201,7 +203,10 @@ class KafkaAdapter extends BaseAdapter {
// Release the pointers
this.consumers = new Map();
})
.then(() => resolve())
.then(() => {
this.connected = false;
resolve();
})
.catch(err => reject(err));
} else {
this.logger.warn(
Expand Down Expand Up @@ -498,6 +503,10 @@ class KafkaAdapter extends BaseAdapter {
// Adapter is stopping. Publishing no longer is allowed
if (this.stopping) return;

if (!this.connected) {
throw new MoleculerRetryableError("Adapter not yet connected. Skipping publishing.");
}

this.logger.debug(`Publish a message to '${channelName}' topic...`, payload, opts);

const data = opts.raw ? payload : this.serializer.serialize(payload);
Expand Down
9 changes: 9 additions & 0 deletions src/adapters/nats.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
const BaseAdapter = require("./base");
const _ = require("lodash");
const C = require("../constants");
const { MoleculerRetryableError } = require("moleculer").Errors;

let NATS;

Expand Down Expand Up @@ -130,6 +131,8 @@ class NatsAdapter extends BaseAdapter {
this.manager = await this.connection.jetstreamManager();

this.client = this.connection.jetstream(); // JetStreamOptions

this.connected = true;
}

/**
Expand All @@ -149,6 +152,8 @@ class NatsAdapter extends BaseAdapter {
} catch (error) {
this.logger.error("Error while closing NATS JetStream connection.", error);
}

this.connected = false;
}

/**
Expand Down Expand Up @@ -426,6 +431,10 @@ class NatsAdapter extends BaseAdapter {
// Adapter is stopping. Publishing no longer is allowed
if (this.stopping) return;

if (!this.connected) {
throw new MoleculerRetryableError("Adapter not yet connected. Skipping publishing.");
}

try {
// Remap headers into JetStream format
if (opts.headers) {
Expand Down
13 changes: 11 additions & 2 deletions src/adapters/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

const _ = require("lodash");
const BaseAdapter = require("./base");
const { ServiceSchemaError } = require("moleculer").Errors;
const { ServiceSchemaError, MoleculerRetryableError } = require("moleculer").Errors;
const C = require("../constants");
/** Redis generated ID of the message that was not processed properly*/
const HEADER_ORIGINAL_ID = "x-original-id";
Expand Down Expand Up @@ -149,6 +149,8 @@ class RedisAdapter extends BaseAdapter {
this.nackedName,
await this.createRedisClient(this.nackedName, this.opts.redis)
);

this.connected = true;
}

/**
Expand All @@ -171,7 +173,10 @@ class RedisAdapter extends BaseAdapter {
// Release the pointers
this.clients = new Map();
})
.then(() => resolve())
.then(() => {
this.connected = false;
resolve();
})
.catch(err => reject(err));
} else {
this.logger.warn(
Expand Down Expand Up @@ -670,6 +675,10 @@ class RedisAdapter extends BaseAdapter {
// Adapter is stopping. Publishing no longer is allowed
if (this.stopping) return;

if (!this.connected) {
throw new MoleculerRetryableError("Adapter not yet connected. Skipping publishing.");
}

this.logger.debug(`Publish a message to '${channelName}' channel...`, payload, opts);

const clientPub = this.clients.get(this.pubName);
Expand Down
2 changes: 1 addition & 1 deletion test/integration/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ describe("Integration tests", () => {
};
// ---- ^ SETUP ^ ---

const numMessages = 20
const numMessages = 20;

await Promise.all(
_.times(numMessages, () => broker.sendToChannel("test.balanced.topic", msg))
Expand Down

0 comments on commit 35afe71

Please sign in to comment.