diff --git a/pom.xml b/pom.xml index 347e6fa..2fbd51c 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,7 @@ com.sksamuel.elasticsearch elasticsearch-river-redis jar - 0.90.1.2 + 1.0.2 elasticsearch-river-redis A river plugin for elastic search that offers an easy way to index redis pubsub https://github.com/sksamuel/elasticsearch-river-redis @@ -118,11 +118,8 @@ 1.8.3 4.11 - 1.2.17 - 1.9.5 - 1.6.6 @@ -165,7 +162,7 @@ org.elasticsearch elasticsearch - 0.90.1 + 1.0.2 diff --git a/src/main/java/org/elasticsearch/plugin/river/redis/RedisIndexer.java b/src/main/java/org/elasticsearch/plugin/river/redis/RedisIndexer.java index 30f61d8..5310840 100644 --- a/src/main/java/org/elasticsearch/plugin/river/redis/RedisIndexer.java +++ b/src/main/java/org/elasticsearch/plugin/river/redis/RedisIndexer.java @@ -21,7 +21,7 @@ class RedisIndexer implements Runnable { private final String index; private final boolean json; private final String messageField; - private BlockingQueue queue = new LinkedBlockingQueue(); + private BlockingQueue queue = new LinkedBlockingQueue<>(); public RedisIndexer(Client client, String index, boolean json, String messageField) { this.client = client; diff --git a/src/test/java/org/elasticsearch/plugin/river/redis/RedisRiverIntTest.java b/src/test/java/org/elasticsearch/plugin/river/redis/RedisRiverIntTest.java index f50a81c..3205731 100644 --- a/src/test/java/org/elasticsearch/plugin/river/redis/RedisRiverIntTest.java +++ b/src/test/java/org/elasticsearch/plugin/river/redis/RedisRiverIntTest.java @@ -3,12 +3,9 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.action.support.QuerySourceBuilder; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.env.Environment; import org.elasticsearch.node.Node; -import org.elasticsearch.node.internal.InternalSettingsPerparer; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -21,7 +18,7 @@ import static org.elasticsearch.client.Requests.countRequest; import static org.elasticsearch.common.io.Streams.copyToStringFromClasspath; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; -import static org.elasticsearch.index.query.QueryBuilders.fieldQuery; +import static org.elasticsearch.index.query.QueryBuilders.queryString; import static org.elasticsearch.node.NodeBuilder.nodeBuilder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -65,16 +62,6 @@ public void setupElasticAndRedis() throws Exception { logger.debug("... connected"); logger.debug("Starting local elastic..."); - Tuple initialSettings = InternalSettingsPerparer.prepareSettings(globalSettings, true); - - if (!initialSettings.v2().configFile().exists()) { - FileSystemUtils.mkdirs(initialSettings.v2().configFile()); - } - - if (!initialSettings.v2().logsFile().exists()) { - FileSystemUtils.mkdirs(initialSettings.v2().logsFile()); - } - node = nodeBuilder().local(true).settings(globalSettings).node(); logger.info("Create river [{}]", river); @@ -106,15 +93,17 @@ public void connectRiverAndSendMessages() throws InterruptedException { Thread.sleep(1000); refreshIndex(); + QuerySourceBuilder builder = new QuerySourceBuilder(); + builder.setQuery(queryString(field + ":" + msg)); + logger.debug("Counting [index={}, type={}, field={}, msg={}]", new Object[]{index, channel, field, msg}); - CountResponse resp = - node.client().count(countRequest(index).types(channel).query(fieldQuery(field, msg))).actionGet(); + CountResponse resp = node.client().count(countRequest(index).types(channel).source(builder)).actionGet(); assertEquals(1, resp.getCount()); msg = "coldplay"; logger.debug("Counting [index={}, type={}, field={}, msg={}]", new Object[]{index, channel, field, msg}); - resp = node.client().count(countRequest(index).types(channel).query(fieldQuery(field, msg))).actionGet(); + resp = node.client().count(countRequest(index).types(channel).source(builder)).actionGet(); assertEquals(0, resp.getCount()); logger.debug("Publishing message [channel={}]", channel); @@ -124,7 +113,7 @@ public void connectRiverAndSendMessages() throws InterruptedException { refreshIndex(); logger.debug("Counting [index={}, type={}, field={}, msg={}]", new Object[]{index, channel, field, msg}); - resp = node.client().count(countRequest(index).types(channel).query(fieldQuery(field, msg))).actionGet(); + resp = node.client().count(countRequest(index).types(channel).source(builder)).actionGet(); assertEquals(1, resp.getCount()); shutdown();