Skip to content

Commit

Permalink
Updated river for elastic 1.0.2
Browse files Browse the repository at this point in the history
  • Loading branch information
sksamuel committed Apr 4, 2014
1 parent 893838c commit 212d291
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 25 deletions.
7 changes: 2 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<groupId>com.sksamuel.elasticsearch</groupId>
<artifactId>elasticsearch-river-redis</artifactId>
<packaging>jar</packaging>
<version>0.90.1.2</version>
<version>1.0.2</version>
<name>elasticsearch-river-redis</name>
<description>A river plugin for elastic search that offers an easy way to index redis pubsub</description>
<url>https://github.com/sksamuel/elasticsearch-river-redis</url>
Expand Down Expand Up @@ -118,11 +118,8 @@
<commons.beanutils.version>1.8.3</commons.beanutils.version>

<junit.version>4.11</junit.version>

<log4j.version>1.2.17</log4j.version>

<mockito.version>1.9.5</mockito.version>

<slf4j.version>1.6.6</slf4j.version>
</properties>

Expand Down Expand Up @@ -165,7 +162,7 @@
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>0.90.1</version>
<version>1.0.2</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class RedisIndexer implements Runnable {
private final String index;
private final boolean json;
private final String messageField;
private BlockingQueue<String[]> queue = new LinkedBlockingQueue<String[]>();
private BlockingQueue<String[]> queue = new LinkedBlockingQueue<>();

public RedisIndexer(Client client, String index, boolean json, String messageField) {
this.client = client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.action.support.QuerySourceBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalSettingsPerparer;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
Expand All @@ -21,7 +18,7 @@
import static org.elasticsearch.client.Requests.countRequest;
import static org.elasticsearch.common.io.Streams.copyToStringFromClasspath;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.fieldQuery;
import static org.elasticsearch.index.query.QueryBuilders.queryString;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -65,16 +62,6 @@ public void setupElasticAndRedis() throws Exception {
logger.debug("... connected");

logger.debug("Starting local elastic...");
Tuple<Settings, Environment> initialSettings = InternalSettingsPerparer.prepareSettings(globalSettings, true);

if (!initialSettings.v2().configFile().exists()) {
FileSystemUtils.mkdirs(initialSettings.v2().configFile());
}

if (!initialSettings.v2().logsFile().exists()) {
FileSystemUtils.mkdirs(initialSettings.v2().logsFile());
}

node = nodeBuilder().local(true).settings(globalSettings).node();

logger.info("Create river [{}]", river);
Expand Down Expand Up @@ -106,15 +93,17 @@ public void connectRiverAndSendMessages() throws InterruptedException {
Thread.sleep(1000);
refreshIndex();

QuerySourceBuilder builder = new QuerySourceBuilder();
builder.setQuery(queryString(field + ":" + msg));

logger.debug("Counting [index={}, type={}, field={}, msg={}]", new Object[]{index, channel, field, msg});
CountResponse resp =
node.client().count(countRequest(index).types(channel).query(fieldQuery(field, msg))).actionGet();
CountResponse resp = node.client().count(countRequest(index).types(channel).source(builder)).actionGet();
assertEquals(1, resp.getCount());

msg = "coldplay";

logger.debug("Counting [index={}, type={}, field={}, msg={}]", new Object[]{index, channel, field, msg});
resp = node.client().count(countRequest(index).types(channel).query(fieldQuery(field, msg))).actionGet();
resp = node.client().count(countRequest(index).types(channel).source(builder)).actionGet();
assertEquals(0, resp.getCount());

logger.debug("Publishing message [channel={}]", channel);
Expand All @@ -124,7 +113,7 @@ public void connectRiverAndSendMessages() throws InterruptedException {
refreshIndex();

logger.debug("Counting [index={}, type={}, field={}, msg={}]", new Object[]{index, channel, field, msg});
resp = node.client().count(countRequest(index).types(channel).query(fieldQuery(field, msg))).actionGet();
resp = node.client().count(countRequest(index).types(channel).source(builder)).actionGet();
assertEquals(1, resp.getCount());

shutdown();
Expand Down

0 comments on commit 212d291

Please sign in to comment.