Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create MongoSinkTask unit tests for DQL-related behavior #91

Merged
merged 2 commits into from
Dec 14, 2021

Conversation

stIncMale
Copy link
Member

@stIncMale stIncMale commented Dec 2, 2021

This is a subtask of KAFKA-257 created to produce smaller PRs. It is based on the changes done in #88.

KAFKA-268

@@ -291,4 +226,91 @@ private String generateWriteErrors(
}
return "[" + String.join(", ", errorString) + "]";
}

static final class StartedMongoSinkTask {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refactoring in MongoSinkTask did not change any logic, but merely extracted the instance logic that is not related to the Kafka Connect-specific lifecycle of the MongoSinkTask to a new class StartedMongoSinkTask, which can now be easily unit tested.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this class is needed, the stages are all still linked to the Kafka lifestyle - eg: stop and put.

So this is just a mechanism for setting the mongo client? Is there a cleaner way to achieve that for testing. Do you think it would be more readable if StartedMongoSinkTask was moved into its own class / file eg: MongoSinkTaskImpl.java

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not make it a top-level class initially, because I was trying to minimize the changes in MongoSinkTask. But I am happy to have such a change. Done.

So this is just a mechanism for setting the mongo client?

Yes it's just manual dependency injection (DI):

  • It results in dependencies of StartedMongoSinkTask (sink config, MongoDB client, error reporter) being expressed clearly, which improves readability;
  • It allows the code that creates such tasks to control the implementation of dependencies, which decouples StartedMongoSinkTask from those implementations. This is crucial for unit tests, because they now can easily specify mocked implementations.

the stages are all still linked to the Kafka lifestyle - eg: stop and put

The important part is that the two aforementioned points are still true. I poorly expressed the idea in the initial comment, but hopefully now it is clearer.

Is there a cleaner way to achieve that for testing.

I don't think there is. DI is applied here right to its point. And of all different styles of DI, constructor injection is the cleanest one.

// batch3
Records.simpleValid(TEST_TOPIC, 4)),
asList(0, 1, 2, 3, 4),
emptyList());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KAFKA-257 will change this empty expectation to asList(2, 3):

  • 2 will be reported because it has a corresponding write error;
  • 3 will be reported because the bulk write is ordered, which means that 3 is not attempted to be written.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

Copy link
Member

@rozza rozza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good - some stylistic comments for consideration but not necessarily requiring action.

@@ -291,4 +226,91 @@ private String generateWriteErrors(
}
return "[" + String.join(", ", errorString) + "]";
}

static final class StartedMongoSinkTask {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this class is needed, the stages are all still linked to the Kafka lifestyle - eg: stop and put.

So this is just a mechanism for setting the mongo client? Is there a cleaner way to achieve that for testing. Do you think it would be more readable if StartedMongoSinkTask was moved into its own class / file eg: MongoSinkTaskImpl.java

@stIncMale stIncMale changed the base branch from KAFKA-267 to master December 14, 2021 07:04
@stIncMale stIncMale requested a review from rozza December 14, 2021 07:33
Copy link
Member

@rozza rozza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

// batch3
Records.simpleValid(TEST_TOPIC, 4)),
asList(0, 1, 2, 3, 4),
emptyList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@stIncMale stIncMale merged commit 9950d8c into mongodb:master Dec 14, 2021
@stIncMale stIncMale deleted the KAFKA-268 branch December 14, 2021 16:30
ypmahajan added a commit to confluentinc/mongo-kafka that referenced this pull request Jun 1, 2022
* Removed unused travis.yml

KAFKA-250

