Skip to content

Commit

Permalink
FIx #1201: Break down KaizenHarvester into simpler pieces
Browse files Browse the repository at this point in the history
Introduce KaizenQuery class to support different methods to store queries that Kaizen needs to process
  • Loading branch information
singhpratyush committed May 29, 2017
1 parent d202df9 commit 905f096
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 43 deletions.
89 changes: 46 additions & 43 deletions src/org/loklak/harvester/strategy/KaizenHarvester.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -40,26 +39,25 @@ public class KaizenHarvester implements Harvester {
private final int SUGGESTIONS_COUNT;
private final int SUGGESTIONS_RANDOM;
private final int PLACE_RADIUS;
private final int QUERIES_LIMIT;
private final boolean VERBOSE;
private final DateFormat dateToString = new SimpleDateFormat("yyyy-MM-dd");

private Random random;

private HashSet<String> queries = new HashSet<>();
private KaizenQueries queries = null;
private ExecutorService executorService = Executors.newFixedThreadPool(1);

private Twitter twitter = null;

public KaizenHarvester() {
public KaizenHarvester(KaizenQueries queries) {
BACKEND = DAO.getConfig("backend", "http://loklak.org");
SUGGESTIONS_COUNT = DAO.getConfig("harvester.kaizen.suggestions_count", 1000);
SUGGESTIONS_RANDOM = DAO.getConfig("harvester.kaizen.suggestions_random", 5);
PLACE_RADIUS = DAO.getConfig("harvester.kaizen.place_radius", 5);
QUERIES_LIMIT = DAO.getConfig("harvester.kaizen.queries_limit", 500);
VERBOSE = DAO.getConfig("harvester.kaizen.verbose", true);

random = new Random();
this.queries = queries;

TwitterFactory twitterFactory = TwitterAPI.getAppTwitterFactory();

Expand All @@ -72,24 +70,16 @@ public KaizenHarvester() {
"client.twitterConsumerKey, client.twitterConsumerSecret)");
}

private void addQuery(String query) {
if (QUERIES_LIMIT > 0 && queries.size() > QUERIES_LIMIT)
return;

if (queries.contains(query))
return;

if (VERBOSE)
DAO.log("Adding '" + query + "' to queries");

queries.add(query);
public KaizenHarvester() {
this(KaizenQueries.getDefaultKaizenQueries(DAO.getConfig("harvester.kaizen.queries_limit", 500)));
}

private void grabInformation(Timeline timeline) {
String query = timeline.getQuery();
if (VERBOSE)
if (VERBOSE) {
DAO.log("Kaizen is going to grab more information" +
(query != null ? " from results of '" + query + "'" : ""));
}

Date oldestTweetDate = null;

Expand All @@ -102,25 +92,28 @@ private void grabInformation(Timeline timeline) {
oldestTweetDate = message.getCreatedAt();
}

for (String user : message.getMentions())
addQuery("from:" + user);
for (String user : message.getMentions()) {
this.queries.addQuery("from:" + user);
}

for (String hashtag : message.getHashtags())
addQuery(hashtag);
for (String hashtag : message.getHashtags()) {
this.queries.addQuery(hashtag);
}

String place = message.getPlaceName();
if (!place.isEmpty())
addQuery("near:\"" + message.getPlaceName() + "\" within:" + PLACE_RADIUS + "mi");
if (!place.isEmpty()) {
this.queries.addQuery("near:\"" + message.getPlaceName() + "\" within:" + PLACE_RADIUS + "mi");
}
}

if (query != null && oldestTweetDate != null) {
String oldestTweetDateStr = dateToString.format(oldestTweetDate);
int startIndex = query.indexOf("until:");
if (startIndex == -1) {
addQuery(query + " until:" + oldestTweetDateStr);
this.queries.addQuery(query + " until:" + oldestTweetDateStr);
} else {
int endIndex = startIndex + 16; // until:yyyy-MM-dd = 16
addQuery(query.replace(query.substring(startIndex + 6, endIndex), oldestTweetDateStr));
this.queries.addQuery(query.replace(query.substring(startIndex + 6, endIndex), oldestTweetDateStr));
}
}
}
Expand All @@ -131,11 +124,11 @@ private void pushToBackend(Timeline timeline) {
}

private int harvestMessages() {
if (VERBOSE)
DAO.log(queries.size() + " available queries, Harvest season!");
if (VERBOSE) {
DAO.log(this.queries.getSize() + " available queries, Harvest season!");
}

String query = queries.iterator().next();
queries.remove(query);
String query = this.queries.getQuery();

if (VERBOSE)
DAO.log("Kaizen is going to harvest messages with query '" + query + "'");
Expand Down Expand Up @@ -166,15 +159,19 @@ private int harvestMessages() {

private void grabTrending() {
try {
if (VERBOSE)
if (VERBOSE) {
DAO.log("Kaizen is going to get trending topics ...");
}

for (Location location : twitter.trends().getAvailableTrends())
for (Trend trend : twitter.trends().getPlaceTrends(location.getWoeid()).getTrends())
addQuery(trend.getQuery());
for (Location location : twitter.trends().getAvailableTrends()) {
for (Trend trend : twitter.trends().getPlaceTrends(location.getWoeid()).getTrends()) {
this.queries.addQuery(trend.getQuery());
}
}
} catch (TwitterException e) {
if (e.getErrorCode() != 88)
if (e.getErrorCode() != 88) {
DAO.severe(e);
}
}
}

Expand All @@ -188,17 +185,19 @@ private void grabSuggestions() {
"desc", "retrieval_next", 0, null, "now",
"retrieval_next", SUGGESTIONS_RANDOM);

if (VERBOSE)
if (VERBOSE) {
DAO.log("Backend gave us " + suggestedQueries.size() + " suggested queries");
}

for (QueryEntry query : suggestedQueries) {
addQuery(query.getQuery());
this.queries.addQuery(query.getQuery());
}

if (suggestedQueries.size() == 0) {
if (VERBOSE)
DAO.log("It looks like backend doesn't have any suggested queries. "+
if (VERBOSE) {
DAO.log("It looks like backend doesn't have any suggested queries. " +
"Grabbing relevant context from backend collected messages ...");
}

Timeline timeline = SearchServlet.search(BACKEND, "", Timeline.Order.CREATED_AT, "cache",
SUGGESTIONS_RANDOM, 0, SearchServlet.backend_hash, 60000);
Expand All @@ -213,14 +212,18 @@ private void grabSuggestions() {
grabTrending();
}

@Override
public int harvest() {
protected boolean shallHarvest() {
float targetProb = random.nextFloat();
float prob = 0.5F;
if (QUERIES_LIMIT > 0) {
prob = queries.size() / (float)QUERIES_LIMIT;
if (this.queries.getMaxSize() > 0) {
prob = queries.getSize() / (float)queries.getMaxSize();
}
if (!queries.isEmpty() && targetProb < prob) {
return !this.queries.isEmpty() && targetProb < prob;
}

@Override
public int harvest() {
if (this.shallHarvest()) {
return harvestMessages();
}

Expand Down
58 changes: 58 additions & 0 deletions src/org/loklak/harvester/strategy/KaizenQueries.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.loklak.harvester.strategy;

import java.util.HashSet;

/**
* KaizenQueries are objects that holds the query strings for KaizenHarvester.
*/
public abstract class KaizenQueries {

public abstract boolean addQuery(String query);

public abstract String getQuery();

public abstract int getSize();

public abstract int getMaxSize();

public boolean isEmpty() {
return this.getSize() == 0;
}

public static KaizenQueries getDefaultKaizenQueries(int qLimit) {
return new KaizenQueries() {

HashSet<String> queries = new HashSet<>();
int queryLimit = qLimit;

@Override
public boolean addQuery(String query) {
if (this.queryLimit > 0 && this.queries.size() > this.queryLimit)
return false;

if (queries.contains(query))
return false;

this.queries.add(query);
return true;
}

@Override
public String getQuery() {
String query = this.queries.iterator().next();
this.queries.remove(query);
return query;
}

@Override
public int getSize() {
return this.queries.size();
}

@Override
public int getMaxSize() {
return this.queryLimit;
}
};
}
}

0 comments on commit 905f096

Please sign in to comment.