Skip to content

Commit

Permalink
[Feat] flink 1.15 support. (#1003)
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed May 5, 2022
1 parent 8386d00 commit ea00e39
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 33 deletions.
14 changes: 0 additions & 14 deletions scalastyle-config.xml
Expand Up @@ -208,20 +208,6 @@ This file is divided into 3 sections:
</customMessage>
</check>

<check customId="awaitready" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters>
<parameter name="regex">Await\.ready</parameter>
</parameters>
<customMessage><![CDATA[
Are you sure that you want to use Await.ready? In most cases, you should use ThreadUtils.awaitReady instead.
If you must use Await.ready, wrap the code block with
// scalastyle:off awaitready
Await.ready(...)
// scalastyle:on awaitready
]]>
</customMessage>
</check>

<check customId="commons-lang2" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true">
<parameters>
<parameter name="regex">org\.apache\.commons\.lang\.</parameter>
Expand Down
26 changes: 14 additions & 12 deletions streamx-console/streamx-console-service/pom.xml
Expand Up @@ -479,7 +479,6 @@
</activation>
<build>
<plugins>

<plugin>
<groupId>com.github.eirslett</groupId>
<artifactId>frontend-maven-plugin</artifactId>
Expand Down Expand Up @@ -548,9 +547,12 @@
</profile>

<profile>
<id>console-flink-shims_default</id>
<id>console-flink-shims_2.11</id>
<activation>
<activeByDefault>true</activeByDefault>
<property>
<name>scala.binary.version</name>
<value>2.11</value>
</property>
</activation>
<build>
<plugins>
Expand Down Expand Up @@ -591,13 +593,6 @@
<version>${project.version}</version>
<outputDirectory>${project.build.directory}/shims</outputDirectory>
</dependency>
<!-- flink 1.15 support-->
<dependency>
<groupId>com.streamxhub.streamx</groupId>
<artifactId>streamx-flink-shims_flink-1.15_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<outputDirectory>${project.build.directory}/shims</outputDirectory>
</dependency>
<!-- flink-submit-core -->
<dependency>
<groupId>com.streamxhub.streamx</groupId>
Expand Down Expand Up @@ -633,11 +628,11 @@
</profile>

<profile>
<id>console-flink-shims_2.11</id>
<id>console-flink-shims_2.12</id>
<activation>
<property>
<name>scala.binary.version</name>
<value>2.11</value>
<value>2.12</value>
</property>
</activation>
<build>
Expand Down Expand Up @@ -679,6 +674,13 @@
<version>${project.version}</version>
<outputDirectory>${project.build.directory}/shims</outputDirectory>
</dependency>
<!-- flink 1.15 support-->
<dependency>
<groupId>com.streamxhub.streamx</groupId>
<artifactId>streamx-flink-shims_flink-1.15_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<outputDirectory>${project.build.directory}/shims</outputDirectory>
</dependency>
<!-- flink-submit-core -->
<dependency>
<groupId>com.streamxhub.streamx</groupId>
Expand Down
Expand Up @@ -150,11 +150,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConf = JobStatusWatcherConf.de
})
// blocking until all future are completed or timeout is reached
val allFutureHold = Future.sequence(tracksFuture)
Try(
// scalastyle:off awaitready
Await.ready(allFutureHold, conf.sglTrkTaskTimeoutSec seconds)
// scalastyle:on awaitready
).failed.map(_ =>
Try(Await.ready(allFutureHold, conf.sglTrkTaskTimeoutSec seconds)).failed.map(_ =>
logInfo(s"[FlinkJobStatusWatcher] tracking flink job status on kubernetes mode timeout," +
s" limitSeconds=${conf.sglTrkTaskTimeoutSec}," +
s" trackingClusterKeys=${trkClusterKeys.mkString(",")}"))
Expand Down
Expand Up @@ -119,10 +119,8 @@ class FlinkMetricWatcher(conf: MetricWatcherConf = MetricWatcherConf.defaultConf
})
// blocking until all future are completed or timeout is reached
Try {
// scalastyle:off awaitready
val futureHold = Future.sequence(trkFutures)
Await.ready(futureHold, conf.sglTrkTaskTimeoutSec seconds)
// scalastyle:on awaitready
}.failed.map { _ =>
logError(s"[FlinkMetricWatcher] tracking flink metrics on kubernetes mode timeout," +
s" limitSeconds=${conf.sglTrkTaskTimeoutSec}," +
Expand Down

0 comments on commit ea00e39

Please sign in to comment.