diff --git a/README.md b/README.md index 1337fa8..414f4b9 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,30 @@ it. If you don't want more than one job running at a time then set this to 1. +Beyond this global scope you can adjust the total number of workers on each +individual Resque Job type by overriding the `max_workers` class method for the job. +If you change this, the value returned by that method takes precedence over the +global value. + +```ruby +class ResourceIntensiveJob + extend Resque::Kubernetes::Job + + def perform + # ... + end + + def job_manifest + # ... + end + + def max_workers + # Simply return an integer value, or do something more complicated if needed. + 105 + end +end +``` + ## To Do - We probably need better namespace support, particularly for reaping diff --git a/lib/resque/kubernetes/job.rb b/lib/resque/kubernetes/job.rb index 0e459a5..db33a44 100644 --- a/lib/resque/kubernetes/job.rb +++ b/lib/resque/kubernetes/job.rb @@ -49,6 +49,32 @@ def before_enqueue_kubernetes_job(*_) apply_kubernetes_job end + protected + + # Return the maximum number of workers to autoscale the job to. + # + # While the number of active Kubernetes Jobs is less than this number, + # the gem will add new Jobs to auto-scale the workers. + # + # By default, this returns `Resque::Kubernetes.max_workers` from the gem + # configuration. You may override this method to return any other value, + # either as a simple integer or with some complex logic. + # + # Example: + # def max_workers + # # A simple integer + # 105 + # end + # + # Example: + # def max_workers + # # Scale based on time of day + # Time.now.hour < 8 ? 15 : 5 + # end + def max_workers + Resque::Kubernetes.max_workers + end + private def jobs_client @@ -98,7 +124,7 @@ def jobs_maxed?(name, namespace) namespace: namespace ) running = resque_jobs.reject { |job| job.spec.completions == job.status.succeeded } - running.size == Resque::Kubernetes.max_workers + running.size == max_workers end def adjust_manifest(manifest) diff --git a/spec/resque/kubernetes/job_spec.rb b/spec/resque/kubernetes/job_spec.rb index 2280dfd..8bf9fed 100644 --- a/spec/resque/kubernetes/job_spec.rb +++ b/spec/resque/kubernetes/job_spec.rb @@ -99,145 +99,59 @@ def initialize(hash) subject.before_enqueue_kubernetes_job end - context "when the maximum number of matching, working jobs is met" do - before do - allow(Resque::Kubernetes).to receive(:max_workers).and_return(1) - allow(jobs_client).to receive(:get_jobs).and_return([working_job]) - end - - it "does not try to create a new job" do - expect(Kubeclient::Resource).not_to receive(:new) - subject.before_enqueue_kubernetes_job - end - end - - context "when matching, completed jobs exist" do - before do - allow(Resque::Kubernetes).to receive(:max_workers).and_return(2) - allow(jobs_client).to receive(:get_jobs).and_return([done_job, working_job]) - end - - it "creates a new job using the provided job manifest" do - expect(jobs_client).to receive(:create_job) - subject.before_enqueue_kubernetes_job - end - end - - context "when more job workers can be launched" do - let(:job) { double("job") } + shared_examples "max workers" do + context "when the maximum number of matching, working jobs is met" do + let(:workers) { 1 } - before do - allow(Resque::Kubernetes).to receive(:max_workers).and_return(10) - allow(jobs_client).to receive(:get_jobs).and_return([]) - allow(Kubeclient::Resource).to receive(:new).and_return(job) - end - - it "creates a new job using the provided job manifest" do - expect(jobs_client).to receive(:create_job) - subject.before_enqueue_kubernetes_job - end - - it "labels the job and the pod" do - manifest = hash_including( - "metadata" => hash_including( - "labels" => hash_including( - "resque-kubernetes" => "job" - ) - ), - "spec" => hash_including( - "template" => hash_including( - "metadata" => hash_including( - "labels" => hash_including( - "resque-kubernetes" => "pod" - ) - ) - ) - ) - ) - expect(Kubeclient::Resource).to receive(:new).with(manifest).and_return(job) - subject.before_enqueue_kubernetes_job - end + before do + allow(jobs_client).to receive(:get_jobs).and_return([working_job]) + end - it "label the job to group it based on the provided name in the manifest" do - manifest = hash_including( - "metadata" => hash_including( - "labels" => hash_including( - "resque-kubernetes-group" => "thing" - ) - ) - ) - expect(Kubeclient::Resource).to receive(:new).with(manifest).and_return(job) - subject.before_enqueue_kubernetes_job + it "does not try to create a new job" do + expect(Kubeclient::Resource).not_to receive(:new) + subject.before_enqueue_kubernetes_job + end end - it "updates the job name to make it unique" do - manifest = hash_including( - "metadata" => hash_including( - "name" => match(/^thing-[a-z0-9]{5}$/) - ) - ) - expect(Kubeclient::Resource).to receive(:new).with(manifest).and_return(job) - subject.before_enqueue_kubernetes_job - end + context "when matching, completed jobs exist" do + let(:workers) { 2 } - context "when the restart policy is included" do before do - manifest = subject.default_manifest.dup - manifest["spec"]["template"]["spec"]["restartPolicy"] = "Always" - allow(subject).to receive(:job_manifest).and_return(manifest) + allow(jobs_client).to receive(:get_jobs).and_return([done_job, working_job]) end - it "retains it" do - manifest = hash_including( - "spec" => hash_including( - "template" => hash_including( - "spec" => hash_including( - "restartPolicy" => "Always" - ) - ) - ) - ) - expect(Kubeclient::Resource).to receive(:new).with(manifest).and_return(job) + it "creates a new job using the provided job manifest" do + expect(jobs_client).to receive(:create_job) subject.before_enqueue_kubernetes_job end end - context "when the restart policy is not set" do - it "ensures it is set to OnFailure" do - manifest = hash_including( - "spec" => hash_including( - "template" => hash_including( - "spec" => hash_including( - "restartPolicy" => "OnFailure" - ) - ) - ) - ) - expect(Kubeclient::Resource).to receive(:new).with(manifest).and_return(job) - subject.before_enqueue_kubernetes_job - end - end + context "when more job workers can be launched" do + let(:job) { double("job") } + let(:workers) { 10 } - context "when TERM_ON_EMPTY environment is included" do before do - manifest = subject.default_manifest.dup - manifest["spec"]["template"]["spec"]["containers"][0]["env"] = [ - {"name" => "TERM_ON_EMPTY", "value" => "true"} - ] - allow(subject).to receive(:job_manifest).and_return(manifest) + allow(jobs_client).to receive(:get_jobs).and_return([]) + allow(Kubeclient::Resource).to receive(:new).and_return(job) end - it "ensures it is set to 1" do + it "creates a new job using the provided job manifest" do + expect(jobs_client).to receive(:create_job) + subject.before_enqueue_kubernetes_job + end + + it "labels the job and the pod" do manifest = hash_including( - "spec" => hash_including( + "metadata" => hash_including( + "labels" => hash_including( + "resque-kubernetes" => "job" + ) + ), + "spec" => hash_including( "template" => hash_including( - "spec" => hash_including( - "containers" => array_including( - hash_including( - "env" => array_including( - hash_including("name" => "TERM_ON_EMPTY", "value" => "1") - ) - ) + "metadata" => hash_including( + "labels" => hash_including( + "resque-kubernetes" => "pod" ) ) ) @@ -246,60 +160,167 @@ def initialize(hash) expect(Kubeclient::Resource).to receive(:new).with(manifest).and_return(job) subject.before_enqueue_kubernetes_job end - end - context "when TERM_ON_EMPTY environment is not set" do - it "ensures it is set to 1" do + it "label the job to group it based on the provided name in the manifest" do manifest = hash_including( - "spec" => hash_including( - "template" => hash_including( - "spec" => hash_including( - "containers" => array_including( - hash_including( - "env" => array_including( - hash_including("name" => "TERM_ON_EMPTY", "value" => "1") - ) - ) - ) - ) + "metadata" => hash_including( + "labels" => hash_including( + "resque-kubernetes-group" => "thing" ) ) ) expect(Kubeclient::Resource).to receive(:new).with(manifest).and_return(job) subject.before_enqueue_kubernetes_job end - end - context "when the namespace is not included" do - it "sets it to 'default'" do + it "updates the job name to make it unique" do manifest = hash_including( "metadata" => hash_including( - "namespace" => "default" + "name" => match(/^thing-[a-z0-9]{5}$/) ) ) expect(Kubeclient::Resource).to receive(:new).with(manifest).and_return(job) subject.before_enqueue_kubernetes_job end - end - context "when the namespace is set" do - before do - manifest = subject.default_manifest.dup - manifest["metadata"]["namespace"] = "staging" - allow(subject).to receive(:job_manifest).and_return(manifest) + context "when the restart policy is included" do + before do + manifest = subject.default_manifest.dup + manifest["spec"]["template"]["spec"]["restartPolicy"] = "Always" + allow(subject).to receive(:job_manifest).and_return(manifest) + end + + it "retains it" do + manifest = hash_including( + "spec" => hash_including( + "template" => hash_including( + "spec" => hash_including( + "restartPolicy" => "Always" + ) + ) + ) + ) + expect(Kubeclient::Resource).to receive(:new).with(manifest).and_return(job) + subject.before_enqueue_kubernetes_job + end end - it "retains it" do - manifest = hash_including( - "metadata" => hash_including( - "namespace" => "staging" - ) - ) - expect(Kubeclient::Resource).to receive(:new).with(manifest).and_return(job) - subject.before_enqueue_kubernetes_job + context "when the restart policy is not set" do + it "ensures it is set to OnFailure" do + manifest = hash_including( + "spec" => hash_including( + "template" => hash_including( + "spec" => hash_including( + "restartPolicy" => "OnFailure" + ) + ) + ) + ) + expect(Kubeclient::Resource).to receive(:new).with(manifest).and_return(job) + subject.before_enqueue_kubernetes_job + end end + + context "when TERM_ON_EMPTY environment is included" do + before do + manifest = subject.default_manifest.dup + manifest["spec"]["template"]["spec"]["containers"][0]["env"] = [ + {"name" => "TERM_ON_EMPTY", "value" => "true"} + ] + allow(subject).to receive(:job_manifest).and_return(manifest) + end + + it "ensures it is set to 1" do + manifest = hash_including( + "spec" => hash_including( + "template" => hash_including( + "spec" => hash_including( + "containers" => array_including( + hash_including( + "env" => array_including( + hash_including("name" => "TERM_ON_EMPTY", "value" => "1") + ) + ) + ) + ) + ) + ) + ) + expect(Kubeclient::Resource).to receive(:new).with(manifest).and_return(job) + subject.before_enqueue_kubernetes_job + end + end + + context "when TERM_ON_EMPTY environment is not set" do + it "ensures it is set to 1" do + manifest = hash_including( + "spec" => hash_including( + "template" => hash_including( + "spec" => hash_including( + "containers" => array_including( + hash_including( + "env" => array_including( + hash_including("name" => "TERM_ON_EMPTY", "value" => "1") + ) + ) + ) + ) + ) + ) + ) + expect(Kubeclient::Resource).to receive(:new).with(manifest).and_return(job) + subject.before_enqueue_kubernetes_job + end + end + + context "when the namespace is not included" do + it "sets it to 'default'" do + manifest = hash_including( + "metadata" => hash_including( + "namespace" => "default" + ) + ) + expect(Kubeclient::Resource).to receive(:new).with(manifest).and_return(job) + subject.before_enqueue_kubernetes_job + end + end + + context "when the namespace is set" do + before do + manifest = subject.default_manifest.dup + manifest["metadata"]["namespace"] = "staging" + allow(subject).to receive(:job_manifest).and_return(manifest) + end + + it "retains it" do + manifest = hash_including( + "metadata" => hash_including( + "namespace" => "staging" + ) + ) + expect(Kubeclient::Resource).to receive(:new).with(manifest).and_return(job) + subject.before_enqueue_kubernetes_job + end + end + + end + end + + context "for the gem-global max_workers setting" do + before do + allow(Resque::Kubernetes).to receive(:max_workers).and_return(workers) + end + + include_examples "max workers" + end + + context "for the job-specific max_workers setting" do + before do + allow(Resque::Kubernetes).to receive(:max_workers).and_return(0) + allow(subject).to receive(:max_workers).and_return(workers) end + include_examples "max workers" end end