Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing reader.close ? #21

Closed
youngboy opened this issue Dec 24, 2017 · 3 comments
Closed

Missing reader.close ? #21

youngboy opened this issue Dec 24, 2017 · 3 comments

Comments

@youngboy
Copy link

youngboy commented Dec 24, 2017

Hi, I just encountering s3a read timeout error in worker

kafka-connect_1     | org.apache.kafka.connect.errors.ConnectException: An error has occurred when creating reader for file: s3a://XXXXX
kafka-connect_1     | 	at com.github.mmolimar.kafka.connect.fs.policy.AbstractPolicy.offer(AbstractPolicy.java:208)
kafka-connect_1     | 	at com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy.offer(SleepyPolicy.java:11)
kafka-connect_1     | 	at com.github.mmolimar.kafka.connect.fs.FsSourceTask.lambda$poll$0(FsSourceTask.java:76)
kafka-connect_1     | 	at java.util.ArrayList.forEach(ArrayList.java:1249)
kafka-connect_1     | 	at com.github.mmolimar.kafka.connect.fs.FsSourceTask.poll(FsSourceTask.java:73)
kafka-connect_1     | 	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:179)
kafka-connect_1     | 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
kafka-connect_1     | 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
kafka-connect_1     | 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
kafka-connect_1     | 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
kafka-connect_1     | 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
kafka-connect_1     | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
kafka-connect_1     | 	at java.lang.Thread.run(Thread.java:745)
kafka-connect_1     | Caused by: java.io.InterruptedIOException: getFileStatus on s3a://XXXXXX: com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
kafka-connect_1     | 	at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:141)
kafka-connect_1     | 	at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:117)
kafka-connect_1     | 	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:1857)
kafka-connect_1     | 	at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:1820)
kafka-connect_1     | 	at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1761)
kafka-connect_1     | 	at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:583)
kafka-connect_1     | 	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:914)
kafka-connect_1     | 	at com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader.<init>(TextFileReader.java:39)
kafka-connect_1     | 	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
kafka-connect_1     | 	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
kafka-connect_1     | 	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
kafka-connect_1     | 	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
kafka-connect_1     | 	at com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils.make(ReflectionUtils.java:31)
kafka-connect_1     | 	at com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils.makeReader(ReflectionUtils.java:19)
kafka-connect_1     | 	at com.github.mmolimar.kafka.connect.fs.policy.AbstractPolicy.offer(AbstractPolicy.java:205)
kafka-connect_1     | 	... 12 more
...

I have to increase policy.fs.fs.s3a.connection.maximum setting to solve this temporarily.

But I guess call reader.close is the right way to do it, In SourceTask#Poll https://github.com/mmolimar/kafka-connect-fs/blob/master/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTask.java#L74

more: aws/aws-sdk-java#269 (comment)

@mmolimar
Copy link
Owner

Yes, you're right. I've applied the fix.
Could you try it again (after updating the source code)?

@youngboy
Copy link
Author

With latest fix, the error disappear. Thanks!

@xiaobai5150
Copy link

xiaobai5150 commented Oct 25, 2018

I think ,anthor place should add reader.close ,
TextFileReader.java seeek()
@OverRide
public void seek(Offset offset) {
if (offset.getRecordOffset() < 0) {
throw new IllegalArgumentException("Record offset must be greater than 0");
}
try {
if (offset.getRecordOffset() < reader.getLineNumber()) {

           this.reader.close();
          this.reader= null;
            this.reader = new LineNumberReader(new InputStreamReader(getFs().open(getFilePath())));
            currentLine = null;
        }
        while ((currentLine = reader.readLine()) != null) {
            if (reader.getLineNumber() - 1 == offset.getRecordOffset()) {
                this.offset.setOffset(reader.getLineNumber());
                return;
            }
        }
        this.offset.setOffset(reader.getLineNumber());
    } catch (IOException ioe) {
        throw new ConnectException("Error seeking file " + getFilePath(), ioe);
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants