/
out_ktmemqueue.rb
46 lines (36 loc) · 1.14 KB
/
out_ktmemqueue.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
module Fluent
class KtMemQueueOutput < BufferedOutput
Fluent::Plugin.register_output('ktmemqueue', self)
attr_reader :host, :port, :inputkey, :expire
def initialize
super
require 'memcached'
require 'msgpack'
end
def configure(conf)
super
@host = conf.has_key?('host') ? conf['host'] : 'localhost'
@port = conf.has_key?('port') ? conf['port'].to_i : 11211
@inputkey = conf.has_key?('inputkey') ? conf['inputkey'].to_s : 'fluent-plugin-ktmemqueue'
@expire = conf.has_key?('expire') ? conf['expire'].to_i : 0
end
def start
super
@connect = @host.to_s + ":" + @port.to_s
@memcached = Memcached.new(@connect)
@memcached.set @inputkey, "fluent-plugin-ktmemqueue start", @expire, false
end
def shutdown
@memcached.set @inputkey, "fluent-plugin-ktmemqueue end", @expire, false
@memcached.quit
end
def format(tag, time, record)
[tag, record].to_msgpack
end
def write(chunk)
chunk.msgpack_each do |tag, record|
@memcached.set @inputkey, tag.to_s + " " + record.to_s, @expire, false
end
end
end
end