Skip to content

Commit

Permalink
Merge pull request #237 from lichess-org/forum-ingestor-improvement
Browse files Browse the repository at this point in the history
Forum ingestor improvement
  • Loading branch information
lenguyenthanh committed Jun 18, 2024
2 parents 0faedad + 52d8963 commit 427c818
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 24 deletions.
2 changes: 1 addition & 1 deletion modules/ingestor/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<configuration>

<variable name="APP_LOG_LEVEL" value="${APP_LOG_LEVEL:-INFO}" />
<variable name="ELASTIC4S_LOG_LEVEL" value="${ELASTIC4S_LOG_LEVEL:-INFO}" />
<variable name="ELASTIC4S_LOG_LEVEL" value="${ELASTIC4S_LOG_LEVEL:-WARN}" />
<variable name="MONGO_DRIVER_LOG_LEVEL" value="${MONGO_DRIVER_LOG_LEVEL:-INFO}" />

<appender name="FILE" class="ch.qos.logback.core.FileAppender">
Expand Down
25 changes: 19 additions & 6 deletions modules/ingestor/src/main/scala/app.config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,33 @@ object ElasticConfig:
private def uri = env("ELASTIC_URI").or(prop("elastic.uri")).as[String].default("http://127.0.0.1:9200")
def config = uri.map(ElasticConfig.apply)

case class IngestorConfig(forum: IngestorConfig.Config)
case class IngestorConfig(forum: IngestorConfig.Forum, team: IngestorConfig.Team)

object IngestorConfig:
case class Config(batchSize: Int, maxBodyLength: Int, timeWindows: Int, startAt: Option[Long])
case class Forum(batchSize: Int, maxBodyLength: Int, timeWindows: Int, startAt: Option[Long])
case class Team(batchSize: Int, timeWindows: Int, startAt: Option[Long])

object forum:
private object Forum:
private def batchSize =
env("INGESTOR_FORUM_BATCH_SIZE").or(prop("ingestor.forum.batch.size")).as[Int].default(100)
private def maxBodyLength =
env("INGESTOR_FORUM_MAX_BODY_LENGTH").or(prop("ingestor.forum.max.body.length")).as[Int].default(10000)
private def timeWindows =
env("INGESTOR_FORUM_TIME_WINDOWS").or(prop("ingestor.forum.time.windows")).as[Int].default(10)
private def startAt =
env("INGESTOR_FORUM_START_AT").or(prop("ingestor.forum.start.at")).as[Long].option
def config = (batchSize, maxBodyLength, timeWindows, startAt).parMapN(Config.apply)
env("INGESTOR_FORUM_START_AT")
.or(prop("ingestor.forum.start.at"))
.as[Long]
.option
def config = (batchSize, maxBodyLength, timeWindows, startAt).parMapN(Forum.apply)

def config = forum.config.map(IngestorConfig.apply)
private object Team:
private def batchSize =
env("INGESTOR_TEAM_BATCH_SIZE").or(prop("ingestor.team.batch.size")).as[Int].default(100)
private def timeWindows =
env("INGESTOR_TEAM_TIME_WINDOWS").or(prop("ingestor.team.time.windows")).as[Int].default(10)
private def startAt =
env("INGESTOR_TEAM_START_AT").or(prop("ingestor.team.start.at")).as[Long].option
def config = (batchSize, timeWindows, startAt).parMapN(Team.apply)

def config = (Forum.config, Team.config).parMapN(IngestorConfig.apply)
23 changes: 14 additions & 9 deletions modules/ingestor/src/main/scala/ingestor.forum.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ object ForumIngestor:

private val topicProjection = Projection.include(List("_id", "name"))

