Skip to content
Run processing background job on Kubernetes for Ruby
Ruby Dockerfile Shell
Branch: master
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
bin Fix test app Aug 15, 2019
examples Add none-argument example to myapp Aug 15, 2019
exe Fix to work on rails app Aug 12, 2019
gemfiles
lib Fix unstable code Aug 15, 2019
spec Fix unstable test Aug 15, 2019
template Refactor internal interfaces Aug 15, 2019
.dockerignore Initial implementation Aug 10, 2019
.gcloudignore Add rails app for test Aug 12, 2019
.gitignore Initial implementation Aug 10, 2019
.rspec
.rubocop.yml Fix CI was failure Aug 15, 2019
.travis.yml Update slack notification Aug 15, 2019
Appraisals Modify appraisal naming Aug 15, 2019
CHANGELOG.md Add CHANGELOG Aug 15, 2019
CODE_OF_CONDUCT.md Initial commit Aug 6, 2019
Dockerfile Move example files to under example directory Aug 12, 2019
Dockerfile.myapp Fix test app Aug 15, 2019
Gemfile
Guardfile Add specs Aug 15, 2019
LICENSE.txt Initial commit Aug 6, 2019
README.md Add build status badge Aug 15, 2019
Rakefile
cloudbuild.yaml Add rails app for test Aug 12, 2019
kube_queue.gemspec Add development dependency Aug 15, 2019

README.md

KubeQueue

Build Status

Installation

Add this line to your application's Gemfile:

gem 'kube_queue'

And then execute:

$ bundle

Or install it yourself as:

$ gem install kube_queue

Getting Started

Implement worker:

class TestWorker
  include KubeQueue::Worker

  job_name 'kube-queue-test'
  image "my-registry/my-image"
  container_name 'kube-queue-test'

  command 'bundle', 'exec', 'kube_queue', 'TestWorker', '-r', './test_worker.rb'

  def perform(payload)
    puts payload['message']
  end
end

Setting kubernetes configuration.

KubeQueue.kubernetes_configure do |client|
  client.url = ENV['K8S_URL']
  client.ssl_ca_file = ENV['K8S_CA_CERT_FILE']
  client.auth_token = File.read(ENV['K8S_TOKEN'])
end

and run:

TestWorker.enqueue(message: 'hello')

# delay
TestWorker.enqueue_at(message: 'hello', Time.now + 100)

ActiveJob Support

Write to application.rb:

Rails.application.config.active_job.adapter = :kube_queue

Just put your job into app/jobs . Example:

# app/jobs/print_message_job.rb
class PrintMessageJob < ApplicationJob
  include KubeQueue::Worker

  worker_name 'print-message-job'
  image "your-registry/your-image"
  container_name 'your-container-name'

  def perform(payload)
    logger.info payload[:message]
  end
end

and run:

irb(main):001:0> job = PrintMessageJob.perform_later(message: 'hello, kubernetes!')
Enqueued PrintMessageJob (Job ID: 0bf15b35-62d8-4380-9173-99839ce735ff) to KubeQueue(default) with arguments: {:message=>"hello, kubernetes!"}
=> #<PrintMessageJob:0x00007fbfd00c7848 @arguments=[{:message=>"hello, kubernetes!"}], @job_id="0bf15b35-62d8-4380-9173-99839ce735ff", @queue_name="default", @priority=nil, @executions=0>
irb(main):002:0> job.status
=> #<K8s::Resource startTime="2019-08-12T15:56:37Z", active=1>
irb(main):003:0> job.status
=> #<K8s::Resource conditions=[{:type=>"Complete", :status=>"True", :lastProbeTime=>"2019-08-12T15:57:03Z", :lastTransitionTime=>"2019-08-12T15:57:03Z"}], startTime="2019-08-12T15:56:37Z", completionTime="2019-08-12T15:57:03Z", succeeded=1>

See more examples in here.

Run job on locally

bundle exec kube_queue runner JOB_NAME [PAYLOAD]

See more information by kube_queue help or read here.

Advanced Tips

Get a job status

job = ComputePiJob.perform_later
job.status

scheduled job dosent supported now.

Check a generating manifest

# from class
puts ComputePiJob.manifest

# from instance
job = ComputePiJob.perform_later
puts job.manifest

Retry job

Kubernetes Job has a own retry mechanism, if set backoff_limit and/or restart_policy to use it.

class ComputePiJob
  include KubeQueue::Worker

  worker_name 'pi'
  image 'perl'
  container_name 'pi'
  command "perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"

  backoff_limit 10
  restart_policy 'Never'
end

More information, see the official document here.

Timeout

Kubernetes Job has a own timeout mechanism, if set the active_deadline_seconds to use it.

class ComputePiJob
  include KubeQueue::Worker

  worker_name 'pi'
  image 'perl'
  container_name 'pi'
  command "perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"

  active_deadline_seconds 300
end

More information, see the official document here.

Managing container resources

When you specify a Pod, you can optional specify hou much CPU and memory container needs.

class ComputePiJob
  include KubeQueue::Worker

  worker_name 'pi'
  image 'perl'
  container_name 'pi'
  command "perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"

  cpu_limit '0.3'
  cpu_request '0.2'
  memory_limit '100m'
  memory_request '50m'
end

More information, see the official document here.

Use environment variable from ConfigMap/Secret

class ComputePiJob
  include KubeQueue::Worker

  worker_name 'pi'
  image 'perl'
  container_name 'pi'
  command "perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"

  env_from_secret 'mysecret1', 'mysecret2'
  env_from_config_map 'myapp'
end

Features

  • Add tests.
  • Support multiple kubernetes client configuration.
  • Logging informations.
  • Support to get CronJob status.

Development(on GCP/GKE)

setup:

# create service account and cluster role.
kubectl apply -f examples/k8s/service-account.yaml

# get ca.crt and token
kubectl get secret -n kube-system kube-queue-test-token-xxx -o jsonpath="{['data']['token']}" | base64 -d > secrets/token
kubectl get secret -n kube-system kube-queue-test-token-xxx -o jsonpath="{['data']['ca\.crt']}" | base64 -d > secrets/ca.crt

# build image
gcloud builds submit --config cloudbuild.yaml .

run:

K8S_URL=https://xx.xxx.xxx.xxx K8S_CA_CERT_FILE=$(pwd)/secrets/ca.crt K8S_TOKEN=$(pwd)/secrets/token IMAGE_NAME=gcr.io/your-project/kube-queue bin/console

irb(main):001:0> TestWorker.enqueue(message: 'hello, kubernetes!')
You can’t perform that action at this time.