Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

java.lang.ClassCastException for reduce api call #57

Closed
dnrusakov opened this issue Mar 10, 2018 · 11 comments
Closed

java.lang.ClassCastException for reduce api call #57

dnrusakov opened this issue Mar 10, 2018 · 11 comments

Comments

@dnrusakov
Copy link

dnrusakov commented Mar 10, 2018

The following example is the modification of StreamToTableJoinScalaIntegrationTestImplicitSerdes test:

server.createTopic(userClicksTopic)

val stringSerde: Serde[String] = Serdes.String()
val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]

implicit val serialized: Serialized[String, Long] = Serialized.`with`(stringSerde, longSerde)

val streamsConfiguration: Properties = {
  val p = new Properties()
  p.put(StreamsConfig.APPLICATION_ID_CONFIG, s"stream-table-join-scala-integration-test-implicit-serdes-${scala.util.Random.nextInt(100)}")
  p.put(StreamsConfig.CLIENT_ID_CONFIG, "join-scala-integration-test-implicit-serdes-standard-consumer")
  p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
  p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
  p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
  p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100")
  p.put(StreamsConfig.STATE_DIR_CONFIG, localStateDir)
  p
}

val builder = new StreamsBuilderS()

val userClicksStream: KStreamS[String, Long] = builder.stream(userClicksTopic)

userClicksStream
  .groupByKey
  .reduce((_: Long, v2: Long) => v2, "my-ktable-name")
  .toStream
  .through(outputTopic)
  .foreach((k, v) => println(k -> v))

val streams: KafkaStreams = new KafkaStreams(builder.build, streamsConfiguration)

streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
  override def uncaughtException(t: Thread, e: Throwable): Unit = try {
    println(s"Stream terminated because of uncaught exception .. Shutting down app", e)
    e.printStackTrace
    val closed = streams.close()
    println(s"Exiting application after streams close ($closed)")
  } catch {
    case x: Exception => x.printStackTrace
  } finally {
    println("Exiting application ..")
    System.exit(-1)
  }
})

streams.start()

val sender = MessageSender[String, Long](brokers, classOf[StringSerializer].getName, classOf[LongSerializer].getName)

userClicks.foreach(r => sender.writeKeyValue(userClicksTopic, r.key, r.value))

val listener = MessageListener(brokers, outputTopic, "join-scala-integration-test-standard-consumer",
  classOf[StringDeserializer].getName,
  classOf[LongDeserializer].getName,
  new RecordProcessor
)

val l = listener.waitUntilMinKeyValueRecordsReceived(3, 30000)

streams.close()

assertEquals(
  l.sortBy(_.key),
  Seq(
    new KeyValue("chao", 25L),
    new KeyValue("bob", 19L),
    new KeyValue("dave", 56L),
    new KeyValue("eve", 78L),
    new KeyValue("alice", 40L),
    new KeyValue("fang", 99L)
  ).sortBy(_.key)
)

This part of the code above:

userClicksStream
   .groupByKey
   .reduce((_: Long, v2: Long) => v2, "my-ktable-name")

constantly fails with the following exception:

java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
at scala.runtime.java8.JFunction2$mcJJJ$sp.apply(JFunction2$mcJJJ$sp.java:12)
at com.lightbend.kafka.scala.streams.KGroupedStreamS.$anonfun$reduce$3(KGroupedStreamS.scala:49)
at com.lightbend.kafka.scala.streams.FunctionConversions$ReducerFromFunction$.$anonfun$asReducer$1(FunctionConversions.scala:46)
at org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceProcessor.process(KStreamReduce.java:76)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)

At the same time the same code rewritten on a pure Java API works fine.

P.S. I use the latest Release 0.1.2 of the kafka-streams-scala library. Scala 2.11.

@debasishg
Copy link
Contributor

Thanks .. Can u please raise a PR ?

@dnrusakov
Copy link
Author

@debasishg You mean the PR with the fix of that bug? If so, right now i have no clue how to fix that...

@deanwampler
Copy link

deanwampler commented Mar 11, 2018 via email

@dnrusakov
Copy link
Author

@deanwampler This ticket is already an issue, correct?

@deanwampler
Copy link

deanwampler commented Mar 11, 2018 via email

@debasishg
Copy link
Contributor

Which version are u using ? I will recheck but possibly it has been fixed in develop branch.

@dnrusakov
Copy link
Author

dnrusakov commented Mar 11, 2018

@debasishg
I use the latest Release 0.1.2 of the kafka-streams-scala library. Scala 2.11.

@dnrusakov
Copy link
Author

dnrusakov commented Mar 11, 2018

@debasishg @deanwampler

I'm pretty sure i have found the source of the error:

KGroupedStreamS.scala, line 49, the reduce function

def reduce(reducer: (V, V) => V, storeName: String): KTableS[K, V]

At the moment:

inner
   .reduce(((v1: V, v2: V) => 
      reducer(v1, v2)).asReducer, 
      Materialized
         .as[K, V, KeyValueStore[Bytes, Array[Byte]]](storeName))

But should be:

inner
   .reduce(((v1: V, v2: V) => 
      reducer(v1, v2)).asReducer, 
      Materialized
         .as[K, V, KeyValueStore[Bytes, Array[Byte]]](storeName))
         .withKeySerde(...)
         .withValueSerde(...)

@debasishg
Copy link
Contributor

Thanks for investigating. We need to do the following change as u have suggested:

Change the following method in KGroupedStream to add the implicit serde ..

def reduce(reducer: (V, V) => V,
  storeName: String)(implicit keySerde: Serde[K], valueSerde: Serde[V]): KTableS[K, V] = {
  inner.reduce(((v1: V, v2: V) => 
    reducer(v1, v2)).asReducer, 
    Materialized.as[K, V, KeyValueStore[Bytes, Array[Byte]]](storeName)
      .withKeySerde(keySerde)
      .withValueSerde(valueSerde)
  )
}

Then the following code runs ok ..

userClicksStream
  .groupByKey
  .reduce((_: Long, v2: Long) => v2, "my-ktable-name")
  .toStream
  .through(outputTopic)
  .foreach((k, v) => println(k -> v))

We will make the change shortly ..

@dnrusakov
Copy link
Author

dnrusakov commented Mar 12, 2018

@debasishg thank you so much for the fix! It would be great if you also release the new version of the library with that fix on board.

@debasishg
Copy link
Contributor

@dnrusakov we will make a release very shortly. Just tying some loose ends on the implicit serdes implementation.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants