Permalink
Browse files

added a twitter dump reader

- also fixed bad fs backend storage init method name
  • Loading branch information...
1 parent b0aabd3 commit 7926b6ca7bc3d51d4338016f799391d71aecc366 Matthieu Morel committed May 12, 2011
@@ -42,7 +42,7 @@
private long blankCount = 0;
private String streamName;
- private LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue<String>();
+ protected LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue<String>();
private Set<io.s4.listener.EventHandler> handlers = new HashSet<io.s4.listener.EventHandler>();
public void setUserid(String userid) {
@@ -0,0 +1,66 @@
+package io.s4.example.twittertopiccount;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
+import java.io.InputStreamReader;
+import java.util.zip.GZIPInputStream;
+
+/**
+ *
+ * Adapter for injecting twitter data from a twitter dump file, rather than from a live http stream.
+ *
+ * The twitter dumps must be located in a directory in the file system
+ *
+ */
+public class TwitterFeedReader extends TwitterFeedListener {
+
+ String frequencyBySecond;
+ String twitterDumpsDir;
+ String twitterDumpsNamePattern = "\\A.+\\.gz\\z";
+
+ @Override
+ public void connectAndRead() throws Exception {
+ System.out.println("Reading files from dir " + twitterDumpsDir + " matching: " + twitterDumpsNamePattern);
+ File[] dumps = new File(twitterDumpsDir)
+ .listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.matches(twitterDumpsNamePattern);
+ }
+ });
+ for (File dump : dumps) {
+ System.out.println("Reading file : " + dump.getAbsolutePath());
+ GZIPInputStream gzipIs = new GZIPInputStream(new FileInputStream(
+ dump));
+ InputStreamReader isr = new InputStreamReader(gzipIs);
+ BufferedReader br = new BufferedReader(isr);
+ String line = null;
+ while ((line = br.readLine()) != null) {
+ // only consider lines with twitter json-encoded data
+ if (line.startsWith("{")) {
+// System.out.println("Adding line : " + line);
+ messageQueue.add(line);
+ Thread.sleep((1000 / Integer.valueOf(frequencyBySecond)));
+ }
+ }
+ br.close();
+ }
+ System.out.println("OK, read all dump files. Exiting normally.");
+ System.exit(0);
+ }
+
+ public void setFrequencyBySecond(String frequencyBySecond) {
+ this.frequencyBySecond = frequencyBySecond;
+ }
+
+ public void setTwitterDumpsDir(String twitterDumpsDir) {
+ this.twitterDumpsDir = twitterDumpsDir;
+ }
+
+ public void setTwitterDumpsNamePattern(String twitterDumpsNamePattern) {
+ this.twitterDumpsNamePattern = twitterDumpsNamePattern;
+ }
+
+}
@@ -13,5 +13,20 @@
<property name="urlString" value="http://stream.twitter.com/1/statuses/sample.json"/>
<property name="streamName" value="RawStatus"/>
</bean>
+
+ <!--
+ // uncomment this for reading from twitter dumps rather than live stream
+ <bean id="twitterFeedReader" class="io.s4.example.twittertopiccount.TwitterFeedReader"
+ init-method="init">
+ <property name="streamName" value="RawStatus"/>
+ // twitter dumps are assumed to reside in a directory
+ <property name="twitterDumpsDir" value=""/>
+ // you may filter dump files through a regex on the file name. Here is the default filter, that may be omitted
+ <property name="twitterDumpsNamePattern" value="\A.+\.gz\z"/>
+ // injection rate
+ <property name="frequencyBySecond" value="1000"/>
+ </bean>
+ -->
+
</beans>
@@ -172,7 +172,7 @@
<property name="hasher" ref="hasher"/>
</bean>
- <bean id="fsStateStorage" class="io.s4.ft.DefaultFileSystemStateStorage" init-method="checkStorageDir">
+ <bean id="fsStateStorage" class="io.s4.ft.DefaultFileSystemStateStorage" init-method="init">
<!-- if not specified, default is <current_dir>/tmp/storage
<property name="storageRootPath" value="${storage_root_path}" /> -->
</bean>

0 comments on commit 7926b6c

Please sign in to comment.