Skip to content

Commit

Permalink
Refactor internal interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
yuemori committed Aug 15, 2019
1 parent 2e54a93 commit edc9919
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 30 deletions.
8 changes: 5 additions & 3 deletions lib/active_job/adapters/kube_queue_adapter.rb
Expand Up @@ -10,11 +10,12 @@ class KubeQueueAdapter
class << self
# Interface for ActiveJob 4.2
def enqueue(job)
KubeQueue.executor.enqueue(job, ActiveJob::Arguments.serialize(job.arguments))
KubeQueue.executor.enqueue(job)
end

def enqueue_at(job, timestamp)
KubeQueue.executor.enqueue_at(job, ActiveJob::Arguments.serialize(job.arguments), timestamp)
job.scheduled_at = timestamp
KubeQueue.executor.enqueue(job)
end
end

Expand All @@ -24,7 +25,8 @@ def enqueue(job)
end

def enqueue_at(job, timestamp)
KubeQueueAdapter.enqueue_at(job, timestamp)
job.scheduled_at = timestamp
KubeQueueAdapter.enqueue(job)
end
end
end
Expand Down
15 changes: 6 additions & 9 deletions lib/kube_queue/executor.rb
Expand Up @@ -2,15 +2,12 @@

module KubeQueue
class Executor
def enqueue(job, payload)
manifest = ManifestBuilder.new(job, payload).build_job
KubeQueue.client.create_job(manifest)
end

def enqueue_at(job, payload, timestamp)
cron = Time.at(timestamp).utc.strftime("%M %H %d %m %w")
manifest = ManifestBuilder.new(job, payload).build_cron_job(cron)
KubeQueue.client.create_cron_job(manifest)
def enqueue(job)
if job.scheduled_at
KubeQueue.client.create_cron_job(job.manifest)
else
KubeQueue.client.create_job(job.manifest)
end
end
end
end
9 changes: 4 additions & 5 deletions lib/kube_queue/manifest_builder.rb
Expand Up @@ -5,25 +5,24 @@ module KubeQueue
class ManifestBuilder
attr_reader :job

def initialize(job, payload = nil)
def initialize(job)
@job = job
@payload = payload
end

def spec
job.job_spec
end

def payload
@payload ? JSON.generate(@payload, quirks_mode: true) : nil
JSON.generate(job.serialized_payload, quirks_mode: true)
end

def build_job
YAML.safe_load(ERB.new(job.template, nil, "-").result(binding))
YAML.safe_load(ERB.new(job.read_template, nil, "-").result(binding))
end

def build_cron_job(cron)
template = YAML.safe_load(ERB.new(job.template, nil, "-").result(binding))
template = YAML.safe_load(ERB.new(job.read_template, nil, "-").result(binding))

{
apiVersion: "batch/v1beta1",
Expand Down
50 changes: 39 additions & 11 deletions lib/kube_queue/worker.rb
Expand Up @@ -17,11 +17,7 @@ def list

KubeQueue.client.list_job(job_spec.job_class, namespace).map do |res|
worker = KubeQueue.fetch_worker(res.metadata.annotations['kube-queue-job-class'])
job_id = res.metadata.annotations['kube-queue-job-id']
payload = deserialize_annotation_payload(res.metadata.annotations['kube-queue-message-payload'])

job = worker.new(*payload)
job.job_id = job_id
job.resource = res
job
end
Expand All @@ -34,11 +30,10 @@ def find(job_id)

res = KubeQueue.client.get_job(name, namespace)
worker = KubeQueue.fetch_worker(res.metadata.annotations['kube-queue-job-class'])
job_id = res.metadata.annotations['kube-queue-job-id']
payload = deserialize_annotation_payload(res.metadata.annotations['kube-queue-message-payload'])

payload = deserialize_annotation_payload(res.annotations['kube-queue-job-payload'])

job = worker.new(*payload)
job.job_id = job_id
job.resource = res
job
end
Expand All @@ -56,6 +51,10 @@ def read_template
File.read(@template || File.expand_path('../../../template/job.yaml', __FILE__))
end

def manifest
new.manifest
end

private

def deserialize_annotation_payload(payload)
Expand All @@ -81,19 +80,26 @@ def deserialize_annotation_payload(payload)
end
end

def template
def read_template
self.class.read_template
end

def job_spec
self.class.job_spec
end

attr_accessor :job_id, :arguments, :resource
attr_accessor :job_id, :scheduled_at
attr_reader :arguments, :resource

alias_method :payload, :arguments

def initialize(*arguments)
# Compatibility for ActiveJob interface
super
if method(__method__).super_method.arity.zero?
super()
else
super
end

@arguments = arguments
@job_id = SecureRandom.uuid
Expand Down Expand Up @@ -131,10 +137,32 @@ def reload!
load_target
end

def manifest
if scheduled_at
cron = Time.at(scheduled_at).utc.strftime("%M %H %d %m %w")
ManifestBuilder.new(self).build_cron_job(cron)
else
ManifestBuilder.new(self).build_job
end
end

def serialized_payload
if defined?(ActiveJob::Arguments)
ActiveJob::Arguments.serialize(arguments)
else
arguments
end
end

def resource=(resource)
@resource = resource
self.job_id = resource.metadata.annotations['kube-queue-job-id']
end

private

def load_target
@resource = KubeQueue.client.get_job(job_spec.namespace, job_spec.job_name(job_id))
self.resource = KubeQueue.client.get_job(job_spec.namespace, job_spec.job_name(job_id))
@loaded = true
end
end
Expand Down
4 changes: 2 additions & 2 deletions template/job.yaml
Expand Up @@ -2,9 +2,9 @@ apiVersion: batch/v1
kind: Job
metadata:
annotations:
kube-queue-message-payload: '<%= payload %>'
kube-queue-job-class: "<%= spec.job_class %>"
kube-queue-job-id: "<%= job.job_id %>"
kube-queue-job-payload: '<%= payload %>'
name: "<%= spec.job_name(job.job_id) %>"
namespace: <%= spec.namespace %>
labels:
Expand All @@ -16,9 +16,9 @@ spec:
template:
metadata:
annotations:
kube-queue-message-payload: '<%= payload %>'
kube-queue-job-class: "<%= spec.job_class %>"
kube-queue-job-id: "<%= job.job_id %>"
kube-queue-job-payload: '<%= payload %>'
labels:
kube-queue-job: "true"
kube-queue-worker-name: "<%= spec.worker_name %>"
Expand Down

0 comments on commit edc9919

Please sign in to comment.