Skip to content

Spark Streaming集成Flume

刘军强 edited this page Jul 22, 2018 · 6 revisions

官方文档:http://spark.apache.org/docs/1.3.0/streaming-flume-integration.html

1.Spark Streaming集成Flume总共两种方式:

  • Approach 1: Flume-style Push-based Approach
  • Approach 2: Pull-based Approach using a Custom Sink

这里使用第一种方式:Flume-style Push-based Approach

首先在Flume的安装目录下的conf目录下编辑spark-flume.conf文件,内容如下


#The configuration file needs to define the sources, 
#the channels and the sinks.
#Sources, channels and sinks are defined per agent, 
#in this case called 'agent'
a2.sources = r2
a2.channels = c2
a2.sinks = k2
#define sources
a2.sources.r2.type = exec
a2.sources.r2.command = tail -f /opt/datas/spark-flume/wc.input
#define channels
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
#a2.channels.c2.keep-alive=100
#a2.channels.c2.byteCapacityBufferPercentage=20
#define sink
a2.sinks.k2.type = avro
a2.sinks.k2.channel = memoryChannel
a2.sinks.k2.hostname = hadoop-senior.shinelon.com
a2.sinks.k2.port = 9999
#bind sources and sink to channel 
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

必须首先启动Spark Streaming,启动时导入相关jar包,使用下面命令:

./bin/spark-shell --master local[2] --jars /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/spark-streaming-flume_2.10-1.3.0.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/flume-avro-source-1.5.0-cdh5.3.6.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/flume-ng-sdk-1.5.0-cdh5.3.6.jar

然后运行下面程序:


import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._ 

import org.apache.spark.streaming.flume._

import org.apache.spark.storage.StorageLevel

val ssc = new StreamingContext(sc, Seconds(5))

// read data
val stream = FlumeUtils.createStream(ssc, "hadoop-senior.shinelon.com", 9999, StorageLevel.MEMORY_ONLY_SER_2)

stream.count().map(cnt => "Received " + cnt + " flume events." ).print()

ssc.start()             // Start the computation`

ssc.awaitTermination()  // Wait for the computation to terminate`

启动Flume:

bin/flume-ng agent -c conf -n a2 -f conf/spark-flume.conf -Dflume.root.logger=DEBUG,console

上面是监控某一个文件,然后Spark Streaming计算event的个数,因此在Spark Streaming和Flume启动之后,向该文件中添加数据,可以使用下面命令:

echo "hadoop spark flume" >> wc.input

当敲入几条上面的命令之后,在Spark Streaming运行客户端可以看到输出结果,如下图所示:

运行结果