Skip to content

Commit

Permalink
Implement group / ungroup
Browse files Browse the repository at this point in the history
  • Loading branch information
lbguilherme committed Mar 8, 2020
1 parent 6c9b6ab commit a3a1fe8
Show file tree
Hide file tree
Showing 13 changed files with 222 additions and 78 deletions.
2 changes: 1 addition & 1 deletion src/driver/dsl.cr
Expand Up @@ -42,7 +42,7 @@ module RethinkDB
end

def self.convert_type(x : Array)
x.map { |y| convert_type(y).as(ReQL::Term::Type) }.as(ReQL::Term::Type)
ReQL::MakeArrayTerm.new(x.map { |y| convert_type(y).as(ReQL::Term::Type) }).as(ReQL::Term::Type)
end

def self.convert_type(x : Hash)
Expand Down
2 changes: 1 addition & 1 deletion src/reql/executor/field_func.cr
Expand Up @@ -20,7 +20,7 @@ module ReQL
raise QueryLogicError.new("Function expects 1 arguments, but only #{args.size} available")
end

obj = args[0].hash_value
obj = args[0].as_datum.hash_value

if obj.has_key? @field
obj[@field]
Expand Down
31 changes: 31 additions & 0 deletions src/reql/executor/group_stream.cr
@@ -0,0 +1,31 @@
require "./stream"

module ReQL
struct GroupStream < Stream
class InternalData
property channel = Channel(Datum?).new(16)
property started = false
end

def initialize
@internal = InternalData.new
end

def <<(value)
@internal.channel.send(value)
end

def start_reading
raise QueryLogicError.new "Cannot consume group stream more than once" if @internal.started
@internal.started = true
end

def next_val : Datum?
@internal.channel.receive
end

def finish_reading
@internal.channel.close
end
end
end
2 changes: 1 addition & 1 deletion src/reql/executor/js_func.cr
Expand Up @@ -20,7 +20,7 @@ module ReQL
LibDuktape.config_buffer(ctx, -1, @bytecode.to_unsafe, @bytecode.size)
LibDuktape.load_function(ctx)
args.each do |arg|
ctx.push_datum arg
ctx.push_datum arg.as_datum
end
LibDuktape.call(ctx, args.size)

Expand Down
6 changes: 6 additions & 0 deletions src/reql/term.cr
Expand Up @@ -410,4 +410,10 @@ module ReQL
end
end
{% end %}

class MakeArrayTerm < Term
def inspect(io)
@args.inspect(io)
end
end
end
75 changes: 75 additions & 0 deletions src/reql/terms/group.cr
@@ -0,0 +1,75 @@
require "../term"

module ReQL
class GroupTerm < Term
infix_inspect "group"

def check
expect_args 2, 3
end
end

class Evaluator
def eval_term(term : GroupTerm)
target = eval(term.args[0])
group_func = eval(term.args[1]).as_function
aggregation_func = term.args.size >= 3 ? eval(term.args[2]).as_function : nil

stream_table = {} of Datum => GroupStream
result_channel = Channel({Datum, Datum} | Exception).new

group_count = 0

target.each do |value|
value = value.as_datum
group = group_func.eval(self, {value}).as_datum
stream = stream_table[group]?
unless stream
stream = stream_table[group] = GroupStream.new
group_count += 1
spawn do
begin
func = aggregation_func
if func
evaluator = Evaluator.new(@manager, @worker)
evaluator.vars = @vars.dup
evaluator.now = @now

result_channel.send({
group,
func.eval(evaluator, {stream.not_nil!}).as_datum,
})
else
result_channel.send({
group,
stream.not_nil!.as_datum,
})
end
rescue exception
result_channel.send(exception)
end
end
end
stream.not_nil! << value
end

stream_table.each_value &.<<(nil)

result = [] of Hash(String, Datum)

group_count.times do
pair = result_channel.receive
if pair.is_a? Exception
raise pair
end

result << {
"group" => pair[0],
"reduction" => pair[1],
}
end

Datum.new(result)
end
end
end
23 changes: 18 additions & 5 deletions src/reql/terms/table_list.cr
Expand Up @@ -23,12 +23,25 @@ module ReQL
raise "BUG: Wrong number of arguments"
end

db = @manager.databases[db_name]?
unless db
raise QueryLogicError.new "Database `#{db_name}` does not exist."
end
if db_name == "rethinkdb"
Datum.new([
"server_config",
"server_status",
"db_config",
"table_config",
"table_status",
"stats",
"current_issues",
"jobs",
])
else
db = @manager.databases[db_name]?
unless db
raise QueryLogicError.new "Database `#{db_name}` does not exist."
end

