Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ This is a curated list of demos that showcase Apache Pulsar® messaging and even

# Clients

- [PubSub Client Examples](clients/README.md)
- [PubSub Client Examples](clients/README.md)

# Flink

- [Pulsar Flink Connector](pulsar-flink/README.md)
1 change: 1 addition & 0 deletions pulsar-flink/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dependency-reduced-pom.xml
86 changes: 86 additions & 0 deletions pulsar-flink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Pulsar Flink Connector Examples

This is a curated list of examples that demonstrate how to process event streams in Apache Pulsar using Apache Flink.

## Prerequisites

- Java 1.8 or higher to run the demo application
- Maven to compile the demo application
- Pulsar 2.5.2 or higher
- Flink 1.10.1

## Pulsar Streaming Word Count

This example demonstrates a Flink streaming job that reads events from Pulsar, processes them and produces the word count results
back to Pulsar.

```bash
export INPUT_TOPIC=wordcount_input
export OUTPUT_TOPIC=wordcount_output
```

### Steps

1. Download Pulsar 2.5.1 and start Pulsar standalone. Assume `PULSAR_HOME` is the root directory of pulsar distribution.

```bash
${PULSAR_HOME}/bin/pulsar standalone
```

2. Download Flink 1.10.1 and start Flink locally. Assume `FLINK_HOME` is the root directory of flink distribution.

```bash
${FLINK_HOME}/bin/start-cluster.sh
```

3. Clone the examples repo and build the flink examples. Assume `EXAMPLES_HOME` is the root directory of the cloned `streamnative/pulsar-examples` repo.

```bash
git clone https://github.com/streamnative/pulsar-examples.git
```

```bash
cd pulsar-examples/pulsar-flink
```

```bash
mvn clean install
```

4. Open a terminal to subscribe to the output topic `${OUTPUT_TOPIC}` to receive word count results from it.

```bash
${PULSAR_HOME}/bin/pulsar-client consume -s sub -n 0 ${OUTPUT_TOPIC}
```

5. Open a terminal to submit the PulsarStreamingWordCount job to Flink.

```bash
${FLINK_HOME}/bin/flink run ${EXAMPLES_HOME}/pulsar-flink/target/pulsar-flink-examples-0.0.0-SNAPSHOT.jar \
--broker-service-url pulsar://localhost:6650 \
--admin-service-url http://localhost:8080 \
--input-topic ${INPUT_TOPIC} --output-topic ${OUTPUT_TOPIC}
```

6. Open a terminal to produce a stream of sentences to the input topic `${INPUT_TOPIC}`.

```bash
${PULSAR_HOME}/bin/pulsar-client produce -m "test flink streaming word count" -n 100 ${INPUT_TOPIC}
```

7. In the terminal of `step 4`, you should see a stream of wordcount results similar as below. The wordcount results are saved in AVRO format in the output topic.

```bash
----- got message -----
test�
----- got message -----

count�
----- got message -----
word�
----- got message -----
streaming�
----- got message -----

flink�
```
278 changes: 278 additions & 0 deletions pulsar-flink/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
<!--

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.streamnative.examples</groupId>
<version>0.0.0-SNAPSHOT</version>
<artifactId>pulsar-flink-examples</artifactId>
<packaging>jar</packaging>
<name>Pulsar Flink Examples</name>
<url>http://github.com/streamnative/examples</url>
<inceptionYear>2020</inceptionYear>
<licenses>
<license>
<name>Apache License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
</licenses>
<scm>
<connection>scm:git:https://github.com/streamnative/examples.git</connection>
<developerConnection>scm:git:https://github.com/streamnative/examples.git</developerConnection>
<url>https://github.com/streamnative/examples</url>
<tag>branch-0.0.1</tag>
</scm>
<issueManagement>
<system>Github</system>
<url>https://github.com/streamnative/examples/issues</url>
</issueManagement>
<developers>
<developer>
<name>The StreamNative Team</name>
<url>http://github.com/streamnative</url>
</developer>
</developers>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<javac.target>1.8</javac.target>
<pulsar.version>2.5.2</pulsar.version>
<scala.version>2.11.12</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<flink.version>1.10.1</flink.version>
<pulsar-flink-connector.version>2.4.17</pulsar-flink-connector.version>

<!-- plugin dependencies -->
<license-maven-plugin.version>3.0.rc1</license-maven-plugin.version>
<maven-checkstyle-plugin.version>3.0.0</maven-checkstyle-plugin.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
<maven-shade-plugin.version>3.1.0</maven-shade-plugin.version>
<maven-source-plugin.version>2.2.1</maven-source-plugin.version>
<maven-surefire-plugin.version>2.21.0</maven-surefire-plugin.version>
<os-maven-plugin.version>1.4.1.Final</os-maven-plugin.version>
<puppycrawl.checkstyle.version>6.19</puppycrawl.checkstyle.version>
<spotbugs-maven-plugin.version>3.1.8</spotbugs-maven-plugin.version>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-flink-connector_${scala.compat.version}</artifactId>
<version>${pulsar-flink-connector.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.compat.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.compat.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>${os-maven-plugin.version}</version>
</extension>
</extensions>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>${maven-checkstyle-plugin.version}</version>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>${puppycrawl.checkstyle.version}</version>
</dependency>
</dependencies>
<configuration>
<configLocation>../clients/buildtools/src/main/resources/streamnative/checkstyle.xml</configLocation>
<suppressionsLocation>../clients/buildtools/src/main/resources/streamnative/suppressions.xml</suppressionsLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failOnViolation>true</failOnViolation>
<includeResources>false</includeResources>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
<executions>
<execution>
<id>checkstyle</id>
<phase>validate</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-maven-plugin</artifactId>
<version>${spotbugs-maven-plugin.version}</version>
<configuration>
<excludeFilterFile>${session.executionRootDirectory}/../buildtools/src/main/resources/streamnative/findbugsExclude.xml</excludeFilterFile>
</configuration>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>${javac.target}</source>
<target>${javac.target}</target>
<compilerArgs>
<compilerArg>-Werror</compilerArg>
<compilerArg>-Xlint:deprecation</compilerArg>
<compilerArg>-Xlint:unchecked</compilerArg>
<!-- https://issues.apache.org/jira/browse/MCOMPILER-205 -->
<compilerArg>-Xpkginfo:always</compilerArg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<configuration>
<argLine>-Xmx2G -Djava.net.preferIPv4Stack=true -Dio.netty.leakDetection.level=paranoid</argLine>
<redirectTestOutputToFile>${redirectTestOutputToFile}</redirectTestOutputToFile>
<reuseForks>false</reuseForks>
<forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds>
<rerunFailingTestsCount>${testRetryCount}</rerunFailingTestsCount>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>${maven-source-plugin.version}</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.mycila</groupId>
<artifactId>license-maven-plugin</artifactId>
<version>${license-maven-plugin.version}</version>
<configuration>
<header>src/resources/license.template</header>

<excludes>
<exclude>LICENSE</exclude>
<exclude>NOTICE</exclude>
</excludes>
<mapping>
<proto>JAVADOC_STYLE</proto>
<go>DOUBLESLASH_STYLE</go>
<conf>SCRIPT_STYLE</conf>
<ini>SCRIPT_STYLE</ini>
<yaml>SCRIPT_STYLE</yaml>
<tf>SCRIPT_STYLE</tf>
<cfg>SCRIPT_STYLE</cfg>
<Makefile>SCRIPT_STYLE</Makefile>
<service>SCRIPT_STYLE</service>
<cc>JAVADOC_STYLE</cc>
<md>XML_STYLE</md>
<txt>SCRIPT_STYLE</txt>
<scss>JAVADOC_STYLE</scss>
<Doxyfile>SCRIPT_STYLE</Doxyfile>
<tfvars>SCRIPT_STYLE</tfvars>
</mapping>
</configuration>
</plugin>
<plugin>
<!-- Shade all the dependencies to avoid conflicts -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
<minimizeJar>false</minimizeJar>
<artifactSet>
<includes>
<include>io.streamnative.connectors:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>io.streamnative.examples.flink.PulsarStreamingWordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
</profiles>
<repositories>
<repository>
<id>central</id>
<layout>default</layout>
<url>https://repo1.maven.org/maven2</url>
</repository>
<repository>
<id>bintray-streamnative-maven</id>
<name>bintray</name>
<url>https://dl.bintray.com/streamnative/maven</url>
</repository>
</repositories>
</project>
Loading