Skip to content

Cannot deserialize TopicPartition from JobRepository #3797

Open
@MinJunKweon

Description

@MinJunKweon
Contributor

Hi.

I use MySQL for JobRepository. It serialize ExecutionContext as String by JacksonObjectMapper.
It seems to forcing to Map's key type must be String. (Map<String, Object>)
You can see this.

For Example, SHORT_CONTEXT in BATCH_STEP_EXECUTION_CONTEXT:

{"batch.taskletType":"org.springframework.batch.core.step.item.ChunkOrientedTasklet","topic.partition.offsets":["java.util.HashMap",{"test-topic":["java.lang.Long",42]}],"batch.stepType":"org.springframework.batch.core.step.tasklet.TaskletStep"}

However, KafkaItemReader uses TopicPartition as key. So It has problem in deserializing ExecutionContext. You can see this.

        @Override
	public void open(ExecutionContext executionContext) {
		...
		if (this.saveState && executionContext.containsKey(TOPIC_PARTITION_OFFSETS)) {
			Map<TopicPartition, Long> offsets = (Map<TopicPartition, Long>) executionContext.get(TOPIC_PARTITION_OFFSETS);
			for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
				this.partitionOffsets.put(entry.getKey(), entry.getValue() == 0 ? 0 : entry.getValue() + 1);
			}
		}
                ...
	}
2020-11-02 14:30:50 [main] ERROR o.s.batch.core.step.AbstractStep - Encountered an error executing step testStep in job testJob
java.lang.ClassCastException: java.lang.String incompatible with org.apache.kafka.common.TopicPartition
	at org.springframework.batch.item.kafka.KafkaItemReader$$Lambda$911/00000000EF270020.accept(Unknown Source)
	at java.base/java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
	at org.springframework.batch.item.kafka.KafkaItemReader.open(KafkaItemReader.java:174)
	at org.springframework.batch.item.kafka.KafkaItemReader$$FastClassBySpringCGLIB$$9111feb4.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:769)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
	at org.springframework.batch.item.kafka.KafkaItemReader$$EnhancerBySpringCGLIB$$314cf4f9.open(<generated>)
	at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:104)
	at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:311)
	at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:205)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
	at com.sun.proxy.$Proxy92.execute(Unknown Source)
	at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
	at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
	at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
	at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
	at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147)
	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
	at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
	at com.sun.proxy.$Proxy129.run(Unknown Source)
	at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:192)
	at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.executeLocalJobs(JobLauncherCommandLineRunner.java:166)
	at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.launchJobFromProperties(JobLauncherCommandLineRunner.java:153)
	at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.run(JobLauncherCommandLineRunner.java:148)
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784)
	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:322)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215)
        ...

I think It should be deserialize by String first.
And then, convert String to TopicPartition in KafkaItemReader.

Like this,

Map<String, Long> offsets = (Map<String, Long>) executionContext.get(TOPIC_PARTITION_OFFSETS);
for (Map.Entry<String, Long> entry : offsets.entrySet()) {
        TopicPartition topicPartition = getTopicPartitionFromString(entry.getKey());
	this.partitionOffsets.put(topicPartition, entry.getValue() == 0 ? 0 : entry.getValue() + 1);
}

Activity

langzime

langzime commented on Aug 13, 2021

@langzime

same question

noojung

noojung commented on Dec 23, 2024

@noojung

same question too

added this to the 5.2.2 milestone on Dec 23, 2024
modified the milestones: 5.2.2, 5.2.3 on Mar 18, 2025
noojung

noojung commented on Jun 2, 2025

@noojung

Jackson2ExecutionContextStringSerializer always forces all map keys to be String.
So we can't use Map<TopicPartition, Long> directly.

Instead, I think we can store only the partition number (as a String) in update(),nd then reconstruct the full TopicPartition in open() by using the topic name provided to the constructor.

For example:

	@Override
	public void update(ExecutionContext executionContext) {
		if (this.saveState) {
			Map<String, Long> offsets = new HashMap<>();
			for (Map.Entry<TopicPartition, Long> entry : this.partitionOffsets.entrySet()) {
				offsets.put(String.valueOf(entry.getKey().partition()), entry.getValue());
			}
			executionContext.put(TOPIC_PARTITION_OFFSETS, offsets);
		}
		this.kafkaConsumer.commitSync();
	}

	@Override
	public void open(ExecutionContext executionContext) {
		this.kafkaConsumer = new KafkaConsumer<>(this.consumerProperties);
		if (this.partitionOffsets == null) {
			this.partitionOffsets = new HashMap<>();
			for (TopicPartition topicPartition : this.topicPartitions) {
				this.partitionOffsets.put(topicPartition, 0L);
			}
		}
		if (this.saveState && executionContext.containsKey(TOPIC_PARTITION_OFFSETS)) {
			Map<String, Long> offsets = (Map<String, Long>) executionContext.get(TOPIC_PARTITION_OFFSETS);
			for (Map.Entry<String, Long> entry : offsets.entrySet()) {
				String topicName = this.topicPartitions.get(0).topic();
				this.partitionOffsets.put(new TopicPartition(topicName, Integer.parseInt(entry.getKey())), entry.getValue() == 0 ? 0 : entry.getValue() + 1);
			}
		}
		this.kafkaConsumer.assign(this.topicPartitions);
		this.partitionOffsets.forEach(this.kafkaConsumer::seek);
	}
added 2 commits that reference this issue on Jun 3, 2025
bab03c1
a5e43a0
noojung

noojung commented on Jun 4, 2025

@noojung

Could I work on this issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

      Participants

      @fmbenhassine@MinJunKweon@langzime@noojung

      Issue actions

        Cannot deserialize TopicPartition from JobRepository · Issue #3797 · spring-projects/spring-batch