Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runFlink fails on fresh checkout repo #191

Closed
yz-chime opened this issue Aug 25, 2022 · 6 comments · Fixed by #193
Closed

runFlink fails on fresh checkout repo #191

yz-chime opened this issue Aug 25, 2022 · 6 comments · Fixed by #193
Labels
question Further information is requested

Comments

@yz-chime
Copy link

yz-chime commented Aug 25, 2022

Description
The failure occurred when I checked out the repo and ran the basic command in the guide. It seems com.tests.TestMessage is somehow not registered in Stencil. I'm on MacOS 12.5.1, with Java8 and Kafka installed. Stacktrace:

yunfanzhong | dagger $ ./gradlew dagger-core:runFlink

> Task :dagger-core:runFlink FAILED
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/yunfanzhong/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-log4j12/1.7.7/58f588119ffd1702c77ccab6acb54bfb41bed8bd/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/yunfanzhong/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-simple/1.7.25/8dacf9514f0c707cbbcdd6fd699e8940d42fb54e/slf4j-simple-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/yunfanzhong/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-log4j12/1.7.10/b3eeae7d1765f988a1f45ea81517191315c69c9e/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
09:23:11,421 INFO  io.odpf.dagger.core.config.ConfigurationProviderFactory       - Arguments are:
com.tests.TestMessage
io.odpf.dagger.common.exceptions.DescriptorNotFoundException: descriptor not found
        at io.odpf.dagger.common.serde.proto.deserialization.ProtoType.createFieldDescriptor(ProtoType.java:59)
        at io.odpf.dagger.common.serde.proto.deserialization.ProtoType.getProtoFieldDescriptor(ProtoType.java:50)
        at io.odpf.dagger.common.serde.proto.deserialization.ProtoType.getRowType(ProtoType.java:44)
        at io.odpf.dagger.common.serde.proto.deserialization.ProtoDeserializer.<init>(ProtoDeserializer.java:46)
        at io.odpf.dagger.core.deserializer.ProtoDeserializerProvider.getDaggerDeserializer(ProtoDeserializerProvider.java:40)
        at io.odpf.dagger.core.deserializer.DaggerDeserializerFactory.create(DaggerDeserializerFactory.java:28)
        at io.odpf.dagger.core.source.Stream$Builder.build(Stream.java:46)
        at io.odpf.dagger.core.source.StreamsFactory.getStreams(StreamsFactory.java:18)
        at io.odpf.dagger.core.StreamManager.getStreams(StreamManager.java:237)
        at io.odpf.dagger.core.StreamManager.registerSourceWithPreProcessors(StreamManager.java:103)
        at io.odpf.dagger.core.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:37)
Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: io.odpf.dagger.common.exceptions.DescriptorNotFoundException: descriptor not found
        at io.odpf.dagger.core.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:43)
Caused by: io.odpf.dagger.common.exceptions.DescriptorNotFoundException: descriptor not found
        at io.odpf.dagger.common.serde.proto.deserialization.ProtoType.createFieldDescriptor(ProtoType.java:59)
        at io.odpf.dagger.common.serde.proto.deserialization.ProtoType.getProtoFieldDescriptor(ProtoType.java:50)
        at io.odpf.dagger.common.serde.proto.deserialization.ProtoType.getRowType(ProtoType.java:44)
        at io.odpf.dagger.common.serde.proto.deserialization.ProtoDeserializer.<init>(ProtoDeserializer.java:46)
        at io.odpf.dagger.core.deserializer.ProtoDeserializerProvider.getDaggerDeserializer(ProtoDeserializerProvider.java:40)
        at io.odpf.dagger.core.deserializer.DaggerDeserializerFactory.create(DaggerDeserializerFactory.java:28)
        at io.odpf.dagger.core.source.Stream$Builder.build(Stream.java:46)
        at io.odpf.dagger.core.source.StreamsFactory.getStreams(StreamsFactory.java:18)
        at io.odpf.dagger.core.StreamManager.getStreams(StreamManager.java:237)
        at io.odpf.dagger.core.StreamManager.registerSourceWithPreProcessors(StreamManager.java:103)
        at io.odpf.dagger.core.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:37)

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':dagger-core:runFlink'.
> Process 'command '/usr/local/Cellar/openjdk@8/1.8.0+345/libexec/openjdk.jdk/Contents/Home/bin/java'' finished with non-zero exit value 1

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights.

* Get more help at https://help.gradle.org

Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/6.6.1/userguide/command_line_interface.html#sec:command_line_warnings

However, all tests pass when I ran ./gradlew clean test. I noticed that com.tests.TestMessage is referenced in many tests. Not sure why the behavior is different in dagger-core:runFlink target.

To Reproduce
Steps to reproduce the behavior:
Check out the repo and run ./gradlew dagger-core:runFlink

Expected behavior
Not sure what's the expected behavior, maybe dagger process will keep running until shutdown?

@lavkesh
Copy link
Member

lavkesh commented Aug 26, 2022

@yz-chime Thanks for trying it out. For stencil to work, you need to run a stencil server.
please check https://github.com/odpf/stencil for more info.
You have to put the proto descriptor on the stencil.
Or you wanna try it out locally, you can include proto classes in the jar and disable stencil.
I hope this helps.
TestMessage will not help because it's part of the test package, it's not bundled in the main jar.

@yz-chime
Copy link
Author

Thanks. A few follow up questions:

  1. How to disable stencil?
  2. I assume if deployed to a flink 1.9 cluster, we can also disable stencil when proto classes are built into the jar?
  3. Is there a guide on common practice of building container image and deploying to a Kubernetes cluster?

@lavkesh
Copy link
Member

lavkesh commented Aug 27, 2022

@yz-chime Please go through the documentation here and here

@ravisuhag
Copy link
Member

