Skip to content

Commit

Permalink
Merge pull request #228 from lichess-org/ingestor
Browse files Browse the repository at this point in the history
Implement ingestor module that watches mongo and ingest data to elastic search
  • Loading branch information
lenguyenthanh committed Jun 16, 2024
2 parents d0b8417 + a2e6768 commit 53d254d
Show file tree
Hide file tree
Showing 13 changed files with 417 additions and 3 deletions.
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ HTTP_SHUTDOWN_TIMEOUT=1

# enable open api docs
HTTP_ENABLE_DOCS=true

MONGO_URI=mongodb://localhost:27017
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ target
.env

.smithy.lsp.log
/modules/ingestor/store.json
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ app/run

Run tests:
```sh
sbt test
test
```

Run code format and auto code refactor with scalafmt & scalafix:
```sh
sbt prepare
prepare
```

Start ingestor service:
```sh
ingestor/run
```
33 changes: 32 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,40 @@ lazy val api = (project in file("modules/api"))
)
)

lazy val ingestor = (project in file("modules/ingestor"))
.settings(
name := "ingestor",
publish := {},
publish / skip := true,
libraryDependencies ++= Seq(
elastic4sCatsEffect,
catsCore,
fs2,
fs2IO,
catsEffect,
ducktape,
cirisCore,
smithy4sJson,
jsoniterCore,
jsoniterMacro,
mongo4catsCore,
log4Cats,
logback,
weaver,
testContainers
),
Compile / run / fork := true
)
.enablePlugins(JavaAppPackaging)
.dependsOn(core, api)

lazy val client = (project in file("modules/client"))
.settings(
name := "client",
libraryDependencies ++= Seq(
smithy4sJson,
jsoniterCore,
jsoniterMacro,
playWS
)
)
Expand All @@ -56,6 +85,8 @@ lazy val app = (project in file("modules/app"))
publish / skip := true,
libraryDependencies ++= Seq(
smithy4sHttp4s,
jsoniterCore,
jsoniterMacro,
smithy4sHttp4sSwagger,
elastic4sCatsEffect,
catsCore,
Expand Down Expand Up @@ -86,7 +117,7 @@ val e2e = (project in file("modules/e2e"))
lazy val root = project
.in(file("."))
.settings(publish := {}, publish / skip := true)
.aggregate(core, api, app, client, e2e)
.aggregate(core, api, app, client, e2e, ingestor)

addCommandAlias("prepare", "scalafixAll; scalafmtAll")
addCommandAlias("check", "; scalafixAll --check ; scalafmtCheckAll")
27 changes: 27 additions & 0 deletions modules/ingestor/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<configuration>

<variable name="APP_LOG_LEVEL" value="${APP_LOG_LEVEL:-INFO}" />
<variable name="ELASTIC4S_LOG_LEVEL" value="${ELASTIC4S_LOG_LEVEL:-INFO}" />
<variable name="MONGO_DRIVER_LOG_LEVEL" value="${MONGO_DRIVER_LOG_LEVEL:-INFO}" />

<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>logs/application.log</file>
<encoder>
<pattern>%date [%thread] %-5level %logger{20} - %msg%n%xException</pattern>
</encoder>
</appender>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date [%thread] %-5level %logger{20} - %msg%n%xException</pattern>
</encoder>
</appender>

<root level="${APP_LOG_LEVEL}">
<appender-ref ref="STDOUT"/>
<appender-ref ref="FILE"/>
</root>

<logger name="com.sksamuel.elastic4s" level="${ELASTIC4S_LOG_LEVEL}"/>
<logger name="org.mongodb.driver" level="${MONGO_DRIVER_LOG_LEVEL}"/>
</configuration>
54 changes: 54 additions & 0 deletions modules/ingestor/src/main/scala/app.config.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package lila.search
package ingestor

import cats.effect.IO
import cats.syntax.all.*
import ciris.*

object AppConfig:

def load: IO[AppConfig] = appConfig.load[IO]

def appConfig = (
MongoConfig.config,
ElasticConfig.config,
IngestorConfig.config
).parMapN(AppConfig.apply)

case class AppConfig(
mongo: MongoConfig,
elastic: ElasticConfig,
ingestor: IngestorConfig
)
case class MongoConfig(uri: String, name: String)

object MongoConfig:

private def uri = env("MONGO_URI").or(prop("mongo.uri")).as[String]
private def name = env("MONGO_DATABASE").or(prop("mongo.database")).as[String]

def config = (uri, name).parMapN(MongoConfig.apply)

case class ElasticConfig(uri: String)

object ElasticConfig:
private def uri = env("ELASTIC_URI").or(prop("elastic.uri")).as[String].default("http://127.0.0.1:9200")
def config = uri.map(ElasticConfig.apply)

case class IngestorConfig(forum: IngestorConfig.Config)

object IngestorConfig:
case class Config(batchSize: Int, maxBodyLength: Int, timeWindows: Int, startAt: Option[Long])

object forum:
private def batchSize =
env("INGESTOR_FORUM_BATCH_SIZE").or(prop("ingestor.forum.batch.size")).as[Int].default(100)
private def maxBodyLength =
env("INGESTOR_FORUM_MAX_BODY_LENGTH").or(prop("ingestor.forum.max.body.length")).as[Int].default(10000)
private def timeWindows =
env("INGESTOR_FORUM_TIME_WINDOWS").or(prop("ingestor.forum.time.windows")).as[Int].default(10)
private def startAt =
env("INGESTOR_FORUM_START_AT").or(prop("ingestor.forum.start.at")).as[Long].option
def config = (batchSize, maxBodyLength, timeWindows, startAt).parMapN(Config.apply)

def config = forum.config.map(IngestorConfig.apply)
29 changes: 29 additions & 0 deletions modules/ingestor/src/main/scala/app.resources.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package lila.search
package ingestor

import cats.effect.{ IO, Resource }
import cats.syntax.all.*
import com.sksamuel.elastic4s.cats.effect.instances.*
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.{ ElasticClient, ElasticProperties }
import mongo4cats.client.MongoClient
import mongo4cats.database.MongoDatabase
import org.typelevel.log4cats.Logger

class AppResources(val mongo: MongoDatabase[IO], val elastic: ESClient[IO], val store: KVStore)

object AppResources:

def instance(conf: AppConfig)(using Logger[IO]): Resource[IO, AppResources] =
(makeMongoClient(conf.mongo), makeElasticClient(conf.elastic), KVStore.apply().toResource)
.parMapN(AppResources.apply)

def makeElasticClient(conf: ElasticConfig) =
Resource
.make(IO(ElasticClient(JavaClient(ElasticProperties(conf.uri)))))(client => IO(client.close()))
.map(ESClient.apply[IO])

def makeMongoClient(conf: MongoConfig) =
MongoClient
.fromConnectionString[IO](conf.uri)
.evalMap(_.getDatabase(conf.name))
27 changes: 27 additions & 0 deletions modules/ingestor/src/main/scala/app.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package lila.search
package ingestor

import cats.effect.*
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

object App extends IOApp.Simple:

given Logger[IO] = Slf4jLogger.getLogger[IO]

override def run: IO[Unit] = app.useForever

def app: Resource[IO, Unit] =
for
config <- AppConfig.load.toResource
_ <- Logger[IO].info(s"Starting lila-search ingestor with config: $config").toResource
res <- AppResources.instance(config)
_ <- IngestorApp(res, config).run()
yield ()

class IngestorApp(res: AppResources, config: AppConfig)(using Logger[IO]):
def run(): Resource[IO, Unit] =
Ingestor(res.mongo, res.elastic, res.store, config.ingestor)
.flatMap(_.run())
.toResource
.evalTap(_ => Logger[IO].info("Ingestor started"))
128 changes: 128 additions & 0 deletions modules/ingestor/src/main/scala/ingestor.forum.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package lila.search
package ingestor

import cats.effect.IO
import cats.syntax.all.*
import com.mongodb.client.model.changestream.OperationType
import lila.search.spec.ForumSource
import mongo4cats.bson.Document
import mongo4cats.database.MongoDatabase
import mongo4cats.models.collection.ChangeStreamDocument
import mongo4cats.operations.{ Aggregate, Filter, Projection }
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.syntax.*

import java.time.Instant
import scala.concurrent.duration.*

trait ForumIngestor:
def run(): fs2.Stream[IO, Unit]

object ForumIngestor:

private val topicProjection = Projection.include(List("_id", "name"))

private val eventFilter = Filter.in("operationType", List("replace", "insert"))
private val eventProjection = Projection.include(
List(
"documentKey._id",
"fullDocument.text",
"fullDocument.topicId",
"fullDocument.troll",
"fullDocument.createdAt",
"fullDocument.userId",
"fullDocument.erasedAt"
)
)
private val aggregate = Aggregate.matchBy(eventFilter).combinedWith(Aggregate.project(eventProjection))

private val index = Index("forum")

def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Config)(
using Logger[IO]
): IO[ForumIngestor] =
(mongo.getCollection("f_topic"), mongo.getCollection("f_post")).mapN(apply(elastic, store, config))

def apply(elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Config)(
topics: MongoCollection,
posts: MongoCollection
)(using Logger[IO]): ForumIngestor = new:

def run(): fs2.Stream[IO, Unit] =
fs2.Stream
.eval(startAt.flatTap(since => info"Starting forum ingestor from $since"))
.flatMap: last =>
postStream(last)
.filterNot(_.isEmpty)
.evalMap: events =>
val lastEventTimestamp = events.lastOption.flatMap(_.clusterTime).flatMap(_.asInstant)
val (toDelete, toIndex) = events.partition(_.isDelete)
storeBulk(toIndex)
*> deleteMany(toDelete)
*> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now()))

private def storeBulk(events: List[ChangeStreamDocument[Document]]): IO[Unit] =
info"Received ${events.size} forum posts to index" *>
events.toSources
.flatMap: sources =>
elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} forum posts"
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to index forum posts: ${events.map(_.id).mkString(", ")}")

private def deleteMany(events: List[ChangeStreamDocument[Document]]): IO[Unit] =
elastic
.deleteMany(index, events.flatMap(_.id.map(Id.apply)))
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to delete forum posts: ${events.map(_.id).mkString(", ")}")
*> info"Deleted ${events.size} forum posts"

private def saveLastIndexedTimestamp(time: Instant): IO[Unit] =
store.put(index.name, time.plusSeconds(1)) // +1 to avoid reindexing the same event
*> info"Stored last indexed time ${time.getEpochSecond()} for index ${index.name}"

private def startAt: IO[Option[Instant]] =
config.startAt.fold(store.get(index.name))(Instant.ofEpochSecond(_).some.pure[IO])

// Fetches topic names by their ids
private def topicByIds(ids: Seq[String]): IO[Map[String, String]] =
topics
.find(Filter.in("_id", ids))
.projection(topicProjection)
.all
.map(_.map(doc => (doc.getString("_id") -> doc.getString("name")).mapN(_ -> _)).flatten.toMap)

private def postStream(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] =
val builder = posts.watch(aggregate)
since
.fold(builder)(x => builder.startAtOperationTime(x.asBsonTimestamp))
.batchSize(config.batchSize)
.boundedStream(config.batchSize)
.groupWithin(config.batchSize, config.timeWindows.second)
.evalTap(_.traverse_(x => debug"received $x"))
.map(_.toList)

extension (events: List[ChangeStreamDocument[Document]])
private def toSources: IO[List[(String, ForumSource)]] =
val topicIds = events.flatMap(_.topicId).distinct
topicByIds(topicIds).map: topicMap =>
events.flatten: event =>
(event.id, event.topicId, event.fullDocument).flatMapN: (id, topicId, doc) =>
doc.toSource(topicName = topicMap.get(topicId)).map(id -> _)

extension (doc: Document)
private def toSource(topicName: Option[String]): Option[ForumSource] =
(
doc.getString("text").map(_.take(config.maxBodyLength)),
topicName,
doc.getString("topicId"),
doc.getBoolean("troll"),
doc.getNested("createdAt").flatMap(_.asInstant).map(_.toEpochMilli()),
doc.getString("userId").some
).mapN(ForumSource.apply)

extension (event: ChangeStreamDocument[Document])
private def topicId = event.fullDocument.flatMap(_.getString("topicId"))

private def isDelete: Boolean =
event.operationType == OperationType.DELETE ||
event.fullDocument.flatMap(_.get("erasedAt")).isDefined
18 changes: 18 additions & 0 deletions modules/ingestor/src/main/scala/ingestor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package lila.search
package ingestor

import cats.effect.*
import mongo4cats.database.MongoDatabase
import org.typelevel.log4cats.Logger

trait Ingestor:
def run(): IO[Unit]

object Ingestor:

def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig)(using
Logger[IO]
): IO[Ingestor] =
ForumIngestor(mongo, elastic, store, config.forum).map: f =>
new Ingestor:
def run(): IO[Unit] = f.run().compile.drain
Loading

0 comments on commit 53d254d

Please sign in to comment.