Skip to content

Commit

Permalink
Merge b6c48d1 into 394ba20
Browse files Browse the repository at this point in the history
  • Loading branch information
Marko Obrovac committed Feb 11, 2015
2 parents 394ba20 + b6c48d1 commit beae24d
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 86 deletions.
161 changes: 78 additions & 83 deletions lib/dbutils.js
Original file line number Diff line number Diff line change
Expand Up @@ -325,15 +325,31 @@ dbu.schemaTypeToCQLType = function(schemaType) {
*/
function generateSetConvertor (convObj) {
if (!convObj) {
return;
return {
write: function(arr) {
// Default to-null conversion for empty sets
if (!Array.isArray(arr) || arr.length === 0) {
return null;
} else {
return arr;
}
},
// XXX: Should we convert null to the empty array here?
read: null
};
}
var res = {
write: null,
read: null
};
if (convObj.write) {
res.write = function (valArray) {
return valArray.map(convObj.write);
if (!Array.isArray(valArray) || valArray.length === 0) {
// Empty set is equivalent to null in Cassandra
return null;
} else {
return valArray.map(convObj.write);
}
};
}
if (convObj.read) {
Expand Down Expand Up @@ -376,9 +392,7 @@ dbu.makeSchemaInfo = function makeSchemaInfo(schema) {
// this is a set-typed attribute
type = set_type[1];
// generate the convertors only if the underlying type has them defined
if (dbu.conversions[type]) {
psi.conversions[att] = generateSetConvertor(dbu.conversions[type]);
}
psi.conversions[att] = generateSetConvertor(dbu.conversions[type]);
} else if (dbu.conversions[type]) {
// this is regular type and conversion methods are defined for it
psi.conversions[att] = dbu.conversions[type];
Expand Down Expand Up @@ -428,33 +442,6 @@ dbu.makeSchemaInfo = function makeSchemaInfo(schema) {
return psi;
};

/*
* Converts query parameters from JS object/values into
* serializable Cassandra values (based on schema.conversions)
*
* @param {Schema} schema the schema to consult for conversion methods
* @param {Array} keys the list of parameter keys to look up in the schema for conversion
* @param {Array} params the parameter values to convert
* @returns {Array} a new array containing converted values, or the original params array if nothing has been done
*/
dbu.convertParams = function convertParams (schema, keys, params) {
var converted = [];
if (!(schema && schema.conversions && keys && params && keys.length)) {
return params;
}
if(keys.length !== params.length) {
throw new Error('dbu.convertParams(): keys and params arrays are of different sizes!');
}
var convObj = schema.conversions;
keys.forEach(function (key, idx, arr) {
if(key && convObj[key] && convObj[key].write && params[idx]) {
converted.push(convObj[key].write(params[idx]));
} else {
converted.push(params[idx]);
}
});
return converted;
};

/**
* Converts a result row from Cassandra to JS values
Expand All @@ -477,11 +464,19 @@ dbu.convertRow = function convertRow (row, schema) {
*/

dbu.buildCondition = function buildCondition (pred, schema) {
function convert(key, val) {
var convObj = schema.conversions[key];
if (convObj && convObj.write) {
return convObj.write(val);
} else {
return val;
}
}

var params = [];
var typeHints = [];
var keys = [];
var conjunctions = [];
for (var predKey in pred) {
Object.keys(pred).forEach(function(predKey) {
var cql = '';
var predObj = pred[predKey];
cql += dbu.cassID(predKey);
Expand All @@ -491,61 +486,53 @@ dbu.buildCondition = function buildCondition (pred, schema) {
} else if (predObj === null || predObj.constructor !== Object) {
// Default to equality
cql += ' = ?';
params.push(predObj);
params.push(convert(predKey, predObj));
typeHints.push(dbu.schemaTypeToCQLType(schema.attributes[predKey]));
keys.push(predKey);
} else {
var predKeys = Object.keys(predObj);
if (predKeys.length === 1) {
var predOp = predKeys[0];
var predArg = predObj[predOp];
// TODO: Combine the repetitive cases here
switch (predOp.toLowerCase()) {
case 'eq':
cql += ' = ?';
params.push(predArg);
params.push(convert(predKey, predArg));
typeHints.push(dbu.schemaTypeToCQLType(schema.attributes[predKey]));
keys.push(predKey);
break;
case 'lt':
cql += ' < ?';
params.push(predArg);
params.push(convert(predKey, predArg));
typeHints.push(dbu.schemaTypeToCQLType(schema.attributes[predKey]));
keys.push(predKey);
break;
case 'gt':
cql += ' > ?';
params.push(predArg);
params.push(convert(predKey, predArg));
typeHints.push(dbu.schemaTypeToCQLType(schema.attributes[predKey]));
keys.push(predKey);
break;
case 'le':
cql += ' <= ?';
params.push(predArg);
params.push(convert(predKey, predArg));
typeHints.push(dbu.schemaTypeToCQLType(schema.attributes[predKey]));
keys.push(predKey);
break;
case 'ge':
cql += ' >= ?';
params.push(predArg);
params.push(convert(predKey, predArg));
typeHints.push(dbu.schemaTypeToCQLType(schema.attributes[predKey]));
keys.push(predKey);
break;
case 'neq':
case 'ne':
cql += ' != ?';
params.push(predArg);
params.push(convert(predKey, predArg));
typeHints.push(dbu.schemaTypeToCQLType(schema.attributes[predKey]));
keys.push(predKey);
break;
case 'between':
cql += ' >= ?' + ' AND ';
params.push(predArg[0]);
params.push(convert(predKey, predArg[0]));
typeHints.push(dbu.schemaTypeToCQLType(schema.attributes[predKey]));
keys.push(predKey);
cql += dbu.cassID(predKey) + ' <= ?';
params.push(predArg[1]);
params.push(convert(predKey, predArg[1]));
typeHints.push(dbu.schemaTypeToCQLType(schema.attributes[predKey]));
keys.push(predKey);
break;
default: throw new Error ('Operator ' + predOp + ' not supported!');
}
Expand All @@ -554,10 +541,9 @@ dbu.buildCondition = function buildCondition (pred, schema) {
}
}
conjunctions.push(cql);
}
});
return {
query: conjunctions.join(' AND '),
keys: keys,
params: params,
typeHints: typeHints
};
Expand All @@ -571,34 +557,45 @@ dbu.buildPutQuery = function(req, keyspace, table, schema) {
throw new Error('Table not found!');
}

// Convert the attributes
var attributes = req.attributes;
var conversions = schema.conversions || {};

// XXX: should we require non-null secondary index entries too?
var indexKVMap = {};
schema.iKeys.forEach(function(key) {
if (req.attributes[key] === undefined) {
if (attributes[key] === undefined) {
throw new Error("Index attribute " + JSON.stringify(key) + " missing in "
+ JSON.stringify(req) + "; schema: " + JSON.stringify(schema, null, 2));
} else {
indexKVMap[key] = req.attributes[key];
indexKVMap[key] = attributes[key];
}
});

var keys = [];
var paramKeys = [];
var nonIndexKeys = [];
var params = [];
var typeHints = [];
var placeholders = [];
for (var key in req.attributes) {
var val = req.attributes[key];
var haveNonIndexNonNullValue = false;
Object.keys(attributes).forEach(function(key) {
var val = attributes[key];
if (val !== undefined && schema.attributes[key]) {
if (!schema.iKeyMap[key]) {
keys.push(key);
nonIndexKeys.push(key);
// Convert the parameter value
var conversionObj = conversions[key];
if (conversionObj && conversionObj.write) {
val = conversionObj.write(val);
}
if (val !== null) {
haveNonIndexNonNullValue = true;
}
params.push(val);
typeHints.push(dbu.schemaTypeToCQLType(schema.attributes[key]));
paramKeys.push(key);
}
placeholders.push('?');
}
}
});

var using = '';
var usingParams = [];
Expand All @@ -613,7 +610,7 @@ dbu.buildPutQuery = function(req, keyspace, table, schema) {

// switch between insert & update / upsert
// - insert for 'if not exists', or when no non-primary-key attributes are
// specified
// specified, or they are all null (as Cassandra does not distinguish the two)
// - update when any non-primary key attributes are supplied
// - Need to verify that all primary key members are supplied as well,
// else error.
Expand All @@ -627,18 +624,17 @@ dbu.buildPutQuery = function(req, keyspace, table, schema) {
var condRes = dbu.buildCondition(indexKVMap, schema);

var cond = '';
if (!keys.length || req.if === 'not exists') {
if (!haveNonIndexNonNullValue || req.if === 'not exists') {
if (req.if === 'not exists') {
cond = ' if not exists ';
}
var proj = schema.iKeys.concat(keys).map(dbu.cassID).join(',');
var proj = schema.iKeys.concat(nonIndexKeys).map(dbu.cassID).join(',');
cql = 'insert into ' + dbu.cassID(keyspace) + '.' + dbu.cassID(table)
+ ' (' + proj + ') values (';
cql += placeholders.join(',') + ')' + cond + using;
params = condRes.params.concat(params, usingParams);
typeHints = condRes.typeHints.concat(typeHints, usingTypeHints);
paramKeys = condRes.keys.concat(paramKeys, usingParamsKeys);
} else if ( keys.length ) {
} else if (nonIndexKeys.length) {
var condParams = [];
var condTypeHints = [];
var condParamKeys = [];
Expand All @@ -651,21 +647,20 @@ dbu.buildPutQuery = function(req, keyspace, table, schema) {
condParamKeys = condResult.keys;
}

var updateProj = keys.map(dbu.cassID).join(' = ?,') + ' = ? ';
var updateProj = nonIndexKeys.map(dbu.cassID).join(' = ?,') + ' = ? ';
cql += 'update ' + dbu.cassID(keyspace) + '.' + dbu.cassID(table)
+ using + ' set ' + updateProj + ' where ';
cql += condRes.query + cond;
params = usingParams.concat(params, condRes.params, condParams);
typeHints = usingTypeHints.concat(typeHints, condRes.typeHints, condTypeHints);
paramKeys = usingParamsKeys.concat(paramKeys, condRes.keys, condParamKeys);

} else {
throw new Error("Can't Update or Insert");
}

return {
query: cql,
params: dbu.convertParams(schema, paramKeys, params),
params: params,
typeHints: typeHints
};
};
Expand Down Expand Up @@ -701,12 +696,6 @@ dbu.buildGetQuery = function(keyspace, req, consistency, table, schema) {
req.limit = undefined;
}

for ( var item in req.attributes ) {
// req should not have non key attributes
if (!schema.iKeyMap[item]) {
throw new Error("Request attributes need to be key attributes");
}
}

if (req.distinct) {
proj = 'distinct ' + proj;
Expand All @@ -715,15 +704,21 @@ dbu.buildGetQuery = function(keyspace, req, consistency, table, schema) {
var cql = 'select ' + proj + ' from '
+ dbu.cassID(keyspace) + '.' + dbu.cassID(table);

var params = [];
var paramKeys = [];
// Build up the condition
if (req.attributes) {
var params = [];
var attributes = req.attributes;
if (attributes) {
Object.keys(attributes).forEach(function(key) {
// req should not have non key attributes
if (!schema.iKeyMap[key]) {
throw new Error("All request attributes need to be key attributes. Bad attribute: "
+ key);
}
});
cql += ' where ';
var condResult = dbu.buildCondition(req.attributes, schema);
var condResult = dbu.buildCondition(attributes, schema);
cql += condResult.query;
params = condResult.params;
paramKeys = condResult.keys;
}

if (req.order) {
Expand Down Expand Up @@ -772,7 +767,7 @@ dbu.buildGetQuery = function(keyspace, req, consistency, table, schema) {
cql += ' limit ' + req.limit;
}

return {query: cql, params: dbu.convertParams(schema, paramKeys, params)};
return {query: cql, params: params};
};

module.exports = dbu;

0 comments on commit beae24d

Please sign in to comment.