Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

added timezone support

  • Loading branch information...
commit 4616cf0d9464f69efd3521f3fab1aff92ad6b7a6 1 parent a0bcbdd
@frsyuki frsyuki authored
View
1  Rakefile
@@ -16,6 +16,7 @@ begin
gemspec.add_dependency "sequel", "~> 3.26.0"
gemspec.add_dependency "aws-sdk", "~> 1.1.1"
gemspec.add_dependency "perfectqueue", "~> 0.7.0"
+ gemspec.add_dependency "tzinfo", "~> 0.3.29"
gemspec.test_files = Dir["test/**/*.rb", "test/**/*.sh"]
gemspec.files = Dir["bin/**/*", "lib/**/*"]
gemspec.executables = ['perfectsched']
View
25 lib/perfectsched/backend.rb
@@ -3,15 +3,16 @@ module PerfectSched
class Task
- def initialize(id, time, cron, delay, data)
+ def initialize(id, time, cron, delay, data, timezone=nil)
@id = id
@time = time
@cron = cron
@delay = delay
@data = data
+ @timezone = timezone
end
- attr_reader :id, :time, :cron, :delay, :data
+ attr_reader :id, :time, :cron, :delay, :data, :timezone
end
@@ -33,14 +34,15 @@ def finish(token, next_time)
end
# => true (success) or nil (already exists)
- def add(id, cron, delay, data, start_time)
- first_time = @croncalc.next_time(cron, start_time.to_i)
+ def add(id, cron, delay, data, start_time, timezone=nil)
+ timezone = TZInfo::Timezone.get(timezone).name if timezone # normalize
+ first_time = @croncalc.next_time(cron, start_time.to_i, timezone)
timeout = first_time + delay
- add_checked(id, cron, delay, data, first_time, timeout)
+ add_checked(id, cron, delay, data, first_time, timeout, timezone)
end
# => true (success) or nil (already exists)
- def add_checked(id, cron, delay, data, next_time, timeout)
+ def add_checked(id, cron, delay, data, next_time, timeout, timezone)
end
# => true (success) or false (not found, canceled or finished)
@@ -48,19 +50,20 @@ def delete(id)
end
# => true (success) or false (not found)
- def modify(id, cron, delay, data)
+ def modify(id, cron, delay, data, timezone)
cron = cron.strip
- @croncalc.next_time(cron, 0)
- modify_checked(id, cron, delay, data)
+ @croncalc.next_time(cron, 0, timezone)
+ modify_checked(id, cron, delay, data, timezone)
end
- def modify_checked(id, cron, delay, data)
+ def modify_checked(id, cron, delay, data, timezone)
end
# => true (success) or false (not found)
def modify_sched(id, cron, delay)
+ cron_, delay_, data_, timezone = get(id)
cron = cron.strip
- @croncalc.next_time(cron, 0)
+ @croncalc.next_time(cron, 0, timezone)
modify_sched_checked(id, cron, delay)
end
View
68 lib/perfectsched/backend/rdb.rb
@@ -15,37 +15,25 @@ def initialize(uri, table)
}
end
- private
- #def init_db(type)
- # sql = ''
- # case type
- # when /mysql/i
- # sql << "CREATE TABLE IF NOT EXISTS `#{@table}` ("
- # sql << " id VARCHAR(256) NOT NULL,"
- # sql << " timeout INT NOT NULL,"
- # sql << " next_time INT NOT NULL,"
- # sql << " cron VARCHAR(128) NOT NULL,"
- # sql << " delay INT NOT NULL,"
- # sql << " data BLOB NOT NULL,"
- # sql << " PRIMARY KEY (id)"
- # sql << ") ENGINE=INNODB;"
- # else
- # sql << "CREATE TABLE IF NOT EXISTS `#{@table}` ("
- # sql << " id VARCHAR(256) NOT NULL,"
- # sql << " timeout INT NOT NULL,"
- # sql << " next_time INT NOT NULL,"
- # sql << " cron VARCHAR(128) NOT NULL,"
- # sql << " delay INT NOT NULL,"
- # sql << " data BLOB NOT NULL,"
- # sql << " PRIMARY KEY (id)"
- # sql << ");"
- # end
- # # TODO index
- # connect {
- # @db.run sql
- # }
- #end
+ def create_tables
+ sql = ''
+ sql << "CREATE TABLE IF NOT EXISTS `#{@table}` ("
+ sql << " id VARCHAR(256) NOT NULL,"
+ sql << " timeout INT NOT NULL,"
+ sql << " next_time INT NOT NULL,"
+ sql << " cron VARCHAR(128) NOT NULL,"
+ sql << " delay INT NOT NULL,"
+ sql << " data BLOB NOT NULL,"
+ sql << " timezone VARCHAR(256) NULL,"
+ sql << " PRIMARY KEY (id)"
+ sql << ");"
+ # TODO index
+ connect {
+ @db.run sql
+ }
+ end
+ private
def connect(&block)
begin
block.call
@@ -56,8 +44,8 @@ def connect(&block)
public
def list(&block)
- @db.fetch("SELECT id, timeout, next_time, cron, delay, data FROM `#{@table}` ORDER BY timeout ASC") {|row|
- yield row[:id], row[:cron], row[:delay], row[:data], row[:next_time], row[:timeout]
+ @db.fetch("SELECT id, timeout, next_time, cron, delay, data, timezone FROM `#{@table}` ORDER BY timeout ASC") {|row|
+ yield row[:id], row[:cron], row[:delay], row[:data], row[:next_time], row[:timeout], row[:timezone]
}
end
@@ -67,12 +55,12 @@ def acquire(timeout, now=Time.now.to_i)
connect {
while true
rows = 0
- @db.fetch("SELECT id, timeout, next_time, cron, delay, data FROM `#{@table}` WHERE timeout <= ? ORDER BY timeout ASC LIMIT #{MAX_SELECT_ROW};", now) {|row|
+ @db.fetch("SELECT id, timeout, next_time, cron, delay, data, timezone FROM `#{@table}` WHERE timeout <= ? ORDER BY timeout ASC LIMIT #{MAX_SELECT_ROW};", now) {|row|
n = @db["UPDATE `#{@table}` SET timeout=? WHERE id=? AND timeout=?;", timeout, row[:id], row[:timeout]].update
salt = timeout
if n > 0
- return [row[:id],salt], Task.new(row[:id], row[:next_time], row[:cron], row[:delay], row[:data])
+ return [row[:id],salt], Task.new(row[:id], row[:next_time], row[:cron], row[:delay], row[:data], row[:timezone])
end
rows += 1
@@ -92,10 +80,10 @@ def finish(token, next_time, timeout)
}
end
- def add_checked(id, cron, delay, data, next_time, timeout)
+ def add_checked(id, cron, delay, data, next_time, timeout, timezone)
connect {
begin
- n = @db["INSERT INTO `#{@table}` (id, timeout, next_time, cron, delay, data) VALUES (?, ?, ?, ?, ?, ?);", id, timeout, next_time, cron, delay, data].insert
+ n = @db["INSERT INTO `#{@table}` (id, timeout, next_time, cron, delay, data, timezone) VALUES (?, ?, ?, ?, ?, ?, ?);", id, timeout, next_time, cron, delay, data, timezone].insert
return true
rescue Sequel::DatabaseError
return nil
@@ -112,16 +100,16 @@ def delete(id)
def get(id)
connect {
- @db.fetch("SELECT id, timeout, next_time, cron, delay, data FROM `#{@table}` WHERE id=?;", id) {|row|
- return row[:cron], row[:delay], row[:data]
+ @db.fetch("SELECT id, timeout, next_time, cron, delay, data, timezone FROM `#{@table}` WHERE id=?;", id) {|row|
+ return row[:cron], row[:delay], row[:data], row[:timezone]
}
return nil
}
end
- def modify_checked(id, cron, delay, data)
+ def modify_checked(id, cron, delay, data, timezone)
connect {
- n = @db["UPDATE `#{@table}` SET cron=?, delay=?, data=? WHERE id=?;", cron, delay, data, id].update
+ n = @db["UPDATE `#{@table}` SET cron=?, delay=?, data=?, timezone=? WHERE id=?;", cron, delay, data, timezone, id].update
return n > 0
}
end
View
31 lib/perfectsched/backend/simpledb.rb
@@ -28,7 +28,7 @@ def use_consistent_read(b=true)
def list(&block)
rows = 0
- @domain.items.select('timeout', 'next_time', 'cron', 'delay', 'data',
+ @domain.items.select('timeout', 'next_time', 'cron', 'delay', 'data', 'timezone',
:where => "timeout > '#{int_encode(0)}'", # required by SimpleDB
:order => [:timeout, :asc],
:consistent_read => @consistent_read,
@@ -40,9 +40,10 @@ def list(&block)
cron = attrs['cron'].first
delay = int_decode(attrs['delay'].first)
data = attrs['data'].first
+ timezone = attrs['timezone'].first
timeout = int_decode(attrs['timeout'].first)
- yield id, cron, delay, data, next_time, timeout
+ yield id, cron, delay, data, next_time, timeout, timezone
}
end
@@ -51,7 +52,7 @@ def list(&block)
def acquire(timeout, now=Time.now.to_i)
while true
rows = 0
- @domain.items.select('timeout', 'next_time', 'cron', 'delay', 'data',
+ @domain.items.select('timeout', 'next_time', 'cron', 'delay', 'data', 'timezone',
:where => "timeout <= '#{int_encode(now)}'",
:order => [:timeout, :asc],
:consistent_read => @consistent_read,
@@ -67,9 +68,10 @@ def acquire(timeout, now=Time.now.to_i)
cron = attrs['cron'].first
delay = int_decode(attrs['delay'].first)
data = attrs['data'].first
+ timezone = attrs['timezone'].first
salt = int_encode(timeout)
- return [id,salt], Task.new(id, next_time, cron, delay, data)
+ return [id,salt], Task.new(id, next_time, cron, delay, data, timezone)
rescue AWS::SimpleDB::Errors::ConditionalCheckFailed, AWS::SimpleDB::Errors::AttributeDoesNotExist
end
@@ -93,11 +95,17 @@ def finish(token, next_time, timeout)
end
end
- def add_checked(id, cron, delay, data, next_time, timeout)
+ def add_checked(id, cron, delay, data, next_time, timeout, timezone)
begin
- @domain.items[id].attributes.replace('timeout'=>int_encode(timeout), 'next_time'=>int_encode(next_time),
- 'cron'=>cron, 'delay'=>int_encode(delay), 'data'=>data,
- :unless=>'timeout')
+ hash = {}
+ hash['timeout'] = int_encode(timeout)
+ hash['next_time'] = int_encode(next_time)
+ hash['cron'] = cron
+ hash['delay'] = int_encode(delay)
+ hash['data'] = data
+ hash['timezone'] = timezone if timezone
+ hash[:unless] = 'timeout'
+ @domain.items[id].attributes.replace()
return true
rescue AWS::SimpleDB::Errors::ConditionalCheckFailed, AWS::SimpleDB::Errors::ExistsAndExpectedValue
return nil
@@ -122,14 +130,15 @@ def get(id)
end
delay = int_decode(attrs['delay'].first)
data = attrs['data'].first
- return cron, delay, data
+ timezone = attrs['timezone'].first
+ return cron, delay, data, timezone
end
- def modify_checked(id, cron, delay, data)
+ def modify_checked(id, cron, delay, data, timezone)
unless get(id)
return false
end
- @domain.items[id].attributes.replace('cron'=>cron, 'delay'=>int_encode(delay), 'data'=>data)
+ @domain.items[id].attributes.replace('cron'=>cron, 'delay'=>int_encode(delay), 'data'=>data, 'timezone'=>timezone)
return true
end
View
23 lib/perfectsched/command/perfectsched.rb
@@ -52,6 +52,10 @@
add_conf[:delay] = i
}
+op.on('-t', '--timezone NAME', 'Set timezone (default: localtime)') {|s|
+ add_conf[:timezone] = s
+}
+
op.on('-s', '--start UNIXTIME', 'Start time to run a schedule (default: now)', Integer) {|i|
add_conf[:start] = i
}
@@ -143,13 +147,13 @@
timeout: 300
poll_interval: 1
backend:
- database: "mysql://user:password@localhost/mydb"
+ database: "mysql2://user:password@localhost/mydb"
table: "perfectsched"
#simpledb: your-simpledb-domain-name-for-scheduler
#aws_key_id: "AWS_ACCESS_KEY_ID"
#aws_secret_key: "AWS_SECRET_ACCESS_KEY"
queue:
- database: "mysql://user:password@localhost/mydb"
+ database: "mysql2://user:password@localhost/mydb"
table: "perfectqueue"
#simpledb: your-simpledb-domain-name-for-queue
#aws_key_id: "AWS_ACCESS_KEY_ID"
@@ -228,12 +232,12 @@
case type
when :list
- format = "%26s %20s %8s %26s %26s %s"
- puts format % ["id", "schedule", "delay", "next time", "next run", "data"]
- time_format = "%Y-%m-%d %H:%M:%S %z"
+ format = "%26s %18s %8s %20s %20s %20s %s"
+ puts format % ["id", "schedule", "delay", "next time", "next run", "timezone", "data"]
+ time_format = "%Y-%m-%d %H:%M:%S"
n = 0
- backend.list {|id,cron,delay,data,next_time,timeout|
- puts format % [id, cron, delay, Time.at(next_time).strftime(time_format), Time.at(timeout).strftime(time_format), data]
+ backend.list {|id,cron,delay,data,next_time,timeout,timezone|
+ puts format % [id, cron, delay, Time.at(next_time).utc.strftime(time_format), Time.at(timeout).utc.strftime(time_format), timezone, data]
n += 1
}
puts "#{n} entries."
@@ -252,8 +256,9 @@
data = add_conf[:data]
delay = add_conf[:delay]
start = add_conf[:start] || Time.now.to_i
+ timezone = add_conf[:timezone]
- added = backend.add(id, cron, delay, data, start)
+ added = backend.add(id, cron, delay, data, start, timezone)
if added
puts "Schedule id=#{id} is added."
else
@@ -262,7 +267,7 @@
end
when :modify_sched, :modify_delay, :modify_data
- cron, delay, data = backend.get(id)
+ cron, delay, data, timezone = backend.get(id)
unless cron
puts "Schedule id=#{id} does not exist."
exit 1
View
11 lib/perfectsched/croncalc.rb
@@ -5,16 +5,19 @@ module PerfectSched
class CronCalc
def initialize
require 'cron-spec'
+ require 'tzinfo'
# TODO optimize
end
- def next_time(cron, time)
- t = Time.at(time)
+ def next_time(cron, time, timezone)
tab = CronSpec::CronSpecification.new(cron)
+ tz = TZInfo::Timezone.get(timezone) if timezone
while true
- t += 60
+ time += 60
+ t = Time.at(time)
+ t = tz.utc_to_local(t.utc) if tz
if tab.is_specification_in_effect?(t)
- return t.to_i
+ return time
end
# FIXME break
end
View
2  lib/perfectsched/engine.rb
@@ -42,7 +42,7 @@ def process(token, task)
@queue.submit(id, task.data)
# ignore already exists error
- next_time = @croncalc.next_time(task.cron, task.time)
+ next_time = @croncalc.next_time(task.cron, task.time, task.timezone)
next_run = next_time + task.delay
@backend.finish(token, next_time, next_run)
View
29 test/backend_test.rb
@@ -17,7 +17,9 @@ def clean_backend
def open_backend
#PerfectSched::SimpleDBBackend.new(ENV['AWS_ACCESS_KEY_ID'], ENV['AWS_SECRET_ACCESS_KEY'], 'perfectsched-test-1').use_consistent_read
- PerfectSched::RDBBackend.new(DB_URI, "perfectdb_test")
+ db = PerfectSched::RDBBackend.new(DB_URI, "perfectdb_test")
+ db.create_tables
+ db
end
it 'acquire' do
@@ -176,7 +178,7 @@ def open_backend
assert_equal 10, delay
assert_equal 'data2', data
- ok = db1.modify(key, "* * * * 2", 20, "data3")
+ ok = db1.modify(key, "* * * * 2", 20, "data3", nil)
assert_equal true, ok
cron, delay, data = db1.get(key)
@@ -184,5 +186,28 @@ def open_backend
assert_equal 20, delay
assert_equal 'data3', data
end
+
+ it 'timezone' do
+ clean_backend
+
+ db1 = open_backend
+ time = 1323820800 # 2011-12-14 00:00:00 UTC
+
+ ok = db1.add(@key_prefix+'test1', "0 0 * * *", 0, '', time-60, 'UTC')
+ assert_equal true, ok
+
+ ok = db1.add(@key_prefix+'test2', "0 0 * * *", 0, '', time-60, 'Asia/Tokyo')
+ assert_equal true, ok
+
+ token, task = db1.acquire(time+86400, time)
+ assert_not_equal nil, task
+ assert_equal @key_prefix+'test1', task.id
+ assert_equal time, task.time
+
+ token, task = db1.acquire(time+54000+86400, time+54000)
+ assert_not_equal nil, task
+ assert_equal @key_prefix+'test2', task.id
+ assert_equal time+54000, task.time
+ end
end
Please sign in to comment.
Something went wrong with that request. Please try again.