Skip to content

Commit

Permalink
Added support to write directly to gzip
Browse files Browse the repository at this point in the history
  • Loading branch information
blastarr committed Jun 7, 2017
1 parent 86f7c05 commit 38b04b5
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.text.NumberFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.TimeZone;

import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;

import twitter4j.JSONObject;
Expand All @@ -16,34 +20,46 @@
public class TwitterArchiveFileWriter implements
TwitterArchiveWriter
{
private final static Logger LOGGER = Logger.getLogger(TwitterArchiveFileWriter.class);
private final static Logger LOGGER = Logger.getLogger(
TwitterArchiveFileWriter.class);

private Calendar cal;
private NumberFormat nf;
private String archivePath;
private int fileSplits;
private final boolean zipit;

public TwitterArchiveFileWriter(
final String archivePath,
final int fileSplits ) {
cal = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
if (archivePath == null || archivePath.contains("temp") || archivePath.contains("tmp")) {
final int fileSplits,
final boolean zipit ) {
cal = Calendar.getInstance(
TimeZone.getTimeZone(
"GMT"));
if (archivePath == null || archivePath.contains(
"temp")
|| archivePath.contains(
"tmp")) {
this.archivePath = FileUtils.getTempDirectoryPath();
}
else {
this.archivePath = archivePath;
}

nf = NumberFormat.getIntegerInstance();
nf.setMinimumIntegerDigits(2);
nf.setGroupingUsed(false);
nf.setMinimumIntegerDigits(
2);
nf.setGroupingUsed(
false);

// Clamp file splits to range (1-4)
this.fileSplits = Math.min(
4,
Math.max(
1,
fileSplits));

this.zipit = zipit;
}

@Override
Expand All @@ -54,33 +70,89 @@ public void writeTweet(

File tweetFile = new File(
archivePath,
getArchiveFileNameForDate(status.getCreatedAt()));
getArchiveFileNameForDate(
status.getCreatedAt()));

String tweetLine = json.toString() + "\n";
LOGGER.info(tweetLine);
LOGGER.info(
tweetLine);

if (zipit) {
writeStringToZip(
tweetFile,
tweetLine);
}
else {
FileUtils.writeStringToFile(
tweetFile,
tweetLine,
true);
}
}

public void writeStringToZip(
File tweetFile,
String tweetLine )
throws IOException {
OutputStream out = null;
GzipCompressorOutputStream cos = null;

try {
out = FileUtils.openOutputStream(
tweetFile,
true);

cos = new GzipCompressorOutputStream(
out);

IOUtils.write(
tweetLine,
cos,
Charset.defaultCharset());
}
finally {
IOUtils.closeQuietly(
cos);
IOUtils.closeQuietly(
out);
}

FileUtils.writeStringToFile(
tweetFile,
tweetLine,
true);
}

private String getArchiveFileNameForDate(
Date date ) {
cal.setTime(date);
int year = cal.get(Calendar.YEAR);
int month = cal.get(Calendar.MONTH) + 1;
int day = cal.get(Calendar.DAY_OF_MONTH);
cal.setTime(
date);
int year = cal.get(
Calendar.YEAR);
int month = cal.get(
Calendar.MONTH) + 1;
int day = cal.get(
Calendar.DAY_OF_MONTH);

if (fileSplits == 1) {
return ("tweets-" + nf.format(year) + nf.format(month) + nf.format(day) + "p0" + ".json");
return ("tweets-" + nf.format(
year)
+ nf.format(
month)
+ nf.format(
day)
+ "p0" + ".json");
}

// Split the day; e.g. 00/08/16 if 8-hr split
int hh = cal.get(Calendar.HOUR_OF_DAY);
int hh = cal.get(
Calendar.HOUR_OF_DAY);
int fileSplitHours = 24 / fileSplits;
int part = (hh / fileSplitHours) + 1;

return ("tweets-" + nf.format(year) + nf.format(month) + nf.format(day) + "p" + part + ".json");
return ("tweets-" + nf.format(
year)
+ nf.format(
month)
+ nf.format(
day)
+ "p" + part + ".json"
+ (zipit ? ".gz" : ""));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class TwitterStreamArchiver
private String accessToken;
private String accessSecret;
private String archivePath;
private boolean writeZipped = false;
private boolean init = false;

public TwitterStreamArchiver() {
Expand Down Expand Up @@ -96,6 +97,11 @@ public void init(
numProcessingThreads = Integer.parseInt(processingThreadsStr);
}

String writeZippedStr = twitterProps.getProperty("twitter.archive.writeZipped");
if (writeZippedStr != null) {
writeZipped = Boolean.parseBoolean(writeZippedStr);
}

init = true;
}

Expand All @@ -112,7 +118,8 @@ public void run()

TwitterArchiveWriter archiveWriter = new TwitterArchiveFileWriter(
archivePath,
fileSplits);
fileSplits,
writeZipped);

StatusListener statusListener = new TwitterLocationListener(
archiveWriter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,8 @@ twitter.archive.frequencyMillis=5000
# Processing threads
twitter.archive.threads=4

# Zip format flag
twitter.archive.writeZipped=true

# TODO: Watch box configs?
# Currently using a single whole-world watch box.

0 comments on commit 38b04b5

Please sign in to comment.