-
Notifications
You must be signed in to change notification settings - Fork 5
/
Main.scala
57 lines (48 loc) · 2.57 KB
/
Main.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/*
* Copyright (c) 2020-2021 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.postgres.loader
import cats.effect.{ExitCode, IO, IOApp}
import org.log4s.getLogger
import com.snowplowanalytics.snowplow.badrows.Processor
import com.snowplowanalytics.snowplow.postgres.api.DB
import com.snowplowanalytics.snowplow.postgres.config.Cli
import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.Purpose
import com.snowplowanalytics.snowplow.postgres.generated.BuildInfo
import com.snowplowanalytics.snowplow.postgres.resources
import com.snowplowanalytics.snowplow.postgres.storage.utils
import com.snowplowanalytics.snowplow.postgres.streaming.{UnorderedPipe, sink, source}
object Main extends IOApp {
lazy val logger = getLogger
val processor = Processor(BuildInfo.name, BuildInfo.version)
def run(args: List[String]): IO[ExitCode] =
Cli.parse[IO](args).value.flatMap {
case Right(Cli(loaderConfig, iglu)) =>
resources.initialize[IO](loaderConfig.output, iglu).use {
case (blocker, xa, state) =>
val dataStream = source.getSource[IO](blocker, loaderConfig.purpose, loaderConfig.input, loaderConfig.monitoring.metrics)
implicit val db: DB[IO] = DB.interpreter[IO](iglu.resolver, xa, loaderConfig.output.schema)
for {
_ <- loaderConfig.purpose match {
case Purpose.Enriched => utils.prepare[IO](loaderConfig.output.schema, xa)
case Purpose.SelfDescribing => IO.unit
}
badSink = sink.badSink[IO]
goodSink = sink.goodSink[IO](UnorderedPipe.forTransactor(xa), state, iglu, processor).andThen(_.through(badSink))
s = dataStream.observeEither(badSink, goodSink)
_ <- s.compile.drain
} yield ExitCode.Success
}
case Left(error) =>
IO.delay(logger.error(s"Configuration initialization failure\n$error")).as(ExitCode.Error)
}
}