Skip to content

Commit

Permalink
Refactoring client
Browse files Browse the repository at this point in the history
  • Loading branch information
mcorbin committed Feb 4, 2017
1 parent 5fffce3 commit 9953547
Show file tree
Hide file tree
Showing 17 changed files with 948 additions and 529 deletions.
2 changes: 2 additions & 0 deletions .gitignore
@@ -0,0 +1,2 @@
riemann.log
riemann-*
11 changes: 11 additions & 0 deletions Makefile
@@ -1,3 +1,5 @@
RIEMANN_VERSION = 0.2.12

all: install

install:
Expand All @@ -7,8 +9,17 @@ install:
test:
go test

integ-test:
make integ ; make clean

integ:
./integration.sh $(RIEMANN_VERSION)

clean:
go clean ./...
rm -f riemann-$(RIEMANN_VERSION).tar.bz2
rm -rf riemann-$(RIEMANN_VERSION)
rm -f riemann.PID

nuke:
go clean -i ./...
Expand Down
81 changes: 64 additions & 17 deletions README.md
Expand Up @@ -6,8 +6,8 @@ A Go client library for [Riemann](https://github.com/riemann/riemann).

Features:
* Idiomatic concurrency
* Sending events, state updates, queries.
* Feature parity with the reference implementation written in Ruby.
* Sending events, queries.
* Support tcp and udp client.

This client is a fork of Goryman, a Riemann go client written by Christopher Gilbert. Thanks and full credit to him!

Expand Down Expand Up @@ -39,11 +39,22 @@ import (
)
```

Next, we'll need to establish a new client:
Next, we'll need to establish a new client. The parameter is the connection timeout duration. You can use a TCP client:


```go
c := riemanngo.NewTcpClient("127.0.0.1:5555")
err := c.Connect(5)
if err != nil {
panic(err)
}
```

Or a UDP client:

```go
c := riemanngo.NewGorymanClient("localhost:5555")
err := c.Connect()
c := riemanngo.NewUdpClient("127.0.0.1:5555")
err := c.Connect(5)
if err != nil {
panic(err)
}
Expand All @@ -55,31 +66,67 @@ Don't forget to close the client connection when you're done:
defer c.Close()
```

Just like the Riemann Ruby client, the client sends small events over UDP by default. TCP is used for queries and large events. There is no acknowledgement of UDP packets, but they are roughly an order of magnitude faster than TCP. We assume both TCP and UDP are listening on the same port.

Sending events is easy ([list of valid event properties](http://riemann.io/concepts.html)):

```go
err = c.SendEvent(&riemanngo.Event{
Service: "moargore",
Metric: 100,
Tags: []string{"nonblocking"},
})
if err != nil {
panic(err)
result, err := riemanngo.SendEvent(c, &riemanngo.Event{
Service: "hello",
Metric: 100,
Tags: []string{"riemann ftw"},
})
```

The host name and time in events will automatically be replaced with the hostname of the server and the current time if none is specified.

You can also send batch of events:

```go
events = []riemanngo.Event {
riemanngo.Event{
Service: "hello",
Metric: 100,
Tags: []string{"hello"},
},
riemanngo.Event{
Service: "goodbye",
Metric: 200,
Tags: []string{"goodbye"},
},
}
```

You can also query events:
You can also query the Riemann index (using the TCP client):

```go
events, err := c.QueryEvents("host = \"goryman\"")
events, err := c.QueryEvents("service = \"hello\"")
if err != nil {
panic(err)
}
```

The host name and time in events will automatically be replaced with the hostname of the server and the current time if none is specified.
## Tests

You can lauch tests using

```
make test
```

and integration tests using:

```
make integ-test
```

This command will download Riemann, start it, launch integration tests, and kill it.

You can also use:

```
go test -tags=integration
```

if you already have a Riemann instance listening on localhost

## Copyright

Expand Down
118 changes: 34 additions & 84 deletions client.go
@@ -1,112 +1,62 @@
// A Riemann client for Go, featuring concurrency, sending events and state updates, queries,
// and feature parity with the reference implementation written in Ruby.
// A Riemann client for Go, featuring concurrency, sending events and state updates, queries
//
// Copyright (C) 2014 by Christopher Gilbert <christopher.john.gilbert@gmail.com>
package riemanngo

import (
"net"
"time"

pb "github.com/golang/protobuf/proto"
"github.com/riemann/riemann-go-client/proto"
)

// GorymanClient is a client library to send events to Riemann
type GorymanClient struct {
udp *UdpTransport
tcp *TcpTransport
addr string
// Client is an interface to a generic client
type Client interface {
Send(message *proto.Msg) (*proto.Msg, error)
Connect(timeout int32) error
Close() error
}

// NewGorymanClient - Factory
func NewGorymanClient(addr string) *GorymanClient {
return &GorymanClient{
addr: addr,
}
// IndexClient is an interface to a generic Client for index queries
type IndexClient interface {
QueryIndex(q string) ([]Event, error)
}

// Connect creates a UDP and TCP connection to a Riemann server
func (c *GorymanClient) Connect() error {
udp, err := net.DialTimeout("udp", c.addr, time.Second*5)
if err != nil {
return err
}
tcp, err := net.DialTimeout("tcp", c.addr, time.Second*5)
if err != nil {
return err
}
c.udp = NewUdpTransport(udp)
c.tcp = NewTcpTransport(tcp)
return nil
// request encapsulates a request to send to the Riemann server
type request struct {
message *proto.Msg
response_ch chan response
}

// Close the connection to Riemann
func (c *GorymanClient) Close() error {
if nil == c.udp && nil == c.tcp {
return nil
}
err := c.udp.Close()
if err != nil {
return err
}
return c.tcp.Close()
// response encapsulates a response from the Riemann server
type response struct {
message *proto.Msg
err error
}

// Send an event
func (c *GorymanClient) SendEvent(e *Event) error {
// Send an event using a client
func SendEvent(c Client, e *Event) (*proto.Msg, error) {
epb, err := EventToProtocolBuffer(e)
if err != nil {
return err
return nil, err
}

message := &proto.Msg{}
message.Events = append(message.Events, epb)

_, err = c.sendMaybeRecv(message)
return err
msg, err := c.Send(message)
return msg, err
}

// Send a state update
func (c *GorymanClient) SendState(s *State) error {
spb, err := StateToProtocolBuffer(s)
if err != nil {
return err
// Send multiple events using a client
func SendEvents(c Client, e *[]Event) (*proto.Msg, error) {
var events []*proto.Event
for _, elem := range *e {
epb, err := EventToProtocolBuffer(&elem)
if err != nil {
return nil, err
}
events = append(events, epb)
}

message := &proto.Msg{}
message.States = append(message.States, spb)

_, err = c.sendMaybeRecv(message)
return err
}

// Query the server for events
func (c *GorymanClient) QueryEvents(q string) ([]Event, error) {
query := &proto.Query{}
query.String_ = pb.String(q)

message := &proto.Msg{}
message.Query = query

response, err := c.sendRecv(message)
if err != nil {
return nil, err
}

return ProtocolBuffersToEvents(response.GetEvents()), nil
}
message.Events = events

// Send and receive data from Riemann
func (c *GorymanClient) sendRecv(m *proto.Msg) (*proto.Msg, error) {
return c.tcp.SendRecv(m)
}

// Send and maybe receive data from Riemann
func (c *GorymanClient) sendMaybeRecv(m *proto.Msg) (*proto.Msg, error) {
_, err := c.udp.SendMaybeRecv(m)
if err != nil {
return c.tcp.SendMaybeRecv(m)
}
return nil, nil
msg, err := c.Send(message)
return msg, err
}
23 changes: 23 additions & 0 deletions integration.sh
@@ -0,0 +1,23 @@
#!/bin/bash

set -u

RIEMANN_VERSION=$1
RIEMANN_URL=https://github.com/riemann/riemann/releases/download/${RIEMANN_VERSION}/riemann-${RIEMANN_VERSION}.tar.bz2

echo "Download Riemann"
wget ${RIEMANN_URL}
echo "Untar Riemann"
tar xjf riemann-${RIEMANN_VERSION}.tar.bz2
echo "Launch Riemann"
riemann-${RIEMANN_VERSION}/bin/riemann &
RIEMANN_PID=$!
sleep 10
echo "Launch tests"
echo
echo
go test -tags=integration
echo
echo
echo "Stop Riemann"
kill -9 ${RIEMANN_PID}

0 comments on commit 9953547

Please sign in to comment.