/
opentsdb.rb
97 lines (83 loc) · 2.98 KB
/
opentsdb.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# encoding: utf-8
require "logstash/outputs/base"
require "logstash/namespace"
require "socket"
# This output allows you to pull metrics from your logs and ship them to
# opentsdb. Opentsdb is an open source tool for storing and graphing metrics.
#
class LogStash::Outputs::Opentsdb < LogStash::Outputs::Base
config_name "opentsdb"
# The address of the opentsdb server.
config :host, :validate => :string, :default => "localhost"
# The port to connect on your graphite server.
config :port, :validate => :number, :default => 4242
# The metric(s) to use. This supports dynamic strings like %{source_host}
# for metric names and also for values. This is an array field with key
# of the metric name, value of the metric value, and multiple tag,values . Example:
# [source,ruby]
# [
# "%{host}/uptime",
# %{uptime_1m} " ,
# "hostname" ,
# "%{host}
# "anotherhostname" ,
# "%{host}
# ]
#
# The value will be coerced to a floating point value. Values which cannot be
# coerced will zero (0)
config :metrics, :validate => :array, :required => true
def register
connect
end # def register
def connect
# TODO(sissel): Test error cases. Catch exceptions. Find fortune and glory.
begin
@socket = TCPSocket.new(@host, @port)
rescue Errno::ECONNREFUSED => e
@logger.warn("Connection refused to opentsdb server, sleeping...",
:host => @host, :port => @port)
sleep(2)
retry
end
end # def connect
public
def receive(event)
# Opentsdb message format: put metric timestamp value tagname=tagvalue tag2=value2\n
# Catch exceptions like ECONNRESET and friends, reconnect on failure.
begin
name = metrics[0]
value = metrics[1]
tags = metrics[2..-1]
# The first part of the message
message = ['put',
event.sprintf(name),
event.sprintf("%{+%s}"),
event.sprintf(value),
].join(" ")
# If we have have tags we need to add it to the message
event_tags = []
unless tags.nil?
Hash[*tags.flatten].each do |tag_name,tag_value|
# Interprete variables if neccesary
real_tag_name = event.sprintf(tag_name)
real_tag_value = event.sprintf(tag_value)
event_tags << [real_tag_name , real_tag_value ].join('=')
end
message+=' '+event_tags.join(' ')
end
# TODO(sissel): Test error cases. Catch exceptions. Find fortune and glory.
begin
@socket.puts(message)
rescue Errno::EPIPE, Errno::ECONNRESET => e
@logger.warn("Connection to opentsdb server died",
:exception => e, :host => @host, :port => @port)
sleep(2)
connect
end
# TODO(sissel): resend on failure
# TODO(sissel): Make 'resend on failure' tunable; sometimes it's OK to
# drop metrics.
end # @metrics.each
end # def receive
end # class LogStash::Outputs::Opentsdb