Skip to content

Add embedded Druid cluster tests #18147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking β€œSign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 36 commits into from
Jun 28, 2025
Merged

Conversation

kfaraz
Copy link
Contributor

@kfaraz kfaraz commented Jun 16, 2025 β€’

Summary

  • ⏳ Run integration tests without the need to mvn build, create a Docker image or even run startup scripts
  • πŸš€ Just write a JUnit @Test and run it from your IDE! πŸŽ‰ πŸŽ‰
  • πŸ”‘ Run tests against a fully functioning Druid cluster running in a single JVM
  • πŸ”§ Complete control over all aspects of the Druid services being tested
  • 🎩 Load any combination of Druid extensions
  • 🐞 Debug the test and the services using debug points and the web-console!
  • πŸ“¦ Get embedded ZooKeeper + Derby metadata store by default (non embedded versions are supported too)

Explore

πŸ’₯ Run the EmbeddedKafkaClusterMetricsTest to see a simulation in action:

  • Start a Druid cluster which uses the KafkaEmitter to publish cluster metrics to a topic
  • Post a KafkaSupervisor to ingest the metrics back into the same Druid cluster! 🐢 πŸ–
  • Put debug points in the test to keep the web-console running and explore the metrics!

Usage

  1. ✏️ Write a JUnit 5 test class that extends EmbeddedClusterTestBase
  2. πŸ”© Define your cluster in the method createCluster()
  3. πŸ§ͺ Write a @Test method
  4. πŸš€ Run it!

Define a cluster

EmbeddedDruidCluster cluster = EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper();
cluster.add(new EmbeddedOverlord());
cluster.add(new EmbeddedIndexer());

Add an extension module

cluster.addExtension(KafkaIndexTaskModule.class);

Requirements:

  • Module must extend DruidModule
  • Module (and corresponding META-INF/services file) must be on the classpath of the test
    • EITHER add a dependency to the corresponding maven module containing (e.g. maven module for KafkaIndexTaskModule is druid-kafka-indexing-service)
    • OR add your test in the same maven module

Example test:

public EmbeddedDruidCluster createCluster()
{
final EmbeddedDruidCluster cluster = EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper();
kafkaServer = new EmbeddedKafkaServer(cluster.getZookeeper(), cluster.getTestFolder(), Map.of());
cluster.addExtension(KafkaIndexTaskModule.class)
.addResource(kafkaServer)
.useLatchableEmitter()
.addServer(new EmbeddedCoordinator())
.addServer(overlord)
.addServer(indexer)
.addServer(broker);
return cluster;
}

Add common properties

cluster.addCommonProperty("druid.monitoring.emissionPeriod", "PT1S");

These properties correspond to the file common.runtime.properties that are applied on all the Druid services.

Add service-specific properties

EmbeddedOverlord overlord = new EmbeddedOverlord();
overlord.addProperty("druid.manager.segments.useIncrementalCache", "always");

These properties override the common runtime properties and correspond to the service-specific runtime.properties files.

Add a cluster-managed resource

cluster.addResource(new EmbeddedZookeeper());

Resources are

  • anything outside Druid services used by the cluster during its lifecycle.
  • started and stopped with the cluster.
  • implementations of the interface EmbeddedResource.

e.g. EmbeddedKafkaServer, EmbeddedZookeeper, InMemoryDerbyResource

Motivation

Druid ITs can be cumbersome to write, run and debug. They are very resource intensive , require multiple steps even to verify a simple change and are difficult to get right.

Owing to this barrier in writing new tests, Druid IT coverage of certain areas has been low.

This patch aims to address this issue by introducing a new framework to run lightweight integration tests using a fully-functioning Druid cluster running in a single process.

Goals

