Skip to content

Commit

Permalink
refactor write worker for better performance
Browse files Browse the repository at this point in the history
  • Loading branch information
lbguilherme committed Mar 8, 2020
1 parent d7ee2cf commit d969206
Show file tree
Hide file tree
Showing 13 changed files with 134 additions and 109 deletions.
5 changes: 4 additions & 1 deletion src/driver/local_connection.cr
Expand Up @@ -2,6 +2,7 @@ require "socket"
require "../crypto"
require "../storage/manager"
require "../reql/evaluator"
require "../reql/worker"
require "./connection"

module RethinkDB
Expand All @@ -10,13 +11,15 @@ module RethinkDB

def initialize(data_path : String)
@manager = Storage::Manager.new data_path
@worker = ReQL::Worker.new
end

def on_error(&block : Exception, ReQL::Term::Type ->)
@error_callback = block
end

def close
@worker.close
@manager.close
end

Expand All @@ -30,7 +33,7 @@ module RethinkDB
end

def run(term : ReQL::Term::Type, runopts : RunOpts) : RethinkDB::Cursor | RethinkDB::Datum
evaluator = ReQL::Evaluator.new(@manager)
evaluator = ReQL::Evaluator.new(@manager, @worker)
result = evaluator.eval term

case result
Expand Down
27 changes: 24 additions & 3 deletions src/reql/evaluator.cr
@@ -1,15 +1,36 @@
require "./executor/*"
require "./error"
require "./term"
require "./executor/*"
require "./helpers/*"
require "./term"
require "./worker"

module ReQL
class Evaluator
property vars = {} of Int64 => Datum
property table_writers = [] of TableWriter
property now = Time.utc

def initialize(@manager : Storage::Manager)
def initialize(@manager : Storage::Manager, @worker : Worker? = nil)
end

def perform_writes
worker = @worker

unless worker
raise QueryLogicError.new "Cannot perform writes on this context."
end

writer = TableWriter.new
@table_writers << writer

begin
yield writer, worker
ensure
@table_writers.pop
end

@table_writers.last?.try &.merge(writer)
writer.summary
end

def eval(arr : Array) : AbstractValue
Expand Down
File renamed without changes.
9 changes: 5 additions & 4 deletions src/reql/terms/db_create.cr
Expand Up @@ -19,10 +19,11 @@ module ReQL

db_config = @manager.get_table("rethinkdb", "db_config").not_nil!

writter = TableWriter.new
writter.insert(db_config, {
"name" => Datum.new(name),
})
perform_writes do |writer|
writer.insert(db_config, {
"name" => Datum.new(name),
})
end

Datum.new(Hash(String, Datum::Type).new)
end
Expand Down
42 changes: 13 additions & 29 deletions src/reql/terms/delete.cr
Expand Up @@ -14,43 +14,27 @@ module ReQL
def eval_term(term : DeleteTerm)
source = eval(term.args[0])

writter = TableWriter.new

if source.is_array?
channel = Channel({Storage::AbstractTable, Datum}).new(128)
wait_group = WaitGroup.new
jobs = 0
consumers = 0

source.each do |obj|
row = obj.as_row
wait_group.add
channel.send({row.table, row.key})
jobs += 1

if Math.max(1, Math.log(jobs, 1.1).floor - 3) > consumers
consumers += 1
spawn start_deleter_worker(channel, wait_group, writter)
perform_writes do |writer, worker|
if source.is_array?
wait_group = WaitGroup.new
source.each do |obj|
row = obj.as_row
wait_group.add
worker.delete(wait_group, writer, row.table, row.key)
end
wait_group.wait
else
row = source.as_row
writer.delete(row.table, row.key)
end

wait_group.wait
channel.close
else
row = source.as_row
writter.delete(row.table, row.key)
end

@table_writers.last?.try &.merge(writter)

writter.summary
end
end
end

private def start_deleter_worker(channel, wait_group, writter)
private def start_deleter_worker(channel, wait_group, writer)
while pair = channel.receive?
writter.delete(pair[0], pair[1])
writer.delete(pair[0], pair[1])
wait_group.done
end
end
11 changes: 1 addition & 10 deletions src/reql/terms/for_each.cr
Expand Up @@ -14,20 +14,11 @@ module ReQL
target = eval(term.args[0])
func = eval(term.args[1]).as_function

writter = TableWriter.new
@table_writers << writter

begin
perform_writes do
target.each do |e|
func.eval(self, {e.as_datum})
end
ensure
@table_writers.pop
end

@table_writers.last?.try &.merge(writter)

writter.summary
end
end
end
9 changes: 3 additions & 6 deletions src/reql/terms/index_create.cr
Expand Up @@ -33,12 +33,9 @@ module ReQL
multi = Datum.new(term.options["multi"]).bool_value
end

writter = TableWriter.new
writter.create_index(storage, name, function, multi)

@table_writers.last?.try &.merge(writter)

writter.summary
perform_writes do |writer|
writer.create_index(storage, name, function, multi)
end
end
end
end
37 changes: 7 additions & 30 deletions src/reql/terms/insert.cr
@@ -1,6 +1,6 @@
require "uuid"
require "../term"
require "../../utils/waitgroup.cr"
require "../../utils/wait_group.cr"

