Skip to content

Commit

Permalink
Refactor User Operator to follow the controller model - Closes strimz…
Browse files Browse the repository at this point in the history
…i#5691

Signed-off-by: Jakub Scholz <www@scholzj.com>
  • Loading branch information
scholzj committed Nov 15, 2022
1 parent 705f75c commit ce5e3df
Show file tree
Hide file tree
Showing 65 changed files with 7,204 additions and 2,505 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 0.33.0

* Support for automatically restarting failed Connect or Mirror Maker 2 connectors
* Redesign of Strimzi User Operator to improve its scalability

## 0.32.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ rootLogger.level = INFO
rootLogger.appenderRefs = stdout
rootLogger.appenderRef.console.ref = STDOUT
rootLogger.additivity = false

# Keeps separate level for Jetty
logger.jetty.name = org.eclipse.jetty.server
logger.jetty.level = INFO
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,29 @@ spec:
value: "strimzi.io/cluster=my-cluster"
- name: STRIMZI_FULL_RECONCILIATION_INTERVAL_MS <6>
value: "120000"
- name: STRIMZI_LOG_LEVEL <7>
- name: STRIMZI_WORK_QUEUE_SIZE <7>
value: 10000
- name: STRIMZI_CONTROLLER_THREAD_POOL_SIZE <8>
value: 10
- name: STRIMZI_LOG_LEVEL <9>
value: INFO
- name: STRIMZI_GC_LOG_ENABLED <8>
- name: STRIMZI_GC_LOG_ENABLED <10>
value: "true"
- name: STRIMZI_CA_VALIDITY <9>
- name: STRIMZI_CA_VALIDITY <11>
value: "365"
- name: STRIMZI_CA_RENEWAL <10>
- name: STRIMZI_CA_RENEWAL <12>
value: "30"
- name: STRIMZI_JAVA_OPTS <11>
- name: STRIMZI_JAVA_OPTS <13>
value: "-Xmx=512M -Xms=256M"
- name: STRIMZI_JAVA_SYSTEM_PROPERTIES <12>
- name: STRIMZI_JAVA_SYSTEM_PROPERTIES <14>
value: "-Djavax.net.debug=verbose -DpropertyName=value"
- name: STRIMZI_SECRET_PREFIX <13>
- name: STRIMZI_SECRET_PREFIX <15>
value: "kafka-"
- name: STRIMZI_ACLS_ADMIN_API_SUPPORTED <14>
- name: STRIMZI_ACLS_ADMIN_API_SUPPORTED <16>
value: "true"
- name: STRIMZI_MAINTENANCE_TIME_WINDOWS <15>
- name: STRIMZI_MAINTENANCE_TIME_WINDOWS <17>
value: '* * 8-10 * * ?;* * 14-15 * * ?'
- name: STRIMZI_KAFKA_ADMIN_CLIENT_CONFIGURATION <16>
- name: STRIMZI_KAFKA_ADMIN_CLIENT_CONFIGURATION <18>
value: |
default.api.timeout.ms=120000
request.timeout.ms=60000
Expand All @@ -101,23 +105,29 @@ If you deploy more than one User Operator, the labels must be unique for each.
That is, the operators cannot manage the same resources.
<6> The interval between periodic reconciliations, in milliseconds.
The default is `120000` (2 minutes).
<7> The level for printing logging messages.
<7> The size of the controller event queue.
The size of the queue should be at least as big as the maximal amount of users you expect the User Operator to operate.
The default is `1024`.
<8> The size of the worker pool for reconciling the users.
Bigger pool might require more resources, but it will also handle more `KafkaUser` resources
The default is `50`.
<9> The level for printing logging messages.
You can set the level to `ERROR`, `WARNING`, `INFO`, `DEBUG`, or `TRACE`.
<8> Enables garbage collection (GC) logging.
<10> Enables garbage collection (GC) logging.
The default is `true`.
<9> The validity period for the Certificate Authority.
<11> The validity period for the Certificate Authority.
The default is `365` days.
<10> The renewal period for the Certificate Authority. The renewal period is measured backwards from the expiry date of the current certificate.
<12> The renewal period for the Certificate Authority. The renewal period is measured backwards from the expiry date of the current certificate.
The default is `30` days to initiate certificate renewal before the old certificates expire.
<11> (Optional) The Java options used by the JVM running the User Operator
<12> (Optional) The debugging (`-D`) options set for the User Operator
<13> (Optional) Prefix for the names of Kubernetes secrets created by the User Operator.
<14> (Optional) Indicates whether the Kafka cluster supports management of authorization ACL rules using the Kafka Admin API.
<13> (Optional) The Java options used by the JVM running the User Operator
<14> (Optional) The debugging (`-D`) options set for the User Operator
<15> (Optional) Prefix for the names of Kubernetes secrets created by the User Operator.
<16> (Optional) Indicates whether the Kafka cluster supports management of authorization ACL rules using the Kafka Admin API.
When set to `false`, the User Operator will reject all resources with `simple` authorization ACL rules.
This helps to avoid unnecessary exceptions in the Kafka cluster logs.
The default is `true`.
<15> (Optional) Semi-colon separated list of Cron Expressions defining the maintenance time windows during which the expiring user certificates will be renewed.
<16> (Optional) Configuration options for configuring the Kafka Admin client used by the User Operator in the properties format.
<17> (Optional) Semi-colon separated list of Cron Expressions defining the maintenance time windows during which the expiring user certificates will be renewed.
<18> (Optional) Configuration options for configuring the Kafka Admin client used by the User Operator in the properties format.

. If you are using mTLS to connect to the Kafka cluster, specify the secrets used to authenticate connection.
Otherwise, go to the next step.
Expand Down
11 changes: 10 additions & 1 deletion mockkube/src/main/java/io/strimzi/test/mockkube2/MockKube2.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,16 @@ public MockKube2Builder withKafkaTopicCrd() {
return this;
}

/**
* Registers the KafkaUser CRD
*
* @return MockKube builder instance
*/
public MockKube2Builder withKafkaUserCrd() {
mock.registerCrd("kafka.strimzi.io/v1beta2", "KafkaUser", KafkaRebalance.class, TestUtils.CRD_KAFKA_USER);
return this;
}

/**
* Registers the KafkaConnect CRD
*
Expand Down Expand Up @@ -255,7 +265,6 @@ public MockKube2Builder withInitialKafkaMirrorMaker2s(KafkaMirrorMaker2... resou
return this;
}


/**
* Set the mock web server's logging level as needed, defaults to INFO
* @param level logging level for mock web server
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.operator.common.controller;

import io.micrometer.core.instrument.Timer;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.operator.common.metrics.ControllerMetricsHolder;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* Abstract controller loop provides the shared functionality for reconciling resources in Strimzi controllers. It takes
* an event from a queue passed in controller and reconciles it.
*/
public abstract class AbstractControllerLoop {
private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(AbstractControllerLoop.class);
private static final long PROGRESS_WARNING_MS = 60_000L;

private final String name;
private final Thread controllerThread;
private final ControllerQueue workQueue;
private final ReconciliationLockManager lockManager;
private final ScheduledExecutorService scheduledExecutor;

private volatile boolean stop = false;
private volatile boolean running = false;

/**
* Creates the controller. The controller should normally exist once per operator for cluster-wide mode or once per
* namespace for namespaced mode.
*
* @param name The name of this controller loop. The name should help to identify what kind
* of look this is and what does it control / reconciler.
* @param workQueue Queue from which events should be consumed
* @param lockManager Lock manager for making sure no parallel reconciliations for a given resource can happen
* @param scheduledExecutor Scheduled executor service used to run the progress warnings
*/
public AbstractControllerLoop(String name, ControllerQueue workQueue, ReconciliationLockManager lockManager, ScheduledExecutorService scheduledExecutor) {
this.name = name;
this.workQueue = workQueue;
this.lockManager = lockManager;
this.scheduledExecutor = scheduledExecutor;
this.controllerThread = new Thread(new Runner(), name);
}

/**
* The main reconciliation logic which handles the reconciliations.
*
* @param reconciliation Reconciliation identifier used for logging
*/
protected abstract void reconcile(Reconciliation reconciliation);

/**
* Returns the Controller Metrics Holder instance, which is used to hold the various controller metrics
*
* @return Controller metrics holder instance
*/
protected abstract ControllerMetricsHolder metrics();

/**
* Starts the controller: this method creates a new thread in which the controller will run
*/
public void start() {
LOGGER.debugOp("{}: Starting the controller loop", name);
controllerThread.start();
}

/**
* Stops the controller: this method sets the stop flag and interrupt the run loop
*
* @throws InterruptedException InterruptedException is thrown when interrupted while joining the thread
*/
public void stop() throws InterruptedException {
LOGGER.infoOp("{}: Requesting the controller loop to stop", name);
this.stop = true;
controllerThread.interrupt();
controllerThread.join();
}

/**
* Indicates whether the controller is inside the run loop..
*
* @return True when the controller is in the run loop, false otherwise
*/
public boolean isRunning() {
return running;
}

/**
* Indicates whether the controller loop thread is alive or not
*
* @return True when the controller loop thread is alive, false otherwise
*/
public boolean isAlive() {
return controllerThread.isAlive();
}

/**
* Wrapper method to handle obtaining the lock for the resource or re-queueing the reconciliation if the lock is in
* use. When it gets the lock, it calls the reconcileWrapper method.
*
* @param reconciliation Reconciliation marker
*/
private void reconcileWithLock(SimplifiedReconciliation reconciliation) {
String lockName = reconciliation.lockName();
boolean requeue = false;

try {
boolean locked = lockManager.tryLock(lockName, 1_000, TimeUnit.MILLISECONDS);

if (locked) {
try {
reconcileWrapper(reconciliation.toReconciliation());
} finally {
// We have to unlock the resource in any situation
lockManager.unlock(lockName);
}
} else {
// Failed to get the lock => other reconciliation is in progress
LOGGER.warnOp("{}: Failed to acquire lock {}. The resource will be re-queued for later.", name, lockName);
metrics().lockedReconciliationsCounter(reconciliation.namespace).increment();
requeue = true;
}

} catch (InterruptedException e) {
LOGGER.warnOp("{}: Interrupted while trying to acquire lock {}. The resource will be re-queued for later.", name, lockName);
metrics().lockedReconciliationsCounter(reconciliation.namespace).increment();
requeue = true;
}

// Failed to get the lock. We will requeue the resource for next time
if (requeue) {
workQueue.enqueue(reconciliation);
}
}

/**
* Wrapper method to handle reconciliation. It is used to handle common tasks such as:
* - Progress warnings
* - Reconciliation metrics
*
* @param reconciliation Reconciliation marker
*/
private void reconcileWrapper(Reconciliation reconciliation) {
// Tasks before reconciliation
ScheduledFuture<?> progressWarning = scheduledExecutor
.scheduleAtFixedRate(() -> LOGGER.infoCr(reconciliation, "Reconciliation is in progress"), PROGRESS_WARNING_MS, PROGRESS_WARNING_MS, TimeUnit.MILLISECONDS);
metrics().reconciliationsCounter(reconciliation.namespace()).increment(); // Increase the reconciliation counter
Timer.Sample reconciliationTimerSample = Timer.start(metrics().metricsProvider().meterRegistry()); // Start the reconciliation timer

// Reconciliation
try {
reconcile(reconciliation);
} finally {
// Tasks after reconciliation
reconciliationTimerSample.stop(metrics().reconciliationsTimer(reconciliation.namespace())); // Stop the reconciliation timer
progressWarning.cancel(true); // Stop the progress warning
}
}

/**
* Runner class which is used to run the controller loop. This is implemented as a private inner class to not expose
* it as a public method.
*/
private class Runner implements Runnable {
/**
* The run loop of the controller loop thread. It picks reconciliations from the work queue and executes them.
*/
@Override
public void run() {
LOGGER.debugOp("{}: Starting", name);
running = true; // We indicate that we are entering the controller loop

while (!stop) {
try {
LOGGER.debugOp("{}: Waiting for next event from work queue", name);
SimplifiedReconciliation reconciliation = workQueue.take();
reconcileWithLock(reconciliation);
} catch (InterruptedException e) {
LOGGER.debugOp("{}: was interrupted", name, e);
} catch (Exception e) {
LOGGER.warnOp("{}: reconciliation failed", name, e);
}
}

LOGGER.infoOp("{}: Stopping", name);
running = false; // We indicate that we are exiting the controller loop
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.operator.common.controller;

import io.strimzi.operator.common.metrics.ControllerMetricsHolder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
* Controller queue class wraps a Blocking queue and exposes the methods used by controllers. This includes taking
* events from the queue and enqueueing events into the queue.
*/
public class ControllerQueue {
private final static Logger LOGGER = LogManager.getLogger(ControllerQueue.class);

/*test*/ final BlockingQueue<SimplifiedReconciliation> queue;
private final ControllerMetricsHolder metrics;

/**
* Creates the controller queue. The controller should normally exist once per operator for cluster-wide mode or once per
* namespace for namespaced mode.
*
* @param queueSize The capacity of the work queue
* @param metrics Holder for the controller metrics
*/
public ControllerQueue(int queueSize, ControllerMetricsHolder metrics) {
this.queue = new ArrayBlockingQueue<>(queueSize);
this.metrics = metrics;
}

public SimplifiedReconciliation take() throws InterruptedException {
return queue.take();
}

/**
* Enqueues the next reconciliation. It checks whether another reconciliation for the same resource is already in
* the queue and enqueues the new event only if it is not there yet.
*
* @param reconciliation Reconciliation identifier
*/
public void enqueue(SimplifiedReconciliation reconciliation) {
if (!queue.contains(reconciliation)) {
LOGGER.debug("Enqueueing {} {} in namespace {}", reconciliation.kind, reconciliation.name, reconciliation.namespace);
if (!queue.offer(reconciliation)) {
LOGGER.warn("Failed to enqueue an event because the controller queue is full");
}
} else {
metrics.alreadyEnqueuedReconciliationsCounter(reconciliation.namespace).increment(); // Increase the metrics counter
LOGGER.debug("{} {} in namespace {} is already enqueued => ignoring", reconciliation.kind, reconciliation.name, reconciliation.namespace);
}
}

}

0 comments on commit ce5e3df

Please sign in to comment.