Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

Cannot FlinkPulsarSource specify SubscriptionType, or how??? #97

Closed
why198852 opened this issue Jun 16, 2020 · 3 comments
Closed

Cannot FlinkPulsarSource specify SubscriptionType, or how??? #97

why198852 opened this issue Jun 16, 2020 · 3 comments

Comments

@why198852
Copy link

why198852 commented Jun 16, 2020

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(10000)

val prop = new Properties()
prop.put("topic", "persistent://ts/ns/mpc_test")
prop.put("pulsar.reader.subscriptionName", "ts.ns-profile")

val clientConf = new ClientConfigurationData()
clientConf.setAuthPluginClassName("org.apache.pulsar.client.impl.auth.AuthenticationToken")
clientConf.setAuthParams("eDkaliAfeGMsevPF0km6c1B741gxAWT2hgc")
clientConf.setServiceUrl("pulsar://127.0.0.1:6650")

val source1 = new FlinkPulsarSource[String]("http://127.0.0.1:8080", clientConf, new SimpleStringSchema(), prop)
source1.setStartFromSubscription(prop.getProperty("pulsar.reader.subscriptionName", "ts.ns-profile"))
val dd = env.addSource(source1)

dd.print("pulsar:").setParallelism(1)

env.execute(this.getClass.getName)
@jiazhai
Copy link
Contributor

jiazhai commented Jun 17, 2020

@why198852 currently, we use reader mode instead of consumer to read data. it not support subscriptionType

@sijie
Copy link
Member

sijie commented Sep 4, 2020

Close this issue since the question is answered

@ckdarby
Copy link

ckdarby commented Nov 26, 2020

@why198852 currently, we use reader mode instead of consumer to read data. it not support subscriptionType

@jiazhai Is this still the case and is it planned for the future to support consumer with subscription type to read data?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

4 participants