-
Notifications
You must be signed in to change notification settings - Fork 236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add the new change.stream.full.document.before.change
config property
#121
Conversation
…nt.before.change` config property KAFKA-308
} | ||
|
||
@Override | ||
@SuppressWarnings("try") | ||
public synchronized void stop() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed synchronized
as stop
cannot be called concurrently with itself, only with poll
/commit
, but poll
/commit
were neither synchronized
nor used synchronized (this)
blocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
private final Supplier<SourceTaskContext> sourceTaskContextAccessor; | ||
private final Time time; | ||
private volatile boolean isRunning; | ||
private boolean isCopying; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isCopying
is now a plain field because there is no point in setting it to false
in stop
. And without writes from stop
, this field is always accessed sequentially.
@@ -280,7 +280,7 @@ void testBsonSchemaAndValueProducer() { | |||
"Assert schema and value matches", | |||
() -> assertEquals(Schema.BYTES_SCHEMA.schema(), actual.schema()), | |||
// Ensure the data length is truncated. | |||
() -> assertEquals(1071, ((byte[]) actual.value()).length), | |||
() -> assertEquals(1712, ((byte[]) actual.value()).length), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rozza I have no idea where the previous 1071
value came from, or what this assertion is. I put 1712
as this is the new value we have in this test as a result of changes. Should we just delete the assertion as it is useless, given that a reader cannot understand what this is, and doing what I did appears to be the only way to make schema changes, unless the reader has access to arcane knowledge?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you expect this value to change as a result of your changes? At the very least, it tests against an unexpected change to this value, it seems better to fix the comment (I am not sure how it tests that the length is truncated) than to delete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is expected that the serialized value is longer, as it has more data now. But I know neither where the asserted value is supposed to come from, nor why the serialized value is expected to be truncated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not delete the assertion, because it guards against unexpected changes to the value, even though we do not know exactly how to compute the value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iirc it was added to ensure that the size is as expected - it really is testing documentToByteArray and its handling of RawBsonDocuments. So I agree it can be removed from here and is better suited to its own test and / or is already tested in BsonValueToSchemaAndValueTest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the assertions.
StartedMongoSourceTask( | ||
final Supplier<SourceTaskContext> sourceTaskContextAccessor, | ||
final MongoSourceConfig sourceConfig, | ||
final MongoClient mongoClient, | ||
@Nullable final MongoCopyDataManager copyDataManager, | ||
final StatisticsManager statisticsManager) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StartedMongoSourceTask
was extracted from MongoSourceTask
, it gets all its dependencies via a constructor, which makes it unit-testable.
final class StartedMongoSourceTaskTest { | ||
static final class FullDocumentBeforeChangeTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am proposing the idea of grouping unit tests in different static nested classes by functionality, as this prevents cluttering the test state, set up and tear down methods. If you think this is something that should not be done, or that it may be done, but only when we have unit-tests for different parts of functionality, or something else, please express your thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anything that validates the connector and makes the code easier to read / come back to / consume sounds great to me. Do you want to add a separate ticket for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to go and update existing tests, but rather I was curious whether it is fine to start using the approach when writing new tests (and the answer appears to be "yes"), if that seems helpful. As for the existing tests, we may regroup them when we touch them if, again, that seems beneficial in a specific case.
private static final String FULL_DOCUMENT_BEFORE_CHANGE_DOC = | ||
"Specifies the pre-image configuration when creating a Change Stream.\n" | ||
+ "The pre-image is not available in source records published while copying existing data as a result of" | ||
+ " enabling `copy.existing`, and the pre-image configuration has no effect on copying.\n" | ||
+ "See https://www.mongodb.com/docs/manual/reference/method/db.collection.watch/ for more details and possible values."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I linked the server docs here instead of listing and explaining all possible values, because, as always, duplicating the docs eventually leads to the duplicated docs becoming inconsistent and/or obsolete, which results in misleading users, which in turn, makes users to distrust the docs.
In the past, I gave examples of problems our approach introduced in the driver API (where the decision has been and stays "the convenience of reading docs in place is more important than having consistent docs users may trust"), and we have similar situations in Kafka. For example, the documentation for change.stream.full.document
config property states that possible values are only ""
, "default"
, "updateLookup"
. The new values "whenAvailable"
and "required"
are omitted, as they were added later - what should users think about this? Does the connector not support these values? Does it support them, but the authors forgot to specify them? If users actually trust the connector docs (they should not), they will not learn about these two new values, or any values that will be supported by the server in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
KAFKA-308
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good
@@ -280,7 +280,7 @@ void testBsonSchemaAndValueProducer() { | |||
"Assert schema and value matches", | |||
() -> assertEquals(Schema.BYTES_SCHEMA.schema(), actual.schema()), | |||
// Ensure the data length is truncated. | |||
() -> assertEquals(1071, ((byte[]) actual.value()).length), | |||
() -> assertEquals(1712, ((byte[]) actual.value()).length), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you expect this value to change as a result of your changes? At the very least, it tests against an unexpected change to this value, it seems better to fix the comment (I am not sure how it tests that the length is truncated) than to delete.
...ntegrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationJunkTest.java
Outdated
Show resolved
Hide resolved
src/main/java/com/mongodb/kafka/connect/source/statistics/JmxStatisticsManager.java
Show resolved
Hide resolved
…ationTest2` KAFKA-308
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One naming suggestion but LGTM :)
@ExtendWith(MockitoExtension.class) | ||
class MongoSourceTaskTest { | ||
class MongoSourceTaskIntegrationTest2 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion name: MongoSourceTaskVerificationTest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does it mean for a test to be verification test? It appears to me that all tests verify something. I'd prefer to leave the ...IntegrationTest2
name. It won't survive past KAFKA-234.
private static final String FULL_DOCUMENT_BEFORE_CHANGE_DOC = | ||
"Specifies the pre-image configuration when creating a Change Stream.\n" | ||
+ "The pre-image is not available in source records published while copying existing data as a result of" | ||
+ " enabling `copy.existing`, and the pre-image configuration has no effect on copying.\n" | ||
+ "See https://www.mongodb.com/docs/manual/reference/method/db.collection.watch/ for more details and possible values."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
} | ||
|
||
@Override | ||
@SuppressWarnings("try") | ||
public synchronized void stop() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
final class StartedMongoSourceTaskTest { | ||
static final class FullDocumentBeforeChangeTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anything that validates the connector and makes the code easier to read / come back to / consume sounds great to me. Do you want to add a separate ticket for this?
@@ -280,7 +280,7 @@ void testBsonSchemaAndValueProducer() { | |||
"Assert schema and value matches", | |||
() -> assertEquals(Schema.BYTES_SCHEMA.schema(), actual.schema()), | |||
// Ensure the data length is truncated. | |||
() -> assertEquals(1071, ((byte[]) actual.value()).length), | |||
() -> assertEquals(1712, ((byte[]) actual.value()).length), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iirc it was added to ensure that the size is as expected - it really is testing documentToByteArray and its handling of RawBsonDocuments. So I agree it can be removed from here and is better suited to its own test and / or is already tested in BsonValueToSchemaAndValueTest
KAFKA-308
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This new property is equivalent to
ChangeStreamIterable.fullDocumentBeforeChange
in the driver API.Problems:
4 tests inMongoSourceTaskTest
accidentally became integration tests (they need a running MongoDB). These tests need to be refactored.MongoSourceTaskTest
toMongoSourceTaskIntegrationJunkTest
and move it to integration tests. These tests are irredeemable and need to be rewritten to become unit tests.KAFKA-308