Skip to content

Commit

Permalink
Merge pull request #22 from mmolimar/fix/close_reader
Browse files Browse the repository at this point in the history
Close reader in source task
  • Loading branch information
mmolimar committed Dec 25, 2017
2 parents 015296d + 5f1d0b5 commit 032dbd9
Showing 1 changed file with 1 addition and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ public List<SourceRecord> poll() throws InterruptedException {
final List<SourceRecord> results = new ArrayList<>();
List<FileMetadata> files = filesToProcess();
files.forEach(metadata -> {
try {
try (FileReader reader = policy.offer(metadata, context.offsetStorageReader())) {
log.info("Processing records for file {}", metadata);
FileReader reader = policy.offer(metadata, context.offsetStorageReader());
while (reader.hasNext()) {
results.add(convert(metadata, reader.currentOffset(), reader.next()));
}
Expand Down

0 comments on commit 032dbd9

Please sign in to comment.