Skip to content

yjshen/spark-connector-test

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 

Repository files navigation

A step to step guide on how to use the Pulsar Spark Connector

The Pulsar Spark Connector is open source on July 9, 2019. See the source code and user guide here.

Environment

The following example uses the Homebrew package manager to download and install software on macOS, and you can choose other package managers based on your own requirements and operating system.

  1. Install Homebrew.
/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
  1. Install Java 8 or a higher version.

    This example uses Homebrew to install JDK8.

brew tap adoptopenjdk/openjdk
brew cask install adoptopenjdk8
  1. Install Apache Spark 2.4.0 or higher.

    From the official website download Spark 2.4.3 and decompress.

tar xvfz spark-2.4.3-bin-hadoop2.7.tgz
  1. Download Apache Pulsar 2.4.0.

    From the official website download Pulsar 2.4.0.

wget https://archive.apache.org/dist/pulsar/pulsar-2.4.0/apache-pulsar-2.4.0-bin.tar.gz
tar xvfz apache-pulsar-2.4.0-bin.tar.gz
  1. Install Apache Maven.
brew install maven
  1. Set up the development environment.

    This example creates a Maven project called connector-test.

  (1) Build a framework for a Scala project using archetype provided by Scala Maven Plugin.

mvn archetype:generate

In the list that appears, select the latest version of net.alchim31.maven:scala-archetype-simple, which is currently 1.7, and specify groupId, artifactId, and version for the new project.   This example uses:

groupId: com.example
artifactId: connector-test
version: 1.0-SNAPSHOT

