Skip to content

MINOR: Revert initial transactional state store semantics commit #19956

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@

<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask|TaskManager).java"/>
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask).java"/>

<suppress checks="MethodLength"
files="KTableImpl.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ public class KafkaStreams implements AutoCloseable {
protected final TopologyMetadata topologyMetadata;
private final QueryableStoreProvider queryableStoreProvider;
private final DelegatingStandbyUpdateListener delegatingStandbyUpdateListener;
private final LogContext logContext;

GlobalStreamThread globalStreamThread;
protected StateDirectory stateDirectory = null;
Expand Down Expand Up @@ -643,9 +642,6 @@ private void maybeSetRunning() {
return;
}

// all (alive) threads have received their assignment, close any remaining startup tasks, they're not needed
stateDirectory.closeStartupTasks();

setState(State.RUNNING);
}

Expand Down Expand Up @@ -968,7 +964,7 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
} else {
clientId = userClientId;
}
logContext = new LogContext(String.format("stream-client [%s] ", clientId));
final LogContext logContext = new LogContext(String.format("stream-client [%s] ", clientId));
this.log = logContext.logger(getClass());
topologyMetadata.setLog(logContext);

Expand Down Expand Up @@ -1422,9 +1418,6 @@ private static HostInfo parseHostInfo(final String endPoint) {
*/
public synchronized void start() throws IllegalStateException, StreamsException {
if (setState(State.REBALANCING)) {
log.debug("Initializing STANDBY tasks for existing local state");
stateDirectory.initializeStartupTasks(topologyMetadata, streamsMetrics, logContext);

log.debug("Starting Streams client");

if (globalStreamThread != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,13 @@ public void metricChange(final KafkaMetric metric) {
consumer.registerMetricForSubscription(metric);
}
}

/*
The KafkaMetric object is a singleton shared by all StreamThread instances.
So we need to make sure we only pass metrics for the current StreamThread that contains this
MetricsReporter instance, which will register metrics with the embedded KafkaConsumer to pass
through the telemetry pipeline.
Otherwise, Kafka Streams would register multiple metrics for all StreamThreads.
*/
private boolean tagMatchStreamOrStateUpdaterThreadId(final KafkaMetric metric) {
final Map<String, String> tags = metric.metricName().tags();
final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) && (tags.get(THREAD_ID_TAG).equals(threadId) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,39 +222,6 @@ public ProcessorStateManager(final TaskId taskId,
log.debug("Created state store manager for task {}", taskId);
}

/**
* Special constructor used by {@link StateDirectory} to partially initialize startup tasks for local state, before
* they're assigned to a thread. When the task is assigned to a thread, the initialization of this StateManager is
* completed in {@link #assignToStreamThread(LogContext, ChangelogRegister, Collection)}.
*/
static ProcessorStateManager createStartupTaskStateManager(final TaskId taskId,
final boolean eosEnabled,
final LogContext logContext,
final StateDirectory stateDirectory,
final Map<String, String> storeToChangelogTopic,
final Set<TopicPartition> sourcePartitions,
final boolean stateUpdaterEnabled) {
return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions, stateUpdaterEnabled);
}

/**
* Standby tasks initialized for local state on-startup are only partially initialized, because they are not yet
* assigned to a StreamThread. Once assigned to a StreamThread, we complete their initialization here using the
* assigned StreamThread's context.
*/
void assignToStreamThread(final LogContext logContext,
final ChangelogRegister changelogReader,
final Collection<TopicPartition> sourcePartitions) {
if (this.changelogReader != null) {
throw new IllegalStateException("Attempted to replace an existing changelogReader on a StateManager without closing it.");
}
this.sourcePartitions.clear();
this.log = logContext.logger(ProcessorStateManager.class);
this.logPrefix = logContext.logPrefix();
this.changelogReader = changelogReader;
this.sourcePartitions.addAll(sourcePartitions);
}

void registerStateStores(final List<StateStore> allStores, final InternalProcessorContext<?, ?> processorContext) {
processorContext.uninitialize();
for (final StateStore store : allStores) {
Expand Down Expand Up @@ -347,7 +314,7 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
}

private void maybeRegisterStoreWithChangelogReader(final String storeName) {
if (isLoggingEnabled(storeName) && changelogReader != null) {
if (isLoggingEnabled(storeName)) {
changelogReader.register(getStorePartition(storeName), this);
}
}
Expand Down Expand Up @@ -616,7 +583,7 @@ public void flushCache() {
public void close() throws ProcessorStateException {
log.debug("Closing its state manager and all the registered state stores: {}", stores);

if (!stateUpdaterEnabled && changelogReader != null) {
if (!stateUpdaterEnabled) {
changelogReader.unregister(getAllChangelogTopicPartitions());
}

Expand Down Expand Up @@ -664,7 +631,7 @@ else if (exception instanceof StreamsException)
void recycle() {
log.debug("Recycling state for {} task {}.", taskType, taskId);

if (!stateUpdaterEnabled && changelogReader != null) {
if (!stateUpdaterEnabled) {
final List<TopicPartition> allChangelogs = getAllChangelogTopicPartitions();
changelogReader.unregister(allChangelogs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,12 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -49,18 +43,13 @@
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -106,14 +95,11 @@ public StateDirectoryProcessFile() {
private final boolean hasPersistentStores;
private final boolean hasNamedTopologies;

private final ConcurrentMap<TaskId, Thread> lockedTasksToOwner = new ConcurrentHashMap<>();
private final HashMap<TaskId, Thread> lockedTasksToOwner = new HashMap<>();

private FileChannel stateDirLockChannel;
private FileLock stateDirLock;

private final StreamsConfig config;
private final ConcurrentMap<TaskId, Task> tasksForLocalState = new ConcurrentHashMap<>();

/**
* Ensures that the state base directory as well as the application's sub-directory are created.
*
Expand All @@ -132,7 +118,6 @@ public StateDirectory(final StreamsConfig config, final Time time, final boolean
this.hasPersistentStores = hasPersistentStores;
this.hasNamedTopologies = hasNamedTopologies;
this.appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
this.config = config;
final String stateDirName = config.getString(StreamsConfig.STATE_DIR_CONFIG);
final File baseDir = new File(stateDirName);
stateDir = new File(baseDir, appId);
Expand Down Expand Up @@ -197,109 +182,6 @@ private boolean lockStateDirectory() {
return stateDirLock != null;
}

public void initializeStartupTasks(final TopologyMetadata topologyMetadata,
final StreamsMetricsImpl streamsMetrics,
final LogContext logContext) {
final List<TaskDirectory> nonEmptyTaskDirectories = listNonEmptyTaskDirectories();
if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
final ThreadCache dummyCache = new ThreadCache(logContext, 0, streamsMetrics);
final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
final boolean stateUpdaterEnabled = StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());

// discover all non-empty task directories in StateDirectory
for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
final String dirName = taskDirectory.file().getName();
final TaskId id = parseTaskDirectoryName(dirName, taskDirectory.namedTopology());
final ProcessorTopology subTopology = topologyMetadata.buildSubtopology(id);

// we still check if the task's sub-topology is stateful, even though we know its directory contains state,
// because it's possible that the topology has changed since that data was written, and is now stateless
// this therefore prevents us from creating unnecessary Tasks just because of some left-over state
if (subTopology.hasStateWithChangelogs()) {
final Set<TopicPartition> inputPartitions = topologyMetadata.nodeToSourceTopics(id).values().stream()
.flatMap(Collection::stream)
.map(t -> new TopicPartition(t, id.partition()))
.collect(Collectors.toSet());
final ProcessorStateManager stateManager = ProcessorStateManager.createStartupTaskStateManager(
id,
eosEnabled,
logContext,
this,
subTopology.storeToChangelogTopic(),
inputPartitions,
stateUpdaterEnabled
);

final InternalProcessorContext<Object, Object> context = new ProcessorContextImpl(
id,
config,
stateManager,
streamsMetrics,
dummyCache
);

final Task task = new StandbyTask(
id,
inputPartitions,
subTopology,
topologyMetadata.taskConfig(id),
streamsMetrics,
stateManager,
this,
dummyCache,
context
);

try {
task.initializeIfNeeded();

tasksForLocalState.put(id, task);
} catch (final TaskCorruptedException e) {
// Task is corrupt - wipe it out (under EOS) and don't initialize a Standby for it
task.suspend();
task.closeDirty();
}
}
}
}
}

public boolean hasStartupTasks() {
return !tasksForLocalState.isEmpty();
}

public Task removeStartupTask(final TaskId taskId) {
final Task task = tasksForLocalState.remove(taskId);
if (task != null) {
lockedTasksToOwner.replace(taskId, Thread.currentThread());
}
return task;
}

public void closeStartupTasks() {
closeStartupTasks(t -> true);
}

private void closeStartupTasks(final Predicate<Task> predicate) {
if (!tasksForLocalState.isEmpty()) {
// "drain" Tasks first to ensure that we don't try to close Tasks that another thread is attempting to close
final Set<Task> drainedTasks = new HashSet<>(tasksForLocalState.size());
for (final Map.Entry<TaskId, Task> entry : tasksForLocalState.entrySet()) {
if (predicate.test(entry.getValue()) && removeStartupTask(entry.getKey()) != null) {
// only add to our list of drained Tasks if we exclusively "claimed" a Task from tasksForLocalState
// to ensure we don't accidentally try to drain the same Task multiple times from concurrent threads
drainedTasks.add(entry.getValue());
}
}

// now that we have exclusive ownership of the drained tasks, close them
for (final Task task : drainedTasks) {
task.suspend();
task.closeClean();
}
}
}

public UUID initializeProcessId() {
if (!hasPersistentStores) {
final UUID processId = UUID.randomUUID();
Expand Down Expand Up @@ -496,17 +378,9 @@ synchronized void unlock(final TaskId taskId) {
}
}

/**
* Expose for tests.
*/
Thread lockOwner(final TaskId taskId) {
return lockedTasksToOwner.get(taskId);
}

@Override
public void close() {
if (hasPersistentStores) {
closeStartupTasks();
try {
stateDirLock.release();
stateDirLockChannel.close();
Expand Down Expand Up @@ -624,7 +498,6 @@ private IOException maybeCleanEmptyNamedTopologyDirs(final boolean logExceptionA
);
if (namedTopologyDirs != null) {
for (final File namedTopologyDir : namedTopologyDirs) {
closeStartupTasks(task -> task.id().topologyName().equals(parseNamedTopologyFromDirectory(namedTopologyDir.getName())));
final File[] contents = namedTopologyDir.listFiles();
if (contents != null && contents.length == 0) {
try {
Expand Down Expand Up @@ -662,7 +535,6 @@ public void clearLocalStateForNamedTopology(final String topologyName) {
log.debug("Tried to clear out the local state for NamedTopology {} but none was found", topologyName);
}
try {
closeStartupTasks(task -> task.id().topologyName().equals(topologyName));
Utils.delete(namedTopologyDir);
} catch (final IOException e) {
log.error("Hit an unexpected error while clearing local state for topology " + topologyName, e);
Expand Down
Loading