Skip to content

Commit

Permalink
Implement queries as jobs (listable on rethinkdb.jobs)
Browse files Browse the repository at this point in the history
  • Loading branch information
lbguilherme committed Mar 8, 2020
1 parent f4789f8 commit 0c01c69
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/driver/local_connection.cr
Expand Up @@ -34,7 +34,7 @@ module RethinkDB

def run(term : ReQL::Term::Type, runopts : RunOpts) : RethinkDB::Cursor | RethinkDB::Datum
evaluator = ReQL::Evaluator.new(@manager, @worker)
result = evaluator.eval term
result = @manager.job_manager.run_query(evaluator, term)

case result
when ReQL::Stream
Expand Down
22 changes: 22 additions & 0 deletions src/reql/jobs/job.cr
@@ -0,0 +1,22 @@
require "uuid"

abstract class ReQL::Job
getter id = UUID.random
getter start_time = Time.utc
getter running = true

def initialize(@job_manager : JobManager)
@job_manager.lock.synchronize do
@job_manager.jobs[@id] = self
end
end

def finish_job
@running = false
@job_manager.lock.synchronize do
@job_manager.jobs.delete(@id)
end
end

abstract def type : String
end
10 changes: 10 additions & 0 deletions src/reql/jobs/job_manager.cr
@@ -0,0 +1,10 @@
require "./query_job"

class ReQL::JobManager
getter jobs = Hash(UUID, Job).new
getter lock = Mutex.new

def run_query(evaluator : Evaluator, term : Term::Type)
QueryJob.new(self, evaluator, term).result
end
end
57 changes: 57 additions & 0 deletions src/reql/jobs/query_job.cr
@@ -0,0 +1,57 @@
require "./job"
require "../executor/stream"

class ReQL::QueryJob < ReQL::Job
@future_result : Concurrent::Future(ReQL::AbstractValue)?
getter term

def initialize(job_manager, @evaluator : Evaluator, @term : Term::Type)
super(job_manager)

@future_result = future do
begin
result = @evaluator.eval(term)
if result.is_a? ReQL::Stream
ResultStream.new(result, self)
else
finish_job
result
end
rescue error
finish_job
raise error
end
end
end

def result
@future_result.not_nil!.get
end

def type : String
"query"
end

struct ResultStream < ReQL::Stream
@stream : Box(Stream)

def initialize(stream : Stream, @job : ReQL::QueryJob)
@stream = Box(Stream).new(stream)
end

def start_reading
raise QueryLogicError.new "Query terminated." unless @job.running
@stream.object.start_reading
end

def next_val
raise QueryLogicError.new "Query terminated." unless @job.running
@stream.object.next_val
end

def finish_reading
@stream.object.finish_reading
@job.finish_job
end
end
end
2 changes: 2 additions & 0 deletions src/storage/manager.cr
@@ -1,5 +1,6 @@
require "./table/*"
require "./kv"
require "../reql/jobs/job_manager"

module Storage
class Manager
Expand All @@ -9,6 +10,7 @@ module Storage
property kv : KeyValueStore
property lock = Mutex.new
getter start_time = Time.utc
getter job_manager = ReQL::JobManager.new

class Database
property info : KeyValueStore::DatabaseInfo
Expand Down
24 changes: 23 additions & 1 deletion src/storage/table/virtual_jobs_table.cr
Expand Up @@ -7,10 +7,32 @@ module Storage
end

def get(key)
nil
arr = key.array_value rescue return nil
return nil if arr.size != 2
type = arr[0].string_value rescue return nil
id = UUID.new(arr[1].string_value) rescue return nil
job = @manager.job_manager.lock.synchronize { @manager.job_manager.jobs[id]? }
return nil unless job
return nil unless job.type == type
encode(job)
end

private def encode(job : ReQL::Job)
ReQL::Datum.new({
"id" => [job.type, job.id.to_s],
"duration_sec" => (Time.utc - job.start_time).total_seconds,
"type" => job.type,
"info" => job.is_a?(ReQL::QueryJob) ? {
"query" => job.term.inspect,
} : nil,
}).hash_value
end

def scan(&block : Hash(String, ReQL::Datum) ->)
jobs = @manager.job_manager.lock.synchronize { @manager.job_manager.jobs.dup }
jobs.each_value do |job|
yield encode(job)
end
end
end
end

0 comments on commit 0c01c69

Please sign in to comment.