After the above steps, a Maven Scala project framework is basically set up.

 (2) Introduce Spark, Pulsar Spark Connector dependencies in pom.xml under the project root directory, and use maven_shade_plugin for project packaging.

    a. Define the version information of the dependent package.

  <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.12</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
        <spark.version>2.4.3</spark.version>
        <pulsar-spark-connector.version>2.4.0</pulsar-spark-connector.version>
        <spec2.version>4.2.0</spec2.version>
        <maven-shade-plugin.version>3.1.0</maven-shade-plugin.version>
  </properties>

    b. Introduce Spark, Pulsar Spark Connector dependencies.

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.compat.version}</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.compat.version}</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-catalyst_${scala.compat.version}</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>io.streamnative.connectors</groupId>
        <artifactId>pulsar-spark-connector_${scala.compat.version}</artifactId>
        <version>${pulsar-spark-connector.version}</version>
    </dependency>

    c. Add a Maven repository that contains pulsar-spark-connector.

    <repositories>
      <repository>
        <id>central</id>
        <layout>default</layout>
        <url>https://repo1.maven.org/maven2</url>
      </repository>
      <repository>
        <id>bintray-streamnative-maven</id>
        <name>bintray</name>
        <url>https://dl.bintray.com/streamnative/maven</url>
      </repository>
    </repositories>

      d. Package the sample class with pulsar-spark-connector using maven_shade_plugin.

    <plugin>
          <!-- Shade all the dependencies to avoid conflicts -->
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-shade-plugin</artifactId>
          <version>${maven-shade-plugin.version}</version>
          <executions>
            <execution>
              <phase>package</phase>
              <goals>
                <goal>shade</goal>
              </goals>
              <configuration>
                <createDependencyReducedPom>true</createDependencyReducedPom>
                <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
                <minimizeJar>false</minimizeJar>

                <artifactSet>
                  <includes>
                    <include>io.streamnative.connectors:*</include>
                  </includes>
                </artifactSet>
                <filters>
                  <filter>
                    <artifact>*:*</artifact>
                    <excludes>
                      <exclude>META-INF/*.SF</exclude>
                      <exclude>META-INF/*.DSA</exclude>
                      <exclude>META-INF/*.RSA</exclude>
                    </excludes>
                  </filter>
                </filters>
                <transformers>
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                  <transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
                </transformers>
              </configuration>
            </execution>
          </executions>
        </plugin>

Read from and write to Pulsar in Spark programs

The project in the example includes the following programs:

  1. Read the data from Pulsar (name the app StreamRead).
  2. Write the data to Pulsar (name the app BatchWrite).

Build a stream processing job to read data from Pulsar

  1. In StreamRead, create SparkSession.
val spark = SparkSession
    .builder()
    .appName("data-read")
    .config("spark.cores.max", 2)
    .getOrCreate()
  1. In order to connect to Pulsar, you need to specify service.url and admin.url when building DataFrame and specify the topic to be read.
val ds = spark.readStream
    .format("pulsar")
    .option("service.url", "pulsar://localhost:6650")
    .option("admin.url", "http://localhost:8088")
    .option("topic", "topic-test")
    .load()
ds.printSchema()  // print schema information of `topic-test`, as a validation step.
  1. Output ds to the console to start the job execution.
val query = ds.writeStream
    .outputMode("append")
    .format("console")
    .start()
query.awaitTermination()

Write data to Pulsar

  1. Similarly, in BatchWrite, first create SparkSession.
val spark = SparkSession
    .builder()
    .appName("data-sink")
    .config("spark.cores.max", 2)
    .getOrCreate()
  1. Create a list of 1-10 and convert it to a Spark Dataset and write to Pulsar.
import spark.implicits._
spark.createDataset(1 to 10)
    .write
    .format("pulsar")
    .option("service.url", "pulsar://localhost:6650")
    .option("admin.url", "http://localhost:8088")
    .option("topic", "topic-test")
    .save()

Running the program

First configure and start the single-node cluster of Spark and Pulsar, then package the sample project, and submit two jobs through spark-submit respectively, and finally observe the execution result of the program.

  1. Modify the log level of Spark (optional).
  cd ${spark.dir}/conf
  cp log4j.properties.template log4j.properties

  In the text editor, change the log level to WARN .

  log4j.rootCategory=WARN, console
  1. Start the Spark cluster.
cd ${spark.dir}
sbin/start-all.sh
  1. Modify the Pulsar WebService port to 8088 (edit ${pulsar.dir}/conf/standalone.conf) to avoid conflicts with the Spark port.
webServicePort=8088
  1. Start the Pulsar cluster.
bin/pulsar standalone
  1. Package the sample project.
cd ${connector_test.dir}
mvn package
  1. Start StreamRead to monitor data changes in topic-test.
${spark.dir}/bin/spark-submit --class com.example.StreamRead --master spark://localhost:7077 ${connector_test.dir}/target/connector-test-1.0-SNAPSHOT.jar
  1. In another terminal window, start BatchWrite to write a 1-10 digit to topic-test at a time.
${spark.dir}/bin/spark-submit --class com.example.BatchWrite --master spark://localhost:7077 ${connector_test.dir}/target/connector-test-1.0-SNAPSHOT.jar
  1. At this point, you can get a similar output in the terminal where StreamRead is located.
root
 |-- value: integer (nullable = false)
 |-- __key: binary (nullable = true)
 |-- __topic: string (nullable = true)
 |-- __messageId: binary (nullable = true)
 |-- __publishTime: timestamp (nullable = true)
 |-- __eventTime: timestamp (nullable = true)

Batch: 0
+-----+-----+-------+-----------+-------------+-----------+
|value|__key|__topic|__messageId|__publishTime|__eventTime|
+-----+-----+-------+-----------+-------------+-----------+
+-----+-----+-------+-----------+-------------+-----------+

Batch: 1
+-----+-----+--------------------+--------------------+--------------------+-----------+
|value|__key|             __topic|         __messageId|       __publishTime|__eventTime|
+-----+-----+--------------------+--------------------+--------------------+-----------+
|    6| null|persistent://publ...|[08 86 01 10 02 2...|2019-07-08 14:51:...|       null|
|    7| null|persistent://publ...|[08 86 01 10 02 2...|2019-07-08 14:51:...|       null|
|    8| null|persistent://publ...|[08 86 01 10 02 2...|2019-07-08 14:51:...|       null|
|    9| null|persistent://publ...|[08 86 01 10 02 2...|2019-07-08 14:51:...|       null|
|   10| null|persistent://publ...|[08 86 01 10 02 2...|2019-07-08 14:51:...|       null|
|    1| null|persistent://publ...|[08 86 01 10 03 2...|2019-07-08 14:51:...|       null|
|    2| null|persistent://publ...|[08 86 01 10 03 2...|2019-07-08 14:51:...|       null|
|    3| null|persistent://publ...|[08 86 01 10 03 2...|2019-07-08 14:51:...|       null|
|    4| null|persistent://publ...|[08 86 01 10 03 2...|2019-07-08 14:51:...|       null|
|    5| null|persistent://publ...|[08 86 01 10 03 2...|2019-07-08 14:51:...|       null|
+-----+-----+--------------------+--------------------+--------------------+-----------+

So far, we've started a Pulsar and a Spark, built the framework of the sample project, and used the Pulsar Spark Connector to read data from pulsar and write data to pulsar. Get a final result in spark at last.