diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml
index a3b805e4f1c97..88a568e247f44 100644
--- a/.idea/dictionaries/kimchy.xml
+++ b/.idea/dictionaries/kimchy.xml
@@ -45,6 +45,7 @@
encodable
estab
failover
+ firehose
flushable
formatter
formatters
diff --git a/.idea/modules/plugin-river-twitter.iml b/.idea/modules/plugin-river-twitter.iml
index 51120027adb98..a5af7aa369d64 100644
--- a/.idea/modules/plugin-river-twitter.iml
+++ b/.idea/modules/plugin-river-twitter.iml
@@ -15,7 +15,7 @@
-
+
diff --git a/plugins/river/twitter/build.gradle b/plugins/river/twitter/build.gradle
index 4d39cacb8e263..7a5d40387199c 100644
--- a/plugins/river/twitter/build.gradle
+++ b/plugins/river/twitter/build.gradle
@@ -32,8 +32,8 @@ configurations {
dependencies {
compile project(':elasticsearch')
- compile('org.twitter4j:twitter4j-core:2.1.4') { transitive = false }
- distLib('org.twitter4j:twitter4j-core:2.1.4') { transitive = false }
+ compile('org.twitter4j:twitter4j-core:2.1.6') { transitive = false }
+ distLib('org.twitter4j:twitter4j-core:2.1.6') { transitive = false }
testCompile project(':test-testng')
diff --git a/plugins/river/twitter/src/main/java/org/elasticsearch/river/twitter/TwitterRiver.java b/plugins/river/twitter/src/main/java/org/elasticsearch/river/twitter/TwitterRiver.java
index 046bd29902d6a..44556cfec1159 100644
--- a/plugins/river/twitter/src/main/java/org/elasticsearch/river/twitter/TwitterRiver.java
+++ b/plugins/river/twitter/src/main/java/org/elasticsearch/river/twitter/TwitterRiver.java
@@ -26,6 +26,7 @@
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
@@ -38,6 +39,7 @@
import twitter4j.*;
import java.net.URL;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -56,6 +58,10 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
private final int dropThreshold;
+ private FilterQuery filterQuery;
+
+ private String streamType;
+
private final TwitterStream stream;
@@ -73,6 +79,84 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
Map twitterSettings = (Map) settings.settings().get("twitter");
user = XContentMapValues.nodeStringValue(twitterSettings.get("user"), null);
password = XContentMapValues.nodeStringValue(twitterSettings.get("password"), null);
+ streamType = XContentMapValues.nodeStringValue(twitterSettings.get("type"), "sample");
+ Map filterSettings = (Map) twitterSettings.get("filter");
+ if (filterSettings != null) {
+ filterQuery = new FilterQuery();
+ filterQuery.count(XContentMapValues.nodeIntegerValue(filterSettings.get("count"), 0));
+ Object tracks = filterSettings.get("tracks");
+ if (tracks != null) {
+ if (tracks instanceof List) {
+ List lTracks = (List) tracks;
+ filterQuery.track(lTracks.toArray(new String[lTracks.size()]));
+ } else {
+ filterQuery.track(Strings.commaDelimitedListToStringArray(tracks.toString()));
+ }
+ }
+ Object follow = filterSettings.get("follow");
+ if (follow != null) {
+ if (follow instanceof List) {
+ List lFollow = (List) follow;
+ int[] followIds = new int[lFollow.size()];
+ for (int i = 0; i < lFollow.size(); i++) {
+ Object o = lFollow.get(i);
+ if (o instanceof Number) {
+ followIds[i] = ((Number) o).intValue();
+ } else {
+ followIds[i] = Integer.parseInt(o.toString());
+ }
+ }
+ filterQuery.follow(followIds);
+ } else {
+ String[] ids = Strings.commaDelimitedListToStringArray(follow.toString());
+ int[] followIds = new int[ids.length];
+ for (int i = 0; i < ids.length; i++) {
+ followIds[i] = Integer.parseInt(ids[i]);
+ }
+ }
+ }
+ Object locations = filterSettings.get("locations");
+ if (locations != null) {
+ if (locations instanceof List) {
+ List lLocations = (List) locations;
+ double[][] dLocations = new double[lLocations.size()][];
+ for (int i = 0; i < lLocations.size(); i++) {
+ Object loc = lLocations.get(i);
+ double lat;
+ double lon;
+ if (loc instanceof List) {
+ List lLoc = (List) loc;
+ if (lLoc.get(0) instanceof Number) {
+ lat = ((Number) lLoc.get(0)).doubleValue();
+ } else {
+ lat = Double.parseDouble(lLoc.get(0).toString());
+ }
+ if (lLoc.get(1) instanceof Number) {
+ lon = ((Number) lLoc.get(1)).doubleValue();
+ } else {
+ lon = Double.parseDouble(lLoc.get(1).toString());
+ }
+ } else {
+ String[] sLoc = Strings.commaDelimitedListToStringArray(loc.toString());
+ lat = Double.parseDouble(sLoc[0]);
+ lon = Double.parseDouble(sLoc[1]);
+ }
+ dLocations[i] = new double[]{lat, lon};
+ }
+ filterQuery.locations(dLocations);
+ } else {
+ String[] sLocations = Strings.commaDelimitedListToStringArray(locations.toString());
+ double[][] dLocations = new double[sLocations.length / 2][];
+ int dCounter = 0;
+ for (int i = 0; i < sLocations.length; i++) {
+ double lat = Double.parseDouble(sLocations[i]);
+ double lon = Double.parseDouble(sLocations[++i]);
+ dLocations[dCounter++] = new double[]{lat, lon};
+ }
+ filterQuery.locations(dLocations);
+ }
+ }
+ }
}
logger.info("creating twitter stream river for [{}]", user);
@@ -80,7 +164,7 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
if (user == null || password == null) {
stream = null;
indexName = null;
- typeName = null;
+ typeName = "status";
bulkSize = 100;
dropThreshold = 10;
logger.warn("no user / password specified, disabling river...");
@@ -104,6 +188,9 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
}
@Override public void start() {
+ if (stream == null) {
+ return;
+ }
logger.info("starting twitter stream");
try {
String mapping = XContentFactory.jsonBuilder().startObject().startObject(typeName)
@@ -122,7 +209,17 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
}
}
currentRequest = client.prepareBulk();
- stream.sample();
+ if (streamType.equals("filter") || filterQuery != null) {
+ try {
+ stream.filter(filterQuery);
+ } catch (TwitterException e) {
+ logger.warn("failed to create filter stream based on query, disabling river....");
+ }
+ } else if (streamType.equals("firehose")) {
+ stream.firehose(0);
+ } else {
+ stream.sample();
+ }
}
@Override public void close() {