Datum.new(db.tables.keys)
Datum.new(db.tables.keys)
end
end
end
end
21 changes: 21 additions & 0 deletions src/reql/terms/ungroup.cr
@@ -0,0 +1,21 @@
require "../term"

module ReQL
class UngroupTerm < Term
infix_inspect "ungroup"

def check
expect_args 1
end
end

class Evaluator
def eval_term(term : UngroupTerm)
if term.args[0].is_a? GroupTerm
eval(term.args[0])
else
raise QueryLogicError.new("Cannot ungroup() something that is not a group()")
end
end
end
end
65 changes: 65 additions & 0 deletions src/reql/transformers/base.cr
@@ -0,0 +1,65 @@
require "../term"

abstract struct ReQL::Transformer
def initialize(@root_term : ReQL::Term::Type)
end

def transform
should_transform(@root_term) ? transform(@root_term) : @root_term
end

protected def should_transform(hsh : Hash) : Bool
hsh.each_value do |value|
return true if should_transform(value)
end
false
end

protected def should_transform(val : Bool | String | Bytes | Float64 | Int64 | Int32 | Time | Nil) : Bool
false
end

protected def should_transform(term : ReQL::Term) : Bool
term.args.any? { |arg| should_transform arg }
end

protected def transform(hsh : Hash) : ReQL::Term::Type
result = {} of String => ReQL::Term::Type
hsh.each do |(k, v)|
result[k] = transform(v)
end
result.as(ReQL::Term::Type)
end

protected def transform(val : Bool | String | Bytes | Float64 | Int64 | Int32 | Time | Nil) : ReQL::Term::Type
val
end

protected def transform(term : ReQL::Term) : ReQL::Term::Type
term = term.dup
term.args = term.args.map { |e| transform(e).as(ReQL::Term::Type) }
term
end
end

abstract struct ReQL::TransformerWithVisitor < ReQL::Transformer
def initialize(root_term : ReQL::Term::Type)
super(root_term)
visit(root_term)
finish_visit
end

protected def visit(hsh : Hash)
hsh.each_value { |value| visit value }
end

protected def visit(val : Bool | String | Bytes | Float64 | Int64 | Int32 | Time | Nil)
end

protected def visit(term : ReQL::Term)
term.args.each { |arg| visit arg }
end

protected def finish_visit
end
end
2 changes: 1 addition & 1 deletion src/reql/transformers/group_transformer.cr
@@ -1,5 +1,5 @@
require "../term"
require "./transformer"
require "./base"

struct ReQL::GroupTransformer < ReQL::Transformer
protected def should_transform(term : ReQL::GroupTerm)
Expand Down
2 changes: 1 addition & 1 deletion src/reql/transformers/simplify_variables_transformer.cr
@@ -1,5 +1,5 @@
require "../term"
require "./transformer_with_visitor"
require "./base"

struct ReQL::SimplifyVariablesTransformer < ReQL::TransformerWithVisitor
@variables = Set(Int64).new
Expand Down
45 changes: 1 addition & 44 deletions src/reql/transformers/transformer.cr
@@ -1,51 +1,8 @@
require "../term"
require "./*"

abstract struct ReQL::Transformer
def self.transform(term : ReQL::Term::Type)
term = ReQL::GroupTransformer.new(term).transform
term = ReQL::SimplifyVariablesTransformer.new(term).transform
end

def initialize(@root_term : ReQL::Term::Type)
end

def transform
should_transform(@root_term) ? transform(@root_term) : @root_term
end

protected def should_transform(hsh : Hash) : Bool
hsh.each_value do |value|
return true if should_transform(value)
end
false
end

protected def should_transform(val : Bool | String | Bytes | Float64 | Int64 | Int32 | Time | Nil) : Bool
false
end

protected def should_transform(term : ReQL::Term) : Bool
term.args.any? { |arg| should_transform arg }
end

protected def transform(hsh : Hash) : ReQL::Term::Type
result = {} of String => ReQL::Term::Type
hsh.each do |(k, v)|
result[k] = transform(v)
end
result.as(ReQL::Term::Type)
end

protected def transform(val : Bool | String | Bytes | Float64 | Int64 | Int32 | Time | Nil) : ReQL::Term::Type
val
end

protected def transform(term : ReQL::Term) : ReQL::Term::Type
term = term.dup
term.args = term.args.map { |e| transform(e).as(ReQL::Term::Type) }
term
end
end

require "./transformer_with_visitor"
require "./*"
24 changes: 0 additions & 24 deletions src/reql/transformers/transformer_with_visitor.cr

This file was deleted.

0 comments on commit a3a1fe8

Please sign in to comment.