Skip to content

Commit

Permalink
Add another unit-test for testing ES output
Browse files Browse the repository at this point in the history
- Small API for queering Elasticsearch
- Add a test for checking if an event of type redis was added correctly
- requested by elastic#7
  • Loading branch information
monicasarbu committed Apr 28, 2015
1 parent f3c3262 commit 194fc44
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 1 deletion.
105 changes: 105 additions & 0 deletions outputs/elasticsearch/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package elasticsearch

import (
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
)

type Elasticsearch struct {
Url string

client *http.Client
}

type ESSearchResults struct {
Took int `json:"took"`
Shards json.RawMessage `json:"_shards"`
Hits ESHits `json:"hits"`
Aggs map[string]json.RawMessage `json:"aggregations"`
}

type ESHits struct {
Total int
Hits []json.RawMessage `json:"hits"`
}

func NewElasticsearch(url string) *Elasticsearch {
if len(url) == 0 {
url = "http://localhost:9200"
}
return &Elasticsearch{
Url: url,
client: &http.Client{},
}
}

// Generic request method. Returns the HTTP response that we get from ES.
// If ES returns an error HTTP code (>299), the error is non-nil and the
// response is also non-nil.
func (es *Elasticsearch) Request(method string, index string, path string,
data io.Reader) (*http.Response, error) {

url := fmt.Sprintf("%s/%s/%s", es.Url, index, path)

req, err := http.NewRequest(method, url, data)
if err != nil {
return nil, err
}

resp, err := es.client.Do(req)
if err != nil {
return nil, err
}

if resp.StatusCode > 299 {
return resp, fmt.Errorf("ES returned an error: %s", resp.Status)
}

return resp, nil
}

// Refresh an index. Call this after doing inserts or creating/deleting
// indexes in unit tests.
func (es *Elasticsearch) Refresh(index string) (*http.Response, error) {
return es.Request("POST", index, "_refresh", nil)
}

func (es *Elasticsearch) DeleteIndex(index string) (*http.Response, error) {
path := fmt.Sprintf("%s/%s", es.Url, index)

req, err := http.NewRequest("DELETE", path, nil)
if err != nil {
return nil, err
}

resp, err := es.client.Do(req)
if err != nil {
return nil, err
}

return resp, nil
}

func (es *Elasticsearch) Search(index string, params string, reqjson string) (*http.Response, error) {

path := fmt.Sprintf("%s/%s/_search%s", es.Url, index, params)

req, err := http.NewRequest("GET", path, strings.NewReader(reqjson))
if err != nil {
return nil, err
}

resp, err := es.client.Do(req)
if err != nil {
return nil, err
}

if resp.StatusCode > 299 {
return resp, fmt.Errorf("ES returned an error: %s", resp.Status)
}

return resp, nil
}
69 changes: 68 additions & 1 deletion outputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package elasticsearch

import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"testing"
"time"

"github.com/elastic/libbeat/common"
"github.com/elastic/libbeat/outputs"
)

Expand All @@ -12,6 +17,8 @@ const elasticsearchPort = 9200

func createElasticsearchConnection() ElasticsearchOutput {

index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid())

var elasticsearchOutput ElasticsearchOutput
elasticsearchOutput.Init(outputs.MothershipConfig{
Enabled: true,
Expand All @@ -21,7 +28,7 @@ func createElasticsearchConnection() ElasticsearchOutput {
Username: "",
Password: "",
Path: "",
Index: "packetbeat",
Index: index,
Protocol: "",
}, 10)

Expand Down Expand Up @@ -77,3 +84,63 @@ func TestTopologyInES(t *testing.T) {
t.Errorf("Failed to delete old IP of proxy2: %s", name2)
}
}

func TestEvents(t *testing.T) {
if testing.Short() {
t.Skip("Skipping events publish in short mode, because they require Elasticsearch")
}
ts := time.Now()

elasticsearchOutput := createElasticsearchConnection()

event := common.MapStr{}
event["type"] = "redis"
event["status"] = "OK"
event["responsetime"] = 34
event["dst_ip"] = "192.168.21.1"
event["dst_port"] = 6379
event["src_ip"] = "192.168.22.2"
event["src_port"] = 6378
event["agent"] = "appserver1"
r := common.MapStr{}
r["request"] = "MGET key1"
r["response"] = "value1"

index := fmt.Sprintf("%s-%d.%02d.%02d", elasticsearchOutput.Index, ts.Year(), ts.Month(), ts.Day())

es := NewElasticsearch("http://localhost:9200")

if es == nil {
t.Errorf("Failed to create Elasticsearch connection")
}
_, err := es.DeleteIndex(index)
if err != nil {
t.Errorf("Failed to delete index: %s", err)
}

err = elasticsearchOutput.PublishEvent(ts, event)
if err != nil {
t.Errorf("Failed to publish the event: %s", err)
}

es.Refresh(index)

resp, err := es.Search(index, "?search?q=agent:appserver1", "{}")

if err != nil {
t.Errorf("Failed to query elasticsearch: %s", err)
}
defer resp.Body.Close()
objresp, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Errorf("Failed to read body from response")
}
var search_res ESSearchResults
err = json.Unmarshal(objresp, &search_res)
if err != nil {
t.Errorf("Failed to unmarshal response: %s", err)
}
if search_res.Hits.Total != 1 {
t.Errorf("Too many results")
}
}

0 comments on commit 194fc44

Please sign in to comment.