Permalink
Browse files

Initial commit. Gem contains UniqueJobs

  • Loading branch information...
0 parents commit fea7411d7852d6dfe5da08cc94a2ad59f19cae9c @krasnoukhov committed Aug 21, 2012
@@ -0,0 +1,17 @@
+*.gem
+*.rbc
+.bundle
+.config
+.yardoc
+Gemfile.lock
+InstalledFiles
+_yardoc
+coverage
+doc/
+lib/bundler/man
+pkg
+rdoc
+spec/reports
+test/tmp
+test/version_tmp
+tmp
@@ -0,0 +1,2 @@
+source 'https://rubygems.org'
+gemspec
22 LICENSE
@@ -0,0 +1,22 @@
+Copyright (c) 2012 Dmitry Krasnoukhov
+
+MIT License
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
@@ -0,0 +1,50 @@
+# Additional sidekiq middleware
+
+This gem provides additional middleware for [Sidekiq](github.com/mperham/sidekiq/).
+
+Now it contains the following middlewares:
+
+* UniqueJobs (both client and server)
+
+## Installation
+
+Add this line to your application's Gemfile:
+
+ gem 'sidekiq-middleware'
+
+And then execute:
+
+ $ bundle
+
+Or install it yourself as:
+
+ $ gem install sidekiq-middleware
+
+## Usage
+
+For example (put this code in initialize section):
+
+ Sidekiq.configure_server do |config|
+ config.server_middleware do |chain|
+ chain.add Sidekiq::Middleware::Server::UniqueJobs
+ end
+ config.client_middleware do |chain|
+ chain.add Sidekiq::Middleware::Client::UniqueJobs
+ end
+ end
+
+ Sidekiq.configure_client do |config|
+ config.client_middleware do |chain|
+ chain.add Sidekiq::Middleware::Client::UniqueJobs
+ end
+ end
+
+See [Sidekiq Wiki](https://github.com/mperham/sidekiq/wiki/Middleware) for more details.
+
+## Contributing
+
+1. Fork it
+2. Create your feature branch (`git checkout -b my-new-feature`)
+3. Commit your changes (`git commit -am 'Added some feature'`)
+4. Push to the branch (`git push origin my-new-feature`)
+5. Create new Pull Request
@@ -0,0 +1,2 @@
+#!/usr/bin/env rake
+require "bundler/gem_tasks"
@@ -0,0 +1,3 @@
+require "sidekiq-middleware/version"
+require "sidekiq-middleware/server/unique_jobs"
+require "sidekiq-middleware/client/unique_jobs"
@@ -0,0 +1,50 @@
+require 'digest'
+
+module Sidekiq
+ module Middleware
+ module Client
+ class UniqueJobs
+ HASH_KEY_EXPIRATION = 30 * 60
+
+ def call(worker_class, item, queue)
+ enabled = worker_class.get_sidekiq_options['unique']
+
+ if enabled
+ unique = false
+
+ # Enabled unique scheduled
+ if enabled == :all && item.has_key?('at')
+ expiration = worker_class.get_sidekiq_options['expiration'] || (item['at'].to_i - Time.new.to_i)
+ payload = item.clone
+ payload.delete('at')
+ payload.delete('jid')
+ payload_hash = Digest::MD5.hexdigest(Sidekiq.dump_json(Hash[payload.sort]))
+ else
+ expiration = worker_class.get_sidekiq_options['expiration'] || HASH_KEY_EXPIRATION
+ payload = item.clone
+ payload.delete('jid')
+ payload_hash = Digest::MD5.hexdigest(Sidekiq.dump_json(Hash[payload.sort]))
+ end
+
+ Sidekiq.redis do |conn|
+ conn.watch(payload_hash)
+
+ if conn.get(payload_hash)
+ conn.unwatch
+ else
+ unique = conn.multi do
+ conn.setex(payload_hash, expiration, 1)
+ end
+ end
+ end
+
+ yield if unique
+ else
+ yield
+ end
+ end
+
+ end
+ end
+ end
+end
@@ -0,0 +1,41 @@
+module Sidekiq
+ module Middleware
+ module Server
+ class UniqueJobs
+
+ def call(worker_instance, item, queue)
+ forever = worker_instance.class.get_sidekiq_options['forever']
+
+ # Delete lock first if forever is set
+ # Used for jobs which may scheduling self in future
+ clear(worker_instance, item, queue) if forever
+
+ begin
+ yield
+ ensure
+ clear(worker_instance, item, queue) unless forever
+ end
+ end
+
+ def clear(worker_instance, item, queue)
+ enabled = worker_instance.class.get_sidekiq_options['unique']
+
+ # Enabled unique scheduled
+ if enabled == :all && item.has_key?('at')
+ payload = item.clone
+ payload.delete('at')
+ payload.delete('jid')
+ payload_hash = Digest::MD5.hexdigest(Sidekiq.dump_json(Hash[payload.sort]))
+ else
+ payload = item.clone
+ payload.delete('jid')
+ payload_hash = Digest::MD5.hexdigest(Sidekiq.dump_json(Hash[payload.sort]))
+ end
+
+ Sidekiq.redis { |conn| conn.del(payload_hash) }
+ end
+
+ end
+ end
+ end
+end
@@ -0,0 +1,5 @@
+module Sidekiq
+ module Middleware
+ VERSION = "0.0.1"
+ end
+end
@@ -0,0 +1,18 @@
+# -*- encoding: utf-8 -*-
+require File.expand_path('../lib/sidekiq-middleware/version', __FILE__)
+
+Gem::Specification.new do |gem|
+ gem.authors = ["Dmitry Krasnoukhov"]
+ gem.email = ["dmitry@krasnoukhov.com"]
+ gem.description = gem.summary = "Additional sidekiq middleware"
+ gem.homepage = "http://github.com/krasnoukhov/sidekiq-middleware"
+
+ gem.files = `git ls-files`.split($\)
+ gem.executables = gem.files.grep(%r{^bin/}).map{ |f| File.basename(f) }
+ gem.test_files = gem.files.grep(%r{^(test|spec|features)/})
+ gem.name = "sidekiq-middleware"
+ gem.require_paths = ["lib"]
+ gem.version = Sidekiq::Middleware::VERSION
+
+ gem.add_dependency 'sidekiq'
+end

0 comments on commit fea7411

Please sign in to comment.