Skip to content

Commit

Permalink
Merge pull request #1472 from lehni/feature/better-transactions
Browse files Browse the repository at this point in the history
Add a better way to handle transactions by binding them to models
  • Loading branch information
lehni committed Sep 6, 2017
2 parents f18d348 + 0ce1fa9 commit 12c3e3a
Show file tree
Hide file tree
Showing 5 changed files with 496 additions and 155 deletions.
157 changes: 64 additions & 93 deletions lib/dao.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,45 @@ function errorModelNotFound(idValue) {
return error;
}

function invokeConnectorMethod(connector, method, Model, args, options, cb) {
var dataSource = Model.getDataSource();
// If the DataSource is a transaction and no transaction object is provide in
// the options yet, add it to the options, see: DataSource#transaction()
var opts = dataSource.isTransaction && !options.transaction ? Object.assign(
options, {transaction: dataSource.currentTransaction}) : options;
var optionsSupported = connector[method].length >= args.length + 3;
var transaction = opts.transaction;
if (transaction) {
if (!optionsSupported) {
return process.nextTick(function() {
cb(new Error(g.f(
'The connector does not support {{method}} within a transaction', method)));
});
}
// transaction isn't always a Transaction instance. Some tests provide a
// string to test if options get passed through, so check for ensureActive:
if (transaction.ensureActive && !transaction.ensureActive(cb)) {
return;
}
}
var modelName = Model.modelName;
var fullArgs;
if (!optionsSupported && method === 'count') {
// NOTE: The old count() signature is irregular, with `where` coming last:
// [modelName, cb, where]
var where = args[0];
fullArgs = [modelName, cb, where];
} else {
// Standard signature: [modelName, ...args, (opts, ) cb]
fullArgs = [modelName].concat(args);
if (optionsSupported) {
fullArgs.push(opts);
}
fullArgs.push(cb);
}
connector[method].apply(connector, fullArgs);
}

DataAccessObject._forDB = function(data) {
if (!(this.getDataSource().isRelational && this.getDataSource().isRelational())) {
return data;
Expand Down Expand Up @@ -365,7 +404,6 @@ DataAccessObject.create = function(data, options, cb) {
obj.trigger('create', function(createDone) {
obj.trigger('save', function(saveDone) {
var _idName = idName(Model);
var modelName = Model.modelName;
var val = removeUndefined(obj.toObject(true));
function createCallback(err, id, rev) {
if (id) {
Expand Down Expand Up @@ -431,12 +469,8 @@ DataAccessObject.create = function(data, options, cb) {
};
Model.notifyObserversOf('persist', context, function(err) {
if (err) return cb(err);

if (connector.create.length === 4) {
connector.create(modelName, obj.constructor._forDB(context.data), options, createCallback);
} else {
connector.create(modelName, obj.constructor._forDB(context.data), createCallback);
}
invokeConnectorMethod(connector, 'create', Model, [obj.constructor._forDB(context.data)],
options, createCallback);
});
}, obj, cb);
}, obj, cb);
Expand Down Expand Up @@ -579,8 +613,6 @@ DataAccessObject.upsert = function(data, options, cb) {
Model.applyProperties(update, inst);
Model = Model.lookupModel(update);

var connector = self.getConnector();

if (doValidate === false) {
callConnector();
} else {
Expand Down Expand Up @@ -611,11 +643,7 @@ DataAccessObject.upsert = function(data, options, cb) {
};
Model.notifyObserversOf('persist', context, function(err) {
if (err) return done(err);
if (connector.updateOrCreate.length === 4) {
connector.updateOrCreate(Model.modelName, update, options, done);
} else {
connector.updateOrCreate(Model.modelName, update, done);
}
invokeConnectorMethod(connector, 'updateOrCreate', Model, [update], options, done);
});
}
function done(err, data, info) {
Expand Down Expand Up @@ -728,7 +756,6 @@ DataAccessObject.upsertWithWhere = function(where, data, options, cb) {
var self = this;
var Model = this;
var connector = Model.getConnector();
var modelName = Model.modelName;
var query = {where: where};
var context = {
Model: Model,
Expand Down Expand Up @@ -795,7 +822,7 @@ DataAccessObject.upsertWithWhere = function(where, data, options, cb) {
};
Model.notifyObserversOf('persist', context, function(err) {
if (err) return done(err);
connector.upsertWithWhere(modelName, ctx.where, update, options, done);
invokeConnectorMethod(connector, 'upsertWithWhere', Model, [ctx.where, update], options, done);
});
}
function done(err, data, info) {
Expand Down Expand Up @@ -944,8 +971,6 @@ DataAccessObject.replaceOrCreate = function replaceOrCreate(data, options, cb) {
Model.applyProperties(update, inst);
Model = Model.lookupModel(update);

var connector = self.getConnector();

if (options.validate === false) {
return callConnector();
}
Expand All @@ -972,7 +997,7 @@ DataAccessObject.replaceOrCreate = function replaceOrCreate(data, options, cb) {
};
Model.notifyObserversOf('persist', context, function(err) {
if (err) return done(err);
connector.replaceOrCreate(Model.modelName, context.data, options, done);
invokeConnectorMethod(connector, 'replaceOrCreate', Model, [context.data], options, done);
});
}
function done(err, data, info) {
Expand Down Expand Up @@ -1097,7 +1122,6 @@ DataAccessObject.findOrCreate = function findOrCreate(query, data, options, cb)
var connector = Model.getConnector();

function _findOrCreate(query, data, currentInstance) {
var modelName = self.modelName;
function findOrCreateCallback(err, data, created) {
if (err) return cb(err);
var context = {
Expand Down Expand Up @@ -1155,12 +1179,8 @@ DataAccessObject.findOrCreate = function findOrCreate(query, data, options, cb)

Model.notifyObserversOf('persist', context, function(err) {
if (err) return cb(err);

if (connector.findOrCreate.length === 5) {
connector.findOrCreate(modelName, query, self._forDB(context.data), options, findOrCreateCallback);
} else {
connector.findOrCreate(modelName, query, self._forDB(context.data), findOrCreateCallback);
}
invokeConnectorMethod(connector, 'findOrCreate', Model, [query, self._forDB(context.data)],
options, findOrCreateCallback);
});
}

Expand Down Expand Up @@ -1776,7 +1796,7 @@ DataAccessObject._coerce = function(where, options) {
}
} else {
if (val != null) {
const allowExtendedOperators = self._allowExtendedOperators(options);
var allowExtendedOperators = self._allowExtendedOperators(options);
if (operator === null && val instanceof RegExp) {
// Normalize {name: /A/} to {name: {regexp: /A/}}
operator = 'regexp';
Expand Down Expand Up @@ -1952,8 +1972,8 @@ DataAccessObject.find = function find(query, options, cb) {
cb(err);
} else if (Array.isArray(data)) {
memory.define({
properties: self.dataSource.definitions[self.modelName].properties,
settings: self.dataSource.definitions[self.modelName].settings,
properties: self.dataSource.definitions[modelName].properties,
settings: self.dataSource.definitions[modelName].settings,
model: self,
});

Expand Down Expand Up @@ -1993,11 +2013,7 @@ DataAccessObject.find = function find(query, options, cb) {
}

var geoCallback = options.notify === false ? geoCallbackWithoutNotify : geoCallbackWithNotify;
if (connector.all.length === 4) {
connector.all(self.modelName, {}, options, geoCallback);
} else {
connector.all(self.modelName, {}, geoCallback);
}
invokeConnectorMethod(connector, 'all', self, [{}], options, geoCallback);
}
// already handled
return cb.promise;
Expand Down Expand Up @@ -2100,11 +2116,7 @@ DataAccessObject.find = function find(query, options, cb) {
};

if (options.notify === false) {
if (connector.all.length === 4) {
connector.all(self.modelName, query, options, allCb);
} else {
connector.all(self.modelName, query, allCb);
}
invokeConnectorMethod(connector, 'all', self, [query], options, allCb);
} else {
var context = {
Model: this,
Expand All @@ -2114,10 +2126,7 @@ DataAccessObject.find = function find(query, options, cb) {
};
this.notifyObserversOf('access', context, function(err, ctx) {
if (err) return cb(err);

connector.all.length === 4 ?
connector.all(self.modelName, ctx.query, options, allCb) :
connector.all(self.modelName, ctx.query, allCb);
invokeConnectorMethod(connector, 'all', self, [ctx.query], options, allCb);
});
}
return cb.promise;
Expand Down Expand Up @@ -2258,11 +2267,7 @@ DataAccessObject.destroyAll = function destroyAll(where, options, cb) {
};

if (whereIsEmpty(where)) {
if (connector.destroyAll.length === 4) {
connector.destroyAll(Model.modelName, {}, options, done);
} else {
connector.destroyAll(Model.modelName, {}, done);
}
invokeConnectorMethod(connector, 'destroyAll', Model, [{}], options, done);
} else {
try {
// Support an optional where object
Expand All @@ -2276,11 +2281,7 @@ DataAccessObject.destroyAll = function destroyAll(where, options, cb) {
});
}

if (connector.destroyAll.length === 4) {
connector.destroyAll(Model.modelName, where, options, done);
} else {
connector.destroyAll(Model.modelName, where, done);
}
invokeConnectorMethod(connector, 'destroyAll', Model, [where], options, done);
}

function done(err, info) {
Expand Down Expand Up @@ -2444,17 +2445,7 @@ DataAccessObject.count = function(where, options, cb) {
};
this.notifyObserversOf('access', context, function(err, ctx) {
if (err) return cb(err);
where = ctx.query.where;

if (connector.count.length <= 3) {
// Old signature, please note where is the last
// count(model, cb, where)
connector.count(Model.modelName, cb, where);
} else {
// New signature
// count(model, where, options, cb)
connector.count(Model.modelName, where, options, cb);
}
invokeConnectorMethod(connector, 'count', Model, [ctx.query.where], options, cb);
});
return cb.promise;
};
Expand Down Expand Up @@ -2507,7 +2498,6 @@ DataAccessObject.prototype.save = function(options, cb) {

var inst = this;
var connector = inst.getConnector();
var modelName = Model.modelName;

var context = {
Model: Model,
Expand Down Expand Up @@ -2591,12 +2581,8 @@ DataAccessObject.prototype.save = function(options, cb) {

Model.notifyObserversOf('persist', context, function(err) {
if (err) return cb(err);

if (connector.save.length === 4) {
connector.save(modelName, inst.constructor._forDB(data), options, saveCallback);
} else {
connector.save(modelName, inst.constructor._forDB(data), saveCallback);
}
invokeConnectorMethod(connector, 'save', Model, [Model._forDB(data)],
options, saveCallback);
});
}, data, cb);
}, data, cb);
Expand Down Expand Up @@ -2770,12 +2756,7 @@ DataAccessObject.updateAll = function(where, data, options, cb) {
};
Model.notifyObserversOf('persist', context, function(err, ctx) {
if (err) return cb(err);

if (connector.update.length === 5) {
connector.update(Model.modelName, where, data, options, updateCallback);
} else {
connector.update(Model.modelName, where, data, updateCallback);
}
invokeConnectorMethod(connector, 'update', Model, [where, data], options, updateCallback);
});
}
return cb.promise;
Expand Down Expand Up @@ -2907,11 +2888,7 @@ DataAccessObject.prototype.remove =
});
}

if (connector.destroy.length === 4) {
connector.destroy(inst.constructor.modelName, id, options, destroyCallback);
} else {
connector.destroy(inst.constructor.modelName, id, destroyCallback);
}
invokeConnectorMethod(connector, 'destroy', Model, [id], options, destroyCallback);
}, null, cb);
}
return cb.promise;
Expand Down Expand Up @@ -3040,7 +3017,6 @@ DataAccessObject.replaceById = function(id, data, options, cb) {
if (isPKMissing(Model, cb))
return cb.promise;

var model = Model.modelName;
var hookState = {};

if (id !== data[pkName]) {
Expand Down Expand Up @@ -3155,8 +3131,8 @@ DataAccessObject.replaceById = function(id, data, options, cb) {
options: options,
};
Model.notifyObserversOf('persist', ctx, function(err) {
connector.replaceById(model, id,
inst.constructor._forDB(context.data), options, replaceCallback);
invokeConnectorMethod(connector, 'replaceById', Model, [id, Model._forDB(context.data)],
options, replaceCallback);
});
}
}
Expand Down Expand Up @@ -3216,7 +3192,6 @@ function(data, options, cb) {

var allowExtendedOperators = Model._allowExtendedOperators(options);
var strict = this.__strict;
var model = Model.modelName;
var hookState = {};

// Convert the data to be plain object so that update won't be confused
Expand Down Expand Up @@ -3352,13 +3327,9 @@ function(data, options, cb) {
options: options,
};
Model.notifyObserversOf('persist', ctx, function(err) {
if (connector.updateAttributes.length === 5) {
connector.updateAttributes(model, getIdValue(inst.constructor, inst),
inst.constructor._forDB(context.data), options, updateAttributesCallback);
} else {
connector.updateAttributes(model, getIdValue(inst.constructor, inst),
inst.constructor._forDB(context.data), updateAttributesCallback);
}
invokeConnectorMethod(connector, 'updateAttributes', Model,
[getIdValue(Model, inst), Model._forDB(context.data)],
options, updateAttributesCallback);
});
}, data, cb);
}, data, cb);
Expand Down

0 comments on commit 12c3e3a

Please sign in to comment.