SocketServer
is a NIO socket server for a KafkaServer.
SocketServer
is created and then started up exclusively when KafkaServer
is requested to start up.
SocketServer
uses queued.max.requests configuration property for…FIXME
Name | Description |
---|---|
SocketServer
uses an empty metric prefix for…FIXME
SocketServer
uses ControlPlane metric prefix for…FIXME
Control-Plane Request Handler for Controller-Brokers Communication — controlPlaneRequestChannelOpt
Internal Value
SocketServer
creates a dedicated RequestChannel for communication between a controller and brokers when control.plane.listener.name configuration property is defined.
The RequestChannel
is created for KafkaServer
(when requested to start up). It is the time when KafkaServer
creates dedicated control-plane request processor and control-plane request handler pool.
The RequestChannel
is created with the queue size of 20
and the ControlPlane metric name prefix.
SocketServer
manages the RequestChannel
(when requested to stopProcessingRequests and shutdown).
The RequestChannel
is assumed to be available when SocketServer
is requested to createControlPlaneAcceptorAndProcessor (when requested to start up) based on control.plane.listener.name configuration property.
newProcessor(
id: Int,
connectionQuotas: ConnectionQuotas,
listenerName: ListenerName,
securityProtocol: SecurityProtocol,
memoryPool: MemoryPool): Processor
newProcessor
simply creates a new Processor with the RequestChannel and the following configuration properties:
Note
|
newProcessor is used exclusively when SocketServer is requested to createControlPlaneAcceptorAndProcessor and addDataPlaneProcessors.
|
startup(
startupProcessors: Boolean = true): Unit
Internally, startup
creates the ConnectionQuotas (with maxConnectionsPerIp and maxConnectionsPerIpOverrides).
For every endpoint (in endpoints registry) startup
does the following:
-
Creates up to numProcessorThreads number of Processors (for ConnectionQuotas and MemoryPool)
-
Creates a
Acceptor
for the endpoint and processors -
Records the
Acceptor
in acceptors internal registry -
Starts a non-daemon thread for the
Acceptor
with the name askafka-socket-acceptor-[listenerName]-[securityProtocol]-[port]
(e.g.kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-9092
) and waits until it has started fully
startup
then registers metrics.
In the end, startup
prints out the following INFO message to the logs:
Started [dataPlaneAcceptors] acceptor threads for data-plane
Note
|
startup is used exclusively when KafkaServer is requested to start up.
|
addProcessors(
acceptor: Acceptor,
endpoint: EndPoint,
newProcessorsPerListener: Int): Unit
addProcessors
…FIXME
Note
|
addProcessors is used when SocketServer is requested to createAcceptorAndProcessors and resizeThreadPool.
|
createAcceptorAndProcessors(
processorsPerListener: Int,
endpoints: Seq[EndPoint]): Unit
createAcceptorAndProcessors
…FIXME
Note
|
createAcceptorAndProcessors is used when SocketServer is requested to startup and addListeners.
|
resizeThreadPool(
oldNumNetworkThreads: Int,
newNumNetworkThreads: Int): Unit
resizeThreadPool
…FIXME
Note
|
resizeThreadPool is used exclusively when DynamicThreadPool is requested to reconfigure (the number of network threads).
|
addListeners(listenersAdded: Seq[EndPoint]): Unit
addListeners
…FIXME
Note
|
addListeners is used exclusively when DynamicListenerConfig is requested to reconfigure.
|
stopProcessingRequests(): Unit
stopProcessingRequests
…FIXME
updateMaxConnectionsPerIpOverride(
maxConnectionsPerIpOverrides: Map[String, Int]): Unit
updateMaxConnectionsPerIpOverride
…FIXME
Note
|
updateMaxConnectionsPerIpOverride is used when…FIXME
|
updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit
updateMaxConnectionsPerIp
…FIXME
Note
|
updateMaxConnectionsPerIp is used when…FIXME
|
removeListeners(listenersRemoved: Seq[EndPoint]): Unit
removeListeners
…FIXME
Note
|
removeListeners is used when…FIXME
|
addDataPlaneProcessors(
acceptor: Acceptor,
endpoint: EndPoint,
newProcessorsPerListener: Int): Unit
addDataPlaneProcessors
…FIXME
Note
|
addDataPlaneProcessors is used when SocketServer is requested to createDataPlaneAcceptorsAndProcessors and resizeThreadPool.
|
createDataPlaneAcceptorsAndProcessors(
dataProcessorsPerListener: Int,
endpoints: Seq[EndPoint]): Unit
createDataPlaneAcceptorsAndProcessors
…FIXME
Note
|
createDataPlaneAcceptorsAndProcessors is used when SocketServer is requested to start up and addListeners.
|
createControlPlaneAcceptorAndProcessor(
endpointOpt: Option[EndPoint]): Unit
createControlPlaneAcceptorAndProcessor
…FIXME
Note
|
createControlPlaneAcceptorAndProcessor is used when SocketServer is requested to start up.
|
startDataPlaneProcessors(
authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit
startDataPlaneProcessors
…FIXME
Note
|
startDataPlaneProcessors is used when…FIXME
|
createAcceptor(
endPoint: EndPoint,
metricPrefix: String) : Acceptor
createAcceptor
…FIXME
Note
|
createAcceptor is used when…FIXME
|
Name | Description |
---|---|
|
|
|
|
|
RequestChannel (with the queue size of maxQueuedRequests and the DataPlaneMetricPrefix metric name prefix) Initialized when Used to create the dataPlaneRequestProcessor and dataPlaneRequestHandlerPool for |
|
|
|
|
|
|
|
|
|
The number of processors per endpoint (as configured using num.network.threads Kafka property) |
|
Network processor threads per ID (initially totalProcessorThreads) New processor threads are added in addProcessors Used in stopProcessingRequests (to shut down the network processor threads) |
|
A RequestChannel (with queued.max.requests queue size) Used when:
|
|
Total number of processors, i.e. numProcessorThreads for every endpoint |