Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Flink Spector throws please specify the TypeInformation manually via StreamExecutionEnvironment#fromElements(Collection, TypeInformation) error #52

Closed
rohanbolar opened this issue Aug 7, 2017 · 5 comments

Comments

@rohanbolar
Copy link

Hi There. i am having issue with running the test case

  1. I have file containging avro JSON Message
  2. I would like to have DStream created with Tuple2<key, AvroObject>
  3. run window.reduce operation and validate the aggregate
  4. I looked into source code of spector. there is no hook available to specify the TypInformation

Any help would be great help.

package com.xyz.ips

import java.io.{InputStream, InputStreamReader}
import java.util.concurrent.TimeUnit

import com.xyz.ips.mediation.Report
import com.xyz.ips.test.utii.TestDataReader
import com.xyz.ips.transformation.{Counter, EMMFlatMap}
import com.typesafe.config.ConfigException.Null
import abc.efg.EReport
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource, SingleOutputStreamOperator}
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.windowing.time.Time
import org.flinkspector.core.quantify.{MatchTuples, OutputMatcher}
import org.flinkspector.datastream.{DataStreamTestBase, DataStreamTestEnvironment}
import org.flinkspector.datastream.input.time.InWindow
import org.hamcrest.CoreMatchers._
import org.junit.Test
import org.scalatest.junit.AssertionsForJUnit

import scala.io.Source
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericRecord}
import org.apache.avro.io.{DatumReader, Decoder, DecoderFactory, JsonDecoder}
import org.apache.avro.specific.{SpecificData, SpecificDatumReader, SpecificRecord}
import org.apache.commons.io.IOUtils
import org.apache.avro.io.Decoder
import org.apache.avro.io.JsonDecoder
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.flinkspector.datastream.input.EventTimeSourceBuilder

import scala.collection.mutable.ArrayBuffer

class ReportConsumerTest extends DataStreamTestBase with AssertionsForJUnit{
private var schema = null

/**
* Reader that deserializes byte array into a record.
*/
private var datumReader = null

/**
* Input stream to read message from.
*/
private var inputStream = null

/**
* Avro decoder that decodes binary data.
*/
private var decoder = null

/**
* Record to deserialize byte array to.
*/
private var record = null

val serDataType: TypeInformation[Tuple2[String, Report]] = createTypeInformation[Tuple2[String, Report]]

@test def testCounter() = {

val testPayload = this.getClass().getClassLoader().getResourceAsStream("samples/simplecounter-2.txt")
val ReportList: java.util.List[Tuple2[String,Report]] = TestDataReader.parseAvro(testPayload);



setParallelism(2)

var testStream :EventTimeSourceBuilder[Tuple2[String, Report]] = null

 testStream = createTimedTestStreamWith(Tuple2.of(null, null))
val serDataType: TypeInformation[Tuple2[String, Report]] = createTypeInformation[Tuple2[String, Report]]


val it = ReportList.iterator()

var report:Tuple2[String,Report] = null

while(it.hasNext)
  {
    report = it.next()
    testStream = testStream.emit(Tuple2.of(report.f0, report.f1))

  }



**val testStream1 = testStream.close();**

/*
System.out.println("test****************")

val matcher :OutputMatcher[Tuple2[String, EReport]]  =
//name the values in your tuple with keys:
  new MatchTuples[Tuple2[String, EReport]]("value", "name")
    //add an assertion using a value and hamcrest matchers
    .assertThat("name", either(is("fritz")).or(is("peter")))
    //express how many matchers must return true for your test to pass:
    .anyOfThem()
    //define how many records need to fulfill the
    .onEachRecord()

assertStream(window(testStream1), matcher);

*/

}

def window(stream: DataStream[Tuple2[String,Report]]):DataStream[Tuple2[String,EReport]] = {

val hourlyAggregate: DataStream[Tuple2[String, EReport]] = stream
  .flatMap(new EFlatMap(null))
  .keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  .reduce(new Counter())
return hourlyAggregate;

}

}

-Getting following issue

val testStream1 = testStream.close(); line is throwing error
[error] Test com.xyz.ips.ReportConsumerTest.testCounter failed: java.lang.RuntimeException: Could not startWith TypeInformation for type class org.apache.flink.streaming.runtime.streamrecord.StreamRecord; please specify the TypeInformation manually via StreamExecutionEnvironment#fromElements(Collection, TypeInformation), took 1.131 sec
[error] at org.flinkspector.datastream.DataStreamTestEnvironment.fromCollectionWithTimestamp(DataStreamTestEnvironment.java:188)
[error] at org.flinkspector.datastream.DataStreamTestEnvironment.fromInput(DataStreamTestEnvironment.java:142)
[error] at org.flinkspector.datastream.input.EventTimeSourceBuilder.close(EventTimeSourceBuilder.java:62)
[error] at com.xyz.ips.ReportConsumerTest.testCounter(ReportConsumerTest.scala:92)

@lofifnc
Copy link
Contributor

lofifnc commented Aug 9, 2017

Hi,

You are using scala which is not yet officially supported, but shouldn't pose any problems here as far I can see, since you're using the java api of Flink. You're indeed right that I should provide a way here to provide Typeinformation. I will try to include this in the next release.

But I think your problem is right here:
createTimedTestStreamWith(Tuple2.of(null, null))

I use the first element later to extract TypeInformation. I think your test would work fine if the first element in your TestStream / Input would contain actual data.

You could change the logic to provide the input like this:

var testStream :EventTimeSourceBuilder[Tuple2[String, Report]] = null

val it = ReportList.iterator()
var report:Tuple2[String,Report] = null
report = it.next()

testStream = createTimedTestStreamWith(Tuple2.of(report.f0, report.f1))

while(it.hasNext)
  {
    report = it.next()
    testStream = testStream.emit(Tuple2.of(report.f0, report.f1))

  }

This way the first element contains actual data.

And since your'e not providing any timestamps but do Windowing do a .closeAndFlush(); instead of a simple close() this will instruct Flinkspector to emit a max watermark at the end of the test stream.
val testStream1 = testStream.closeAndFlush();

I have not tested any of this. Please provide some Feedback if this resolves your problem.

Cheers,
Alex

@cslotterback
Copy link

cslotterback commented Aug 10, 2017

Hello,

I have found somewhat a solution that works:

var builder: EventTimeSourceBuilder[Tuple2[String, Report]] = null
val reports: List[Tuple2[String, Report]] = ...

if (reports.size < 1) {
      assert(false)
}

builder = createTimedTestStreamWith(reports.head)
for (report <- reports.drop(1)) {
  builder.emit(report, InWindow.to( ... ))
}
val inputStream = builder.close()

The only issue I see is that the first record isn't inserted into a window, so I am not sure where the first record in the list ends up. Maybe #createTimedTestStreamWith could be updated so you can pass a time for the first record, or like previously suggested, just passing the type with the stream constructor, and emiting all the records like normal.

@rohanbolar
Copy link
Author

rohanbolar commented Aug 11, 2017

We got flink-spector working with scala driver by adding following in the sbt

"junit" % "junit" % "4.11" % Test,
crossPaths := false,

Implementation is described as above by chris.

class Consumer extends DataStreamTestBase with AssertionsForJUnit{
@test def testStream() = {
var builder: EventTimeSourceBuilder[Tuple2[String, Report]] = null
val reports: List[Tuple2[String, Report]] = ...

if (reports.size < 1) {
assert(false)
}

builder = createTimedTestStreamWith(reports.head)
for (report <- reports.drop(1)) {
builder.emit(report, InWindow.to( ... ))
}
val inputStream = builder.close()

}

}

you can run the test with "sbt test"

@lofifnc
Copy link
Contributor

lofifnc commented Aug 14, 2017

@cslotterback You can already to this by using the underlying EventTimeInputBuilder.

    public static <T> EventTimeInputBuilder<T> startWith(T record, long timeStamp) 

    public static <T> EventTimeInputBuilder<T> startWith(T record, Moment moment) 

But I'm going to expose this in the high level api in the next release.

@rohanbolar If I'm not mistaken according to the inputs your still using the java api of flink?
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource, SingleOutputStreamOperator}

@lofifnc
Copy link
Contributor

lofifnc commented Aug 16, 2017

This issue has been bundled into #57

@lofifnc lofifnc closed this as completed Aug 16, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants