-
Notifications
You must be signed in to change notification settings - Fork 6
/
Resources.scala
54 lines (47 loc) · 1.91 KB
/
Resources.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.psisoyev.train.station
import cats.effect.concurrent.Ref
import cats.effect.{ Concurrent, ContextShift, Resource }
import cats.implicits._
import cats.{ Inject, Parallel }
import com.psisoyev.train.station.arrival.ExpectedTrains.ExpectedTrain
import cr.pulsar.{ Consumer, Producer, Pulsar, Subscription, Topic, Config => PulsarConfig }
import io.circe.Encoder
final case class Resources[F[_], E](
config: Config,
producer: Producer[F, E],
consumers: List[Consumer[F, E]],
trainRef: Ref[F, Map[TrainId, ExpectedTrain]]
)
object Resources {
def make[
F[_]: Concurrent: ContextShift: Parallel: Logger,
E: Inject[*, Array[Byte]]: Encoder
]: Resource[F, Resources[F, E]] = {
def topic(config: PulsarConfig, city: City) =
Topic(
Topic.Name(city.value.toLowerCase),
config
).withType(Topic.Type.Persistent)
def consumer(client: Pulsar.T, config: Config, city: City): Resource[F, Consumer[F, E]] = {
val name = s"${city.value}-${config.city.value}"
val subscription =
Subscription(Subscription.Name(name))
.withType(Subscription.Type.Failover)
val options =
Consumer
.Options[F, E]()
.withAutoAck
.withLogger(EventLogger.incomingEvents)
Consumer.withOptions[F, E](client, topic(config.pulsar, city), subscription, options)
}
def producer(client: Pulsar.T, config: Config): Resource[F, Producer[F, E]] =
Producer.withLogger[F, E](client, topic(config.pulsar, config.city), EventLogger.outgoingEvents)
for {
config <- Resource.liftF(Config.load[F])
client <- Pulsar.create[F](config.pulsar.serviceUrl)
producer <- producer(client, config)
consumers <- config.connectedTo.traverse(consumer(client, config, _))
trainRef <- Resource.liftF(Ref.of[F, Map[TrainId, ExpectedTrain]](Map.empty))
} yield Resources(config, producer, consumers, trainRef)
}
}