-
Notifications
You must be signed in to change notification settings - Fork 191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement in-memory schema retriever #101
Conversation
This sounds great, but I'm a little confused at a high-level. Can you explain how the memory retriever will return the most recent schema, and how we can be sure the cache isn't stale? |
For every incoming message, we insert the schema for the message for that table and topic into the cache. If we see invalid errors when trying to write rows to the database, we call In BigQuery, any fields that are added to a schema must be nullable or repeated. That means we should only try to update the schema when trying to write a message that has new fields. |
return schema; | ||
} | ||
|
||
return SchemaBuilder.struct().build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so if we don't have a record, we just return an empty schema? Is that safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, by returning an empty schema the calling code will create a table without a schema. When we receive our first message and try to add it, we'll hit the invalid schema case from the original code and update the schema with the schema from the message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you leave a comment to this effect? either on or in the function?
@@ -150,6 +153,10 @@ public void put(Collection<SinkRecord> records) { | |||
for (SinkRecord record : records) { | |||
if (record.value() != null) { | |||
PartitionedTableId table = getRecordTable(record); | |||
if (schemaRetriever != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will the schema retriever ever be null, or is this just for ease of testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The schemaRetriever is optional configuration and only required when you want the connector to create the table or update the schema
@@ -62,6 +62,10 @@ public AdaptiveBigQueryWriter(BigQuery bigQuery, | |||
this.schemaManager = schemaManager; | |||
} | |||
|
|||
private boolean isTableMissingSchema(BigQueryException e) { | |||
return e.getReason().equalsIgnoreCase("invalid"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets have a comment linking to https://cloud.google.com/bigquery/troubleshooting-errors and a short explanation so it's clear what "invalid" means to bigquery to anyone reading this code.
Also, does this really need to use equalsIgnoreCase?
} | ||
} catch (BigQueryException e) { | ||
if (isTableMissingSchema(e)) { | ||
attemptSchemaUpdate(tableId, topic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are there really two different types of responses we could get that imply that we require a schema update? We have two paths here leading to the same result (attemptSchemaUpdate)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a table has no schema, it will raise a BigQueryException
and the writeResponse
will never be set. If the table has an incorrect schema, bigQuery.insertAll
will return a writeResponse containing the errors encountered
} | ||
|
||
// If the table was missing its schema, we never received a writeResponse | ||
writeResponse = bigQuery.insertAll(request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems like a duplicate from the try block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a table has no schema, it will raise a BigQueryException
and the writeResponse
will never be set in the try block so we need to retry inserting after adding a schema to the table
@@ -106,6 +120,18 @@ public void performWriteRequest(PartitionedTableId tableId, | |||
logger.debug("table insertion completed successfully"); | |||
} | |||
|
|||
private boolean hasBigQueryResponseErrors(InsertAllResponse writeResponse, InsertAllRequest request) { | |||
return request != null && writeResponse != null && writeResponse.hasErrors(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when is request/writeResponse ever going to be null? I also feel like this is method is hiding some functionality for not much gain (rather than just having this line in the one place where this method seems to be used)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, I'll get rid of this function. writeResponse
should not be null
attemptSchemaUpdate(tableId, topic); | ||
} | ||
} catch (BigQueryException e) { | ||
if (isTableMissingSchema(e)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if I understand correctly, there are two possiblities where we need to update the table schema:
- the table exists and has a schema, but doesn't have some new cols (and thus needs to be updated)
- the table exists but has no/an empty schema. This is new because this is only possible with the MemorySchemaRetriever.
is my understanding accurate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's correct
} | ||
|
||
// If the table was missing its schema, we never received a writeResponse | ||
writeResponse = bigQuery.insertAll(request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still seems to me like we could be doing this twice. If this is only needed if the table was missing its schema, shouldn't this be in the catch block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is in the catch block, I can move it under the if (isTableMissingSchema(e))
to make it more readable though
attemptSchemaUpdate(tableId, topic); | ||
|
||
// If the table was missing its schema, we never received a writeResponse | ||
writeResponse = bigQuery.insertAll(request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I wonder if this should be reformatted a bit.
If we have a write response with invalid schema errors, then we retry the insert with an attempt count (lines 106-121) because the schema update might be delayed. I assume this is also a possibility if there is no schema all all.
So, I feel like this insertAll should be covered in that block, and not here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have no schema, the writeResponse
will be null so we'll run into issues during the while loop's writeResponse.hasErrors
check. Just to clarify, are you thinking we should update the while loop to handle the case where the writeResponse is null? That seems reasonable to me but want to verify before implementing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I think the retry-after-schema-related-failure while loop should be updated to cover the case of this type of schema-related failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last thing!
logger.trace("insertion failed"); | ||
if (onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors())) { | ||
if (writeResponse == null || onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors())) { | ||
// If the table was missing its schema, we never received a writeResponse | ||
logger.debug("re-attempting insertion"); | ||
writeResponse = bigQuery.insertAll(request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should have a try/catch around this, right? because we could get an exception of we have a null/empty schema? It's fine to just catch it and do nothing, but we should make sure the error doesn't percolate up if we intend to just retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, added!
Looks good! |
This change implements an in-memory schema retriever which caches the last seen schema for a given topic (inspiration drawn from how the JDBC sink connector handles schema updates).
Our use case for this schema retriever is to be able to support creating and updating table schemas for serialization formats that do not use the schema registry.