Skip to content

Commit

Permalink
Merge pull request #16 from wotifgroup/parameterize-index-name
Browse files Browse the repository at this point in the history
Allow indexName to be parameterized.
  • Loading branch information
tallpsmith committed Jul 25, 2011
2 parents c6680ad + 8021895 commit b0f9f15
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 31 deletions.
9 changes: 5 additions & 4 deletions pom.xml
Expand Up @@ -36,13 +36,13 @@
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>0.16.2</version>
<version>0.17.1</version>
</dependency>

<dependency>
<groupId>com.cloudera.flume</groupId>
<artifactId>flume</artifactId>
<version>0.9.3-cdh3u0</version>
<groupId>com.cloudera</groupId>
<artifactId>flume-core</artifactId>
<version>0.9.4-cdh3u1</version>
<scope>provided</scope>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -112,6 +112,7 @@
<includes>
<include>org.elasticsearch:elasticsearch</include>
<include>org.apache.lucene</include>
<include>org.apache.regexp</include>
</includes>
</artifactSet>
</configuration>
Expand Down
32 changes: 25 additions & 7 deletions src/main/java/org/elasticsearch/flume/ElasticSearchSink.java
Expand Up @@ -41,7 +41,7 @@ public class ElasticSearchSink extends EventSink.Base {
private Node node;
private Client client;
private String indexName = DEFAULT_INDEX_NAME;

private String indexPattern = null;
private String indexType = DEFAULT_LOG_TYPE;

private Charset charset = Charset.defaultCharset();
Expand All @@ -57,8 +57,6 @@ public class ElasticSearchSink extends EventSink.Base {

@Override
public void append(Event e) throws IOException {
// TODO strategize the name of the index, so that logs based on day can go to individula indexes, allowing simple cleanup
// by deleting older days indexes in ES
try {
XContentBuilder builder = jsonBuilder()
.startObject()
Expand All @@ -70,10 +68,7 @@ public void append(Event e) throws IOException {

addAttrs(builder, e.getAttrs());

client.prepareIndex(indexName, indexType, null)
.setSource(builder)
.execute()
.actionGet();
index(e, builder);
} catch (Exception ex) {
LOG.error("Error Processing event: {}", e.toString(), ex);
eventErrorCount.incrementAndGet();
Expand Down Expand Up @@ -137,6 +132,21 @@ private void addComplexField(XContentBuilder builder, String fieldName, XContent
}
}

private void index(Event e, XContentBuilder builder) {
String iName = indexName;
if (indexPattern != null) {
iName = e.escapeString(indexPattern);
}
client.prepareIndex(iName, indexType, null)
.setSource(builder)
.execute()
.actionGet();

if (!iName.equals(indexName)) {
client.admin().indices().prepareAliases().addAlias(iName, indexName).execute().actionGet();
}
}

@Override
public void close() throws IOException, InterruptedException {
super.close();
Expand Down Expand Up @@ -202,6 +212,14 @@ public void setIndexName(String indexName) {
this.indexName = indexName;
}

public String getIndexPattern() {
return indexPattern;
}

public void setIndexPattern(String indexPattern) {
this.indexPattern = indexPattern;
}

public String getIndexType() {
return indexType;
}
Expand Down
Expand Up @@ -9,9 +9,9 @@ class ElasticSearchSinkBuilder extends SinkFactory.SinkBuilder {
@Override
public EventSink build(Context context, String... argv) {

if (argv.length > 4) {
if (argv.length > 5) {
throw new IllegalArgumentException(
"usage: elasticSearchSink[([clusterName, indexName, esHostNames, indexType])");
"usage: elasticSearchSink[([clusterName, indexName, esHostNames, indexType, indexPattern])");
}

ElasticSearchSink sink = new ElasticSearchSink();
Expand All @@ -28,6 +28,9 @@ public EventSink build(Context context, String... argv) {
if (argv.length > 3) {
sink.setIndexType(argv[index++]);
}
if (argv.length > 4) {
sink.setIndexPattern(argv[index++]);
}
return sink;
}
}
68 changes: 50 additions & 18 deletions src/test/java/org/elasticsearch/flume/ElasticSearchSinkTest.java
Expand Up @@ -2,22 +2,23 @@

import static org.elasticsearch.client.Requests.refreshRequest;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.index.query.xcontent.QueryBuilders.fieldQuery;
import static org.elasticsearch.index.query.xcontent.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.xcontent.QueryBuilders.queryString;
import static org.elasticsearch.index.query.QueryBuilders.fieldQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.queryString;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.index.query.xcontent.XContentQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.search.SearchHit;
Expand All @@ -29,6 +30,7 @@
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.Event.Priority;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.reporter.ReportEvent;

public class ElasticSearchSinkTest {
Expand Down Expand Up @@ -87,7 +89,8 @@ public void appendDifferentTypesOfLogMessage() throws IOException, InterruptedEx
sink.append(event);
sink.append(new EventImpl("bleh foo baz bar".getBytes(), 1, Priority.WARN, System.nanoTime(), "notlocalhost"));
sink.append(new EventImpl("{\"key\":\"value\"}".getBytes(), 2, Priority.DEBUG, System.nanoTime(), "jsonbody"));
sink.append(new EventImpl("{\"key\":\"value\",\"complex\":{\"subkey\":\"subvalue\"}}".getBytes(), 3, Priority.DEBUG, System.nanoTime(), "complexjsonbody"));
sink.append(new EventImpl("{\"key\":\"value\",\"complex\":{\"subkey\":\"subvalue\"}}".getBytes(), 3, Priority.DEBUG,
System.nanoTime(), "complexjsonbody"));

sink.close();

Expand All @@ -106,7 +109,8 @@ public void appendDifferentTypesOfLogMessage() throws IOException, InterruptedEx
public void validateErrorCount() throws IOException, InterruptedException {
ElasticSearchSink sink = createAndOpenSink();

EventImpl invalidJsonEvent1 = new EventImpl("{ \"not json\" : no".getBytes(), 1, Priority.DEBUG, System.nanoTime(), "notlocalhost") ;
EventImpl invalidJsonEvent1 = new EventImpl("{ \"not json\" : no".getBytes(), 1, Priority.DEBUG, System.nanoTime(),
"notlocalhost");
sink.append(invalidJsonEvent1);
sink.close();

Expand All @@ -117,27 +121,51 @@ public void validateErrorCount() throws IOException, InterruptedException {

@Test
public void validateSinkIndexTypeConfiguration() throws IOException, InterruptedException {
ElasticSearchSink sink = createAndOpenSinkWithDefaultType();
EventSink sink = createAndOpenSink("", "log", "");

EventImpl event = new EventImpl("new index message".getBytes(), 1, Priority.WARN, System.nanoTime(), "notlocalhost");
sink.append(event);
sink.close();
searchClient.admin().indices().refresh(refreshRequest(INDEX_NAME)).actionGet();
SearchResponse response = searchClient.prepareSearch(INDEX_NAME).setTypes("log")
.setQuery(fieldQuery("message.text", "new")).execute().actionGet();
assertEquals("There should have been 1 search result for default index type", 1, response.getHits().getTotalHits());

assertSimpleTest(INDEX_NAME, "log", 1);
}

@Test
public void validateIndexNamePatternUsed() throws IOException, InterruptedException {
ElasticSearchSink sink = createAndOpenSink("", "log", "test_%Y-%m-%d");

sink.append(new EventImpl("new index message".getBytes(), 0, Priority.WARN, System.nanoTime(), "notlocalhost"));
sink.append(new EventImpl("new index message".getBytes(), TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS), Priority.WARN,
System.nanoTime(), "notlocalhost"));
sink.close();

assertSimpleTest("test_1970-01-01", "log", 1);
assertSimpleTest("test_1970-01-02", "log", 1);
assertSimpleTest(INDEX_NAME, "log", 2);
}

private ElasticSearchSink createAndOpenSinkWithDefaultType() throws IOException, InterruptedException {
return createAndOpenSink("");
private void assertSimpleTest(String indexName, String indexType, int hits) {
searchClient.admin().indices().refresh(refreshRequest(indexName)).actionGet();
SearchResponse response = searchClient.prepareSearch(indexName).setTypes(indexType)
.setQuery(fieldQuery("message.text", "new")).execute().actionGet();
assertEquals("There should have been " + hits + " search result for default index type", hits, response.getHits()
.getTotalHits());
}

private ElasticSearchSink createAndOpenSink() throws IOException, InterruptedException {
return createAndOpenSink(INDEX_TYPE);
return createAndOpenSink(INDEX_NAME, INDEX_TYPE, "");
}

private ElasticSearchSink createAndOpenSink(String indexType) throws IOException, InterruptedException {
private ElasticSearchSink createAndOpenSink(String indexName, String indexType, String indexPattern) throws IOException,
InterruptedException {
ElasticSearchSink sink = new ElasticSearchSink();
sink.setLocalOnly(true);
if (StringUtils.isNotBlank(indexName)) {
sink.setIndexName(indexName);
}
if (StringUtils.isNotBlank(indexPattern)) {
sink.setIndexPattern(indexPattern);
}
if (StringUtils.isNotBlank(indexType)) {
sink.setIndexType(indexType);
}
Expand Down Expand Up @@ -171,7 +199,7 @@ private void assertJsonBody(Event event) {
Map<String, Object> json = (Map<String, Object>) response.getHits().getAt(0).getSource().get("message");
assertEquals("value", json.get("key"));
}

@SuppressWarnings("unchecked")
private void assertComplexJsonBody(Event event) {
SearchResponse response = executeSearch(queryString("host:complexjsonbody"));
Expand All @@ -183,8 +211,12 @@ private void assertComplexJsonBody(Event event) {
assertEquals("subvalue", json.get("subkey"));
}

private SearchResponse executeSearch(XContentQueryBuilder query) {
return searchClient.prepareSearch(INDEX_NAME).setTypes(INDEX_TYPE)
private SearchResponse executeSearch(QueryBuilder query) {
return executeSearch(query, INDEX_NAME, INDEX_TYPE);
}

private SearchResponse executeSearch(QueryBuilder query, String indexName, String indexType) {
return searchClient.prepareSearch(indexName).setTypes(indexType)
.setQuery(query)
.execute()
.actionGet();
Expand Down

0 comments on commit b0f9f15

Please sign in to comment.