val interestedOperations = List(DELETE, INSERT, UPDATE, REPLACE).map(_.getValue)
private val eventFilter = Filter.in("operationType", interestedOperations)
private val interestedOperations = List(DELETE, INSERT, UPDATE, REPLACE).map(_.getValue)
private val eventFilter = Filter.in("operationType", interestedOperations)
private val eventProjection = Projection.include(
List(
"clusterTime",
"documentKey._id",
"fullDocument.text",
"fullDocument.topicId",
Expand All @@ -39,12 +40,12 @@ object ForumIngestor:

private val index = Index("forum")

def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Config)(
def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Forum)(
using Logger[IO]
): IO[ForumIngestor] =
(mongo.getCollection("f_topic"), mongo.getCollection("f_post")).mapN(apply(elastic, store, config))

def apply(elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Config)(
def apply(elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Forum)(
topics: MongoCollection,
posts: MongoCollection
)(using Logger[IO]): ForumIngestor = new:
Expand All @@ -53,10 +54,10 @@ object ForumIngestor:
fs2.Stream
.eval(startAt.flatTap(since => info"Starting forum ingestor from $since"))
.flatMap: last =>
postStream(last)
changes(last)
.filterNot(_.isEmpty)
.evalMap: events =>
val lastEventTimestamp = events.lastOption.flatMap(_.clusterTime).flatMap(_.asInstant)
val lastEventTimestamp = events.flatten(_.clusterTime.flatMap(_.asInstant)).maxOption
val (toDelete, toIndex) = events.partition(_.isDelete)
storeBulk(toIndex)
*> deleteMany(toDelete)
Expand All @@ -78,8 +79,8 @@ object ForumIngestor:
Logger[IO].error(e)(s"Failed to delete forum posts: ${events.map(_.id).mkString(", ")}")

private def saveLastIndexedTimestamp(time: Instant): IO[Unit] =
store.put(index.name, time.plusSeconds(1)) // +1 to avoid reindexing the same event
*> info"Stored last indexed time ${time.getEpochSecond()} for index ${index.name}"
store.put(index.name, time)
*> info"Stored last indexed time ${time.getEpochSecond} for $index"

private def startAt: IO[Option[Instant]] =
config.startAt.fold(store.get(index.name))(Instant.ofEpochSecond(_).some.pure[IO])
Expand All @@ -92,12 +93,16 @@ object ForumIngestor:
.all
.map(_.map(doc => (doc.getString("_id") -> doc.getString("name")).mapN(_ -> _)).flatten.toMap)

private def postStream(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] =
private def changes(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] =
val builder = posts.watch(aggregate)
// skip the first event if we're starting from a specific timestamp
// since the event at that timestamp is already indexed
val skip = since.fold(0)(_ => 1)
since
.fold(builder)(x => builder.startAtOperationTime(x.asBsonTimestamp))
.batchSize(config.batchSize)
.boundedStream(config.batchSize)
.drop(skip)
.groupWithin(config.batchSize, config.timeWindows.second)
.evalTap(_.traverse_(x => debug"received $x"))
.map(_.toList)
Expand Down
11 changes: 3 additions & 8 deletions modules/ingestor/src/main/scala/kvstore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,12 @@ object KVStore:
new KVStore:

def get(key: String): IO[Option[Instant]] =
read(file)
.map: content =>
content.get(key).map(Instant.ofEpochSecond)
read(file).map(_.get(key).map(Instant.ofEpochSecond))

def put(key: String, value: Instant): IO[Unit] =
mutex.lock.surround:
read(file).flatMap: content =>
write(
file,
content.updated(key, value.getEpochSecond + 1)
) // +1 to avoid reindexing the same data
write(file, content.updated(key, value.getEpochSecond))

private def read(path: String): IO[State] =
Files[IO]
Expand All @@ -49,7 +44,7 @@ object KVStore:

private def write(path: String, content: State): IO[Unit] =
fs2.Stream
.eval(IO.blocking(writeToString(content)))
.eval(IO(writeToString(content)))
.through(fs2.text.utf8.encode[IO])
.through(Files[IO].writeAll(fs2.io.file.Path(path)))
.compile
Expand Down

0 comments on commit 427c818

Please sign in to comment.