Skip to content

Commit

Permalink
Merge branch 'release/r82-tawny-eagle'
Browse files Browse the repository at this point in the history
  • Loading branch information
alexanderdean committed Aug 8, 2016
2 parents 0cd6417 + 9e73c7c commit 871b126
Show file tree
Hide file tree
Showing 32 changed files with 689 additions and 216 deletions.
25 changes: 20 additions & 5 deletions 4-storage/kinesis-elasticsearch-sink/examples/config.hocon.sample
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,26 @@ sink {
}

elasticsearch {
cluster-name: "{{sinkElasticsearchClusterName}}"
endpoint: "{{sinkElasticsearchEndpoint}}"
max-timeout: "{{sinkElasticsearchMaxTimeout}}"
index: "{{sinkElasticsearchIndex}}" # Elasticsearch index name
type: "{{sinkElasticsearchType}}" # Elasticsearch type name

# Events are indexed using an Elasticsearch Client
# - type: http or transport (will default to transport)
# - endpoint: the cluster endpoint
# - port: the port the cluster can be accessed on
# - for http this is usually 9200
# - for transport this is usually 9300
# - max-timeout: the maximum attempt time before a client restart
client {
type: "{{sinkElasticseachClient}}"
endpoint: "{{sinkElasticsearchEndpoint}}"
port: {{sinkElasticsearchTransportPort}}
max-timeout: "{{sinkElasticsearchMaxTimeout}}"
}

cluster {
name: "{{sinkElasticsearchClusterName}}"
index: "{{sinkElasticsearchIndex}}"
type: "{{sinkElasticsearchType}}"
}
}

# Events are accumulated in a buffer before being sent to Elasticsearch.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/**
* Copyright (c) 2014-2016 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
Expand All @@ -20,7 +20,7 @@ object BuildSettings {
// Basic settings for our app
lazy val basicSettings = Seq[Setting[_]](
organization := "com.snowplowanalytics",
version := "0.6.0",
version := "0.7.0",
description := "Kinesis sink for Elasticsearch",
scalaVersion := "2.10.1",
scalacOptions := Seq("-deprecation", "-encoding", "utf8",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014 Snowplow Analytics Ltd. All rights reserved.
/**
* Copyright (c) 2014-2016 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
Expand All @@ -10,6 +10,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/

import sbt._

object Dependencies {
Expand All @@ -31,6 +32,7 @@ object Dependencies {
val kinesisClient = "1.6.1"
val kinesisConnector = "1.1.2"
val elasticsearch = "1.4.4"
val jest = "1.0.3"
// Scala
val argot = "1.0.1"
val config = "1.0.2"
Expand All @@ -51,9 +53,11 @@ object Dependencies {
// Java
val logging = "commons-logging" % "commons-logging" % V.logging
val slf4j = "org.slf4j" % "slf4j-simple" % V.slf4j
val log4jOverSlf4j = "org.slf4j" % "log4j-over-slf4j" % V.slf4j
val kinesisClient = "com.amazonaws" % "amazon-kinesis-client" % V.kinesisClient
val kinesisConnector = "com.amazonaws" % "amazon-kinesis-connectors" % V.kinesisConnector
val elasticsearch = "org.elasticsearch" % "elasticsearch" % V.elasticsearch
val jest = "io.searchbox" % "jest" % V.jest
// Scala
val argot = "org.clapper" %% "argot" % V.argot
val config = "com.typesafe" % "config" % V.config
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014 Snowplow Analytics Ltd. All rights reserved.
/**
* Copyright (c) 2014-2016 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
Expand All @@ -10,6 +10,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/

import sbt._
import Keys._

Expand Down Expand Up @@ -40,10 +41,12 @@ object SnowplowElasticsearchSinkBuild extends Build {
Libraries.scalazSpecs2,
Libraries.commonsLang3,
Libraries.slf4j,
Libraries.log4jOverSlf4j,
Libraries.kinesisClient,
Libraries.kinesisConnector,
Libraries.snowplowTracker,
Libraries.elasticsearch
Libraries.elasticsearch,
Libraries.jest
)
)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014 Snowplow Analytics Ltd.
/**
* Copyright (c) 2014-2016 Snowplow Analytics Ltd.
* All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
Expand All @@ -16,6 +16,7 @@
* See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/

package com.snowplowanalytics.snowplow.storage.kinesis.elasticsearch

// Java
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014 Snowplow Analytics Ltd.
/**
* Copyright (c) 2014-2016 Snowplow Analytics Ltd.
* All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
Expand All @@ -16,6 +16,7 @@
* See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/

package com.snowplowanalytics.snowplow.storage.kinesis.elasticsearch

// Amazon
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014 Snowplow Analytics Ltd.
/**
* Copyright (c) 2014-2016 Snowplow Analytics Ltd.
* All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
Expand All @@ -16,6 +16,7 @@
* See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/

package com.snowplowanalytics.snowplow
package storage.kinesis.elasticsearch

Expand Down Expand Up @@ -53,6 +54,7 @@ import scalatracker.Tracker
* @param tracker a Tracker instance
* @param maxConnectionTime the maximum amount of time
* we can attempt to send to elasticsearch
* @param elasticsearchClientType The type of ES Client to use
*/
class ElasticsearchPipeline(
streamType: StreamType,
Expand All @@ -61,10 +63,12 @@ class ElasticsearchPipeline(
goodSink: Option[ISink],
badSink: ISink,
tracker: Option[Tracker] = None,
maxConnectionTime: Long) extends IKinesisConnectorPipeline[ValidatedRecord, EmitterInput] {
maxConnectionTime: Long,
elasticsearchClientType: String
) extends IKinesisConnectorPipeline[ValidatedRecord, EmitterInput] {

override def getEmitter(configuration: KinesisConnectorConfiguration): IEmitter[EmitterInput] =
new SnowplowElasticsearchEmitter(configuration, goodSink, badSink, tracker, maxConnectionTime)
new SnowplowElasticsearchEmitter(configuration, goodSink, badSink, tracker, maxConnectionTime, elasticsearchClientType)

override def getBuffer(configuration: KinesisConnectorConfiguration) = new BasicMemoryBuffer[ValidatedRecord](configuration)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/**
* Copyright (c) 2014-2016 Snowplow Analytics Ltd.
* All rights reserved.
*
Expand All @@ -16,6 +16,7 @@
* See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/

package com.snowplowanalytics.snowplow.storage.kinesis.elasticsearch

// Java
Expand Down Expand Up @@ -53,14 +54,14 @@ import com.snowplowanalytics.snowplow.enrich.common.outputs.BadRow

// This project
import sinks._
import clients._

// Whether the input stream contains enriched events or bad events
object StreamType extends Enumeration {
type StreamType = Value
val Good, Bad = Value
}


/**
* Main entry point for the Elasticsearch sink
*/
Expand Down Expand Up @@ -99,22 +100,28 @@ object ElasticsearchSinkApp extends App {
case "bad" => StreamType.Bad
case _ => throw new RuntimeException("\"stream-type\" must be set to \"good\" or \"bad\"")
}

val elasticsearch = configValue.getConfig("elasticsearch")
val documentIndex = elasticsearch.getString("index")
val documentType = elasticsearch.getString("type")
val esClient = elasticsearch.getConfig("client")
val esCluster = elasticsearch.getConfig("cluster")
val clientType = esClient.getString("type")
val documentIndex = esCluster.getString("index")
val documentType = esCluster.getString("type")

val tracker = if (configValue.hasPath("monitoring.snowplow")) {
SnowplowTracking.initializeTracker(configValue.getConfig("monitoring.snowplow")).some
} else {
None
}

val maxConnectionTime = configValue.getConfig("elasticsearch").getLong("max-timeout")
val maxConnectionTime = configValue.getConfig("elasticsearch.client").getLong("max-timeout")
val finalConfig = convertConfig(configValue)

val goodSink = configValue.getString("sink.good") match {
case "stdout" => Some(new StdouterrSink)
case "elasticsearch" => None
}

val badSink = configValue.getString("sink.bad") match {
case "stderr" => new StdouterrSink
case "none" => new NullSink
Expand All @@ -133,7 +140,7 @@ object ElasticsearchSinkApp extends App {

// Read records from Kinesis
case "kinesis" => {
new ElasticsearchSinkExecutor(streamType, documentIndex, documentType, finalConfig, goodSink, badSink, tracker, maxConnectionTime).success
new ElasticsearchSinkExecutor(streamType, documentIndex, documentType, finalConfig, goodSink, badSink, tracker, maxConnectionTime, clientType).success
}

// Run locally, reading from stdin and sending events to stdout / stderr rather than Elasticsearch / Kinesis
Expand All @@ -143,7 +150,15 @@ object ElasticsearchSinkApp extends App {
case StreamType.Good => new SnowplowElasticsearchTransformer(documentIndex, documentType)
case StreamType.Bad => new BadEventTransformer(documentIndex, documentType)
}
lazy val elasticsearchSender = new ElasticsearchSender(finalConfig, None, maxConnectionTime)

lazy val elasticsearchSender: ElasticsearchSender = (
if (clientType == "http") {
new ElasticsearchSenderHTTP(finalConfig, None, maxConnectionTime)
} else {
new ElasticsearchSenderTransport(finalConfig, None, maxConnectionTime)
}
)

def run = for (ln <- scala.io.Source.stdin.getLines) {
val emitterInput = transformer.consumeLine(ln)
emitterInput._2.bimap(
Expand All @@ -155,6 +170,7 @@ object ElasticsearchSinkApp extends App {
)
}
}.success

case _ => "Source must be set to 'stdin' or 'kinesis'".fail
}

Expand Down Expand Up @@ -185,8 +201,11 @@ object ElasticsearchSinkApp extends App {
val secretKey = aws.getString("secret-key")

val elasticsearch = connector.getConfig("elasticsearch")
val elasticsearchEndpoint = elasticsearch.getString("endpoint")
val clusterName = elasticsearch.getString("cluster-name")
val esClient = elasticsearch.getConfig("client")
val esCluster = elasticsearch.getConfig("cluster")
val elasticsearchEndpoint = esClient.getString("endpoint")
val elasticsearchPort = esClient.getString("port")
val clusterName = esCluster.getString("name")

val kinesis = connector.getConfig("kinesis")
val kinesisIn = kinesis.getConfig("in")
Expand Down Expand Up @@ -219,6 +238,7 @@ object ElasticsearchSinkApp extends App {

props.setProperty(KinesisConnectorConfiguration.PROP_ELASTICSEARCH_ENDPOINT, elasticsearchEndpoint)
props.setProperty(KinesisConnectorConfiguration.PROP_ELASTICSEARCH_CLUSTER_NAME, clusterName)
props.setProperty(KinesisConnectorConfiguration.PROP_ELASTICSEARCH_PORT, elasticsearchPort)

props.setProperty(KinesisConnectorConfiguration.PROP_BUFFER_BYTE_SIZE_LIMIT, byteLimit)
props.setProperty(KinesisConnectorConfiguration.PROP_BUFFER_RECORD_COUNT_LIMIT, recordLimit)
Expand All @@ -229,5 +249,4 @@ object ElasticsearchSinkApp extends App {

new KinesisConnectorConfiguration(props, CredentialsLookup.getCredentialsProvider(accessKey, secretKey))
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014 Snowplow Analytics Ltd.
/**
* Copyright (c) 2014-2016 Snowplow Analytics Ltd.
* All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
Expand All @@ -16,6 +16,7 @@
* See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/

package com.snowplowanalytics.snowplow
package storage.kinesis.elasticsearch

Expand Down Expand Up @@ -46,6 +47,7 @@ import StreamType._
* @param tracker a Tracker instance
* @param maxConnectionTimeout the maximum amount of time
* we can attempt to send to elasticsearch
* @param elasticsearchClientType The type of ES Client to use
*/
class ElasticsearchSinkExecutor(
streamType: StreamType,
Expand All @@ -55,11 +57,13 @@ class ElasticsearchSinkExecutor(
goodSink: Option[ISink],
badSink: ISink,
tracker: Option[Tracker] = None,
maxConnectionTimeout: Long = 60000) extends KinesisConnectorExecutorBase[ValidatedRecord, EmitterInput] {
maxConnectionTimeout: Long = 60000,
elasticsearchClientType: String
) extends KinesisConnectorExecutorBase[ValidatedRecord, EmitterInput] {

initialize(config)
override def getKinesisConnectorRecordProcessorFactory = {
new KinesisConnectorRecordProcessorFactory[ValidatedRecord, EmitterInput](
new ElasticsearchPipeline(streamType, documentIndex, documentType, goodSink, badSink, tracker, maxConnectionTimeout), config)
new ElasticsearchPipeline(streamType, documentIndex, documentType, goodSink, badSink, tracker, maxConnectionTimeout, elasticsearchClientType), config)
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014 Snowplow Analytics Ltd.
/**
* Copyright (c) 2014-2016 Snowplow Analytics Ltd.
* All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
Expand All @@ -16,6 +16,7 @@
* See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/

package com.snowplowanalytics.snowplow.storage.kinesis.elasticsearch

// Scalaz
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014 Snowplow Analytics Ltd.
/**
* Copyright (c) 2014-2016 Snowplow Analytics Ltd.
* All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
Expand All @@ -16,6 +16,7 @@
* See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/

package com.snowplowanalytics.snowplow.storage.kinesis.elasticsearch

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014 Snowplow Analytics Ltd.
/**
* Copyright (c) 2014-2016 Snowplow Analytics Ltd.
* All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
Expand All @@ -16,6 +16,7 @@
* See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/

package com.snowplowanalytics.snowplow.storage.kinesis.elasticsearch

// Scalaz
Expand Down
Loading

0 comments on commit 871b126

Please sign in to comment.