Open
Description
Hi.
I use MySQL for JobRepository. It serialize ExecutionContext as String by JacksonObjectMapper.
It seems to forcing to Map
's key type must be String
. (Map<String, Object>
)
You can see this.
For Example, SHORT_CONTEXT
in BATCH_STEP_EXECUTION_CONTEXT
:
{"batch.taskletType":"org.springframework.batch.core.step.item.ChunkOrientedTasklet","topic.partition.offsets":["java.util.HashMap",{"test-topic":["java.lang.Long",42]}],"batch.stepType":"org.springframework.batch.core.step.tasklet.TaskletStep"}
However, KafkaItemReader
uses TopicPartition
as key. So It has problem in deserializing ExecutionContext
. You can see this.
@Override
public void open(ExecutionContext executionContext) {
...
if (this.saveState && executionContext.containsKey(TOPIC_PARTITION_OFFSETS)) {
Map<TopicPartition, Long> offsets = (Map<TopicPartition, Long>) executionContext.get(TOPIC_PARTITION_OFFSETS);
for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
this.partitionOffsets.put(entry.getKey(), entry.getValue() == 0 ? 0 : entry.getValue() + 1);
}
}
...
}
2020-11-02 14:30:50 [main] ERROR o.s.batch.core.step.AbstractStep - Encountered an error executing step testStep in job testJob
java.lang.ClassCastException: java.lang.String incompatible with org.apache.kafka.common.TopicPartition
at org.springframework.batch.item.kafka.KafkaItemReader$$Lambda$911/00000000EF270020.accept(Unknown Source)
at java.base/java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
at org.springframework.batch.item.kafka.KafkaItemReader.open(KafkaItemReader.java:174)
at org.springframework.batch.item.kafka.KafkaItemReader$$FastClassBySpringCGLIB$$9111feb4.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:769)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
at org.springframework.batch.item.kafka.KafkaItemReader$$EnhancerBySpringCGLIB$$314cf4f9.open(<generated>)
at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:104)
at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:311)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:205)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
at com.sun.proxy.$Proxy92.execute(Unknown Source)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
at com.sun.proxy.$Proxy129.run(Unknown Source)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:192)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.executeLocalJobs(JobLauncherCommandLineRunner.java:166)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.launchJobFromProperties(JobLauncherCommandLineRunner.java:153)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.run(JobLauncherCommandLineRunner.java:148)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784)
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:322)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215)
...
I think It should be deserialize by String first.
And then, convert String
to TopicPartition
in KafkaItemReader
.
Like this,
Map<String, Long> offsets = (Map<String, Long>) executionContext.get(TOPIC_PARTITION_OFFSETS);
for (Map.Entry<String, Long> entry : offsets.entrySet()) {
TopicPartition topicPartition = getTopicPartitionFromString(entry.getKey());
this.partitionOffsets.put(topicPartition, entry.getValue() == 0 ? 0 : entry.getValue() + 1);
}
Activity
langzime commentedon Aug 13, 2021
same question
noojung commentedon Dec 23, 2024
same question too
noojung commentedon Jun 2, 2025
Jackson2ExecutionContextStringSerializer always forces all map keys to be String.
So we can't use Map<TopicPartition, Long> directly.
Instead, I think we can store only the partition number (as a String) in update(),nd then reconstruct the full TopicPartition in open() by using the topic name provided to the constructor.
For example:
fix: Handle TopicPartition serialization in KafkaItemReader
Fix KafkaItemReader TopicPartition serialization issue with JobReposi…
noojung commentedon Jun 4, 2025
Could I work on this issue?