/
test.rb
131 lines (113 loc) · 3.5 KB
/
test.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
require 'thread'
require 'webrick'
require './lib/lpxc.rb'
Thread.abort_on_exception = true
LOGPLEX_URL = URI('http://localhost:5000/logs')
ENV['LOGPLEX_URL'] = LOGPLEX_URL.to_s
case RUBY_VERSION
when "1.8.7"
require 'test/unit'
LpxcTestBase = Test::Unit::TestCase
when "1.9.3", "2.0.0"
require 'minitest/autorun'
LpxcTestBase = MiniTest::Unit::TestCase
else
raise("Unsupported ruby version: #{RUBY_VERSION}")
end
def Lpxc.clients; @clients end #:nodoc:
class TestServer #:nodoc:
def results; @results; end
def stop; @server.shutdown; end
def start(port=LOGPLEX_URL.port)
@results = []
@server = WEBrick::HTTPServer.new(
:Logger => WEBrick::Log.new("/dev/null"),
:AccessLog => [],
:Port => port)
@server.mount_proc(LOGPLEX_URL.path) {|req, res| @results << req.body}
@server_thread = Thread.new {@server.start}
self
end
end
class LpxcTest < LpxcTestBase #:nodoc:
def setup
@test_server = TestServer.new.start
end
def teardown
@test_server.stop
end
def test_integration
c = Lpxc.new(
:flush_interval=> 0.2,
:batch_size => 1,
:max_reqs_per_conn=> 1,
:request_queue => SizedQueue.new(1))
c.puts('hello world', 't.123')
c.wait
expected = /66 <190>1 [0-9T:\+\-\.]+ myhost t.123 lpxc - - hello world/
assert @test_server.results[0] =~ expected
end
def test_integration_batching
batch_size = 100
c = Lpxc.new(
:request_queue => SizedQueue.new(1),
:flush_interval => 10,
:batch_size => batch_size
)
batch_size.times do
c.puts('hello world', 't.123')
end
c.wait
assert_equal(1, @test_server.results.length)
assert_equal(batch_size, @test_server.results[0].scan(/hello\sworld/).count)
end
def test_fmt
c = Lpxc.new
t = Time.now.utc
actual = c.send(:fmt, {:t => t, :token => 't.123', :msg => 'hello world'})
ts = t.strftime("%Y-%m-%dT%H:%M:%S+00:00")
expected = "66 <190>1 #{ts} myhost t.123 lpxc - - hello world"
assert_equal(expected, actual)
end
def test_request_queue_with_single_token
reqs = SizedQueue.new(1)
c = Lpxc.new(:request_queue => reqs, :batch_size => 2)
c.puts('hello world', 't.123')
c.puts('hello world', 't.123')
c.wait
assert_equal(1, @test_server.results.length)
end
def test_request_queue_with_many_tokens
reqs = SizedQueue.new(2)
c = Lpxc.new(:request_queue => reqs, :batch_size => 2)
c.puts('hello world', 't.123')
c.puts('hello world', 't.124')
c.wait
assert_equal(2, @test_server.results.length)
end
def test_flushing_the_client
reqs = SizedQueue.new(1)
# Make the batch size much larger than the input.
c = Lpxc.new(:request_queue => reqs, :batch_size => 10)
c.puts('hello world', 't.123')
c.flush
c.wait
assert_equal(1, @test_server.results.length)
end
def test_auto_multiplexing
ts2 = TestServer.new.start(5001)
Lpxc.puts("hello first", LOGPLEX_URL.to_s)
Lpxc.puts("hello second", 'http://token:second@localhost:5001/logs')
Lpxc.puts("second again", 'http://token:second@localhost:5001/logs')
Lpxc.puts("hello third", 'http://token:third@localhost:5001/logs')
Lpxc.clients.each {|_, c| c.flush }; sleep 1
assert_equal(2, Lpxc.clients.length)
assert_equal(1, @test_server.results.length)
assert_equal(2, ts2.results.length)
assert_match(/hello second/, ts2.results.first)
assert_match(/second again/, ts2.results.first)
assert_match(/hello third/, ts2.results.last)
ensure
ts2.stop
end
end