Skip to content

Commit

Permalink
Clean up KafkaSource
Browse files Browse the repository at this point in the history
  • Loading branch information
ziliu committed Mar 16, 2016
1 parent 6684f17 commit 2c18e3f
Showing 1 changed file with 32 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -122,28 +123,28 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {

@Override
public List<WorkUnit> getWorkunits(SourceState state) {
Map<String, List<WorkUnit>> workUnits = Maps.newConcurrentMap();
Map<String, List<WorkUnit>> workUnits = Maps.newConcurrentMap();

this.kafkaWrapper = this.closer.register(KafkaWrapper.create(state));

List<KafkaTopic> topics = getFilteredTopics(state);
Map<String, State> topicSpecificStateMap = getTopicSpecificState(topics, state);

int numOfThreads =
state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS, ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT);
ExecutorService threadPool = Executors.newFixedThreadPool(numOfThreads,
ExecutorsUtils.newThreadFactory(Optional.of(LOG)));

long startTimeOfCreatingWorkUnits = System.currentTimeMillis();
LOG.info("Begin using thread pool to create work units for topic size " + topics.size());


int numOfThreads = state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT);
ExecutorService threadPool =
Executors.newFixedThreadPool(numOfThreads, ExecutorsUtils.newThreadFactory(Optional.of(LOG)));

Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted();

for (KafkaTopic topic : topics) {
threadPool.submit(new WorkUnitCreator(topic, state, Optional.fromNullable(topicSpecificStateMap.get(topic.getName())), workUnits));
threadPool.submit(new WorkUnitCreator(topic, state,
Optional.fromNullable(topicSpecificStateMap.get(topic.getName())), workUnits));
}

ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(LOG), 1L, TimeUnit.HOURS);
LOG.info("Done using thread pool to create work units. Total time used for creating work units is "
+ (System.currentTimeMillis()-startTimeOfCreatingWorkUnits) + " milliseconds");
LOG.info(String.format("Created %s workunits in %d seconds", workUnits.size(),
createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS)));

// Create empty WorkUnits for skipped partitions (i.e., partitions that have previous offsets,
// but aren't processed).
Expand Down Expand Up @@ -203,7 +204,7 @@ private void createEmptyWorkUnitsForSkippedPartitions(Map<String, List<WorkUnit>

// in case the previous offset not been set
getAllPreviousOffsets(state);

// For each partition that has a previous offset, create an empty WorkUnit for it if
// it is not in this.partitionsToBeProcessed.
for (Map.Entry<KafkaPartition, Long> entry : this.previousOffsets.entrySet()) {
Expand Down Expand Up @@ -352,9 +353,8 @@ private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, SourceSt
return getWorkUnitForTopicPartition(partition, offsets, topicSpecificState);
}


private long getPreviousOffsetForPartition(KafkaPartition partition, SourceState state)
throws PreviousOffsetNotFoundException {
throws PreviousOffsetNotFoundException {

getAllPreviousOffsets(state);

Expand All @@ -367,10 +367,10 @@ private long getPreviousOffsetForPartition(KafkaPartition partition, SourceState

// need to be synchronized as this.previousOffsets need to be initialized once
private synchronized void getAllPreviousOffsets(SourceState state) {
if(this.doneGettingAllPreviousOffsets){
if (this.doneGettingAllPreviousOffsets) {
return;
}

this.previousOffsets.clear();
for (WorkUnitState workUnitState : state.getPreviousWorkUnitStates()) {
List<KafkaPartition> partitions = KafkaUtils.getPartitions(workUnitState);
Expand All @@ -383,7 +383,7 @@ private synchronized void getAllPreviousOffsets(SourceState state) {
}
}
}

this.doneGettingAllPreviousOffsets = true;
}

Expand Down Expand Up @@ -415,7 +415,7 @@ private synchronized boolean shouldMoveToLatestOffset(KafkaPartition partition,

// thread safe
private WorkUnit createEmptyWorkUnit(KafkaPartition partition, long previousOffset,
Optional<State> topicSpecificState) {
Optional<State> topicSpecificState) {
Offsets offsets = new Offsets();
offsets.setEarliestOffset(previousOffset);
offsets.setLatestOffset(previousOffset);
Expand Down Expand Up @@ -526,37 +526,31 @@ public boolean apply(KafkaTopic input) {
return this.topicNamePattern.matcher(input.getName()).matches();
}
}
private class WorkUnitCreator implements Runnable{

private class WorkUnitCreator implements Runnable {
private final KafkaTopic topic;
private final SourceState state;
private final Optional <State> topicSpecificState;
private final Optional<State> topicSpecificState;
private final Map<String, List<WorkUnit>> allTopicWorkUnits;
WorkUnitCreator(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState,
Map<String, List<WorkUnit>> workUnits){

WorkUnitCreator(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState,
Map<String, List<WorkUnit>> workUnits) {
this.topic = topic;
this.state = state;
this.topicSpecificState = topicSpecificState;
this.allTopicWorkUnits = workUnits;
}

@Override
public void run() {
try {
long curTime = System.currentTimeMillis();
LOG.info("Using thread pool to create work units for topic " + this.topic.getName());
List<WorkUnit> oneTopicWorkUnits =
KafkaSource.this.getWorkUnitsForTopic(this.topic, this.state, topicSpecificState);
this.allTopicWorkUnits.put(this.topic.getName(), oneTopicWorkUnits);
LOG.info("Done creating work unit for " + this.topic.getName() + ", created work unit size is "
+ oneTopicWorkUnits.size() + ". Total time used for creating this topic is "
+ (System.currentTimeMillis() - curTime) + " milliseconds");
this.allTopicWorkUnits.put(this.topic.getName(),
KafkaSource.this.getWorkUnitsForTopic(this.topic, this.state, topicSpecificState));
} catch (Throwable t) {
LOG.error("Caught error in creating work unit for " + this.topic.getName() + " : " + t.getMessage(), t);
LOG.error("Caught error in creating work unit for " + this.topic.getName(), t);
throw t;
}
}

}
}

0 comments on commit 2c18e3f

Please sign in to comment.