Skip to content

Commit

Permalink
Merge branch '1096_gmr' into next
Browse files Browse the repository at this point in the history
  • Loading branch information
mlucy committed Feb 21, 2014
2 parents d3205c7 + 2a80df5 commit eea0c65
Show file tree
Hide file tree
Showing 83 changed files with 2,933 additions and 2,426 deletions.
47 changes: 25 additions & 22 deletions drivers/javascript/ast.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ class TermBase
# Parse out run options from connOrOptions object
if connOrOptions? and connOrOptions.constructor is Object
for own key of connOrOptions
unless key in ['connection', 'useOutdated', 'noreply', 'timeFormat', 'profile', 'durability']
throw new err.RqlDriverError "First argument to `run` must be an open connection or { connection: <connection>, useOutdated: <bool>, noreply: <bool>, timeFormat: <string>, profile: <bool>, durability: <string>}."
unless key in ['connection', 'useOutdated', 'noreply', 'timeFormat', 'groupFormat', 'profile', 'durability', 'batchConf']
throw new err.RqlDriverError "First argument to `run` must be an open connection or { connection: <connection>, useOutdated: <bool>, noreply: <bool>, timeFormat: <string>, groupFormat: <string>, profile: <bool>, durability: <string>}."
conn = connOrOptions.connection
opts = connOrOptions
else
Expand All @@ -68,7 +68,7 @@ class TermBase
# This only checks that the argument is of the right type, connection
# closed errors will be handled elsewhere
unless conn? and conn._start?
throw new err.RqlDriverError "First argument to `run` must be an open connection or { connection: <connection>, useOutdated: <bool>, noreply: <bool>, timeFormat: <string>, profile: <bool>, durability: <string>}."
throw new err.RqlDriverError "First argument to `run` must be an open connection or { connection: <connection>, useOutdated: <bool>, noreply: <bool>, timeFormat: <string>, groupFormat: <string>, profile: <bool>, durability: <string>}."

# We only require a callback if noreply isn't set
if not opts.noreply and typeof(cb) isnt 'function'
Expand Down Expand Up @@ -144,7 +144,6 @@ class RDBVal extends TermBase
upcase: ar () -> new Upcase {}, @
downcase: ar () -> new Downcase {}, @
isEmpty: ar () -> new IsEmpty {}, @
groupedMapReduce: varar(3, 4, (group, map, reduce) -> new GroupedMapReduce {}, @, funcWrap(group), funcWrap(map), funcWrap(reduce))
innerJoin: ar (other, predicate) -> new InnerJoin {}, @, other, predicate
outerJoin: ar (other, predicate) -> new OuterJoin {}, @, other, predicate
eqJoin: aropt (left_attr, right, opts) -> new EqJoin opts, @, funcWrap(left_attr), right
Expand All @@ -162,11 +161,11 @@ class RDBVal extends TermBase

forEach: ar (func) -> new ForEach {}, @, funcWrap(func)

groupBy: (attrs..., collector) ->
unless collector? and attrs.length >= 1
numArgs = attrs.length + (if collector? then 1 else 0)
throw new err.RqlDriverError "Expected 2 or more argument(s) but found #{numArgs}."
new GroupBy {}, @, attrs, collector
group: varar(1, null, (fields...) -> new Group {}, @, fields.map(funcWrap)...)
sum: varar(0, null, (fields...) -> new Sum {}, @, fields.map(funcWrap)...)
avg: varar(0, null, (fields...) -> new Avg {}, @, fields.map(funcWrap)...)
min: varar(0, null, (fields...) -> new Min {}, @, fields.map(funcWrap)...)
max: varar(0, null, (fields...) -> new Max {}, @, fields.map(funcWrap)...)

info: ar () -> new Info {}, @
sample: ar (count) -> new Sample {}, @, count
Expand Down Expand Up @@ -640,17 +639,25 @@ class IsEmpty extends RDBOp
tt: "IS_EMPTY"
mt: 'isEmpty'

class GroupedMapReduce extends RDBOp
tt: "GROUPED_MAP_REDUCE"
mt: 'groupedMapReduce'
class Group extends RDBOp
tt: "GROUP"
mt: 'group'

class GroupBy extends RDBOp
tt: "GROUPBY"
mt: 'groupBy'
class Sum extends RDBOp
tt: "SUM"
mt: 'sum'

class GroupBy extends RDBOp
tt: "GROUPBY"
mt: 'groupBy'
class Avg extends RDBOp
tt: "AVG"
mt: 'avg'

class Min extends RDBOp
tt: "MIN"
mt: 'min'

class Max extends RDBOp
tt: "MAX"
mt: 'max'

class InnerJoin extends RDBOp
tt: "INNER_JOIN"
Expand Down Expand Up @@ -997,10 +1004,6 @@ rethinkdb.do = varar 1, null, (args...) ->

rethinkdb.branch = ar (test, trueBranch, falseBranch) -> new Branch {}, test, trueBranch, falseBranch

rethinkdb.count = {'COUNT': true}
rethinkdb.sum = ar (attr) -> {'SUM': attr}
rethinkdb.avg = ar (attr) -> {'AVG': attr}

rethinkdb.asc = (attr) -> new Asc {}, funcWrap(attr)
rethinkdb.desc = (attr) -> new Desc {}, funcWrap(attr)

Expand Down
7 changes: 6 additions & 1 deletion drivers/javascript/net.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ aropt = util.aropt
deconstructDatum = util.deconstructDatum
mkAtom = util.mkAtom
mkErr = util.mkErr
mkSeq = util.mkSeq

class Connection extends events.EventEmitter
DEFAULT_HOST: 'localhost'
Expand Down Expand Up @@ -257,6 +256,12 @@ class Connection extends events.EventEmitter
val: r.expr(opts.durability).build()
query.global_optargs.push(pair)

if opts.batchConf?
pair =
key: 'batch_conf'
val: r.expr(opts.batchConf).build()
query.global_optargs.push(pair)

# Save callback
if (not opts.noreply?) or !opts.noreply
@outstandingCallbacks[token] = {cb:cb, root:term, opts:opts}
Expand Down
9 changes: 9 additions & 0 deletions drivers/javascript/util.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ convertPseudotype = (obj, opts) ->
obj
else
throw new err.RqlDriverError "Unknown timeFormat run option #{opts.timeFormat}."
when 'GROUPED_DATA'
switch opts.groupFormat
when 'native', undefined
# Don't convert the data into a map, because the keys could be objects which doesn't work in JS
obj['data']
when 'raw'
obj
else
throw new err.RqlDriverError "Unknown groupFormat run option #{opts.groupFormat}."
else
# Regular object or unknown pseudo type
obj
Expand Down
2 changes: 1 addition & 1 deletion drivers/python/rethinkdb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This file includes all public facing Python API functions

from .net import connect, Connection, Cursor, protobuf_implementation
from .query import js, json, error, do, row, table, db, db_create, db_drop, db_list, table_create, table_drop, table_list, branch, count, sum, avg, asc, desc, eq, ne, le, ge, lt, gt, any, all, add, sub, mul, div, mod, type_of, info, time, monday, tuesday, wednesday, thursday, friday, saturday, sunday, january, february, march, april, may, june, july, august, september, october, november, december, iso8601, epoch_time, now, literal, make_timezone, and_, or_, not_, object
from .query import js, json, error, do, row, table, db, db_create, db_drop, db_list, table_create, table_drop, table_list, branch, asc, desc, eq, ne, le, ge, lt, gt, any, all, add, sub, mul, div, mod, type_of, info, time, monday, tuesday, wednesday, thursday, friday, saturday, sunday, january, february, march, april, may, june, july, august, september, october, november, december, iso8601, epoch_time, now, literal, make_timezone, and_, or_, not_, object
from .errors import RqlError, RqlClientError, RqlCompileError, RqlRuntimeError, RqlDriverError
from .ast import expr, exprJSON, RqlQuery
import rethinkdb.docs
94 changes: 69 additions & 25 deletions drivers/python/rethinkdb/ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,18 @@ def limit(self, index):
def reduce(self, func):
return Reduce(self, func_wrap(func))

def sum(self, *args):
return Sum(self, *[func_wrap(arg) for arg in args])

def avg(self, *args):
return Avg(self, *[func_wrap(arg) for arg in args])

def min(self, *args):
return Min(self, *[func_wrap(arg) for arg in args])

def max(self, *args):
return Max(self, *[func_wrap(arg) for arg in args])

def map(self, func):
return Map(self, func_wrap(func))

Expand Down Expand Up @@ -409,13 +421,8 @@ def eq_join(self, left_attr, other, index=()):
def zip(self):
return Zip(self)

def grouped_map_reduce(self, grouping, mapping, data_collector):
return GroupedMapReduce(self, func_wrap(grouping), func_wrap(mapping),
func_wrap(data_collector))

def group_by(self, arg1, arg2, *rest):
args = [arg1, arg2] + list(rest)
return GroupBy(self, list(args[:-1]), args[-1])
def group(self, *args):
return Group(self, *[func_wrap(arg) for arg in args])

def for_each(self, mapping):
return ForEach(self, func_wrap(mapping))
Expand Down Expand Up @@ -596,6 +603,24 @@ def reql_type_time_to_datetime(obj):
else:
return datetime.datetime.utcfromtimestamp(obj['epoch_time'])

# Python only allows immutable built-in types to be hashed, such as for keys in a dict
# This means we can't use lists or dicts as keys in grouped data objects, so we convert
# them to tuples and frozensets, respectively.
# This may make it a little harder for users to work with converted grouped data, unless
# they do a simple iteration over the result
def recursively_make_hashable(obj):
if isinstance(obj, list):
return tuple([recursively_make_hashable(i) for i in obj])
elif isinstance(obj, dict):
return frozenset([(k, recursively_make_hashable(v)) for (k,v) in obj.items()])
return obj

