Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/main/java/com/pinterest/secor/common/SecorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ public String getMessageParserClass() {
return getString("secor.message.parser.class");
}

public String getUploaderClass() {
return getString("secor.upload.class", "com.pinterest.secor.uploader.Uploader");
}

public String getUploadManagerClass() {
return getString("secor.upload.manager.class");
}
Expand Down Expand Up @@ -463,6 +467,10 @@ public String getString(String name) {
return mProperties.getString(name);
}

public String getString(String name, String defaultValue) {
return mProperties.getString(name, defaultValue);
}

public int getInt(String name) {
checkProperty(name);
return mProperties.getInt(name);
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/pinterest/secor/consumer/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ private void init() throws Exception {
FileRegistry fileRegistry = new FileRegistry(mConfig);
UploadManager uploadManager = ReflectionUtil.createUploadManager(mConfig.getUploadManagerClass(), mConfig);

mUploader = new Uploader(mConfig, mOffsetTracker, fileRegistry, uploadManager);
mUploader = ReflectionUtil.createUploader(mConfig.getUploaderClass());
mUploader.init(mConfig, mOffsetTracker, fileRegistry, uploadManager);
mMessageWriter = new MessageWriter(mConfig, mOffsetTracker, fileRegistry);
mMessageParser = ReflectionUtil.createMessageParser(mConfig.getMessageParserClass(), mConfig);
mMessageTransformer = ReflectionUtil.createMessageTransformer(mConfig.getMessageTransformerClass(), mConfig);
Expand Down
36 changes: 28 additions & 8 deletions src/main/java/com/pinterest/secor/uploader/Uploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,25 @@ public class Uploader {
private ZookeeperConnector mZookeeperConnector;
private UploadManager mUploadManager;

public Uploader(SecorConfig config, OffsetTracker offsetTracker, FileRegistry fileRegistry,
UploadManager uploadManager) {
this(config, offsetTracker, fileRegistry, uploadManager,
new ZookeeperConnector(config));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the javadoc to this 'init' method?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do, even though there was no JavaDoc on the previous constructor ... but I can deduct what is what from the code :-)

/**
* Init the Uploader with its dependent objects.
*
* @param config Secor configuration
* @param offsetTracker Tracker of the current offset of topics partitions
* @param fileRegistry Registry of log files on a per-topic and per-partition basis
* @param uploadManager Manager of the physical upload of log files to the remote repository
*/
public void init(SecorConfig config, OffsetTracker offsetTracker, FileRegistry fileRegistry,
UploadManager uploadManager) {
init(config, offsetTracker, fileRegistry, uploadManager,
new ZookeeperConnector(config));
}

// For testing use only.
public Uploader(SecorConfig config, OffsetTracker offsetTracker, FileRegistry fileRegistry,
UploadManager uploadManager,
ZookeeperConnector zookeeperConnector) {
public void init(SecorConfig config, OffsetTracker offsetTracker, FileRegistry fileRegistry,
UploadManager uploadManager,
ZookeeperConnector zookeeperConnector) {
mConfig = config;
mOffsetTracker = offsetTracker;
mFileRegistry = fileRegistry;
Expand Down Expand Up @@ -164,7 +173,7 @@ private void trim(LogFilePath srcPath, long startOffset) throws Exception {
}
}

private void trimFiles(TopicPartition topicPartition, long startOffset) throws Exception {
protected void trimFiles(TopicPartition topicPartition, long startOffset) throws Exception {
Collection<LogFilePath> paths = mFileRegistry.getPaths(topicPartition);
for (LogFilePath path : paths) {
trim(path, startOffset);
Expand Down Expand Up @@ -201,6 +210,17 @@ private void checkTopicPartition(TopicPartition topicPartition) throws Exception
}
}

/**
* Apply the Uploader policy for pushing partition files to the underlying storage.
*
* For each of the partitions of the file registry, apply the policy for flushing
* them to the underlying storage.
*
* This method could be subclassed to provide an alternate policy. The custom uploader
* class name would need to be specified in the secor.upload.class.
*
* @throws Exception if any error occurs while appying the policy
*/
public void applyPolicy() throws Exception {
Collection<TopicPartition> topicPartitions = mFileRegistry.getTopicPartitions();
for (TopicPartition topicPartition : topicPartitions) {
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/com/pinterest/secor/util/ReflectionUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package com.pinterest.secor.util;

import com.pinterest.secor.common.FileRegistry;
import com.pinterest.secor.common.LogFilePath;
import com.pinterest.secor.common.OffsetTracker;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.io.FileReader;
import com.pinterest.secor.io.FileWriter;
Expand All @@ -25,6 +27,7 @@
import com.pinterest.secor.transformer.MessageTransformer;
import com.pinterest.secor.uploader.UploadManager;

import com.pinterest.secor.uploader.Uploader;
import org.apache.hadoop.io.compress.CompressionCodec;

/**
Expand Down Expand Up @@ -60,6 +63,26 @@ public static UploadManager createUploadManager(String className,
return (UploadManager) clazz.getConstructor(SecorConfig.class).newInstance(config);
}

/**
* Create an Uploader from its fully qualified class name.
*
* The class passed in by name must be assignable to Uploader.
* See the secor.upload.class config option.
*
* @param className The class name of a subclass of Uploader
* @return an UploadManager instance with the runtime type of the class passed by name
* @throws Exception
*/
public static Uploader createUploader(String className) throws Exception {
Class<?> clazz = Class.forName(className);
if (!Uploader.class.isAssignableFrom(clazz)) {
throw new IllegalArgumentException(String.format("The class '%s' is not assignable to '%s'.",
className, Uploader.class.getName()));
}

return (Uploader) clazz.newInstance();
}

/**
* Create a MessageParser from it's fully qualified class name.
* The class passed in by name must be assignable to MessageParser and have 1-parameter constructor accepting a SecorConfig.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public TestUploader(SecorConfig config, OffsetTracker offsetTracker,
FileRegistry fileRegistry,
UploadManager uploadManager,
ZookeeperConnector zookeeperConnector) {
super(config, offsetTracker, fileRegistry, uploadManager, zookeeperConnector);
init(config, offsetTracker, fileRegistry, uploadManager, zookeeperConnector);
mReader = Mockito.mock(FileReader.class);
}

Expand Down