Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

Enables Pulsar as Streaming Source and Sink in Flink DataStream #4

Merged
merged 6 commits into from
Aug 10, 2019
Merged

Enables Pulsar as Streaming Source and Sink in Flink DataStream #4

merged 6 commits into from
Aug 10, 2019

Conversation

yjshen
Copy link
Contributor

@yjshen yjshen commented Aug 9, 2019

This PR focus on consuming typed message from Pulsar and produce typed message to Pulsar.

Master Issue: #1

It's consist of the following part:

  • FlinkPulsarSource: This enables exactly-once consume as well as topic/partition discovery.
  • FlinkPulsarSink: This enables at-least-once when checkpoint is enabled and setFlushOnCheckpoint set to true
  • PulsarDeserializer and PulsarSerializer: These two come from the counterpart of pulsar-spark-connector, with adaption to Flink's type system
  • PulsarMetadataReader: This comes from pulsar-spark-connector too, provides the ability to preserve and commit message, topic match and find, schema get and upload.

@yjshen yjshen requested a review from sijie as a code owner August 9, 2019 04:09
@yjshen yjshen mentioned this pull request Aug 9, 2019
1 task
private val defaultCacheExpireTimeout = TimeUnit.MINUTES.toMillis(10)

private lazy val cacheExpireTimeout: Long = defaultCacheExpireTimeout
// TODO: how to get current context with settings in Flink
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this or create an issue for tracking a fix for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened an issue to track this.

val pulsarClient: Client = org.apache.pulsar.client.api.PulsarClient
.builder()
.serviceUrl(pulsarServiceUrl)
.loadConf(clientConf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have the case insensitive issue as what we had in Spark?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a test case for this, opened an issue.

src/main/scala/org/apache/flink/pulsar/JSONOptions.scala Outdated Show resolved Hide resolved
@sijie sijie mentioned this pull request Aug 9, 2019
8 tasks
@sijie
Copy link
Member

sijie commented Aug 10, 2019

[�[1;33mWARNING�[m] [Warn] : bootstrap class path not set in conjunction with -source 1.6
[�[1;31mERROR�[m] [Error] /home/jenkins/workspace/StreamNative_pulsar-flink_PR-4/src/test/java/org/apache/flink/streaming/connectors/pulsar/internal/ClosableBlockingQueueTest.java:38: diamond operator is not supported in -source 1.6
  (use -source 7 or higher to enable diamond operator)
[�[1;31mERROR�[m] [Error] /home/jenkins/workspace/StreamNative_pulsar-flink_PR-4/src/test/scala/org/apache/flink/streaming/connectors/pulsar/testutils/TestSourceContext.java:30: diamond operator is not supported in -source 1.6
  (use -source 7 or higher to enable diamond operator)

@yjshen can you please talk a look at the CI failure?

@sijie sijie added area/connector type/task Indicates a chore or a small item of work labels Aug 10, 2019
@sijie sijie added this to the 2.5.0 milestone Aug 10, 2019
@yjshen
Copy link
Contributor Author

yjshen commented Aug 10, 2019

[�[1;33mWARNING�[m] [Warn] : bootstrap class path not set in conjunction with -source 1.6
[�[1;31mERROR�[m] [Error] /home/jenkins/workspace/StreamNative_pulsar-flink_PR-4/src/test/java/org/apache/flink/streaming/connectors/pulsar/internal/ClosableBlockingQueueTest.java:38: diamond operator is not supported in -source 1.6
  (use -source 7 or higher to enable diamond operator)
[�[1;31mERROR�[m] [Error] /home/jenkins/workspace/StreamNative_pulsar-flink_PR-4/src/test/scala/org/apache/flink/streaming/connectors/pulsar/testutils/TestSourceContext.java:30: diamond operator is not supported in -source 1.6
  (use -source 7 or higher to enable diamond operator)

@yjshen can you please talk a look at the CI failure?

Done.

@sijie sijie merged commit 9093a65 into streamnative:master Aug 10, 2019
@sijie sijie deleted the streaming_only branch August 10, 2019 02:52
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
area/connector type/task Indicates a chore or a small item of work
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants