Skip to content

Commit

Permalink
improve rolling top words to take advantage of improved serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Dec 2, 2011
1 parent b0805c8 commit 98915c3
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 8 deletions.
1 change: 0 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
[org.clojure/clojure-contrib "1.2.0"]
[org.twitter4j/twitter4j-core "2.2.5-SNAPSHOT"]
[org.twitter4j/twitter4j-stream "2.2.5-SNAPSHOT"]
[com.googlecode.json-simple/json-simple "1.1"]
]

:dev-dependencies [[storm "0.6.0"]
Expand Down
8 changes: 3 additions & 5 deletions src/jvm/storm/starter/bolt/MergeObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.json.simple.JSONValue;


public class MergeObjects implements IBasicBolt {
Expand Down Expand Up @@ -66,7 +65,7 @@ public void prepare(Map stormConf, TopologyContext context) {
public void execute(Tuple tuple, BasicOutputCollector collector) {


List<List> merging = (List) JSONValue.parse(tuple.getString(0));
List<List> merging = (List) tuple.getValue(0);
for(List pair : merging) {

Integer existingIndex = _find(pair.get(0));
Expand All @@ -93,9 +92,8 @@ public int compare(List o1, List o2) {

long currentTime = System.currentTimeMillis();
if(_lastTime==null || currentTime >= _lastTime + 2000) {
String fullRankings = JSONValue.toJSONString(_rankings);
collector.emit(new Values(fullRankings));
LOG.info("Rankings: " + fullRankings);
collector.emit(new Values(_rankings));
LOG.info("Rankings: " + _rankings);
_lastTime = currentTime;
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/jvm/storm/starter/bolt/RankObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.json.simple.JSONValue;


public class RankObjects implements IBasicBolt {
Expand Down Expand Up @@ -90,7 +89,7 @@ public int compare(List o1, List o2) {

long currentTime = System.currentTimeMillis();
if(_lastTime==null || currentTime >= _lastTime + 2000) {
collector.emit(new Values(JSONValue.toJSONString(_rankings)));
collector.emit(new Values(_rankings));
_lastTime = currentTime;
}
}
Expand Down

0 comments on commit 98915c3

Please sign in to comment.