ravisuhag commented Aug 27, 2022

@yz-chime In addition to ref @lavkesh provided inlining configs.

  1. Using this config
SCHEMA_REGISTRY_STENCIL_ENABLE : false
  1. Yes
  2. Yes (https://github.com/odpf/dagger/blob/main/docs/docs/guides/deployment.md)

@ravisuhag ravisuhag added the question Further information is requested label Aug 27, 2022
@yz-chime
Copy link
Author

yz-chime commented Aug 31, 2022

I copied generated classes of TestMessage.proto to the src/main/java of dapper-common. runFlink target could start flink and listen to kafka topic. I chose the io.odpf.dagger.consumer.TestNestedRepeatedMessage as the schema for the test Kafka topic, and set 6 as the INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX according to the proto definition.

I generated protobuf template in python and wrote a python script to send a serialized sample events to the topic. The code is straightforward:

from pb.TestMessage_pb2 import TestNestedRepeatedMessage
from kafka import KafkaProducer

tnrm = TestNestedRepeatedMessage()
tm = tnrm.single_message
tm.order_number = "123"
tm.order_url = "https://www.example.com/page"
tm.order_details = "something"

tnrm.number_field = 123
tnrm.event_timestamp.GetCurrentTime()
# print(tnrm.ListFields())

data = tnrm.SerializeToString()

producer = KafkaProducer(bootstrap_servers="localhost:9092")

producer.send("test-topic", data).get()
print(f"sent {len(data)} bytes to test-topic")

However the Dagger job instantly crashes when receiving the event. Relevant log lines are below:

11:31:33,168 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='test-topic', partition=0}]
11:31:33,170 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Timestamps/Watermarks -> Filter -> SourceConversion(table=[default_catalog.default_database.data_stream], fields=[single_message, repeated_message, number_field, repeated_number_field, metadata, event_timestamp, repeated_long_field, __internal_validation_field__, rowtime]) -> SinkConversionToTuple2 -> Filter -> Map (1/1)#0 (02e81ee53e06b8a604da2703d593457d) switched from INITIALIZING to RUNNING.
11:31:33,171 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Timestamps/Watermarks -> Filter -> SourceConversion(table=[default_catalog.default_database.data_stream], fields=[single_message, repeated_message, number_field, repeated_number_field, metadata, event_timestamp, repeated_long_field, __internal_validation_field__, rowtime]) -> SinkConversionToTuple2 -> Filter -> Map (1/1) (02e81ee53e06b8a604da2703d593457d) switched from INITIALIZING to RUNNING.
11:31:33,174 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 creating fetcher with offsets {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773}.
11:31:33,186 INFO  org.apache.kafka.clients.consumer.ConsumerConfig              - ConsumerConfig values: ... (omitted)
11:31:33,192 WARN  org.apache.kafka.clients.consumer.ConsumerConfig              - The configuration 'auto.commit.enable' was supplied but isn't a known config.
11:31:33,192 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version: 2.4.1
11:31:33,193 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId: c57222ae8cd7866b
11:31:33,193 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka startTimeMs: 1661970693192
11:31:33,196 INFO  org.apache.kafka.clients.consumer.KafkaConsumer               - [Consumer clientId=consumer-dummy-consumer-group-2, groupId=dummy-consumer-group] Subscribed to partition(s): test-topic-0
11:31:33,213 INFO  org.apache.kafka.clients.Metadata                             - [Consumer clientId=consumer-dummy-consumer-group-2, groupId=dummy-consumer-group] Cluster ID: 6Mx7Agf4SNmUXno6XX-7_w
11:31:33,215 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-dummy-consumer-group-2, groupId=dummy-consumer-group] Discovered group coordinator 10.0.0.3:9092 (id: 2147483647 rack: null)
11:31:33,224 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - [Consumer clientId=consumer-dummy-consumer-group-2, groupId=dummy-consumer-group] Setting offset for partition test-topic-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=10.0.0.3:9092 (id: 0 rack: null), epoch=0}}
#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x000000010eedbd1d, pid=8572, tid=0x0000000000014f03
#
# JRE version: OpenJDK Runtime Environment (8.0_345) (build 1.8.0_345-bre_2022_08_04_23_35-b00)
# Java VM: OpenJDK 64-Bit Server VM (25.345-b00 mixed mode bsd-amd64 compressed oops)
# Problematic frame:
# V  [libjvm.dylib+0x539d1d]
#
# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /Users/yunfanzhong/code/dagger/dagger-core/hs_err_pid8572.log
Compiled method (nm)   16202 1513     n 0       sun.misc.Unsafe::getInt (native)
 total in heap  [0x0000000114e1a010,0x0000000114e1a360] = 848
 relocation     [0x0000000114e1a138,0x0000000114e1a178] = 64
 main code      [0x0000000114e1a180,0x0000000114e1a360] = 480
Compiled method (nm)   16203 1513     n 0       sun.misc.Unsafe::getInt (native)
 total in heap  [0x0000000114e1a010,0x0000000114e1a360] = 848
 relocation     [0x0000000114e1a138,0x0000000114e1a178] = 64
 main code      [0x0000000114e1a180,0x0000000114e1a360] = 480
#
# If you would like to submit a bug report, please visit:
#   https://github.com/Homebrew/homebrew-core/issues
#

> Task :dagger-core:runFlink FAILED

I'm not sure what caused this. Is it because TestNestedRepeatedMessage object is too complex and I probably should use a plain object with simple fields instead?

@yz-chime
Copy link
Author

I tried a simple message definition and the job still crashes.

message TestSimpleEvent {
    string user_id = 1;
    string event_name = 2;
    google.protobuf.Timestamp event_timestamp = 3;
}

@Meghajit Meghajit linked a pull request Sep 20, 2022 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants