-
Notifications
You must be signed in to change notification settings - Fork 19
/
KinesisPipeline.scala
78 lines (71 loc) · 2.82 KB
/
KinesisPipeline.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**
* Copyright (c) 2014-2020 Snowplow Analytics Ltd.
* All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache
* License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied.
*
* See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics
package stream.loader
// AWS Kinesis Connector libs
import com.amazonaws.services.kinesis.connectors.interfaces.{
IBuffer,
IEmitter,
IKinesisConnectorPipeline,
ITransformer
}
import com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration
import com.amazonaws.services.kinesis.connectors.impl.{AllPassFilter, BasicMemoryBuffer}
// This project
import com.snowplowanalytics.stream.loader.sinks._
import com.snowplowanalytics.stream.loader.Config._
import com.snowplowanalytics.stream.loader.transformers.{
BadEventTransformer,
EnrichedEventJsonTransformer,
PlainJsonTransformer
}
import com.snowplowanalytics.stream.loader.clients.BulkSender
/**
* KinesisElasticsearchPipeline class sets up the Emitter/Buffer/Transformer/Filter,
* orchestrating the whole records flow
*
* @param streamType the type of stream, good, bad or plain-json
* @param goodSink the configured GoodSink
* @param badSink the configured BadSink
* @param bulkSender The Client to use
*/
class KinesisPipeline(
streamType: StreamType,
goodSink: Option[ISink],
badSink: ISink,
bulkSender: BulkSender[EmitterJsonInput],
shardDateField: Option[String],
shardDateFormat: Option[String],
bufferRecordLimit: Long,
bufferByteLimit: Long,
) extends IKinesisConnectorPipeline[ValidatedJsonRecord, EmitterJsonInput] {
def getEmitter(configuration: KinesisConnectorConfiguration): IEmitter[EmitterJsonInput] =
new Emitter(bulkSender, goodSink, badSink, bufferRecordLimit, bufferByteLimit)
def getBuffer(configuration: KinesisConnectorConfiguration): IBuffer[ValidatedJsonRecord] =
new BasicMemoryBuffer[ValidatedJsonRecord](configuration)
def getTransformer(
c: KinesisConnectorConfiguration): ITransformer[ValidatedJsonRecord, EmitterJsonInput] =
streamType match {
case StreamType.Good => new EnrichedEventJsonTransformer(shardDateField, shardDateFormat)
case StreamType.PlainJson => new PlainJsonTransformer
case StreamType.Bad => new BadEventTransformer
}
def getFilter(c: KinesisConnectorConfiguration) =
new AllPassFilter[ValidatedJsonRecord]()
}