Skip to content
Permalink
Browse files
Add influent-spark-streaming
  • Loading branch information
okumin committed Jun 30, 2017
1 parent 925ef16 commit 429b2a2
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 0 deletions.
@@ -1,6 +1,17 @@

lazy val root = (project in file(".")).aggregate(influentJava, influentTransport)

lazy val influentSparkStreaming = (project in file("influent-spark-streaming"))
.settings(commonSettings: _*)
.settings(
name := "influent-spark-streaming",
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-streaming" % "2.1.1" % "provided"
),
run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in(Compile, run), runner in(Compile, run))
).dependsOn(influentJava)


lazy val influentJava = (project in file("influent-java"))
.settings(commonSettings: _*)
.settings(javaSettings: _*)
@@ -20,6 +31,17 @@ lazy val influentTransport = (project in file("influent-transport"))
name := "influent-transport"
)

lazy val influentSparkStreamingSample = (project in file("influent-spark-streaming-sample"))
.settings(commonSettings: _*)
.settings(
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-streaming" % "2.1.1" % "provided"
),
assemblyJarName in assembly := "influent-spark-streaming-sample.jar",
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false),
run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in(Compile, run), runner in(Compile, run))
).dependsOn(influentSparkStreaming)

lazy val influentJavaSample = (project in file("influent-java-sample"))
.settings(commonSettings: _*)
.settings(javaSettings: _*)
@@ -0,0 +1,25 @@
package sample

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.fluentd.FluentdUtils
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

object EventCounter {
def main(args: Array[String]): Unit = {
val host = args(0)
val port = args(1).toInt

val sparkConf = new SparkConf().setAppName("fluentd-event-counter")
val ssc = new StreamingContext(sparkConf, Milliseconds(5000))

val stream = FluentdUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY)

stream.map { eventStream =>
eventStream.getTag.getName -> eventStream.getEntries.size().toLong
}.reduceByKey(_ + _).print()

ssc.start()
ssc.awaitTerminationOrTimeout(300 * 1000)
}
}
@@ -0,0 +1,16 @@
package org.apache.spark.streaming.fluentd

import influent.EventStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

private[fluentd] class FluentdInputDStream(ssc: StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel)
extends ReceiverInputDStream[EventStream](ssc) {

override def getReceiver(): Receiver[EventStream] = new FluentdReceiver(host, port, storageLevel)
}
@@ -0,0 +1,67 @@
package org.apache.spark.streaming.fluentd

import java.net.InetSocketAddress
import java.util.concurrent.CompletableFuture

import influent.EventStream
import influent.forward.{ForwardCallback, ForwardServer}
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.fluentd.FluentdReceiver.SparkStreamingCallback
import org.apache.spark.streaming.receiver.Receiver

import scala.util.control.NonFatal

private[fluentd] class FluentdReceiver(host: String,
port: Int,
storageLevel: StorageLevel)
extends Receiver[EventStream](storageLevel) with Logging {
private[this] var server: Option[ForwardServer] = None

override def onStart(): Unit = {
synchronized {
server match {
case None =>
val forwardServer = new ForwardServer.Builder(new SparkStreamingCallback(this))
.workerPoolSize(1)
.localAddress(new InetSocketAddress(host, port))
.build()
forwardServer.start()
server = Some(forwardServer)
logInfo(s"Fluentd server started on $host:$port.")
case Some(_) =>
logWarning("Fluentd receiver being asked to start more then once with out close.")
}
}
logInfo("Fluentd receiver started.")
}

override def onStop(): Unit = {
synchronized {
server.foreach { x =>
x.shutdown()
server = None
}
logInfo("Fluentd receiver stopped.")
}
}

override def preferredLocation: Option[String] = Some(host)
}

private[fluentd] object FluentdReceiver {

private class SparkStreamingCallback(receiver: FluentdReceiver) extends ForwardCallback {
override def consume(stream: EventStream): CompletableFuture[Void] = {
val future = new CompletableFuture[Void]()
try {
receiver.store(stream)
future.complete(null)
} catch {
case NonFatal(e) => future.completeExceptionally(e)
}
future
}
}

}
@@ -0,0 +1,13 @@
package org.apache.spark.streaming.fluentd

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext

object FluentdUtils {
def createStream(ssc: StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel): FluentdInputDStream = {
new FluentdInputDStream(ssc, host, port, storageLevel)
}
}

0 comments on commit 429b2a2

Please sign in to comment.