This repository has been archived by the owner on Feb 24, 2021. It is now read-only.
forked from InfluxCommunity/influxdb-ruby
-
Notifications
You must be signed in to change notification settings - Fork 0
/
core.rb
111 lines (95 loc) · 3.12 KB
/
core.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
module InfluxDB
module Query # :nodoc: all
# rubocop:disable Metrics/AbcSize
module Core
def ping
get "/ping"
end
# rubocop:disable Metrics/MethodLength
def query(query, opts = {})
precision = opts.fetch(:precision, config.time_precision)
denormalize = opts.fetch(:denormalize, config.denormalize)
url = full_url("/query", q: query, db: config.database, precision: precision)
series = fetch_series(get(url, parse: true))
if block_given?
series.each do |s|
yield s['name'], s['tags'], denormalize ? denormalize_series(s) : raw_values(s)
end
else
denormalize ? denormalized_series_list(series) : series
end
end
# rubocop:enable Metrics/MethodLength
# Example:
# write_points([
# {
# series: 'cpu',
# tags: { host: 'server_nl', regios: 'us' },
# values: {internal: 5, external: 6},
# timestamp: 1422568543702900257
# },
# {
# series: 'gpu',
# values: {value: 0.9999},
# }
# ])
def write_points(data, precision = nil, retention_policy = nil)
data = data.is_a?(Array) ? data : [data]
payload = generate_payload(data)
writer.write(payload, precision, retention_policy)
end
# Example:
# write_point('cpu', tags: {region: 'us'}, values: {internal: 60})
def write_point(series, data, precision = nil, retention_policy = nil)
write_points(data.merge(series: series), precision, retention_policy)
end
def write(data, precision, retention_policy = nil)
precision ||= config.time_precision
params = { db: config.database, precision: precision }
params[:rp] = retention_policy if retention_policy
url = full_url("/write", params)
post(url, data)
end
private
def denormalized_series_list(series)
series.map do |s|
{
'name' => s['name'],
'tags' => s['tags'],
'values' => denormalize_series(s)
}
end
end
def fetch_series(response)
response.fetch('results', [])
.fetch(0, {})
.fetch('series', [])
end
def generate_payload(data)
data.map do |point|
InfluxDB::PointValue.new(point).dump
end.join("\n")
end
def execute(query, options = {})
url = full_url("/query", q: query)
get(url, options)
end
def denormalize_series(series)
Array(series["values"]).map do |values|
Hash[series["columns"].zip(values)]
end
end
def raw_values(series)
series.select { |k, _| %w(columns values).include?(k) }
end
def full_url(path, params = {})
unless basic_auth?
params[:u] = config.username
params[:p] = config.password
end
query = params.map { |k, v| [CGI.escape(k.to_s), "=", CGI.escape(v.to_s)].join }.join("&")
URI::Generic.build(path: File.join(config.prefix, path), query: query).to_s
end
end
end
end