* Add debug logging to MongoSourceTask (mongodb#83)

KAFKA-261

* Evergreen use https for git clone

DRIVERS-1970

* Commit changes done by auto-formatting

Running `./gradlew check` results in some files being reformatted.
I don't know why these files have not been reformatted,
but I think this must be done to avoid interference with
future changes.

KAFKA-267

* Remove the retry mechanism in MongoSinkTask

KAFKA-267

* Clarify that the user does not need to do anything beyond removing obsolete configuration properties

KAFKA-267

* Clarify where to look for RAM/CPU restrictions when using `docker/run.sh`

KAFKA-267

* Improve the warning message

KAFKA-267

* Spotless code formatting updates

* Create MongoSinkTask unit tests for DQL-related behavior (mongodb#91)

KAFKA-268

* KAFKA 271 (mongodb#93)

* Update gradlew to 7.3.1

Added gradle.properties to support spotless / google format java via compile exports

Updated the following to support Java 17:

  * Spotless plugin
  * Spotbugs plugin
  * Buildconfig plugin
  * Junit 5 dependency
  * Mockito dependency
  * Hamcrest dependency

KAFKA-271

* Ensure Union types are validated correctly. (mongodb#94)

KAFKA-270

* Report MongoDB errors to the dead letter queue (DLQ) (mongodb#95)

KAFKA-257

* Add support for the `bulk.write.ordered` sink connector property (mongodb#96)

KAFKA-253

* Mention `bulk.write.ordered` in `CHANGELOG.md` (mongodb#98)

KAFKA-253

* Added `copy.existing.allow.disk.use` configuration

Allows the copy existing aggregation to use temporary
disk storage if required. Defaults to true but can be
disabled if the user doesn't have the permissions
for disk access.

KAFKA-265

* Use the default value from the configuration in MongoCopyDataManagerTest

KAFKA-265

* Test: Remove deprecated @RunWith(JUnitPlatform.class) annotations

* Mongo sink connector must tolerate the `ErrantRecordReporter` being not available (mongodb#100)

KAFKA-286

* fixed source setup in docker example (mongodb#99)

Added json key and value converter to the MongoDB Kafka
Source Connector, the default Avro converter does not
handle mongo jsons properly adding binary values in front
of each key and value. The behaviour could be verified
by running ./run.sh from the docker folder

KAFKA-285

Co-authored-by: Gleb Abroskin <gabroskin@fun.co>

* Log incompatible configuration properties (mongodb#101)

Specifically, when the CDC handler is configured
(https://docs.mongodb.com/kafka-connector/current/sink-connector/configuration-properties/cdc/)
configuration properties for post processors
https://docs.mongodb.com/kafka-connector/current/sink-connector/configuration-properties/post-processors/
and configuration properties for the ID strategy
are ignored.

KAFKA-277

* [KAFKA-262] Support user defined topic separator (mongodb#86)

KAFKA-262

Co-authored-by: Valentin Kovalenko <valentin.kovalenko@mongodb.com>

* Update the driver version to 4.5.0 (mongodb#102)

KAFKA-279

* Evergreen: Added publish release tag task

KAFKA-291

* Make the code formatting checker happy

* Stop converting unsupported `fixed` Avro types (mongodb#104)

* Correctly process namespaces when converting `org.apache.avro.Schema` to `org.apache.kafka.connect.data.Schema` (mongodb#103)

`org.apache.kafka.connect.data.Schema` does not separate a namespace from a short name,
and always uses full names, while `org.apache.avro.Schema` separates them.
The "always uses full names" part about `org.apache.kafka.connect.data.Schema`
is not documented anywhere, but based on the following example
https://docs.confluent.io/platform/current/tutorials/examples/connect-streams-pipeline/docs/index.html#example-3-jdbc-source-connector-with-specificavro-key-string-null-and-value-specificavro
and the `SetSchemaMetadata`
(https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java),
it is clear that the name in `org.apache.kafka.connect.data.Schema`
is supposed to be the full name.

Avro supports namespaces only for schemas of records, enums, and fixed types,
of which MongoDB Kafka Connector supports only records.
Therefore, we need to care about using `org.apache.avro.Schema.getFullName`
as `org.apache.kafka.connect.data.Schema.name()` only for record schemas.

It is worth pointing out that as a result of this change, MongoDB Kafka Connector
will start storing a bit different Avro schemas in Confluent Schema Registry
via `AvroConverter`
(https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java).
This is not supposed to cause any compatibility issues for connectors, judging from
https://docs.confluent.io/platform/current/schema-registry/avro.html#schema-evolution-and-compatibility,
but will be visible for those users who explicitly fetch schemas from Schema Registry.
It is unclear why users would do that and how, but apparently it is what the user
reported KAFKA-246 is doing. Here are some thoughts on how a user may fetch relevant schemas from Schema Registry:
- `AvroConverter.fromConnectData` encodes the ID of a registered schema as part of the data
it produces, which is then stored as key/value of a Kafka record. A user may decode this ID from the raw data
and use it to fetch the schema by ID (`SchemaRegistryClient.getSchemaById`).
- `SchemaRegistryClient.getLatestSchemaMetadata("subject").getSchema()`, where subject is
determined by the subject name strategy
(https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#sr-schemas-subject-name-strategy).

Other useful links:
- https://docs.confluent.io/platform/current/schema-registry/
- https://avro.apache.org/docs/current/spec.html#names

KAFKA-246

* Version: bump 1.7.0

* Version: bump 1.8.0-SNAPSHOT

* Improve copying data logging.

KAFKA-309

* Add `release.sh`

Co-authored-by: Ross Lawley <ross.lawley@gmail.com>
Co-authored-by: Jeff Yemin <jeff.yemin@mongodb.com>
Co-authored-by: Valentin Kovalenko <valentin.kovalenko@mongodb.com>
Co-authored-by: Valentin Kovalenko <valentin.male.kovalenko@gmail.com>
Co-authored-by: Gleb Abroskin <abroskingleb@gmail.com>
Co-authored-by: Gleb Abroskin <gabroskin@fun.co>
Co-authored-by: wangxianghu <wangxianghu@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants