Skip to content

Commit

Permalink
feat(data_point): implement base type for ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
kimburgess committed Dec 12, 2019
1 parent 9df3479 commit 5ba215c
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 0 deletions.
50 changes: 50 additions & 0 deletions spec/flux/data_point_spec.cr
@@ -0,0 +1,50 @@
require "../spec_helper"

describe Flux::DataPoint do
describe ".new" do
it "detects missing fields" do
expect_raises(ArgumentError) do
Flux::DataPoint.new "foo"
end
end

it "maps fields into a FieldSet" do
point = Flux::DataPoint.new "foo", a: 1, b: "test", c: 0.0, d: false
point.fields.should eq({
:a => 1,
:b => "test",
:c => 0.0,
:d => false
})
end
end

describe "#tag" do
it "allows tagging of points" do
point = Flux::DataPoint.new "foo", a: 1
point.tag :test, "bar"
point.tags.should eq({:test => "bar"})
end
end

describe "#to_s" do
time = Time.now
ts = time.to_unix

it "serializes a simple point to line protocol" do
point = Flux::DataPoint.new "foo", time, a: 1
point.to_s.should eq("foo a=1 #{ts}")
end

it "serializes a multi-field point to line protocol" do
point = Flux::DataPoint.new "foo", time, a: 1, b: true
point.to_s.should eq("foo a=1,b=t #{ts}")
end

it "serializes when tags are present" do
point = Flux::DataPoint.new "foo", time, a: 1
point.tag :test, "bar"
point.to_s.should eq("foo,test=bar a=1 #{ts}")
end
end
end
64 changes: 64 additions & 0 deletions src/flux/data_point.cr
@@ -0,0 +1,64 @@
# Model and serialization tools for InfluxDB data points.
struct Flux::DataPoint
alias TagSet = Hash(Symbol, String | Symbol)

alias FieldSet = Hash(Symbol, Float32 | Float64 | Int32 | Int64 | Bool | String)

getter measurement : String

getter(tags) { TagSet.new }

getter fields : FieldSet

getter timestamp : Time

def initialize(@measurement, @timestamp = Time.now, @tags = nil, **fields : **T) forall T
raise ArgumentError.new "data points must include at least one field" \
if fields.empty?

@fields = FieldSet.new
{% for k in T %}
@fields[{{k.symbolize}}] = fields[{{k.symbolize}}]
{% end %}
end

def tag(key, value)
tags[key] = value
end

def to_s(io : IO)
io << @measurement

@tags.try(
&.each do |k, v|
io << ','
io << k
io << '='
io << v
end
)

io << ' '

fields.join(',', io) do |(k, v), field|
field << k
field << '='
case v
when String
field << '"'
field << v
field << '"'
when true
field << 't'
when false
field << 'f'
else
field << v
end
end

io << ' '

io << timestamp.to_unix
end
end

0 comments on commit 5ba215c

Please sign in to comment.