This repository has been archived by the owner on Sep 26, 2021. It is now read-only.
/
yup.rb
121 lines (102 loc) · 3.42 KB
/
yup.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
require 'uri'
require 'rubygems'
require 'eventmachine'
require 'logger'
require 'yajl'
require 'tmpdir'
require 'yup/version'
require 'yup/request_forwarder'
require 'yup/request_handler'
require 'yup/state'
module Yup
@@resend_delay = 60.0
def self.resend_delay; @@resend_delay end
def self.resend_delay=(seconds); @@resend_delay = seconds end
@@watermark = 100
def self.watermark; @@watermark end
def self.watermark=(seconds); @@watermark = seconds end
@@logger = Logger.new(STDOUT)
def self.logger; @@logger end
def self.logger=(logger); @@logger = logger end
@@retry_unless_2xx = false
def self.retry_unless_2xx; @@retry_unless_2xx end
def self.retry_unless_2xx=(bool); @@retry_unless_2xx = bool end
def self.run(config)
host = config[:listen_host] || 'localhost'
port = config[:listen_port] || 8080
status_code = config[:status_code] || 200
forward_to = config[:forward_to]
timeout = config[:timeout] || 60
EM.run do
EM.start_server(host, port, RequestHandler, forward_to, status_code, nil, timeout)
logger.info { "listening on #{host}:#{port}" }
end
end
def self.run_with_state(config)
case State.state_type(config[:persistent])
when :bdb
self.run_with_bdb(config)
when :redis
self.run_with_redis(config)
else
abort "Unknown scheme of persistent queue."
end
end
def self.run_with_bdb(config)
require 'yup/state/bdb'
host = config[:listen_host] || 'localhost'
port = config[:listen_port] || 8080
status_code = config[:status_code] || 200
forward_to = config[:forward_to]
dbpath = config[:persistent]
timeout = config[:timeout] || 60
feedback_channel = File.join(Dir.tmpdir, "yupd-#{$$}-feedback")
state = State::BDB.new(dbpath, forward_to, feedback_channel)
pid = Process.fork do
State::BDB::RequestForwarder.new(state, forward_to, timeout).run_loop
end
if pid
db_closer = proc do
Yup.logger.info { "Terminating consumer #{$$}" }
Process.kill("KILL", pid)
state.dispose()
exit 0
end
Signal.trap("TERM", &db_closer)
Signal.trap("INT", &db_closer)
end
EM.run do
EM.start_unix_domain_server(feedback_channel, State::BDB::FeedbackHandler, state)
logger.info { "Feedback through #{feedback_channel}" }
EM.start_server(host, port, RequestHandler, forward_to, status_code, state, timeout)
logger.info { "Listening on #{host}:#{port}" }
end
end
def self.run_with_redis(config)
require 'yup/state/redis'
host = config[:listen_host] || 'localhost'
port = config[:listen_port] || 8080
status_code = config[:status_code] || 200
forward_to = config[:forward_to]
dbpath = config[:persistent]
timeout = config[:timeout] || 60
state = State::Redis.new(dbpath, forward_to)
pid = Process.fork do
State::Redis::RequestForwarder.new(state, forward_to, timeout).run_loop
end
if pid
db_closer = proc do
Yup.logger.info { "Terminating consumer #{$$}" }
Process.kill("KILL", pid)
state.dispose()
exit 0
end
Signal.trap("TERM", &db_closer)
Signal.trap("INT", &db_closer)
end
EM.run do
EM.start_server(host, port, RequestHandler, forward_to, status_code, state, timeout)
logger.info { "Listening on #{host}:#{port}" }
end
end
end