-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Add embedded Druid cluster tests #18147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking βSign up for GitHubβ, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Great work @kfaraz. How do extensions get enabled/disabled in these embedded tests? Are those supported yet? |
Thanks, @abhishekagarwal87 ! Edit:
A test in this PR is already using the Kafka extension as a proof-of-concept. The syntax would be something like: EmbeddedKafkaServer kafkaServer = new EmbeddedKafkaServer();
EmbeddedDruidCluster cluster = EmbeddedDruidCluster.withExtensions(List.of(KafkaIndexTaskModule.class))
.addServer(new EmbeddedCoordinator())
.addServer(new EmbeddedIndexer())
.addServer(new EmbeddedOverlord())
.addResource(kafkaServer); To nicely wire up the dependencies and the SDK, I am currently evaluating two alternatives described below. I am leaning towards B. What are your thoughts? Option A: Keep the test framework in a separate module
|
...tests/src/test/java/org/apache/druid/testing/simulate/embedded/DruidServerJunitResource.java
Outdated
Show resolved
Hide resolved
simulation-tests/src/test/java/org/apache/druid/testing/simulate/embedded/EmbeddedOverlord.java
Outdated
Show resolved
Hide resolved
@Inject | ||
CoordinatorClient coordinator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really understand why this injection will happen - an instance of it is only present as a private field of this class - could you give a hint? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guice takes care of since we have bound it using .toInstance()
in the method getInitModules()
.
binder.bind(ServiceClientHolder.class).toInstance(clientHolder)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting; I wasn't expecting that during a toInstance
call that will be interpreted; somehow I was thinking guice
have to be the one creating the original instance...makes sense ; but I still find it a little bit odd :)
I won't forget this trick :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
π
simulation-tests/src/test/java/org/apache/druid/testing/simulate/embedded/EmbeddedBroker.java
Outdated
Show resolved
Hide resolved
@kgyrtkirk , thanks for the suggestions! Will try to address them. |
regarding comment I would really like the separate module - but placing these tests where they should belong will be more in front of someone working on that part...so I think its better to put it inside an existing module - it might be possible to create a separate source set for it; but that would only be a logical isolation (probably with no real benefits) we could have a module which loads multiple (or all) extensions so that more complicated scenarios could be tested....actually that was the reason I created |
Thanks for the feedback, @kgyrtkirk ! I feel the same.
Yeah, that would be nice! I originally thought we could put it into |
...n-tests/src/test/java/org/apache/druid/testing/simulate/indexing/batch/IndexTaskSimTest.java
Fixed
Show fixed
Hide fixed
...s/src/test/java/org/apache/druid/testing/simulate/indexing/kafka/KafkaSupervisorSimTest.java
Fixed
Show fixed
Hide fixed
...s/src/test/java/org/apache/druid/testing/simulate/indexing/kafka/KafkaSupervisorSimTest.java
Fixed
Show fixed
Hide fixed
...s/src/test/java/org/apache/druid/testing/simulate/indexing/kafka/KafkaSupervisorSimTest.java
Fixed
Show fixed
Hide fixed
...g-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaSupervisorSimTest.java
Fixed
Show fixed
Hide fixed
...g-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaSupervisorSimTest.java
Fixed
Show fixed
Hide fixed
...g-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaSupervisorSimTest.java
Fixed
Show fixed
Hide fixed
|
||
// Get the task ID | ||
List<TaskStatusPlus> taskStatuses = ImmutableList.copyOf( | ||
getResult(cluster.leaderOverlord().taskStatuses(null, dataSource, 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question - For this TC, their are 2 partitions, so number of tasks should be 2 as well right?
Or we are using a single task for demonstration purpose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The requirement is Number of tasks <= Number of partitions
.
A single task can read from any number of partitions.
We launch more tasks to increase ingestion throughput.
); | ||
} | ||
|
||
private List<ProducerRecord<byte[], byte[]>> generateRecordsForTopic( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: There should be a provision to produce records for each partition
* This class and most of its methods are kept package protected as they are used | ||
* only by the specific server implementations in the same package. | ||
*/ | ||
public abstract class EmbeddedDruidServer implements ServerReferencesProvider |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since almost every class is called EmbeddedX
; I feel like it would probably be an option to call these "embedded tests" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that could work too ππ»
I had originally set out to write a simulation framework (that only emulated the behaviour of a cluster and didn't need to be an actual Druid cluster) but it ended up being a single-process embedded cluster instead.
As for the name, the word "simulation" stuck and it also made for a small and easy catchy suffix for test names like XyzSimTest
, so I kept it anyway. But I am not opposed to calling it something else either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my 2Β’: to me the word "simulation" is misleading given the final design- I don't believe anything is really being simulated. IMO using "embedded" across the board is better. I suggest searching for "simulate" or "simulation" and adjusting them so we can keep the wording consistent. As for XyzSimTest
, could be XyzEmbedTest
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, will make the change.
quidem-ut/src/test/java/org/apache/druid/simulate/indexing/KafkaIngestSelfMetricsSimTest.java
Fixed
Show fixed
Hide fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work. I've always wished for a testing system like this.
/** | ||
* Simulation tests for batch {@link IndexTask} using inline datasources. | ||
*/ | ||
public class IndexTaskSimTest extends IndexingSimulationTestBase |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
quidem-ut
is now not really an appropriate name for this module, as these tests aren't using quidem. I don't think you need to change it in this PR necessarily, as there's already a lot of other changes and I don't want to get into a debate over the name in this PR π. A follow up to change the name would be useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. ππ»
I could just put these tests in a separate module, if that seems better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might as well be a separate module named embedded-tests
. The main reason I say that is I am struggling to think of a good name for the combined quidem + embedded test module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That works!
I guess quidem tests are also essentially embedded tests. It's just that they don't use a full cluster but only a Broker-like setup.
I am not sure how much value it would add, but we could try piping quidem queries through an embedded cluster itself in the future. If we ever do that, we could have just the one module, embedded-tests
.
import java.util.Map; | ||
|
||
/** | ||
* Base class for simulation tests related to ingestion and indexing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO would be better to have this be a collection of static
utility methods. The reason is that if we ever add another kind of "test base" with different kinds of convenience functions, a single test class won't be able to use both sets of convenience functions, due to the impossibility of multiple inheritance. The presence of runSql
in this class is a hint that something is off (that isn't an indexing API).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering if these methods shouldn't just go into the EmbeddedDruidCluster
class itself, given that they are always used to interact with a specific cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would also work. It would make the calling code look nicer.
The main downside I can think of is that at some point, there may be a lot of functions and the EmbeddedDruidCluster
class could get pretty long. But if that happens, there are ways to address it. For example we could split the functions out into default methods in interfaces that extend DruidClusterApi
and have EmbeddedDruidCluster
implement DruidClusterApi
and multiple of those other interfaces. Please don't do that now though. I'm just thinking through how it might evolve in the future. For now I think adding the methods directly to EmbeddedDruidCluster
is good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion! Ended up doing something similar.
- Renamed the class
IndexingSimulationTestBase
toEmbeddedClusterApis
- Kept some methods like
createTaskFromPayload()
andcreateTestDatasourceName()
as static - Created an instance of
EmbeddedClusterApis
for eachEmbeddedDruidCluster
- Tests can now invoke methods as follows:
// general usage pattern
cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
cluster.callApi().waitForTaskToSucceed(taskId);
// shorthand for SQL
cluster.runSql(...);
); | ||
return new DutySchedule( | ||
killConfig.getDutyPeriod().toStandardDuration().getMillis(), | ||
Duration.standardSeconds(10).getMillis() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it intentional that this was changed this from 1 minute to 10 seconds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it helps with the tests finishing faster and doesn't really affect a prod setup.
I could also just pass this value as a config too for consistency and let the default be 1 minute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it doesn't really affect a prod setup then it's imo ok to leave it like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ended up changing this slightly since I realized that it is possible that an Overlord has just come up and has been elected leader but the segment metadata cache has still not warmed up (if operating in mode ifSynced
).
The unused segment killer performs better with the cache fully synced.
So using the following values which feel good enough for all cases. We can revisit them later if needed.
duty period = 1 hour,
initial delay = duty period / 4 = 15 mins
@@ -679,10 +679,10 @@ private void pollSegments() | |||
|
|||
stopwatch.stop(); | |||
emitMetric("segment/poll/time", stopwatch.millisElapsed()); | |||
log.info( | |||
/*log.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems unintentional; please restore the original code if so.
@@ -30,7 +31,12 @@ public class SupervisorReport<T> | |||
private final DateTime generationTime; | |||
private final T payload; | |||
|
|||
public SupervisorReport(String id, DateTime generationTime, T payload) | |||
@JsonCreator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't yet found the place where this deserializer is used or tested- where is that? I wanted to check something about its usage: when there's a totally-generic thing like T
as a property, Jackson doesn't know what T
to use unless a TypeReference
is provided. So it's important that the deserialization be done in that specific way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, thanks for catching this. I had intended to use this but it is not being used anymore. Will revert this.
public EmbeddedMiddleManager() | ||
{ | ||
// Don't sync lookups as cluster might not have a Coordinator | ||
// addProperty("druid.lookup.enableLookupSyncOnStartup", "false"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't include commented-out code.
* This class and most of its methods are kept package protected as they are used | ||
* only by the specific server implementations in the same package. | ||
*/ | ||
public abstract class EmbeddedDruidServer implements ServerReferencesProvider |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my 2Β’: to me the word "simulation" is misleading given the final design- I don't believe anything is really being simulated. IMO using "embedded" across the board is better. I suggest searching for "simulate" or "simulation" and adjusting them so we can keep the wording consistent. As for XyzSimTest
, could be XyzEmbedTest
.
|
||
/** | ||
* {@link EmbeddedResource} for an embedded zookeeper cluster that can be used | ||
* as a JUnit Rule in simulation tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment about "JUnit Rule" correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, will clean it up.
Thanks for the feedback, @gianm and for catching the leftover superfluous changes! |
...test/java/org/apache/druid/testing/embedded/indexing/KafkaIngestSelfMetricsEmbeddedTest.java
Fixed
Show fixed
Hide fixed
...test/java/org/apache/druid/testing/embedded/indexing/KafkaIngestSelfMetricsEmbeddedTest.java
Fixed
Show fixed
Hide fixed
...test/java/org/apache/druid/testing/embedded/indexing/KafkaIngestSelfMetricsEmbeddedTest.java
Fixed
Show fixed
Hide fixed
...test/java/org/apache/druid/testing/embedded/indexing/KafkaIngestSelfMetricsEmbeddedTest.java
Fixed
Show fixed
Hide fixed
final int numSegments = Integer.parseInt( | ||
cluster.runSql("SELECT COUNT(*) FROM sys.segments WHERE datasource = '%s'", dataSource) | ||
); |
Check notice
Code scanning / CodeQL
Missing catch of NumberFormatException Note test
final int numRows = Integer.parseInt( | ||
cluster.runSql("SELECT COUNT(*) FROM %s", dataSource) | ||
); |
Check notice
Code scanning / CodeQL
Missing catch of NumberFormatException Note test
InlineSchemaDataSourceCompactionConfig | ||
.builder() | ||
.forDataSource(dataSource) | ||
.withSkipOffsetFromLatest(Period.seconds(0)) | ||
.withMaxRowsPerSegment(compactedMaxRowsPerSegment) |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation Note test
Builder.withMaxRowsPerSegment
final int expectedValueForSegmentsAssigned = (int) Double.parseDouble( | ||
cluster.runSql( | ||
"SELECT COUNT(*) FROM %s WHERE metric = '%s' AND host = '%s' AND service = '%s'", | ||
dataSource, metricName, server.selfNode().getHostAndPort(), server.selfNode().getServiceName() | ||
) | ||
); |
Check notice
Code scanning / CodeQL
Missing catch of NumberFormatException Note test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks a lot for the review, @gianm ! |
Summary
mvn build
, create a Docker image or even run startup scripts@Test
and run it from your IDE! π πExplore
π₯ Run the
EmbeddedKafkaClusterMetricsTest
to see a simulation in action:KafkaEmitter
to publish cluster metrics to a topicKafkaSupervisor
to ingest the metrics back into the same Druid cluster! πΆ πUsage
EmbeddedClusterTestBase
createCluster()
@Test
methodDefine a cluster
Add an extension module
Requirements:
DruidModule
META-INF/services
file) must be on the classpath of the testKafkaIndexTaskModule
isdruid-kafka-indexing-service
)Example test:
druid/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
Lines 63 to 78 in 5a8dcd5
Add common properties
These properties correspond to the file
common.runtime.properties
that are applied on all the Druid services.Add service-specific properties
These properties override the common runtime properties and correspond to the service-specific
runtime.properties
files.Add a cluster-managed resource
Resources are
EmbeddedResource
.e.g.
EmbeddedKafkaServer
,EmbeddedZookeeper
,InMemoryDerbyResource
Motivation
Druid ITs can be cumbersome to write, run and debug. They are very resource intensive , require multiple steps even to verify a simple change and are difficult to get right.
Owing to this barrier in writing new tests, Druid IT coverage of certain areas has been low.
This patch aims to address this issue by introducing a new framework to run lightweight integration tests using a fully-functioning Druid cluster running in a single process.
Goals
Simulation tests aim to:
TaskQueue
,TaskLockbox
,TaskRunner
, etc.Non-goals
Embedded cluster tests are not meant to:
Design
Cli*
runnableCli*
runnablesdruid.extensions.modulesForEmbeddedTest
EmbeddedKafkaServer
,EmbeddedZookeeper
)Changes to review in core Druid
JettyServerInitializer
properly so that multiple Jetty servers may be bound correctly within the same JVMJettyServerInitUtils.getGuiceFilterHolder
druid.extensions.modulesForEmbeddedTest
which takes in a list of module class names thatshould be loaded in a simulation test
SeekableStreamTaskRunner
to allowtaskDuration
to be less than 1 minuteOverlordClient.postSupervisor
to hit APIPOST /druid/indexer/v1/supervisor
TestDerbyConnector
andCuratorTestBase
to have public accessNew classes to review
EmbeddedDruidCluster
: defines a Druid cluster used in an embedded cluster testEmbeddedResource
: defines any resource with a lifecycle that can be used in anEmbeddedDruidCluster
EmbeddedDruidServer
that can provide aDruidServerResource
with a lifecycleEmbeddedZookeeper
: uses the existingCuratorTestBase
to run an embedded zk serverEmbeddedKafkaServer
: an embedded Kafka server mostly copying code fromTestBroker
utility class. We can merge these two in a follow up PR.EmbeddedClusterTestBase
LatchableEmitter
to wait for metric events using countdown latches rather than repeatedly polling Druid APIsThis PR has: