From b38723cb786de3f6fdbde3e0dfae50f71a49176e Mon Sep 17 00:00:00 2001 From: Josef Erben Date: Mon, 27 Jul 2020 09:06:24 +0200 Subject: [PATCH] Get rid of Obj.magic and store WorkableJob.t --- src/queue/queue_core.ml | 67 ++++++++++++------------ src/queue/queue_service.ml | 91 +++++++++++++++------------------ src/queue/queue_service_repo.ml | 2 +- src/queue/queue_sig.ml | 2 +- test/test-unit/test_queue.ml | 1 + 5 files changed, 76 insertions(+), 87 deletions(-) diff --git a/src/queue/queue_core.ml b/src/queue/queue_core.ml index aac02618c..8072c1636 100644 --- a/src/queue/queue_core.ml +++ b/src/queue/queue_core.ml @@ -1,5 +1,6 @@ open Base +(** This is the description of a job. A job dispatch is a job description and some arguments/input. *) module Job = struct let default_tries = 5 @@ -35,6 +36,33 @@ module Job = struct let set_retry_delay retry_delay job = { job with retry_delay } end +(** A workable job can process a job instance that is persisted. We can not store the job directly because of the polymorphic type ('a Job.t). *) +module WorkableJob = struct + type t = { + name : string; + with_context : Core.Ctx.t -> Core.Ctx.t; + work : Core.Ctx.t -> input:string option -> (unit, string) Result.t Lwt.t; + failed : Core.Ctx.t -> (unit, string) Result.t Lwt.t; + max_tries : int; + retry_delay : Utils.Time.duration; + } + [@@deriving show, fields] + + let of_job job = + let name = Job.name job in + let with_context = Job.with_context job in + let work ctx ~input = + match (Job.string_to_input job) input with + | Error msg -> Lwt_result.fail msg + | Ok input -> (Job.handle job) ctx ~input + in + let failed = Job.failed job in + let max_tries = Job.max_tries job in + let retry_delay = Job.retry_delay job in + { name; with_context; work; failed; max_tries; retry_delay } +end + +(** This is the actual job instance that is derived from the job description ['a Job.t] and some input. This needs to be serialized and persisted for persistent job queues. *) module JobInstance = struct module Status = struct type t = Pending | Succeeded | Failed [@@deriving yojson, show, eq] @@ -89,9 +117,11 @@ module JobInstance = struct let should_run ~job ~job_instance ~now = let tries = job_instance.tries in - let max_tries = Job.max_tries job in + let max_tries = WorkableJob.max_tries job in let start_at = job_instance.start_at in - let retry_delay = Job.retry_delay job |> Utils.Time.duration_to_span in + let retry_delay = + WorkableJob.retry_delay job |> Utils.Time.duration_to_span + in let earliest_retry_at = if Option.is_some job_instance.last_ran_at then Ptime.add_span now retry_delay |> Option.value ~default:now @@ -105,36 +135,3 @@ module JobInstance = struct let is_pending = is_pending job_instance in is_pending && has_tries_left && is_after_delay && is_after_retry_delay end - -(* TODO turn these into types *) -module type JOB_WORKER = sig - val work : Core.Ctx.t -> input:string -> (unit, string) Result.t Lwt.t - - val failed : Core.Ctx.t -> (unit, string) Result.t Lwt.t - - val with_context : Core.Ctx.t -> Core.Ctx.t - - val name : string - - val max_tries : int - - val retry_delay : Utils.Time.duration -end - -module JobWorkerManager : sig - val register_worker : (module JOB_WORKER) -> unit - - val work : Core.Ctx.t -> JobInstance.t -> unit Lwt.t -end = struct - let registered_workers : - (string, (module JOB_WORKER), Base.String.comparator_witness) Base.Map.t - ref = - ref @@ Map.empty (module String) - - let register_worker (module JobWorker : JOB_WORKER) = - registered_workers := - Map.add_exn !registered_workers ~key:JobWorker.name - ~data:(module JobWorker) - - let work _ _ = failwith "TODO" -end diff --git a/src/queue/queue_service.ml b/src/queue/queue_service.ml index d070f45a7..cc2e46a6f 100644 --- a/src/queue/queue_service.ml +++ b/src/queue/queue_service.ml @@ -3,11 +3,10 @@ open Base let ( let* ) = Lwt.bind module Job = Queue_core.Job +module WorkableJob = Queue_core.WorkableJob module JobInstance = Queue_core.JobInstance -(* TODO think about how to avoid Obj.magic *) -(* We have this problem due to the polymorphic type 'a Job.t and mutable state *) -let registered_jobs = Caml.Obj.magic (ref []) +let registered_jobs : WorkableJob.t list ref = ref [] let stop_schedule : (unit -> unit) option ref = ref None @@ -31,7 +30,7 @@ module MakePolling Lwt_result.return () let register_jobs _ ~jobs = - registered_jobs := jobs; + registered_jobs := jobs |> List.map ~f:WorkableJob.of_job; Lwt.return () let dispatch ctx ~job ?delay input = @@ -51,61 +50,52 @@ module MakePolling "QUEUE: Failure while enqueuing job instance: " ^ msg) |> Lwt.map Result.ok_or_failwith - let run_job ctx input_string ~job ~job_instance = + let run_job ctx input ~job ~job_instance = let job_instance_id = JobInstance.id job_instance in - match Job.string_to_input job input_string with - | Error msg -> - Log.err (fun m -> - m "QUEUE: Unexpected input %s found for job instance %a %s" - (Option.value ~default:"-" input_string) - JobInstance.pp job_instance msg); - Lwt.return None - | Ok input -> ( + let* result = + Lwt.catch + (fun () -> WorkableJob.work job ctx ~input) + (fun exn -> + let exn_string = Exn.to_string exn in + Lwt.return + @@ Error + ( "Exception caught while running job, this is a bug in your \ + job handler, make sure to not throw exceptions " ^ exn_string + )) + in + match result with + | Error msg -> ( + Logs.err (fun m -> + m "QUEUE: Failure while running job instance %a %s" JobInstance.pp + job_instance msg); let* result = Lwt.catch - (fun () -> Job.handle job ctx ~input) + (fun () -> WorkableJob.failed job ctx) (fun exn -> let exn_string = Exn.to_string exn in Lwt.return @@ Error - ( "Exception caught while running job, this is a bug in \ - your job handler, make sure to not throw exceptions " - ^ exn_string )) + ( "Exception caught while cleaning up job, this is a bug in \ + your job failure handler, make sure to not throw \ + exceptions " ^ exn_string )) in match result with - | Error msg -> ( + | Error msg -> Logs.err (fun m -> - m "QUEUE: Failure while running job instance %a %s" + m + "QUEUE: Failure while run failure handler for job instance \ + %a %s" JobInstance.pp job_instance msg); - let* result = - Lwt.catch - (fun () -> Job.failed job ctx) - (fun exn -> - let exn_string = Exn.to_string exn in - Lwt.return - @@ Error - ( "Exception caught while cleaning up job, this is a \ - bug in your job failure handler, make sure to not \ - throw exceptions " ^ exn_string )) - in - match result with - | Error msg -> - Logs.err (fun m -> - m - "QUEUE: Failure while run failure handler for job \ - instance %a %s" - JobInstance.pp job_instance msg); - Lwt.return None - | Ok () -> - Logs.err (fun m -> - m "QUEUE: Failure while cleaning up job instance %a" - Uuidm.pp job_instance_id); - Lwt.return None ) + Lwt.return None | Ok () -> - Logs.debug (fun m -> - m "QUEUE: Successfully ran job instance %a" Uuidm.pp + Logs.err (fun m -> + m "QUEUE: Failure while cleaning up job instance %a" Uuidm.pp job_instance_id); - Lwt.return @@ Some () ) + Lwt.return None ) + | Ok () -> + Logs.debug (fun m -> + m "QUEUE: Successfully ran job instance %a" Uuidm.pp job_instance_id); + Lwt.return @@ Some () let update ctx ~job_instance = Repo.update ctx ~job_instance @@ -121,7 +111,7 @@ module MakePolling let job_instance = match job_run_status with | None -> - if JobInstance.tries job_instance >= Job.max_tries job then + if JobInstance.tries job_instance >= WorkableJob.max_tries job then JobInstance.set_failed job_instance else job_instance | Some () -> JobInstance.set_succeeded job_instance @@ -138,7 +128,7 @@ module MakePolling let work_queue ctx ~jobs = let* pending_job_instances = - Repo.find_pending ctx + Repo.find_workable ctx |> Lwt_result.map_err (fun msg -> "QUEUE: Failure while finding pending job instances " ^ msg) |> Lwt.map Result.ok_or_failwith @@ -153,7 +143,8 @@ module MakePolling | job_instance :: job_instances -> ( let job = List.find jobs ~f:(fun job -> - job |> Job.name |> String.equal (JobInstance.name job_instance)) + job |> WorkableJob.name + |> String.equal (JobInstance.name job_instance)) in match job with | None -> loop job_instances jobs @@ -168,7 +159,7 @@ module MakePolling (* Combine all context middleware functions of registered jobs to get the context the jobs run with*) let combined_context_fn = jobs - |> List.map ~f:Job.with_context + |> List.map ~f:WorkableJob.with_context |> List.fold_left ~init:Fn.id ~f:Fn.compose in (* This function run every second, the request context gets created here with each tick *) diff --git a/src/queue/queue_service_repo.ml b/src/queue/queue_service_repo.ml index 9cebf805a..f24a58b76 100644 --- a/src/queue/queue_service_repo.ml +++ b/src/queue/queue_service_repo.ml @@ -28,7 +28,7 @@ module MakeMemory (Repo : Data.Repo.Sig.SERVICE) : Queue_sig.REPO = struct state := Map.set !state ~key:id ~data:job_instance; Lwt_result.return () - let find_pending _ = + let find_workable _ = let all_job_instances = List.map !ordered_ids ~f:(fun id -> Map.find !state id) in diff --git a/src/queue/queue_sig.ml b/src/queue/queue_sig.ml index 5da530776..f9973cc5d 100644 --- a/src/queue/queue_sig.ml +++ b/src/queue/queue_sig.ml @@ -7,7 +7,7 @@ module type REPO = sig val enqueue : Core.Ctx.t -> job_instance:JobInstance.t -> (unit, string) Result.t Lwt.t - val find_pending : Core.Ctx.t -> (JobInstance.t list, string) Result.t Lwt.t + val find_workable : Core.Ctx.t -> (JobInstance.t list, string) Result.t Lwt.t val update : Core.Ctx.t -> job_instance:JobInstance.t -> (unit, string) Result.t Lwt.t diff --git a/test/test-unit/test_queue.ml b/test/test-unit/test_queue.ml index fd87c7e98..99ac41382 100644 --- a/test/test-unit/test_queue.ml +++ b/test/test-unit/test_queue.ml @@ -11,6 +11,7 @@ let should_run_job _ () = () |> Sihl.Queue.set_max_tries 3 |> Sihl.Queue.set_retry_delay Sihl.Utils.Time.OneMinute + |> Sihl.Queue.Core.WorkableJob.of_job in let job_instance = Sihl.Queue.Core.JobInstance.create ~input:None ~name:"foo" ~start_at:now