Simulation tests aim to:

  • 🦸🏻 Increase developer productivity
  • πŸ—οΈ Fill the test coverage gap between class-level unit tests and docker-based integration tests.
  • βœ… Simplify testing out new features on a full Druid cluster.
  • 🍰 Easily define different cluster setups in the test itself without needing to rely on a bunch of config files.
  • πŸ”§ Provide complete control over all aspects of the service(s) being tested.
  • 🧡 Test scalability and concurrency handling in critical pieces like TaskQueue, TaskLockbox, TaskRunner, etc.
  • 🐞 Debug failures quickly (a typical cluster takes < 1s to start and can then be used for any number of tests).
  • 🐎 Discover race conditions due to the fast nature of the test (already discovered some while putting this up πŸ™‚ ).
  • 😈 Use real Druid services and no mocked dependencies.

Non-goals

Embedded cluster tests are not meant to:

  • Replace docker-based ITs (standard or revised)
  • Test running a multi-process Druid cluster
  • Test Druid docker image

Design

  • Run required Druid services in an embedded mode, that internally launch the corresponding Cli* runnable
  • Each Druid service is sandboxed and has its own separate Guice injector
  • No override of default Guice dependencies provided by the Cli* runnables
  • No mocks
  • Write out task logs and segments to temp directory
  • Choose extensions to load via config druid.extensions.modulesForEmbeddedTest
  • Add custom resources to the Druid cluster (e.g. EmbeddedKafkaServer, EmbeddedZookeeper)

Changes to review in core Druid

  • Wire up JettyServerInitializer properly so that multiple Jetty servers may be bound correctly within the same JVM
  • Add new test-only config druid.extensions.modulesForEmbeddedTest which takes in a list of module class names that
    should be loaded in a simulation test
  • Minor fix in SeekableStreamTaskRunner to allow taskDuration to be less than 1 minute
  • Add method OverlordClient.postSupervisor to hit API POST /druid/indexer/v1/supervisor
  • Update methods in test classes TestDerbyConnector and CuratorTestBase to have public access

New classes to review

  • EmbeddedDruidCluster: defines a Druid cluster used in an embedded cluster test
  • EmbeddedResource: defines any resource with a lifecycle that can be used in an EmbeddedDruidCluster
  • EmbeddedDruidServer that can provide a DruidServerResource with a lifecycle
  • EmbeddedZookeeper: uses the existing CuratorTestBase to run an embedded zk server
  • EmbeddedKafkaServer: an embedded Kafka server mostly copying code from TestBroker utility class. We can merge these two in a follow up PR.
  • JUnit 5 base test class EmbeddedClusterTestBase
  • LatchableEmitter to wait for metric events using countdown latches rather than repeatedly polling Druid APIs

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@kfaraz kfaraz changed the title [WIP] Add Druid cluster simulation tests [WIP] Add Druid cluster simulations (docker-less mini integration tests) Jun 17, 2025
@kfaraz kfaraz requested a review from gianm June 17, 2025 05:24
@abhishekagarwal87
Copy link
Contributor

Great work @kfaraz. How do extensions get enabled/disabled in these embedded tests? Are those supported yet?

@kfaraz
Copy link
Contributor Author

kfaraz commented Jun 17, 2025 β€’

Thanks, @abhishekagarwal87 !

Edit:
Using extensions is already supported.

  • We can load any combination of extension modules as long as they are on the class path. Since these are plain UTs, that simply means adding a maven dependency to the respective extension module.
  • Required modules should be specified in the cluster definition. These are then populated in druid.extensions.modulesForSimulation (as opposed to production which uses druid.extensions.loadList).
  • We can also add custom resources to an embedded cluster such as an EmbeddedKafkaServer

A test in this PR is already using the Kafka extension as a proof-of-concept.

The syntax would be something like:

EmbeddedKafkaServer kafkaServer = new EmbeddedKafkaServer();
EmbeddedDruidCluster cluster = EmbeddedDruidCluster.withExtensions(List.of(KafkaIndexTaskModule.class))
                                                   .addServer(new EmbeddedCoordinator())
                                                   .addServer(new EmbeddedIndexer())
                                                   .addServer(new EmbeddedOverlord())
                                                   .addResource(kafkaServer);

To nicely wire up the dependencies and the SDK, I am currently evaluating two alternatives described below.

I am leaning towards B. What are your thoughts?

Option A: Keep the test framework in a separate module simulation-tests

  • Currently implemented in this PR
  • All simulation tests would live in this module itself
  • When we want to write a simulation test for an extension, say Kinesis indexing service,
    simulation-tests module would need to depend on that extension.

Pros:

  • The framework, embedded resources for extensions and tests all live in one module.
  • Embedded clusters can use any mix of extensions (e.g. say MySQL metadata store + Kafka ingestion)

Option B: Keep the test framework in services/test package

  • This is highest level Druid core module containing the Cli* runnables
  • Write simulations for core Druid in services/test itself
  • Extensions can then depend on services/test and use the SDK to define their own cluster resources
    e.g. Kafka server (embedded or otherwise) and run tests using these resources

Pros:

  • More maintainable in the long run
  • Keep the simulation tests in the same package as the code being tested
  • Simulations will be added organically as a part of feature development
  • Simulations count towards Jacoco coverage for extensions
  • Clean maven dependencies

Cons:

  • Embedded cluster would be able to use only one extension at a time since extensions don't seem
    to be able to depend on each other
  • Possible remedy: Add a module that depends on multiple extensions. Simulation tests that use multiple extensions would have to live in this module. A module which may already serve this purpose is integration-tests.

Comment on lines 256 to 257
@Inject
CoordinatorClient coordinator;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand why this injection will happen - an instance of it is only present as a private field of this class - could you give a hint? :)

Copy link
Contributor Author

@kfaraz kfaraz Jun 17, 2025 β€’

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guice takes care of since we have bound it using .toInstance() in the method getInitModules().

binder.bind(ServiceClientHolder.class).toInstance(clientHolder)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting; I wasn't expecting that during a toInstance call that will be interpreted; somehow I was thinking guice have to be the one creating the original instance...makes sense ; but I still find it a little bit odd :)
I won't forget this trick :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

πŸ˜ƒ

@kfaraz
Copy link
Contributor Author

kfaraz commented Jun 17, 2025

@kgyrtkirk , thanks for the suggestions! Will try to address them.
Also, please let me know if you have any thoughts on #18147 (comment).

@kgyrtkirk
Copy link
Member

kgyrtkirk commented Jun 18, 2025 β€’

regarding comment

I would really like the separate module - but placing these tests where they should belong will be more in front of someone working on that part...so I think its better to put it inside an existing module - it might be possible to create a separate source set for it; but that would only be a logical isolation (probably with no real benefits)

we could have a module which loads multiple (or all) extensions so that more complicated scenarios could be tested....actually that was the reason I created quidem-ut - maybe we could rename that module so that the multi-extension simulation tests could live there along with the qtest stuff?

@kfaraz
Copy link
Contributor Author

kfaraz commented Jun 18, 2025

I would really like the separate module - but placing these tests where they should belong will be more in front of someone working on that part...so I think its better to put it inside an existing module - it might be possible to create a separate source set for it; but that would only be a logical isolation (probably with no real benefits)

Thanks for the feedback, @kgyrtkirk ! I feel the same.

we could have a module which loads multiple (or all) extensions so that more complicated scenarios could be tested....actually that was the reason I created quidem-ut - maybe we could rename that module so that the multi-extension simulation tests could live there along with the qtest stuff?

Yeah, that would be nice! I originally thought we could put it into integration-tests module but the quidem UT module is more closely related. We could just call that module simulation-tests since quidem UTs are also essentially a type of simulation.

@github-actions github-actions bot added Area - Batch Ingestion Area - Querying Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Jun 19, 2025

