Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QUESTION: how to get sample app WordCount.jar to run with version 1.15.3 and 2 taskmanager replicas #618

Closed
pzim opened this issue Jan 31, 2023 · 3 comments

Comments

@pzim
Copy link

pzim commented Jan 31, 2023

Hi all,

I am attempting to deploy the sample WordCount.jar app using 2 taskmanager replicas, however only 1 taskmanager is able to register successfully with the ResourceManager at any one time. When attempting to deploy the same app with a single replica for taskmanager, it seems to work fine.

flink UI showing running job using a single taskmanager replica setup:
image

However when deploying a 2 replica taskmanager configuration, one of the taskmanagers gets an akka gated exception.

logs from the failed taskmanager pod in a 2 replica setup:

Caused by: akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@flinkjobcluster-sample-jobmanager:6123/), Path(/user/rpc/resourcemanager_*)]
	at java.lang.Thread.run(Unknown Source) ~[?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
	at scala.concurrent.java8.FuturesConvertersImpl$CF$$anon$1.accept(FutureConvertersImpl.scala:53) ~[?:?]
	at scala.concurrent.java8.FuturesConvertersImpl$CF$$anon$1.accept(FutureConvertersImpl.scala:59) ~[?:?]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcService.lambda$resolveActorAddress$11(AkkaRpcService.java:602) ~[?:?]
org.apache.flink.runtime.rpc.exceptions.RpcConnectionException: Could not connect to rpc endpoint under address akka.tcp://flink@flinkjobcluster-sample-jobmanager:6123/user/rpc/resourcemanager_*.
2023-01-31 21:20:42,583 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not resolve ResourceManager address akka.tcp://flink@flinkjobcluster-sample-jobmanager:6123/user/rpc/resourcemanager_*, retrying in 10000 ms.
2023-01-31 21:20:42,582 WARN  akka.remote.Remoting                                         [] - Tried to associate with unreachable remote address [akka.tcp://flink@flinkjobcluster-sample-jobmanager:6123]. Address is now gated for 50 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted.] 

flink UI showing job with 1 taskmanager registered and running healthy, although 2 taskmanager pods are running:
image

same flink UI observed a few minutes later, showing the job failed:
image

I created a custom flink 1.15.3 image using the below Dockerfile:

FROM flink:1.15.3-scala_2.12-java11

RUN mkdir -p ${FLINK_HOME}/flink-web-upload ${FLINK_HOME}/usrlib
COPY target/wc.jar ${FLINK_HOME}/flink-web-upload/app.jar

RUN ln -s ${FLINK_HOME}/flink-web-upload/app.jar ${FLINK_HOME}/usrlib/app.jar
RUN chown -R 9999:9999 ${FLINK_HOME}/flink-web-upload ${FLINK_HOME}/usrlib

USER 9999

Where wc.jar was downloaded from here:
https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.15.3/flink-examples-streaming_2.12-1.15.3-WordCount.jar

Below is the flinkcluster CR I'm using to deploy this setup:

apiVersion: flinkoperator.k8s.io/v1beta1
kind: FlinkCluster
metadata:
  name: flinkjobcluster-sample
spec:
  flinkVersion: "1.15"
  image:
    name: <redacted_path_to_ECR>/streaming-analytics-app:flink-1-15-test
  jobManager:
    accessScope: Cluster
    ports:
      ui: 8081
    resources:
      limits:
        memory: "2048Mi"
        cpu: "500m"
  taskManager:
    replicas: 2
    resources:
      limits:
        memory: "2048Mi"
        cpu: "500m"
  job:
    jarFile: ./examples/streaming/WordCount.jar
    className: org.apache.flink.streaming.examples.wordcount.WordCount
    args: ["--input", "./README.txt"]
    parallelism: 2
    restartPolicy: Never
  flinkProperties:
    taskmanager.numberOfTaskSlots: "1"

  logConfig:
    log4j-console.properties: |
      # This affects logging for both user code and Flink
      rootLogger.level = DEBUG
      rootLogger.appenderRef.console.ref = ConsoleAppender
      rootLogger.appenderRef.rolling.ref = RollingFileAppender

      # Uncomment this if you want to _only_ change Flink's logging
      logger.flink.name = org.apache.flink
      logger.flink.level = DEBUG

      # The following lines keep the log level of common libraries/connectors on
      # log level DEBUG. The root logger does not override this. You have to manually
      # change the log levels here.
      logger.akka.name = akka
      logger.akka.level = DEBUG
      logger.kafka.name= org.apache.kafka
      logger.kafka.level = DEBUG
      logger.hadoop.name = org.apache.hadoop
      logger.hadoop.level = DEBUG

      logger.zookeeper.name = org.apache.zookeeper
      logger.zookeeper.level = DEBUG

      logger.kafkacoordinator.name = org.apache.kafka.clients.consumer.internals.AbstractCoordinator
      logger.kafkacoordinator.level = DEBUG

      # Log all info to the console
      appender.console.name = ConsoleAppender
      appender.console.type = CONSOLE
      appender.console.layout.type = PatternLayout
      appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

      # Log all info in the given rolling file
      appender.rolling.name = RollingFileAppender
      appender.rolling.type = RollingFile
      appender.rolling.append = false
      appender.rolling.fileName = ${sys:log.file}
      appender.rolling.filePattern = ${sys:log.file}.%i
      appender.rolling.layout.type = PatternLayout
      appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
      appender.rolling.policies.type = Policies
      appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
      appender.rolling.policies.size.size=100MB
      appender.rolling.strategy.type = DefaultRolloverStrategy
      appender.rolling.strategy.max = 10


I would welcome any input in helping get both taskmanagers registered and able to run the job healthy.
@jto
Copy link
Contributor

jto commented Feb 1, 2023

Hey! This is rather odd. Looks like there may be a network issue somewhere. Could you look into your namespace events see if anything pops ?

@pzim
Copy link
Author

pzim commented Feb 1, 2023

@jto - appreciate you responding so quick. The networking is fine I believe. I deployed an ubuntu container with networking tools in the same namespace and am able to access each of the job/taskmanager pods/ports. The only interesting events are related to the taskmanager experiencing the gating issue, where its readiness probes are failing with 500 response.
Logs in this failing taskmanager still showing:

Tried to associate with unreachable remote address [akka.tcp://flink@flinkjobcluster-sample-jobmanager:6123]. Address is now gated for 50 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted.]

Another data point: I can delete the failing taskmanager pod and when it comes back up it becomes the healthy one and the other, previously healthy taskmanager pod becomes unhealthy with the above gated message.

@pzim
Copy link
Author

pzim commented Feb 1, 2023

After doing some pcaps, the network traffic between the taskmanagers and and jobmanager looks pretty healthy/normal.

After seeing heartbeat timeouts like below led to the thought that our envoy proxies were blocking the heartbeat communication.

org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException: The heartbeat of ResourceManager with id cf8116891cd5aec6f77ac3f897359ff6 timed out.

After adding an istio annotation to exclude traffic for port 6123, both taskmanagers were able to register and stay up.
Closing this issue now as this has resolved the issue.

@pzim pzim closed this as completed Feb 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants