Skip to content

Commit

Permalink
rocksdb state store impl (#1180)
Browse files Browse the repository at this point in the history
* rocksdb state store impl

* fix RocksDBClientTest on win64

* add docs for mapdb/rocksdb

* scalafmt

* bump rocksdb jni

* bump scalafmt to 3.7.17
  • Loading branch information
shuttie committed Nov 28, 2023
1 parent b4bcc59 commit 5220575
Show file tree
Hide file tree
Showing 22 changed files with 419 additions and 133 deletions.
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
style = defaultWithAlign
maxColumn = 120
version = 3.7.16
version = 3.7.17
assumeStandardLibraryStripMargin = true
align.stripMargin = true
runner.dialect = scala213
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ lazy val root = (project in file("."))
ExclusionRule("org.nd4j", "guava"),
ExclusionRule("org.nd4j", "protobuf")
),
"org.rocksdb" % "rocksdbjni" % "8.6.7",
"org.rocksdb" % "rocksdbjni" % "8.8.1",
"org.mapdb" % "mapdb" % "3.0.10" exclude ("net.jpountz.lz4", "lz4"),
"com.github.jelmerk" % "hnswlib-core" % "1.1.0",
"org.slf4j" % "jcl-over-slf4j" % "2.0.9", // librec uses commons-logging, which is JCL
Expand Down
57 changes: 57 additions & 0 deletions doc/configuration/persistence.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,63 @@ in the same datacenter/AZ)
* `pipeline.flushPeriod` controls the level of "eventualness" in the overall eventual consistency. With values
larger than `10` seconds, a second Metarank instance may not see write buffered in a first instance.

## Disk persistence

Metarank has also an experimental option of using disk persistence instead of Redis. The main drawback of such an
approach is that the deployment becomes stateful and you need to maintain a disk persistence.

Metarank supports two disk backends for file-based persistence:

* MapDB: uses a mmap-based storage for data, works well for smaller datasets.
* RocksDB: uses an LSM-tree storage, suits for large datasets.

The file persistence configured in the following way:

```yaml
state:
type: file
path: /path/to/dir # required
format: binary # optional, default=binary, possible values: json, binary
backend: # optional, default mapdb
type: rocksdb # required, values: rocksdb, mapdb

```

### RocksDB options

RocksDB can be configured by defining the following values in the config file:

```yaml
state:
type: file
path: /path/to/dir # required
backend: # optional, default mapdb
type: rocksdb
lruCacheSizeMb: 1024000000 # LRU cache size in bytes, optional, default 1Gb
blockSize: 8192 # Block size in bytes, optional, default 8kb

```

A rule of thumb defining these parameters:

* higher LRU cache size leads to better read throughput at the cost of extra memory usage. If not sure, set it to 50% of your RAM.
* blockSize defines a size of page RocksDB reads from disk. In a perfect world it should match your actual disk block size:
For cloud-attached disks like AWS EBS it should be 16kb, for local drives 1-2kb.

### MapDB options

MapDB can be configured in the following way:

```yaml
state:
type: file
path: /path/to/dir # required
backend: # optional, default mapdb
type: mapdb
mmap: true # should MapDB use mmap or raw disk reads for data access? Optional, default true.
maxNodeSize: 16 # what is the node size for internal db index. Optional, default 16.
```

### TLS Support

Metarank supports connecting to Redis using TLS for transport encryption, but there is no way to autodetect
Expand Down
50 changes: 39 additions & 11 deletions src/main/scala/ai/metarank/config/StateStoreConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,25 +110,53 @@ object StateStoreConfig extends Logging {
case class FileStateConfig(
path: String,
format: StoreFormat = BinaryStoreFormat,
backend: FileBackend = MapDBBackend
backend: FileBackend = MapDBBackend()
) extends StateStoreConfig
object FileStateConfig {
sealed trait FileBackend
case object RocksDBBackend extends FileBackend
case object MapDBBackend extends FileBackend
case class RocksDBBackend(lruCacheSize: Long = 1024 * 1024 * 1024L, blockSize: Int = 8 * 1024) extends FileBackend
case class MapDBBackend(mmap: Boolean = true, maxNodeSize: Int = 16) extends FileBackend

implicit val fileBackendDecoder: Decoder[FileBackend] = Decoder.instance(c =>
c.as[String] match {
case Right("mapdb") => Right(MapDBBackend())
case Right("rocksdb") => Right(RocksDBBackend())
case Right(other) => Left(DecodingFailure(s"backend type $other not supported", c.history))
case Left(_) =>
c.downField("type").as[String] match {
case Right("mapdb") => mapDBBackendDecoder.tryDecode(c)
case Right("rocksdb") => rocksDbBackendDecoder.tryDecode(c)
case Right(other) => Left(DecodingFailure(s"backend type $other not supported", c.history))
case Left(err) => Left(err)
}
}
)

implicit val rocksDbBackendDecoder: Decoder[RocksDBBackend] = Decoder.instance(c =>
for {
lruCacheSize <- c.downField("lruCacheSize").as[Option[Long]].map(_.getOrElse(1024 * 1024 * 1024L))
blockSize <- c.downField("blockSize").as[Option[Int]].map(_.getOrElse(8 * 1024))
} yield {
RocksDBBackend(lruCacheSize, blockSize)
}
)

implicit val mapDBBackendDecoder: Decoder[MapDBBackend] = Decoder.instance(c =>
for {
mmap <- c.downField("mmap").as[Option[Boolean]].map(_.getOrElse(true))
maxNodeSize <- c.downField("maxNodeSize").as[Option[Int]].map(_.getOrElse(16))
} yield {
MapDBBackend(mmap, maxNodeSize)
}
)

implicit val fileBackendDecoder: Decoder[FileBackend] = Decoder.decodeString.emapTry {
case "rocksdb" => Success(RocksDBBackend)
case "mapdb" => Success(MapDBBackend)
case other => Failure(new Exception(s"file backend $other not supported"))
}
implicit val fileStateDecoder: Decoder[FileStateConfig] = Decoder.instance(c =>
for {
path <- c.downField("path").as[String]
format <- c.downField("format").as[Option[StoreFormat]]
back <- c.downField("backend").as[Option[FileBackend]]
format <- c.downField("format").as[Option[StoreFormat]].map(_.getOrElse(BinaryStoreFormat))
back <- c.downField("backend").as[Option[FileBackend]].map(_.getOrElse(RocksDBBackend()))
} yield {
FileStateConfig(path, format.getOrElse(BinaryStoreFormat), back.getOrElse(RocksDBBackend))
FileStateConfig(path, format, back)
}
)
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/ai/metarank/fstore/file/FileKVStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ai.metarank.model.{FeatureValue, Key}
import cats.effect.IO
import fs2.Stream

case class FileKVStore(db: HashDB, format: StoreFormat) extends KVStore[Key, FeatureValue] {
case class FileKVStore(db: HashDB[Array[Byte]], format: StoreFormat) extends KVStore[Key, FeatureValue] {
override def put(values: Map[Key, FeatureValue]): IO[Unit] = IO {
val size = values.size
val keys = new Array[String](size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import ai.metarank.fstore.file.client.{HashDB, SortedDB}
import ai.metarank.ml.{Context, Model, Predictor}
import cats.effect.IO

case class FileModelStore(db: HashDB) extends ModelStore {
case class FileModelStore(db: HashDB[Array[Byte]]) extends ModelStore {
override def put(value: Model[_]): IO[Unit] = IO {
value.save() match {
case None => {}
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/ai/metarank/fstore/file/FilePersistence.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ai.metarank.fstore.cache.{CachedKVStore, NegCachedKVStore}
import ai.metarank.fstore.codec.StoreFormat
import ai.metarank.fstore.file.FilePersistence.FeatureSize
import ai.metarank.fstore.file.client.FileClient.PrefixSize
import ai.metarank.fstore.file.client.{FileClient, MapDBClient}
import ai.metarank.fstore.file.client.{FileClient, MapDBClient, RocksDBClient}
import ai.metarank.fstore.memory.{MemKVStore, MemModelStore, MemPeriodicCounter}
import ai.metarank.model.Key.FeatureName
import ai.metarank.model.{Feature, FeatureKey, FeatureValue, Key, Schema}
Expand Down Expand Up @@ -70,10 +70,10 @@ object FilePersistence {
case class FeatureSize(name: FeatureName, size: PrefixSize)
def create(conf: FileStateConfig, schema: Schema, imp: ImportCacheConfig): Resource[IO, FilePersistence] =
conf.backend match {
case FileStateConfig.RocksDBBackend =>
Resource.raiseError[IO, FilePersistence, Throwable](new Exception("not yet implemented"))
case FileStateConfig.MapDBBackend =>
MapDBClient.create(Path.of(conf.path)).map(c => FilePersistence(schema, c, conf.format, imp))
case opts: FileStateConfig.RocksDBBackend =>
RocksDBClient.create(Path.of(conf.path), opts).map(c => FilePersistence(schema, c, conf.format, imp))
case opts: FileStateConfig.MapDBBackend =>
MapDBClient.create(Path.of(conf.path), opts).map(c => FilePersistence(schema, c, conf.format, imp))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import ai.metarank.model.State.ScalarState
import ai.metarank.model.{FeatureValue, Key, State, Timestamp, Write}
import cats.effect.IO

case class FileScalarFeature(config: ScalarConfig, db: HashDB, format: StoreFormat) extends ScalarFeature {
case class FileScalarFeature(config: ScalarConfig, db: HashDB[Array[Byte]], format: StoreFormat) extends ScalarFeature {
override def put(action: Write.Put): IO[Unit] = IO {
db.put(format.key.encodeNoPrefix(action.key), format.scalar.encode(action.value))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ trait FileClient {
def sortedStringDB(name: String): SortedDB[String]
def sortedFloatDB(name: String): SortedDB[Float]
def sortedIntDB(name: String): SortedDB[Int]
def hashDB(name: String): HashDB
def hashDB(name: String): HashDB[Array[Byte]]
def close(): Unit
}

Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/ai/metarank/fstore/file/client/HashDB.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ai.metarank.fstore.file.client

trait HashDB extends DB[Array[Byte]] {
def put(keys: Array[String], values: Array[Array[Byte]]): Unit
def get(keys: Array[String]): Array[Array[Byte]]
trait HashDB[T] extends DB[T] {
def put(keys: Array[String], values: Array[T]): Unit
def get(keys: Array[String]): Array[T]
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package ai.metarank.fstore.file.client

import ai.metarank.config.StateStoreConfig.FileStateConfig.MapDBBackend
import ai.metarank.fstore.file.client.mapdb.{MapdbHashDB, MapdbSortedDB, ScalaFloatSerializer, ScalaIntSerializer}
import cats.effect.{IO, Resource}
import org.mapdb.{BTreeMap, DB, DBMaker, HTreeMap, Serializer}

import java.nio.file.{Files, Path, Paths}

class MapDBClient(db: DB) extends FileClient {
override def hashDB(name: String): HashDB = {
override def hashDB(name: String): HashDB[Array[Byte]] = {
val hash = db.hashMap(name, Serializer.STRING, Serializer.BYTE_ARRAY).createOrOpen()
MapdbHashDB(hash)
}
Expand Down Expand Up @@ -47,7 +48,8 @@ class MapDBClient(db: DB) extends FileClient {
}

object MapDBClient {
def create(path: Path): Resource[IO, MapDBClient] = Resource.make(IO(createUnsafe(path)))(m => IO(m.close()))
def create(path: Path, opts: MapDBBackend): Resource[IO, MapDBClient] =
Resource.make(IO(createUnsafe(path)))(m => IO(m.close()))

def createUnsafe(path: Path) = {
val pathFile = path.toFile
Expand Down

0 comments on commit 5220575

Please sign in to comment.