Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exiting WorkerSinkTask due to unrecoverable exception #36

Open
ramyogi opened this issue Dec 20, 2019 · 17 comments
Open

Exiting WorkerSinkTask due to unrecoverable exception #36

ramyogi opened this issue Dec 20, 2019 · 17 comments

Comments

@ramyogi
Copy link
Contributor

ramyogi commented Dec 20, 2019

When there is a problem with Message and SOLR is not able to index and returns exception so we get in to the below , How do we resolve this and skipping this message and move forward. Even if I restart it is keep stuck with this message,
In my connector configuration , Like this, Still it is stuck.
behavior.on.malformed.documents=warn
solr.commit.within=100
errors.tolerance=all
errors.log.enable=true

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)

@ramyogi
Copy link
Contributor Author

ramyogi commented Dec 20, 2019

I would like to provide what solr client throws exception. Ideally this will not occur but if it is occuring we should have solution to proceed so please suggest some or can I take a look your code and provide pull request ?

Caused by: org.apache.solr.client.solrj.impl.CloudSolrClient$RouteException: Error from server at http://localhost/solr/realtimeindexing_shard6_replica_n7: Exception writing document id 12345678 to the index; possible analysis error: Document contains at least one immense term in field="abc" (whose UTF8 encoding is longer than the max length 32766), all of which were skipped. Please correct the analyzer to not produce such terms. The prefix of the first immense term is: '[49, 48, 46, 49, 49, 48, 51, 47, 80, 104, 121, 115, 82, 101, 118, 76, 101, 116, 116, 46, 57, 51, 46, 49, 51, 48, 54, 48, 51, 80]...', original message: bytes can be at most 32766 in length; got 38490. Perhaps the document has an indexed string field (solr.StrField) which is too large
at org.apache.solr.client.solrj.impl.CloudSolrClient.getRouteException(CloudSolrClient.java:125)
at org.apache.solr.client.solrj.impl.CloudSolrClient.getRouteException(CloudSolrClient.java:46)
at org.apache.solr.client.solrj.impl.BaseCloudSolrClient.directUpdate(BaseCloudSolrClient.java:549)
at org.apache.solr.client.solrj.impl.BaseCloudSolrClient.sendRequest(BaseCloudSolrClient.java:1037)

@ramyogi
Copy link
Contributor Author

ramyogi commented Dec 20, 2019

Reason behind is : below exception only caught.
catch (SolrServerException | IOException ex) {
throw new RetriableException(ex);
}
SOLR cloud exception above is different.

@jcustenborder
Copy link
Owner

Isn't this fatal though?

Caused by: org.apache.solr.client.solrj.impl.CloudSolrClient$RouteException: Error from server at http://localhost/solr/realtimeindexing_shard6_replica_n7: Exception writing document id 12345678 to the index; possible analysis error: Document contains at least one immense term in field="abc" (whose UTF8 encoding is longer than the max length 32766), all of which were skipped. Please correct the analyzer to not produce such terms. The prefix of the first immense term is: '[49, 48, 46, 49, 49, 48, 51, 47, 80, 104, 121, 115, 82, 101, 118, 76, 101, 116, 116, 46, 57, 51, 46, 49, 51, 48, 54, 48, 51, 80]...', original message: bytes can be at most 32766 in length; got 38490. Perhaps the document has an indexed string field (solr.StrField) which is too large

@ramyogi
Copy link
Contributor Author

ramyogi commented Dec 21, 2019

Correct, this is the reason. But this should be retried and go to Dead letter queue to proceed further messages, In current situation it stuck not moving at all.

@jcustenborder
Copy link
Owner

That's not how the dead letter topic works unfortunately. It's only for deserialization errors.

@ramyogi
Copy link
Contributor Author

ramyogi commented Dec 24, 2019

How to resolve this issue, if there is an error in message which is unrecoverable. Just ignore that message and proceed. As I am doing SOLR in schema managed so some messages data is coming unexpectedly. Any suggestion you could provide very helpful.

@ramyogi
Copy link
Contributor Author

ramyogi commented Dec 30, 2019

In the elastic search plugin they provide option drop invalid message and proceeding so probably we could add the flag like this and catch this kind of exception to proceed further. Right now the Task is stuck and partition offset not moving So need some kind of handling this.

catch (ConnectException convertException) {
if (dropInvalidMessage) {
log.error(
"Can't convert record from topic {} with partition {} and offset {}. "

  • "Error message: {}",
    sinkRecord.topic(),
    sinkRecord.kafkaPartition(),
    sinkRecord.kafkaOffset(),
    convertException.getMessage()
    );
    } else {
    throw convertException;
    }
    }

@hartmut-co-uk
Copy link

+1
this is a valid situation - I'd also welcome an option to 'unblock' by either log+skip (the proposed behavior.on.malformed.documents=warn)

@hartmut-co-uk
Copy link

@ramyogi did you find a solution / settled with an alternative / forked?

@jcustenborder
Copy link
Owner

We could potentially add support for something like this. The concern I have is most if not all the examples were problems where due to infrastructure failing. Meaning we'd fail on the next message anyway.

@hartmut-co-uk
Copy link

hartmut-co-uk commented Oct 21, 2020

Hmm valid concern!
In case of infrastructure failing / timeout/network or other temporary issues - it certainly wouldn't make sense to skip or move messages to dead-letter queue.
Would something more fine granular be feasible?
TBH I'm not familiar with the Solr Java client lib - so I don't know about exception/error behaviour.

@jcustenborder
Copy link
Owner

I think it boils down to a limitation of the SOLR api. This connector specifically uses add(Collection docs) to index documents. This is the fastest way to write data to Solr. Each batch of records that get written to poll are converted and sent. Do you all know of a way for me to figure out which document failed? That would be immensely helpful in this use case. The alternative is to use add(SolrInputDocument doc) which comes with the warning Adds a single document Many SolrClient implementations have drastically slower indexing performance when documents are added individually. Document batching generally leads to better indexing performance and should be used whenever possible. Without being able to figure out which document(s) failed I'd have to report the entire batch as failed.

@ramyogi7283
Copy link

Yes Jeremy, we should be able to log the document unique Id if the situation occurs. Right now this kind of error completely stuck and offset not moving at all. Manually we need to delete the record and resume the kafkaconnect process. I am.looking forward your suggestion then I can prepare a PR .

@jcustenborder
Copy link
Owner

The issue is it's not going to be A document it's going to be a batch of documents. If we send 50 documents to solr and one is bad which one is it?

@hartmut-co-uk
Copy link

is there a config setting to change the batch size?
For a (still manual work involved) recovery process - one could:

  1. stop/delete the connect task
  2. restart with batch size=1
  3. process until again stuck at the exact record causing the failure
  4. restart with 'skip on error' enabled
  5. stop and set config back to initial state (~ batch size >1, 'skipOnError'=false)

Or as an alternative - move the entire failing batch to a dl topic for further manual analysis / manual 'replay'.
Depends on the use case - but it might be preferable over the entire sink task to freeze and block.

@jcustenborder
Copy link
Owner

Yes Jeremy, we should be able to log the document unique Id if the situation occurs. Right now this kind of error completely stuck and offset not moving at all. Manually we need to delete the record and resume the kafkaconnect process. I am.looking forward your suggestion then I can prepare a PR .

The problem is I am unclear on how to determine which document is the offending one. Meaning that if we did something where we dumped to the log, we would have to do the whole batch which could be 500 documents pending on how you did your batch size. The part I really don't like is throwing away a batch over a single document or two.

is there a config setting to change the batch size?
For a (still manual work involved) recovery process - one could:

  1. stop/delete the connect task
  2. restart with batch size=1
  3. process until again stuck at the exact record causing the failure
  4. restart with 'skip on error' enabled
  5. stop and set config back to initial state (~ batch size >1, 'skipOnError'=false)

You can do with with max.poll.records. It's a standard kafka setting. You might need to set this at the worker level pending on the kafka connect version.

Or as an alternative - move the entire failing batch to a dl topic for further manual analysis / manual 'replay'.
Depends on the use case - but it might be preferable over the entire sink task to freeze and block.

Unfortunately dlq functionality is only for serialization at the moment. You don't have an api to produce messages to a DLQ from a connector. I'd literally have to create a producer to do so and that would mean pulling in all of the producer settings.

@hartmut-co-uk
Copy link

Unfortunately dlq functionality is only for serialization at the moment. You don't have an api to produce messages to a DLQ from a connector. I'd literally have to create a producer to do so and that would mean pulling in all of the producer settings.

Note: Kafka 2.6 shipped KIP-610
Which brings such capability.
Though with batching and given error scenario - the limitation is the same - the entire batch would have to be 'skipped' and sent to DLQ.

The part I really don't like is throwing away a batch over a single document or two.

Yes, this would probably apply for most use cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants