Skip to content

Commit

Permalink
Use custom mx config section
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbede committed Feb 17, 2020
1 parent d7b9678 commit cf904c8
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 17 deletions.
15 changes: 12 additions & 3 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,6 @@ module.exports = {
// the connections.
connections: 10,

// How many parallel connections to open against a MX per Sending IP
connectionsPerMX: 10,

// Throttling applies per connection in a process
// throttling: '100 messages/second', // max messages per minute, hour or second

Expand Down Expand Up @@ -486,5 +483,17 @@ module.exports = {
*/
},

// Receiving MX specific configuration
// The keys are RegExp's for matching the MX
mxConfig: {
// default is required
default: {
// How many parallel connections per IP to use against a receiving mailserver
// This connection limit is shared across the whole ZoneMTA instance
// Connections are locked by Receiving IP and Sending IP
maxConnections: 50
}
},

pluginsPath: './plugins'
};
39 changes: 37 additions & 2 deletions lib/mta-locker.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,42 @@
const log = require('npmlog');
const config = require('wild-config');

class MTALocker {
constructor() {
this.counters = new Map();

// precompile regex
this.precompiledRegex = [];

let precompileRegex = () => {
let regexConfig = config.mxConfig;
for (let regKey in regexConfig) {
this.precompiledRegex.push({
regexp: new RegExp(regKey),
config: regexConfig[regKey]
});
}
log.verbose("MTALocker", "Keys precompiled");
}
precompileRegex();

config.on('reload', () => {
log.verbose("MTALocker", "Reloading MTA configs");
this.precompiledRegex = [];
precompileRegex();
});
}

_getMXConfig(mx) {
if (this.precompiledRegex.length > 0) {
for (let pRegex of this.precompiledRegex) {
if (pRegex.regexp.test(mx)) {
return pRegex.config;
}
}
}

return config.mxConfig.default;
}

// helper to get correct counter Map
Expand Down Expand Up @@ -42,14 +76,15 @@ class MTALocker {
log.verbose("MTALocker", "MTA released with '"+key+"'");
}

isFree(key, maxConnections) {
isFree(key, mx) {
let config = this._getMXConfig(mx);
let sum = 0;
this.counters.forEach(counters=>{
if(counters.has(key)){
sum += counters.get(key);
}
})
return sum < maxConnections;
return sum < config.maxConnections;
}
}

Expand Down
2 changes: 1 addition & 1 deletion lib/queue-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ class QueueServer {
client.send({
req: data.req,
response: {
isFree: this.mtaLocker.isFree(data.key, client.zone.connectionsPerMX)
isFree: this.mtaLocker.isFree(data.key, data.mx)
}
});
break;
Expand Down
26 changes: 15 additions & 11 deletions lib/sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -907,17 +907,21 @@ class Sender extends EventEmitter {
mxOptions.connectHook = (opts, connOpts, cb) => {
plugins.handler.runHooks('sender:connect', [delivery, connOpts], cb);

let lockKey = [connOpts.host, connOpts.localAddress].join(":");
this.queue.sendCommand({ cmd: 'ISFREEMTA', key: lockKey }, (err, data) => {
if (!data.isFree) {
ignoreMXHosts.push(connOpts.host); // Ignoring for the next try
let err = new Error("mta locked");
err.category = "locked";
cb(err);
} else {
cb();
}
});
if (opts.mx.length > 0) {
let lockKey = [connOpts.host, connOpts.localAddress].join(":");
this.queue.sendCommand({cmd: 'ISFREEMTA', key: lockKey, mx: opts.mx[0].exchange}, (err, data) => {
if (!data.isFree) {
ignoreMXHosts.push(connOpts.host); // Ignoring for the next try
let err = new Error("mta locked");
err.category = "locked";
cb(err);
} else {
cb();
}
});
} else {
cb();
}
}

mxOptions.connectError = (err, opts, connOpts) => {
Expand Down

0 comments on commit cf904c8

Please sign in to comment.