module ReQL
class InsertTerm < Term
Expand All @@ -16,8 +16,6 @@ module ReQL
table = eval(term.args[0]).as_table
datum = eval(term.args[1])

writter = TableWriter.new

docs = case
when array = datum.array_or_set_value?
array.map do |e|
Expand All @@ -33,35 +31,14 @@ module ReQL
raise QueryLogicError.new("Expected type OBJECT but found #{datum.reql_type}")
end

channel = Channel(Hash(String, Datum)).new(128)
wait_group = WaitGroup.new
jobs = 0
consumers = 0

docs.each do |obj|
wait_group.add
channel.send(obj)
jobs += 1

if Math.max(1, Math.log(jobs, 1.1).floor - 3) > consumers
consumers += 1
spawn start_inserter_worker(channel, wait_group, writter, table.storage)
perform_writes do |writer, worker|
wait_group = WaitGroup.new
docs.each do |obj|
wait_group.add
@worker.not_nil!.insert(wait_group, writer, table.storage, obj)
end
wait_group.wait
end

wait_group.wait
channel.close

@table_writers.last?.try &.merge(writter)

writter.summary
end
end
end

private def start_inserter_worker(channel, wait_group, writter, storage)
while obj = channel.receive?
writter.insert(storage, obj)
wait_group.done
end
end
9 changes: 3 additions & 6 deletions src/reql/terms/table_create.cr
Expand Up @@ -69,12 +69,9 @@ module ReQL

table_config = @manager.get_table("rethinkdb", "table_config").as(Storage::VirtualTableConfigTable)

writter = TableWriter.new
writter.create_table(table_config, descriptor)

@table_writers.last?.try &.merge(writter)

writter.summary
perform_writes do |writer|
writer.create_table(table_config, descriptor)
end
end
end
end
34 changes: 15 additions & 19 deletions src/reql/terms/update.cr
Expand Up @@ -15,33 +15,29 @@ module ReQL
source = eval(term.args[0])
value = eval(term.args[1])

writter = TableWriter.new

if source.is_array?
source.each do |obj|
row = obj.as_row
writter.atomic_update(row.table, row.key) do |old|
perform_writes do |writer|
if source.is_array?
source.each do |obj|
row = obj.as_row
writer.atomic_update(row.table, row.key) do |old|
if value.is_a?(Func)
value.eval(self, {Datum.new(old)}).hash_value
else
value.hash_value
end
end
end
else
row = source.as_row
writer.atomic_update(row.table, row.key) do |old|
if value.is_a?(Func)
value.eval(self, {Datum.new(old)}).hash_value
else
value.hash_value
end
end
end
else
row = source.as_row
writter.atomic_update(row.table, row.key) do |old|
if value.is_a?(Func)
value.eval(self, {Datum.new(old)}).hash_value
else
value.hash_value
end
end
end

@table_writers.last?.try &.merge(writter)

writter.summary
end
end
end
58 changes: 58 additions & 0 deletions src/reql/worker.cr
@@ -0,0 +1,58 @@
require "./helpers/table_writer"
require "../utils/wait_group"
require "../storage/table/abstract_table"
require "./executor/datum"

class ReQL::Worker
WORKER_COUNT = 128
CHANNEL_SIZE = 10 * WORKER_COUNT

@inserter = Channel({wait_group: WaitGroup, table_writer: TableWriter, table: Storage::AbstractTable, row: Hash(String, Datum)}).new(CHANNEL_SIZE)
@deleter = Channel({wait_group: WaitGroup, table_writer: TableWriter, table: Storage::AbstractTable, key: Datum}).new(CHANNEL_SIZE)
@closer = Channel(WaitGroup).new(WORKER_COUNT)

def initialize
WORKER_COUNT.times do
spawn worker
end
end

def close
wait_group = WaitGroup.new
WORKER_COUNT.times do
wait_group.add
@closer.send(wait_group)
end
wait_group.wait

@inserter.close
@closer.close
end

def insert(wait_group, table_writer, table, row)
@inserter.send({wait_group: wait_group, table_writer: table_writer, table: table, row: row})
end

def delete(wait_group, table_writer, table, key)
@deleter.send({wait_group: wait_group, table_writer: table_writer, table: table, key: key})
end

private def worker
loop do
select
when wait_group = @closer.receive
wait_group.done
return
when tuple = @inserter.receive
tuple[:table_writer].insert(tuple[:table], tuple[:row])
tuple[:wait_group].done
when tuple = @deleter.receive
tuple[:table_writer].delete(tuple[:table], tuple[:key])
tuple[:wait_group].done
end
end
rescue error
STDERR.puts error.inspect_with_backtrace
worker
end
end
2 changes: 1 addition & 1 deletion src/storage/kv.cr
@@ -1,7 +1,7 @@
require "uuid"
require "file_utils"
require "rocksdb"
require "../reql/helpers/table_writter"
require "../reql/helpers/table_writer"
require "../reql/executor/func"
require "../reql/executor/reql_func"
require "../reql/terms/var"
Expand Down
File renamed without changes.

0 comments on commit d969206

Please sign in to comment.