Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
koudelka committed Apr 25, 2015
0 parents commit 3f4910c
Show file tree
Hide file tree
Showing 21 changed files with 682 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/_build
/deps
erl_crash.dump
*.ez
riemann.log*
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
The MIT License (MIT)

Copyright (c) 2015 Michael Shapiro

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
Riemann
=======

Riemann is (surprise!) a [Riemann](http://riemann.io) client for [Elixir](http://elixir-lang.org).

## Usage
Riemann's interface is dead simple, you get `Riemann.send/1` and `Riemann.send_async/1`, blocking and non-blocking (just `call` and `cast`).

## Example
```elixir
Riemann.send(service: "my awesome app", metric: 5.0, attributes: [build: "7543"])

Riemann.send([
[service: "my awesome app req", metric: 1, attributes: [build: "7543"]],
[service: "things in queue", metric: 100, attributes: [build: "7543"]]
])

```

## Configuration
Just toss this snippet into your environment's config:

```elixir
config :riemann, :address,
host: "127.0.0.1",
port: 5555
```

## Caveats
- Only TCP is supported, read [this](http://riemann.io/howto.html#what-protocol-should-i-use-to-talk-to-riemann).
- I haven't implemented querying yet.

If you want UDP or querying, feel free to submit a PR (with tests 👺) or bug me to implement them.

## License

See the LICENSE file. (spoiler: it's MIT)
5 changes: 5 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# This file is responsible for configuring your application
# and its dependencies with the aid of the Mix.Config module.
use Mix.Config

import_config "#{Mix.env}.exs"
5 changes: 5 additions & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use Mix.Config

config :riemann, :address,
host: "127.0.0.1",
port: 5555
5 changes: 5 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use Mix.Config

config :riemann, :address,
host: "127.0.0.1",
port: 6666
44 changes: 44 additions & 0 deletions lib/riemann.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
defmodule Riemann do
use Application
alias Riemann.Worker
alias Riemann.Proto.Msg
alias Riemann.Proto.Event

def start(_type, _args) do
import Supervisor.Spec, warn: false

pool_options = [
name: {:local, Worker.pool_name},
worker_module: Riemann.Worker,
size: 6,
max_overflow: 10
]

address = Application.get_env(:riemann, :address)
worker_options = [
host: address[:host],
port: address[:port]
]

children = [
:poolboy.child_spec(Worker.pool_name, pool_options, worker_options)
]

opts = [strategy: :one_for_one]
Supervisor.start_link(children, opts)
end

def send(events) do
do_send(events, :sync)
end

def send_async(events) do
do_send(events, :async)
end

defp do_send(events, sync) do
[events: Event.list_to_events(events)]
|> Msg.new
|> Msg.send(sync)
end
end
11 changes: 11 additions & 0 deletions lib/riemann/helpers/attribute.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule Riemann.Helpers.Attribute do
defmacro __using__(_opts) do
quote do
def build(attributes) do
Enum.map(attributes, fn {k, v} ->
new(key: to_string(k), value: to_string(v))
end)
end
end
end
end
43 changes: 43 additions & 0 deletions lib/riemann/helpers/event.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
defmodule Riemann.Helpers.Event do
defmacro __using__(_opts) do
quote do
alias Riemann.Proto.Attribute

{:ok, hostname} = :inet.gethostname
@hostname :erlang.list_to_binary(hostname)

# is_list(hd()) detects when it's a list of events, since keyword events are also lists
# [[service: "a", metric: 1], [service: "b", metric: 2]]
def list_to_events(events) when is_list(hd(events)) do
Enum.map(events, &build/1)
end

# [service: "a", metric: 1]
def list_to_events(event) do
[event] |> list_to_events
end

def build(event) do
event = Dict.merge([host: @hostname, time: now], event)

event = case Dict.get(event, :attributes) do
nil -> event
a -> Dict.put(event, :attributes, Attribute.build(a))
end

case Dict.get(event, :metric) do
i when is_integer(i) -> Dict.put(event, :metric_sint64, i)
f when is_float(f) -> Dict.put(event, :metric_d, f)
nil -> raise ArgumentError, "no metric provided for event #{inspect event}"
end
|> new
end

defp now do
{mega_secs, secs, _} = :erlang.now
(mega_secs * 1_000_000) + secs
end

end
end
end
15 changes: 15 additions & 0 deletions lib/riemann/helpers/msg.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule Riemann.Helpers.Msg do
defmacro __using__(_opts) do
quote do

def send(msg, :sync) do
:poolboy.transaction(Riemann.Worker.pool_name, &GenServer.call(&1, {:send_msg, msg}))
end

def send(msg, :async) do
:poolboy.transaction(Riemann.Worker.pool_name, &GenServer.cast(&1, {:send_msg, msg}))
end

end
end
end
12 changes: 12 additions & 0 deletions lib/riemann/proto.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule Riemann.Proto do
# this is broken, "from" isn't executed in the context of this module. :(
# @proto_file Path.expand("riemann.proto", __DIR__)
# @external_resource @proto_file
# use Protobuf, from: @proto_file

alias Riemann.Helpers
use Protobuf, from: Path.expand("riemann.proto", __DIR__)
use_in "Event", Helpers.Event
use_in "Msg", Helpers.Msg
use_in "Attribute", Helpers.Attribute
end
48 changes: 48 additions & 0 deletions lib/riemann/riemann.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// From https://github.com/aphyr/riemann-java-client/blob/c6fe3537cd81341710fe27802641f34e8b639a5a/src/main/proto/riemann/proto.proto
option java_package = "com.aphyr.riemann";
option java_outer_classname = "Proto";

// Deprecated; state was used by early versions of the protocol, but not any
// more.
message State {
optional int64 time = 1;
optional string state = 2;
optional string service = 3;
optional string host = 4;
optional string description = 5;
optional bool once = 6;
repeated string tags = 7;
optional float ttl = 8;
}

message Event {
optional int64 time = 1;
optional string state = 2;
optional string service = 3;
optional string host = 4;
optional string description = 5;
repeated string tags = 7;
optional float ttl = 8;
repeated Attribute attributes = 9;

optional sint64 metric_sint64 = 13;
optional double metric_d = 14;
optional float metric_f = 15;
}

message Query {
optional string string = 1;
}

message Msg {
optional bool ok = 2;
optional string error = 3;
repeated State states = 4;
optional Query query = 5;
repeated Event events = 6;
}

message Attribute {
required string key = 1;
optional string value = 2;
}
126 changes: 126 additions & 0 deletions lib/riemann/worker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
defmodule Riemann.Worker do
use GenServer
require Logger
alias Riemann.Proto.Msg

defmodule State do
defstruct tcp: nil, host: nil, port: nil, from: nil
end

@ok_msg Msg.new(ok: true) |> Msg.encode
def ok_msg, do: @ok_msg

@pool_name :worker_pool
def pool_name, do: @pool_name

def start_link(state) do
GenServer.start_link(__MODULE__, state, [])
end

def init([host: host, port: port]) do
:timer.apply_after(0, GenServer, :call, [self, :connect])
{:ok, %State{host: host, port: port}}
end


def handle_call(:connect, _from, state) do
{reply, tcp} = case connect(state) do
{:ok, tcp} -> {:ok, tcp}
{:error, error} -> {{:error, error}, nil}
end
{:reply, reply, %{state | tcp: tcp}}
end

def handle_call({:send_msg, msg}, from, state) do
case connect(state) do
{:ok, tcp} ->
:ok = :gen_tcp.send(tcp, Msg.encode(msg))
# we'll reply once we get an ok from the server
{:noreply, %{state | tcp: tcp, from: from}}
error ->
{:reply, error, %{state | tcp: nil}}
end
end

# used for testing
def handle_call(:state, _from, state) do
{:reply, state, state}
end

def handle_call(_msg, _from, state) do
{:reply, :unknown_msg, state}
end


def handle_cast({:send_msg, msg}, state) do
case connect(state) do
{:ok, tcp} ->
:ok = :gen_tcp.send(tcp, Msg.encode(msg))
{:noreply, %{state | tcp: tcp}}
_error ->
{:noreply, %{state | tcp: nil}}
end
end

# used for testing
def handle_cast(:disconnect, %State{tcp: tcp} = state) do
disconnect(tcp)
{:noreply, %{state | tcp: nil}}
end

def handle_cast(_msg, state) do
{:noreply, state}
end


def handle_info({:tcp, _port, @ok_msg}, %State{from: from} = state) when is_tuple(from) do
GenServer.reply(from, :ok)
handle_info({:tcp, _port, @ok_msg}, %{state | from: nil})
end

def handle_info({:tcp, _port, @ok_msg}, state) do
{:noreply, state}
end

# connection dropped while waiting for a reply
def handle_info({:tcp_closed, port}, %State{from: from} = state) when is_tuple(from) do
GenServer.reply(from, {:error, :tcp_closed})
handle_info({:tcp_closed, port}, %{state | from: nil})
end

# connection dropped, but we weren't waiting for a reply
def handle_info({:tcp_closed, _port}, state) do
{:noreply, %{state | tcp: nil}}
end


def terminate(_reason, %State{tcp: tcp}) when is_port(tcp) do
disconnect(tcp)
end

def code_change(_old_vsn, state, _extra) do
{:ok, state}
end

defp connect(%State{tcp: tcp}) when is_port(tcp) do
{:ok, tcp}
end

defp connect(%State{host: host, port: port}) do
case connect(host, port) do
{:error, error} ->
Logger.error "Couldn't connect to Riemann server #{host}:#{port}, got #{inspect error}"
{:error, error}
ok -> ok
end
end

defp connect(host, port) do
# riemann message lengths are four bytes up front
:gen_tcp.connect(:erlang.binary_to_list(host), port, [:binary, nodelay: true, packet: 4, active: true])
end

defp disconnect(tcp) do
:gen_tcp.close(tcp)
end
end
Loading

0 comments on commit 3f4910c

Please sign in to comment.