def reql_type_grouped_data_to_object(obj):
if not 'data' in obj:
raise RqlDriverError('pseudo-type GROUPED_DATA object %s does not have the expected field "data".' % py_json.dumps(obj))

return dict([(recursively_make_hashable(k),v) for (k,v) in obj['data']])

# This class handles the conversion of RQL terminal types in both directions
# Going to the server though it does not support R_ARRAY or R_OBJECT as those
# are alternately handled by the MakeArray and MakeObject nodes. Why do this?
Expand Down Expand Up @@ -631,51 +656,58 @@ def compose(self, args, optargs):
return repr(self.data)

@staticmethod
def _convert_pseudotype(obj, time_format):
def _convert_pseudotype(obj, format_opts):
reql_type = obj.get('$reql_type$')
if reql_type is not None:
if reql_type == 'TIME':
if time_format == 'native':
time_format = format_opts.get('time_format')
if time_format is None or time_format == 'native':
# Convert to native python datetime object
return reql_type_time_to_datetime(obj)
elif time_format != 'raw':
raise RqlDriverError("Unknown time_format run option \"%s\"." % time_format)
elif reql_type == 'GROUPED_DATA':
group_format = format_opts.get('group_format')
if group_format is None or group_format == 'native':
return reql_type_grouped_data_to_object(obj)
elif group_format != 'raw':
raise RqlDriverError("Unknown group_format run option \"%s\"." % group_format)
else:
raise RqlDriverError("Unknown pseudo-type %s" % reql_type)
# If there was no pseudotype, or the time format is raw, return the original object
return obj

@staticmethod
def _recursively_convert_pseudotypes(obj, time_format):
def _recursively_convert_pseudotypes(obj, format_opts):
if isinstance(obj, dict):
for (key, value) in obj.iteritems():
obj[key] = Datum._recursively_convert_pseudotypes(value, time_format)
obj = Datum._convert_pseudotype(obj, time_format)
obj[key] = Datum._recursively_convert_pseudotypes(value, format_opts)
obj = Datum._convert_pseudotype(obj, format_opts)
elif isinstance(obj, list):
for i in xrange(len(obj)):
obj[i] = Datum._recursively_convert_pseudotypes(obj[i], time_format)
obj[i] = Datum._recursively_convert_pseudotypes(obj[i], format_opts)
return obj

@staticmethod
def deconstruct(datum, time_format='native'):
def deconstruct(datum, format_opts={}):
d_type = datum.type
if d_type == p.Datum.R_JSON:
obj = py_json.loads(datum.r_str)
return Datum._recursively_convert_pseudotypes(obj, time_format)
return Datum._recursively_convert_pseudotypes(obj, format_opts)
elif d_type == p.Datum.R_OBJECT:
obj = { }
for pair in datum.r_object:
obj[pair.key] = Datum.deconstruct(pair.val, time_format)
obj[pair.key] = Datum.deconstruct(pair.val, format_opts)

# Thanks to "pseudo-types" we can't yet be quite sure if this object is meant to
# be an object or something else. We need a second layer of type switching, this
# time on an obfuscated field "$reql_type$" rather than the datum type field we
# already switched on.
Datum._convert_pseudotype(obj, time_format)
Datum._convert_pseudotype(obj, format_opts)
return obj
elif d_type == p.Datum.R_ARRAY:
array = datum.r_array
return [Datum.deconstruct(e, time_format) for e in array]
return [Datum.deconstruct(e, format_opts) for e in array]
elif d_type == p.Datum.R_STR:
return datum.r_str
elif d_type == p.Datum.R_NUM:
Expand Down Expand Up @@ -967,6 +999,22 @@ class Reduce(RqlMethodQuery):
tt = p.Term.REDUCE
st = 'reduce'

class Sum(RqlMethodQuery):
tt = p.Term.SUM
st = 'sum'

class Avg(RqlMethodQuery):
tt = p.Term.AVG
st = 'avg'

class Min(RqlMethodQuery):
tt = p.Term.MIN
st = 'min'

class Max(RqlMethodQuery):
tt = p.Term.MAX
st = 'max'

class Map(RqlMethodQuery):
tt = p.Term.MAP
st = 'map'
Expand Down Expand Up @@ -1019,13 +1067,9 @@ class IsEmpty(RqlMethodQuery):
tt = p.Term.IS_EMPTY
st = 'is_empty'

class GroupedMapReduce(RqlMethodQuery):
tt = p.Term.GROUPED_MAP_REDUCE
st = 'grouped_map_reduce'

class GroupBy(RqlMethodQuery):
tt = p.Term.GROUPBY
st = 'group_by'
class Group(RqlMethodQuery):
tt = p.Term.GROUP
st = 'group'

class InnerJoin(RqlMethodQuery):
tt = p.Term.INNER_JOIN
Expand Down

0 comments on commit eea0c65

Please sign in to comment.