From 0c01c6936201f4f3ed1c39fd4727763b1113a369 Mon Sep 17 00:00:00 2001 From: Guilherme Bernal Date: Sun, 8 Mar 2020 00:50:54 -0300 Subject: [PATCH] Implement queries as jobs (listable on rethinkdb.jobs) --- src/driver/local_connection.cr | 2 +- src/reql/jobs/job.cr | 22 ++++++++++ src/reql/jobs/job_manager.cr | 10 +++++ src/reql/jobs/query_job.cr | 57 +++++++++++++++++++++++++ src/storage/manager.cr | 2 + src/storage/table/virtual_jobs_table.cr | 24 ++++++++++- 6 files changed, 115 insertions(+), 2 deletions(-) create mode 100644 src/reql/jobs/job.cr create mode 100644 src/reql/jobs/job_manager.cr create mode 100644 src/reql/jobs/query_job.cr diff --git a/src/driver/local_connection.cr b/src/driver/local_connection.cr index 1f5a5d9..d3f4071 100644 --- a/src/driver/local_connection.cr +++ b/src/driver/local_connection.cr @@ -34,7 +34,7 @@ module RethinkDB def run(term : ReQL::Term::Type, runopts : RunOpts) : RethinkDB::Cursor | RethinkDB::Datum evaluator = ReQL::Evaluator.new(@manager, @worker) - result = evaluator.eval term + result = @manager.job_manager.run_query(evaluator, term) case result when ReQL::Stream diff --git a/src/reql/jobs/job.cr b/src/reql/jobs/job.cr new file mode 100644 index 0000000..c1633cc --- /dev/null +++ b/src/reql/jobs/job.cr @@ -0,0 +1,22 @@ +require "uuid" + +abstract class ReQL::Job + getter id = UUID.random + getter start_time = Time.utc + getter running = true + + def initialize(@job_manager : JobManager) + @job_manager.lock.synchronize do + @job_manager.jobs[@id] = self + end + end + + def finish_job + @running = false + @job_manager.lock.synchronize do + @job_manager.jobs.delete(@id) + end + end + + abstract def type : String +end diff --git a/src/reql/jobs/job_manager.cr b/src/reql/jobs/job_manager.cr new file mode 100644 index 0000000..e2c1e63 --- /dev/null +++ b/src/reql/jobs/job_manager.cr @@ -0,0 +1,10 @@ +require "./query_job" + +class ReQL::JobManager + getter jobs = Hash(UUID, Job).new + getter lock = Mutex.new + + def run_query(evaluator : Evaluator, term : Term::Type) + QueryJob.new(self, evaluator, term).result + end +end diff --git a/src/reql/jobs/query_job.cr b/src/reql/jobs/query_job.cr new file mode 100644 index 0000000..afb2807 --- /dev/null +++ b/src/reql/jobs/query_job.cr @@ -0,0 +1,57 @@ +require "./job" +require "../executor/stream" + +class ReQL::QueryJob < ReQL::Job + @future_result : Concurrent::Future(ReQL::AbstractValue)? + getter term + + def initialize(job_manager, @evaluator : Evaluator, @term : Term::Type) + super(job_manager) + + @future_result = future do + begin + result = @evaluator.eval(term) + if result.is_a? ReQL::Stream + ResultStream.new(result, self) + else + finish_job + result + end + rescue error + finish_job + raise error + end + end + end + + def result + @future_result.not_nil!.get + end + + def type : String + "query" + end + + struct ResultStream < ReQL::Stream + @stream : Box(Stream) + + def initialize(stream : Stream, @job : ReQL::QueryJob) + @stream = Box(Stream).new(stream) + end + + def start_reading + raise QueryLogicError.new "Query terminated." unless @job.running + @stream.object.start_reading + end + + def next_val + raise QueryLogicError.new "Query terminated." unless @job.running + @stream.object.next_val + end + + def finish_reading + @stream.object.finish_reading + @job.finish_job + end + end +end diff --git a/src/storage/manager.cr b/src/storage/manager.cr index e75197d..1e803ad 100644 --- a/src/storage/manager.cr +++ b/src/storage/manager.cr @@ -1,5 +1,6 @@ require "./table/*" require "./kv" +require "../reql/jobs/job_manager" module Storage class Manager @@ -9,6 +10,7 @@ module Storage property kv : KeyValueStore property lock = Mutex.new getter start_time = Time.utc + getter job_manager = ReQL::JobManager.new class Database property info : KeyValueStore::DatabaseInfo diff --git a/src/storage/table/virtual_jobs_table.cr b/src/storage/table/virtual_jobs_table.cr index cb382ea..2ff8225 100644 --- a/src/storage/table/virtual_jobs_table.cr +++ b/src/storage/table/virtual_jobs_table.cr @@ -7,10 +7,32 @@ module Storage end def get(key) - nil + arr = key.array_value rescue return nil + return nil if arr.size != 2 + type = arr[0].string_value rescue return nil + id = UUID.new(arr[1].string_value) rescue return nil + job = @manager.job_manager.lock.synchronize { @manager.job_manager.jobs[id]? } + return nil unless job + return nil unless job.type == type + encode(job) + end + + private def encode(job : ReQL::Job) + ReQL::Datum.new({ + "id" => [job.type, job.id.to_s], + "duration_sec" => (Time.utc - job.start_time).total_seconds, + "type" => job.type, + "info" => job.is_a?(ReQL::QueryJob) ? { + "query" => job.term.inspect, + } : nil, + }).hash_value end def scan(&block : Hash(String, ReQL::Datum) ->) + jobs = @manager.job_manager.lock.synchronize { @manager.job_manager.jobs.dup } + jobs.each_value do |job| + yield encode(job) + end end end end