Skip to content

Commit

Permalink
Support s3a hadoop file system.
Browse files Browse the repository at this point in the history
Added tests to run through both with s3n and s3a
Updated hadoop-aws and jets3t packages.
  • Loading branch information
Space committed Aug 26, 2015
1 parent 5a1ea07 commit e00fac6
Show file tree
Hide file tree
Showing 14 changed files with 93 additions and 49 deletions.
1 change: 1 addition & 0 deletions Makefile
Expand Up @@ -15,6 +15,7 @@ integration: build
@mkdir -p $(TEST_HOME)
@tar -xzf $(JAR_FILE) -C $(TEST_HOME)
@cp $(TEST_CONFIG)/* $(TEST_HOME)
@[ ! -e $(CONFIG)/core-site.xml ] && jar uf $(TEST_HOME)/secor-*.jar -C $(TEST_CONFIG) core-site.xml
@[ ! -e $(CONFIG)/jets3t.properties ] && jar uf $(TEST_HOME)/secor-*.jar -C $(TEST_CONFIG) jets3t.properties
cd $(TEST_HOME) && ./scripts/run_tests.sh

Expand Down
23 changes: 20 additions & 3 deletions pom.xml
Expand Up @@ -99,7 +99,7 @@
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
<version>0.7.1</version>
<version>0.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
Expand Down Expand Up @@ -153,13 +153,30 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
<artifactId>hadoop-common</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.7.0</version>
<exclusions>
<exclusion>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
1 change: 1 addition & 0 deletions src/main/config/log4j.dev.properties
Expand Up @@ -5,6 +5,7 @@ log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE

log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=INFO
log4j.appender.CONSOLE.Target=System.err
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] (%C:%L) %-5p %m%n

Expand Down
1 change: 1 addition & 0 deletions src/main/config/log4j.prod.properties
Expand Up @@ -5,6 +5,7 @@ log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE

log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=WARN
log4j.appender.CONSOLE.Target=System.err
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] (%C:%L) %-5p %m%n

Expand Down
5 changes: 5 additions & 0 deletions src/main/config/secor.common.properties
Expand Up @@ -21,6 +21,7 @@
secor.kafka.topic_filter=.*

# AWS authentication credentials.
# Leave empty if using IAM role-based authentication with s3a filesystem.
aws.access.key=
aws.secret.key=

Expand All @@ -40,6 +41,10 @@ aws.secret.key=
aws.region=
aws.endpoint=

# Hadoop filesystem to use. Choices are s3n or s3a.
# See https://wiki.apache.org/hadoop/AmazonS3 for details.
secor.s3.filesystem=s3n

# Zookeeper config.
zookeeper.session.timeout.ms=3000
zookeeper.sync.time.ms=200
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/pinterest/secor/common/SecorConfig.java
Expand Up @@ -135,6 +135,8 @@ public int getMessagesPerSecond() {
return getInt("secor.messages.per.second");
}

public String getS3FileSystem() { return getString("secor.s3.filesystem"); }

public String getS3Bucket() {
return getString("secor.s3.bucket");
}
Expand All @@ -143,6 +145,9 @@ public String getS3Path() {
return getString("secor.s3.path");
}

public String getS3Prefix() {
return getS3FileSystem() + "://" + getS3Bucket() + "/" + getS3Path();
}
public String getLocalPath() {
return getString("secor.local.path");
}
Expand Down
Expand Up @@ -86,8 +86,6 @@ private String[] getFinalizedUptoPartitions(String topic) throws Exception {
}

private void finalizePartitionsUpTo(String topic, String[] uptoPartitions) throws Exception {
final String s3Prefix = "s3n://" + mConfig.getS3Bucket() + "/" + mConfig.getS3Path();

LOG.info("Finalize up to (but not include) {}, dim: {}",
uptoPartitions, uptoPartitions.length);

Expand All @@ -98,7 +96,7 @@ private void finalizePartitionsUpTo(String topic, String[] uptoPartitions) throw
// Stop at the first partition which already have the SUCCESS file
for (int i = 0; i < mLookbackPeriods; i++) {
LOG.info("Looking for partition: " + Arrays.toString(previous));
LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, previous,
LogFilePath logFilePath = new LogFilePath(mConfig.getS3Prefix(), topic, previous,
mConfig.getGeneration(), 0, 0, mFileExtension);
String logFileDir = logFilePath.getLogFileDir();
if (FileUtil.exists(logFileDir)) {
Expand Down Expand Up @@ -165,7 +163,7 @@ private void finalizePartitionsUpTo(String topic, String[] uptoPartitions) throw
}

// Generate the SUCCESS file at the end
LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, current,
LogFilePath logFilePath = new LogFilePath(mConfig.getS3Prefix(), topic, current,
mConfig.getGeneration(), 0, 0, mFileExtension);
String logFileDir = logFilePath.getLogFileDir();
String successFilePath = logFileDir + "/_SUCCESS";
Expand Down
8 changes: 2 additions & 6 deletions src/main/java/com/pinterest/secor/tools/LogFileVerifier.java
Expand Up @@ -48,16 +48,12 @@ public LogFileVerifier(SecorConfig config, String topic) throws IOException {
new HashMap<TopicPartition, SortedMap<Long, HashSet<LogFilePath>>>();
}

private String getPrefix() {
return "s3n://" + mConfig.getS3Bucket() + "/" + mConfig.getS3Path();
}

private String getTopicPrefix() {
return getPrefix() + "/" + mTopic;
return mConfig.getS3Prefix() + "/" + mTopic;
}

private void populateTopicPartitionToOffsetToFiles() throws IOException {
String prefix = getPrefix();
String prefix = mConfig.getS3Prefix();
String topicPrefix = getTopicPrefix();
String[] paths = FileUtil.listRecursively(topicPrefix);
for (String path : paths) {
Expand Down
Expand Up @@ -44,8 +44,7 @@ public HadoopS3UploadManager(SecorConfig config) {
}

public Handle<?> upload(LogFilePath localPath) throws Exception {
String s3Prefix = "s3n://" + mConfig.getS3Bucket() + "/" + mConfig.getS3Path();
LogFilePath s3Path = localPath.withPrefix(s3Prefix);
LogFilePath s3Path = localPath.withPrefix(mConfig.getS3Prefix());
final String localLogFilename = localPath.getLogFilePath();
final String s3LogFilename = s3Path.getLogFilePath();
LOG.info("uploading file {} to {}", localLogFilename, s3LogFilename);
Expand Down
27 changes: 17 additions & 10 deletions src/main/java/com/pinterest/secor/util/FileUtil.java
Expand Up @@ -18,6 +18,7 @@

import com.pinterest.secor.common.SecorConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -33,19 +34,25 @@
* @author Pawel Garbacki (pawel@pinterest.com)
*/
public class FileUtil {
private static SecorConfig mConfig = null;
private static Configuration mConf = new Configuration(true);

public static void configure(SecorConfig config) {
mConfig = config;
if (config != null) {
if (config.getAwsAccessKey().isEmpty() != config.getAwsSecretKey().isEmpty()) {
throw new IllegalArgumentException(
"Must specify both aws.access.key and aws.secret.key or neither.");
}
if (!config.getAwsAccessKey().isEmpty()) {
mConf.set(Constants.ACCESS_KEY, config.getAwsAccessKey());
mConf.set(Constants.SECRET_KEY, config.getAwsSecretKey());
mConf.set("fs.s3n.awsAccessKeyId", config.getAwsAccessKey());
mConf.set("fs.s3n.awsSecretAccessKey", config.getAwsSecretKey());
}
}
}

public static FileSystem getFileSystem(String path) throws IOException {
Configuration conf = new Configuration();
if (mConfig != null) {
conf.set("fs.s3n.awsAccessKeyId", mConfig.getAwsAccessKey());
conf.set("fs.s3n.awsSecretAccessKey", mConfig.getAwsSecretKey());
}
return FileSystem.get(URI.create(path), conf);
return FileSystem.get(URI.create(path), mConf);
}

public static String[] list(String path) throws IOException {
Expand All @@ -56,7 +63,7 @@ public static String[] list(String path) throws IOException {
if (statuses != null) {
for (FileStatus status : statuses) {
Path statusPath = status.getPath();
if (path.startsWith("s3://") || path.startsWith("s3n://")) {
if (path.startsWith("s3://") || path.startsWith("s3n://") || path.startsWith("s3a://")) {
paths.add(statusPath.toUri().toString());
} else {
paths.add(statusPath.toUri().getPath());
Expand Down Expand Up @@ -119,7 +126,7 @@ public static long getModificationTimeMsRecursive(String path) throws IOExceptio
for (FileStatus fileStatus : statuses) {
Path statusPath = fileStatus.getPath();
String stringPath;
if (path.startsWith("s3://") || path.startsWith("s3n://")) {
if (path.startsWith("s3://") || path.startsWith("s3n://") || path.startsWith("s3a://")) {
stringPath = statusPath.toUri().toString();
} else {
stringPath = statusPath.toUri().getPath();
Expand Down
49 changes: 28 additions & 21 deletions src/main/scripts/run_tests.sh
Expand Up @@ -25,7 +25,8 @@
# mvn package
# mkdir /tmp/test
# cd /tmp/test
# tar -zxvf ~/git/optimus/secor/target/secor-0.1-SNAPSHOT-bin.tar.gz
# tar -zxvf ~/git/optimus/secor/target/secor-0.2-SNAPSHOT-bin.tar.gz
#
# # copy Hadoop native libs to lib/, or change HADOOP_NATIVE_LIB_PATH to point to them
# ./scripts/run_tests.sh
#
Expand All @@ -52,6 +53,9 @@ declare -A READER_WRITERS
READER_WRITERS[json]=com.pinterest.secor.io.impl.DelimitedTextFileReaderWriterFactory
READER_WRITERS[binary]=com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory

# Hadoop supports multiple implementations of the s3 filesytem
S3_FILESYSTEMS=${S3_FILESYSTEMS:-s3a s3n}

# The minimum wait time is one minute plus delta. Secor is configured to upload files older than
# one minute and we need to make sure that everything ends up on s3 before starting verification.
WAIT_TIME=${SECOR_WAIT_TIME:-120}
Expand Down Expand Up @@ -400,26 +404,29 @@ check_for_native_libs
stop_s3
start_s3

for key in ${!READER_WRITERS[@]}; do
MESSAGE_TYPE=${key}
ADDITIONAL_OPTS=-Dsecor.file.reader.writer.factory=${READER_WRITERS[${key}]}
echo "********************************************************"
echo "Running tests for Message Type: ${MESSAGE_TYPE} and ReaderWriter: ${READER_WRITERS[${key}]}"
post_and_verify_test
if [ ${MESSAGE_TYPE} = "binary" ]; then
# Testing finalizer in partition mode
post_and_finalizer_verify_test hr
post_and_finalizer_verify_test dt
fi
start_from_non_zero_offset_test
move_offset_back_test
if [ ${MESSAGE_TYPE} = "json" ]; then
post_and_verify_compressed_test
elif [ -z ${SKIP_COMPRESSED_BINARY} ]; then
post_and_verify_compressed_test
else
echo "Skipping compressed tests for ${MESSAGE_TYPE}"
fi
for fkey in ${S3_FILESYSTEMS}; do
FILESYSTEM_TYPE=${fkey}
for key in ${!READER_WRITERS[@]}; do
MESSAGE_TYPE=${key}
ADDITIONAL_OPTS="-Dsecor.s3.filesystem=${FILESYSTEM_TYPE} -Dsecor.file.reader.writer.factory=${READER_WRITERS[${key}]}"
echo "********************************************************"
echo "Running tests for Message Type: ${MESSAGE_TYPE} and ReaderWriter:${READER_WRITERS[${key}]} using filesystem: ${FILESYSTEM_TYPE}"
post_and_verify_test
if [ ${MESSAGE_TYPE} = "binary" ]; then
# Testing finalizer in partition mode
post_and_finalizer_verify_test hr
post_and_finalizer_verify_test dt
fi
start_from_non_zero_offset_test
move_offset_back_test
if [ ${MESSAGE_TYPE} = "json" ]; then
post_and_verify_compressed_test
elif [ -z ${SKIP_COMPRESSED_BINARY} ]; then
post_and_verify_compressed_test
else
echo "Skipping compressed tests for ${MESSAGE_TYPE}"
fi
done
done

stop_s3
7 changes: 7 additions & 0 deletions src/test/config/core-site.xml
@@ -0,0 +1,7 @@
<configuration>
<property>
<name>fs.s3a.endpoint</name>
<description>For testing override the endpoint to fakes3</description>
<value>http://localhost:5000</value>
</property>
</configuration>
1 change: 1 addition & 0 deletions src/test/config/jets3t.properties
@@ -1,3 +1,4 @@
s3service.https-only=false
s3service.s3-endpoint-http-port=5000
s3service.s3-endpoint=localhost
s3service.disable-dns-buckets=true
5 changes: 2 additions & 3 deletions src/test/java/com/pinterest/secor/uploader/UploaderTest.java
Expand Up @@ -124,8 +124,7 @@ public void testUploadFiles() throws Exception {
Mockito.when(
mOffsetTracker.getTrueCommittedOffsetCount(mTopicPartition))
.thenReturn(11L);
Mockito.when(mConfig.getS3Bucket()).thenReturn("some_bucket");
Mockito.when(mConfig.getS3Path()).thenReturn("some_s3_parent_dir");
Mockito.when(mConfig.getS3Prefix()).thenReturn("s3a://some_bucket/some_s3_parent_dir");

HashSet<LogFilePath> logFilePaths = new HashSet<LogFilePath>();
logFilePaths.add(mLogFilePath);
Expand All @@ -142,7 +141,7 @@ public void testUploadFiles() throws Exception {
FileUtil.moveToS3(
"/some_parent_dir/some_topic/some_partition/some_other_partition/"
+ "10_0_00000000000000000010",
"s3n://some_bucket/some_s3_parent_dir/some_topic/some_partition/"
"s3a://some_bucket/some_s3_parent_dir/some_topic/some_partition/"
+ "some_other_partition/10_0_00000000000000000010");
Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition);
Mockito.verify(mZookeeperConnector).setCommittedOffsetCount(
Expand Down

0 comments on commit e00fac6

Please sign in to comment.