// Get the task ID
List<TaskStatusPlus> taskStatuses = ImmutableList.copyOf(
getResult(cluster.leaderOverlord().taskStatuses(null, dataSource, 1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question - For this TC, their are 2 partitions, so number of tasks should be 2 as well right?

Or we are using a single task for demonstration purpose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The requirement is Number of tasks <= Number of partitions.
A single task can read from any number of partitions.
We launch more tasks to increase ingestion throughput.

);
}

private List<ProducerRecord<byte[], byte[]>> generateRecordsForTopic(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: There should be a provision to produce records for each partition

* This class and most of its methods are kept package protected as they are used
* only by the specific server implementations in the same package.
*/
public abstract class EmbeddedDruidServer implements ServerReferencesProvider
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since almost every class is called EmbeddedX ; I feel like it would probably be an option to call these "embedded tests" ?

Copy link
Contributor Author

@kfaraz kfaraz Jun 25, 2025 β€’

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that could work too πŸ‘πŸ»

I had originally set out to write a simulation framework (that only emulated the behaviour of a cluster and didn't need to be an actual Druid cluster) but it ended up being a single-process embedded cluster instead.

As for the name, the word "simulation" stuck and it also made for a small and easy catchy suffix for test names like XyzSimTest, so I kept it anyway. But I am not opposed to calling it something else either.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my 2Β’: to me the word "simulation" is misleading given the final design- I don't believe anything is really being simulated. IMO using "embedded" across the board is better. I suggest searching for "simulate" or "simulation" and adjusting them so we can keep the wording consistent. As for XyzSimTest, could be XyzEmbedTest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, will make the change.

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work. I've always wished for a testing system like this.

/**
* Simulation tests for batch {@link IndexTask} using inline datasources.
*/
public class IndexTaskSimTest extends IndexingSimulationTestBase
Copy link
Contributor

@gianm gianm Jun 26, 2025 β€’

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quidem-ut is now not really an appropriate name for this module, as these tests aren't using quidem. I don't think you need to change it in this PR necessarily, as there's already a lot of other changes and I don't want to get into a debate over the name in this PR πŸ˜„. A follow up to change the name would be useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. πŸ‘πŸ»

I could just put these tests in a separate module, if that seems better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well be a separate module named embedded-tests. The main reason I say that is I am struggling to think of a good name for the combined quidem + embedded test module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That works!

I guess quidem tests are also essentially embedded tests. It's just that they don't use a full cluster but only a Broker-like setup.

I am not sure how much value it would add, but we could try piping quidem queries through an embedded cluster itself in the future. If we ever do that, we could have just the one module, embedded-tests.

import java.util.Map;

/**
* Base class for simulation tests related to ingestion and indexing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO would be better to have this be a collection of static utility methods. The reason is that if we ever add another kind of "test base" with different kinds of convenience functions, a single test class won't be able to use both sets of convenience functions, due to the impossibility of multiple inheritance. The presence of runSql in this class is a hint that something is off (that isn't an indexing API).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if these methods shouldn't just go into the EmbeddedDruidCluster class itself, given that they are always used to interact with a specific cluster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would also work. It would make the calling code look nicer.

The main downside I can think of is that at some point, there may be a lot of functions and the EmbeddedDruidCluster class could get pretty long. But if that happens, there are ways to address it. For example we could split the functions out into default methods in interfaces that extend DruidClusterApi and have EmbeddedDruidCluster implement DruidClusterApi and multiple of those other interfaces. Please don't do that now though. I'm just thinking through how it might evolve in the future. For now I think adding the methods directly to EmbeddedDruidCluster is good.

Copy link
Contributor Author

@kfaraz kfaraz Jun 27, 2025 β€’

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! Ended up doing something similar.

  • Renamed the class IndexingSimulationTestBase to EmbeddedClusterApis
  • Kept some methods like createTaskFromPayload() and createTestDatasourceName() as static
  • Created an instance of EmbeddedClusterApis for each EmbeddedDruidCluster
  • Tests can now invoke methods as follows:
// general usage pattern
cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));

cluster.callApi().waitForTaskToSucceed(taskId);

// shorthand for SQL
cluster.runSql(...);

);
return new DutySchedule(
killConfig.getDutyPeriod().toStandardDuration().getMillis(),
Duration.standardSeconds(10).getMillis()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional that this was changed this from 1 minute to 10 seconds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it helps with the tests finishing faster and doesn't really affect a prod setup.
I could also just pass this value as a config too for consistency and let the default be 1 minute.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it doesn't really affect a prod setup then it's imo ok to leave it like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ended up changing this slightly since I realized that it is possible that an Overlord has just come up and has been elected leader but the segment metadata cache has still not warmed up (if operating in mode ifSynced).

The unused segment killer performs better with the cache fully synced.

So using the following values which feel good enough for all cases. We can revisit them later if needed.

duty period = 1 hour,
initial delay = duty period / 4 = 15 mins

@@ -679,10 +679,10 @@ private void pollSegments()

stopwatch.stop();
emitMetric("segment/poll/time", stopwatch.millisElapsed());
log.info(
/*log.info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unintentional; please restore the original code if so.

@@ -30,7 +31,12 @@ public class SupervisorReport<T>
private final DateTime generationTime;
private final T payload;

public SupervisorReport(String id, DateTime generationTime, T payload)
@JsonCreator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't yet found the place where this deserializer is used or tested- where is that? I wanted to check something about its usage: when there's a totally-generic thing like T as a property, Jackson doesn't know what T to use unless a TypeReference is provided. So it's important that the deserialization be done in that specific way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks for catching this. I had intended to use this but it is not being used anymore. Will revert this.

public EmbeddedMiddleManager()
{
// Don't sync lookups as cluster might not have a Coordinator
// addProperty("druid.lookup.enableLookupSyncOnStartup", "false");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't include commented-out code.

* This class and most of its methods are kept package protected as they are used
* only by the specific server implementations in the same package.
*/
public abstract class EmbeddedDruidServer implements ServerReferencesProvider
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my 2Β’: to me the word "simulation" is misleading given the final design- I don't believe anything is really being simulated. IMO using "embedded" across the board is better. I suggest searching for "simulate" or "simulation" and adjusting them so we can keep the wording consistent. As for XyzSimTest, could be XyzEmbedTest.


/**
* {@link EmbeddedResource} for an embedded zookeeper cluster that can be used
* as a JUnit Rule in simulation tests.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment about "JUnit Rule" correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, will clean it up.

@kfaraz
Copy link
Contributor Author

kfaraz commented Jun 26, 2025 β€’

Thanks for the feedback, @gianm and for catching the leftover superfluous changes!
I will address your comments and take another pass to ensure that no extra changes have made their way into this PR.

@kfaraz kfaraz changed the title Add Druid cluster simulations (docker-less mini integration tests) Add embedded Druid cluster tests Jun 26, 2025
Comment on lines +168 to +170
final int numSegments = Integer.parseInt(
cluster.runSql("SELECT COUNT(*) FROM sys.segments WHERE datasource = '%s'", dataSource)
);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note test

Potential uncaught 'java.lang.NumberFormatException'.
Comment on lines +173 to +175
final int numRows = Integer.parseInt(
cluster.runSql("SELECT COUNT(*) FROM %s", dataSource)
);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note test

Potential uncaught 'java.lang.NumberFormatException'.
Comment on lines +231 to +235
InlineSchemaDataSourceCompactionConfig
.builder()
.forDataSource(dataSource)
.withSkipOffsetFromLatest(Period.seconds(0))
.withMaxRowsPerSegment(compactedMaxRowsPerSegment)

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note test

Invoking
Builder.withMaxRowsPerSegment
should be avoided because it has been deprecated.
Comment on lines +315 to +320
final int expectedValueForSegmentsAssigned = (int) Double.parseDouble(
cluster.runSql(
"SELECT COUNT(*) FROM %s WHERE metric = '%s' AND host = '%s' AND service = '%s'",
dataSource, metricName, server.selfNode().getHostAndPort(), server.selfNode().getServiceName()
)
);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note test

Potential uncaught 'java.lang.NumberFormatException'.
Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@kfaraz kfaraz merged commit b3cdf6b into apache:master Jun 28, 2025
206 of 209 checks passed
@kfaraz
Copy link
Contributor Author

kfaraz commented Jun 28, 2025

Thanks a lot for the review, @gianm !

@capistrant capistrant added this to the 34.0.0 milestone Jul 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants