-
Notifications
You must be signed in to change notification settings - Fork 192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix gcs load retry logic #170
Conversation
} catch (ConnectException ex) { | ||
throw new ConnectException("Failed to write rows to GCS", ex); | ||
} catch (InterruptedException ex) { | ||
throw new ConnectException("Thread interrupted while batch writing to GCS.", ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it easy at this point to indicate the name of the connector in the log? Or some identifying mark so we can know which connector is generating the error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm, I'll look into it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mtagle you might look to see if Kafka connect now includes connector info in the log context (MDC). If so, SREs just need to include a new pre-defined variable.
Yup. https://issues.apache.org/jira/browse/KAFKA-3816 .. I recommend just using this once we get to 2.3, since it looks like this is when it was added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so I can't give the connector name but I can give the name of the table it's attempting to get data into. That is hopefully helpful?
logger.info("Batch loaded {} rows", rows.size()); | ||
} | ||
else { | ||
throw new ConnectException(String.format("Failed to load %d rows into GCS within %d attempts.", rows.size(), retries)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly here and in line 128 (logger.info("Batch loaded...). It would be nice to be able to identify the connector that is generating the error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MDC. See above.
} catch (ConnectException ex) { | ||
throw new ConnectException("Failed to write rows to GCS", ex); | ||
} catch (InterruptedException ex) { | ||
throw new ConnectException("Thread interrupted while batch writing to GCS.", ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mtagle you might look to see if Kafka connect now includes connector info in the log context (MDC). If so, SREs just need to include a new pre-defined variable.
Yup. https://issues.apache.org/jira/browse/KAFKA-3816 .. I recommend just using this once we get to 2.3, since it looks like this is when it was added.
logger.warn("Exceptions occurred for table {}, attempting retry", tableId.getTable()); | ||
} | ||
retryCount++; | ||
} while (exceptionsOccurred && (retryCount < retries)); | ||
} while (!success && (retryCount < retries)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possible to move this to a normal while() rather than do/while as a best-practices thing?
if (success) { | ||
logger.info("Batch loaded {} rows", rows.size()); | ||
} | ||
else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Not sure , but :
}
else {
Seems not to style.
logger.info("Batch loaded {} rows", rows.size()); | ||
} | ||
else { | ||
throw new ConnectException(String.format("Failed to load %d rows into GCS within %d attempts.", rows.size(), retries)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MDC. See above.
…etries are actually retries and not total tries. Address other comments.
LGTM! |
wrote a bunch of tests for GCSToBQWriter
fixed retry logic for GCSToBQWriter:
it's possible that this will fix the issues we've been seeing with dropped rows when going through GCS loading.