diff --git a/.scalafmt.conf b/.scalafmt.conf index 9b0a86aa5..dbdbb1ecf 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,6 +1,6 @@ style = defaultWithAlign maxColumn = 120 -version = 3.7.16 +version = 3.7.17 assumeStandardLibraryStripMargin = true align.stripMargin = true runner.dialect = scala213 \ No newline at end of file diff --git a/build.sbt b/build.sbt index 0c3a2c380..5f19423eb 100644 --- a/build.sbt +++ b/build.sbt @@ -70,7 +70,7 @@ lazy val root = (project in file(".")) ExclusionRule("org.nd4j", "guava"), ExclusionRule("org.nd4j", "protobuf") ), - "org.rocksdb" % "rocksdbjni" % "8.6.7", + "org.rocksdb" % "rocksdbjni" % "8.8.1", "org.mapdb" % "mapdb" % "3.0.10" exclude ("net.jpountz.lz4", "lz4"), "com.github.jelmerk" % "hnswlib-core" % "1.1.0", "org.slf4j" % "jcl-over-slf4j" % "2.0.9", // librec uses commons-logging, which is JCL diff --git a/doc/configuration/persistence.md b/doc/configuration/persistence.md index 67f8798b0..c9a94c3aa 100644 --- a/doc/configuration/persistence.md +++ b/doc/configuration/persistence.md @@ -84,6 +84,63 @@ in the same datacenter/AZ) * `pipeline.flushPeriod` controls the level of "eventualness" in the overall eventual consistency. With values larger than `10` seconds, a second Metarank instance may not see write buffered in a first instance. +## Disk persistence + +Metarank has also an experimental option of using disk persistence instead of Redis. The main drawback of such an +approach is that the deployment becomes stateful and you need to maintain a disk persistence. + +Metarank supports two disk backends for file-based persistence: + +* MapDB: uses a mmap-based storage for data, works well for smaller datasets. +* RocksDB: uses an LSM-tree storage, suits for large datasets. + +The file persistence configured in the following way: + +```yaml +state: + type: file + path: /path/to/dir # required + format: binary # optional, default=binary, possible values: json, binary + backend: # optional, default mapdb + type: rocksdb # required, values: rocksdb, mapdb + +``` + +### RocksDB options + +RocksDB can be configured by defining the following values in the config file: + +```yaml +state: + type: file + path: /path/to/dir # required + backend: # optional, default mapdb + type: rocksdb + lruCacheSizeMb: 1024000000 # LRU cache size in bytes, optional, default 1Gb + blockSize: 8192 # Block size in bytes, optional, default 8kb + +``` + +A rule of thumb defining these parameters: + +* higher LRU cache size leads to better read throughput at the cost of extra memory usage. If not sure, set it to 50% of your RAM. +* blockSize defines a size of page RocksDB reads from disk. In a perfect world it should match your actual disk block size: +For cloud-attached disks like AWS EBS it should be 16kb, for local drives 1-2kb. + +### MapDB options + +MapDB can be configured in the following way: + +```yaml +state: + type: file + path: /path/to/dir # required + backend: # optional, default mapdb + type: mapdb + mmap: true # should MapDB use mmap or raw disk reads for data access? Optional, default true. + maxNodeSize: 16 # what is the node size for internal db index. Optional, default 16. +``` + ### TLS Support Metarank supports connecting to Redis using TLS for transport encryption, but there is no way to autodetect diff --git a/src/main/scala/ai/metarank/config/StateStoreConfig.scala b/src/main/scala/ai/metarank/config/StateStoreConfig.scala index df2ca0828..44f815f77 100644 --- a/src/main/scala/ai/metarank/config/StateStoreConfig.scala +++ b/src/main/scala/ai/metarank/config/StateStoreConfig.scala @@ -110,25 +110,53 @@ object StateStoreConfig extends Logging { case class FileStateConfig( path: String, format: StoreFormat = BinaryStoreFormat, - backend: FileBackend = MapDBBackend + backend: FileBackend = MapDBBackend() ) extends StateStoreConfig object FileStateConfig { sealed trait FileBackend - case object RocksDBBackend extends FileBackend - case object MapDBBackend extends FileBackend + case class RocksDBBackend(lruCacheSize: Long = 1024 * 1024 * 1024L, blockSize: Int = 8 * 1024) extends FileBackend + case class MapDBBackend(mmap: Boolean = true, maxNodeSize: Int = 16) extends FileBackend + + implicit val fileBackendDecoder: Decoder[FileBackend] = Decoder.instance(c => + c.as[String] match { + case Right("mapdb") => Right(MapDBBackend()) + case Right("rocksdb") => Right(RocksDBBackend()) + case Right(other) => Left(DecodingFailure(s"backend type $other not supported", c.history)) + case Left(_) => + c.downField("type").as[String] match { + case Right("mapdb") => mapDBBackendDecoder.tryDecode(c) + case Right("rocksdb") => rocksDbBackendDecoder.tryDecode(c) + case Right(other) => Left(DecodingFailure(s"backend type $other not supported", c.history)) + case Left(err) => Left(err) + } + } + ) + + implicit val rocksDbBackendDecoder: Decoder[RocksDBBackend] = Decoder.instance(c => + for { + lruCacheSize <- c.downField("lruCacheSize").as[Option[Long]].map(_.getOrElse(1024 * 1024 * 1024L)) + blockSize <- c.downField("blockSize").as[Option[Int]].map(_.getOrElse(8 * 1024)) + } yield { + RocksDBBackend(lruCacheSize, blockSize) + } + ) + + implicit val mapDBBackendDecoder: Decoder[MapDBBackend] = Decoder.instance(c => + for { + mmap <- c.downField("mmap").as[Option[Boolean]].map(_.getOrElse(true)) + maxNodeSize <- c.downField("maxNodeSize").as[Option[Int]].map(_.getOrElse(16)) + } yield { + MapDBBackend(mmap, maxNodeSize) + } + ) - implicit val fileBackendDecoder: Decoder[FileBackend] = Decoder.decodeString.emapTry { - case "rocksdb" => Success(RocksDBBackend) - case "mapdb" => Success(MapDBBackend) - case other => Failure(new Exception(s"file backend $other not supported")) - } implicit val fileStateDecoder: Decoder[FileStateConfig] = Decoder.instance(c => for { path <- c.downField("path").as[String] - format <- c.downField("format").as[Option[StoreFormat]] - back <- c.downField("backend").as[Option[FileBackend]] + format <- c.downField("format").as[Option[StoreFormat]].map(_.getOrElse(BinaryStoreFormat)) + back <- c.downField("backend").as[Option[FileBackend]].map(_.getOrElse(RocksDBBackend())) } yield { - FileStateConfig(path, format.getOrElse(BinaryStoreFormat), back.getOrElse(RocksDBBackend)) + FileStateConfig(path, format, back) } ) } diff --git a/src/main/scala/ai/metarank/fstore/file/FileKVStore.scala b/src/main/scala/ai/metarank/fstore/file/FileKVStore.scala index a8afb77a1..a475ff954 100644 --- a/src/main/scala/ai/metarank/fstore/file/FileKVStore.scala +++ b/src/main/scala/ai/metarank/fstore/file/FileKVStore.scala @@ -8,7 +8,7 @@ import ai.metarank.model.{FeatureValue, Key} import cats.effect.IO import fs2.Stream -case class FileKVStore(db: HashDB, format: StoreFormat) extends KVStore[Key, FeatureValue] { +case class FileKVStore(db: HashDB[Array[Byte]], format: StoreFormat) extends KVStore[Key, FeatureValue] { override def put(values: Map[Key, FeatureValue]): IO[Unit] = IO { val size = values.size val keys = new Array[String](size) diff --git a/src/main/scala/ai/metarank/fstore/file/FileModelStore.scala b/src/main/scala/ai/metarank/fstore/file/FileModelStore.scala index dccd5dd27..0f9f89c0b 100644 --- a/src/main/scala/ai/metarank/fstore/file/FileModelStore.scala +++ b/src/main/scala/ai/metarank/fstore/file/FileModelStore.scala @@ -7,7 +7,7 @@ import ai.metarank.fstore.file.client.{HashDB, SortedDB} import ai.metarank.ml.{Context, Model, Predictor} import cats.effect.IO -case class FileModelStore(db: HashDB) extends ModelStore { +case class FileModelStore(db: HashDB[Array[Byte]]) extends ModelStore { override def put(value: Model[_]): IO[Unit] = IO { value.save() match { case None => {} diff --git a/src/main/scala/ai/metarank/fstore/file/FilePersistence.scala b/src/main/scala/ai/metarank/fstore/file/FilePersistence.scala index aed54b536..424946e62 100644 --- a/src/main/scala/ai/metarank/fstore/file/FilePersistence.scala +++ b/src/main/scala/ai/metarank/fstore/file/FilePersistence.scala @@ -8,7 +8,7 @@ import ai.metarank.fstore.cache.{CachedKVStore, NegCachedKVStore} import ai.metarank.fstore.codec.StoreFormat import ai.metarank.fstore.file.FilePersistence.FeatureSize import ai.metarank.fstore.file.client.FileClient.PrefixSize -import ai.metarank.fstore.file.client.{FileClient, MapDBClient} +import ai.metarank.fstore.file.client.{FileClient, MapDBClient, RocksDBClient} import ai.metarank.fstore.memory.{MemKVStore, MemModelStore, MemPeriodicCounter} import ai.metarank.model.Key.FeatureName import ai.metarank.model.{Feature, FeatureKey, FeatureValue, Key, Schema} @@ -70,10 +70,10 @@ object FilePersistence { case class FeatureSize(name: FeatureName, size: PrefixSize) def create(conf: FileStateConfig, schema: Schema, imp: ImportCacheConfig): Resource[IO, FilePersistence] = conf.backend match { - case FileStateConfig.RocksDBBackend => - Resource.raiseError[IO, FilePersistence, Throwable](new Exception("not yet implemented")) - case FileStateConfig.MapDBBackend => - MapDBClient.create(Path.of(conf.path)).map(c => FilePersistence(schema, c, conf.format, imp)) + case opts: FileStateConfig.RocksDBBackend => + RocksDBClient.create(Path.of(conf.path), opts).map(c => FilePersistence(schema, c, conf.format, imp)) + case opts: FileStateConfig.MapDBBackend => + MapDBClient.create(Path.of(conf.path), opts).map(c => FilePersistence(schema, c, conf.format, imp)) } } diff --git a/src/main/scala/ai/metarank/fstore/file/FileScalarFeature.scala b/src/main/scala/ai/metarank/fstore/file/FileScalarFeature.scala index e7a3c7591..d74afe0a5 100644 --- a/src/main/scala/ai/metarank/fstore/file/FileScalarFeature.scala +++ b/src/main/scala/ai/metarank/fstore/file/FileScalarFeature.scala @@ -10,7 +10,7 @@ import ai.metarank.model.State.ScalarState import ai.metarank.model.{FeatureValue, Key, State, Timestamp, Write} import cats.effect.IO -case class FileScalarFeature(config: ScalarConfig, db: HashDB, format: StoreFormat) extends ScalarFeature { +case class FileScalarFeature(config: ScalarConfig, db: HashDB[Array[Byte]], format: StoreFormat) extends ScalarFeature { override def put(action: Write.Put): IO[Unit] = IO { db.put(format.key.encodeNoPrefix(action.key), format.scalar.encode(action.value)) } diff --git a/src/main/scala/ai/metarank/fstore/file/client/FileClient.scala b/src/main/scala/ai/metarank/fstore/file/client/FileClient.scala index 766621be3..6319e7ec3 100644 --- a/src/main/scala/ai/metarank/fstore/file/client/FileClient.scala +++ b/src/main/scala/ai/metarank/fstore/file/client/FileClient.scala @@ -5,7 +5,7 @@ trait FileClient { def sortedStringDB(name: String): SortedDB[String] def sortedFloatDB(name: String): SortedDB[Float] def sortedIntDB(name: String): SortedDB[Int] - def hashDB(name: String): HashDB + def hashDB(name: String): HashDB[Array[Byte]] def close(): Unit } diff --git a/src/main/scala/ai/metarank/fstore/file/client/HashDB.scala b/src/main/scala/ai/metarank/fstore/file/client/HashDB.scala index d21b4ae57..aa33e0d93 100644 --- a/src/main/scala/ai/metarank/fstore/file/client/HashDB.scala +++ b/src/main/scala/ai/metarank/fstore/file/client/HashDB.scala @@ -1,6 +1,6 @@ package ai.metarank.fstore.file.client -trait HashDB extends DB[Array[Byte]] { - def put(keys: Array[String], values: Array[Array[Byte]]): Unit - def get(keys: Array[String]): Array[Array[Byte]] +trait HashDB[T] extends DB[T] { + def put(keys: Array[String], values: Array[T]): Unit + def get(keys: Array[String]): Array[T] } diff --git a/src/main/scala/ai/metarank/fstore/file/client/MapDBClient.scala b/src/main/scala/ai/metarank/fstore/file/client/MapDBClient.scala index 6636f7448..8a9da641e 100644 --- a/src/main/scala/ai/metarank/fstore/file/client/MapDBClient.scala +++ b/src/main/scala/ai/metarank/fstore/file/client/MapDBClient.scala @@ -1,5 +1,6 @@ package ai.metarank.fstore.file.client +import ai.metarank.config.StateStoreConfig.FileStateConfig.MapDBBackend import ai.metarank.fstore.file.client.mapdb.{MapdbHashDB, MapdbSortedDB, ScalaFloatSerializer, ScalaIntSerializer} import cats.effect.{IO, Resource} import org.mapdb.{BTreeMap, DB, DBMaker, HTreeMap, Serializer} @@ -7,7 +8,7 @@ import org.mapdb.{BTreeMap, DB, DBMaker, HTreeMap, Serializer} import java.nio.file.{Files, Path, Paths} class MapDBClient(db: DB) extends FileClient { - override def hashDB(name: String): HashDB = { + override def hashDB(name: String): HashDB[Array[Byte]] = { val hash = db.hashMap(name, Serializer.STRING, Serializer.BYTE_ARRAY).createOrOpen() MapdbHashDB(hash) } @@ -47,7 +48,8 @@ class MapDBClient(db: DB) extends FileClient { } object MapDBClient { - def create(path: Path): Resource[IO, MapDBClient] = Resource.make(IO(createUnsafe(path)))(m => IO(m.close())) + def create(path: Path, opts: MapDBBackend): Resource[IO, MapDBClient] = + Resource.make(IO(createUnsafe(path)))(m => IO(m.close())) def createUnsafe(path: Path) = { val pathFile = path.toFile diff --git a/src/main/scala/ai/metarank/fstore/file/client/RocksDBClient.scala b/src/main/scala/ai/metarank/fstore/file/client/RocksDBClient.scala index 144c00c8b..53b29caa6 100644 --- a/src/main/scala/ai/metarank/fstore/file/client/RocksDBClient.scala +++ b/src/main/scala/ai/metarank/fstore/file/client/RocksDBClient.scala @@ -1,121 +1,76 @@ package ai.metarank.fstore.file.client +import ai.metarank.config.StateStoreConfig.FileStateConfig.RocksDBBackend +import ai.metarank.fstore.file.client.rocksdb.RocksDB.Codec +import ai.metarank.fstore.file.client.rocksdb.{RocksHashDB, RocksSortedDB} +import ai.metarank.util.Logging import cats.effect.IO import cats.effect.kernel.Resource +import fs2.io.file.{Files, Path} import org.rocksdb.{BlockBasedTableConfig, CompressionType, Filter, LRUCache, Options, ReadOptions, RocksDB} -import java.nio.file.Path import java.util import scala.collection.mutable.ArrayBuffer -/* +import org.rocksdb.{RocksDB => RDB} -case class RocksDBClient(db: RocksDB) extends FileClient { - override def put(key: Array[Byte], value: Array[Byte]): Unit = { - db.put(key, value) - } +import java.io.File - override def put(keys: Array[Array[Byte]], values: Array[Array[Byte]]): Unit = { - var i = 0 - while (i < keys.length) { - db.put(keys(i), values(i)) - i += 1 - } - } - override def get(key: Array[Byte]): Option[Array[Byte]] = { - Option(db.get(key)) +case class RocksDBClient(dir: String) extends FileClient { + val options = { + val o = new Options() + o.setCreateIfMissing(true) + val table = new BlockBasedTableConfig() + table.setBlockCache(new LRUCache(2024 * 1024 * 1024)) + table.setCacheIndexAndFilterBlocks(true) + table.setBlockSize(8 * 1024) + o.setTableFormatConfig(table) + o } - override def firstN(prefix: Array[Byte], n: Int): CloseableIterator[FileClient.KeyVal] = { - new CloseableIterator[KeyVal] { - lazy val it = { - val xit = db.newIterator() - xit.seek(prefix) - xit - } - var cnt = 0 - var closed = false - override def nested: Iterator[KeyVal] = new Iterator[KeyVal] { - override def hasNext: Boolean = { - !closed && (cnt < n) && it.isValid && KeyVal.hasPrefix(it.key(), prefix) - } - override def next(): KeyVal = { - cnt += 1 - val result = KeyVal(it.key(), it.value()) - it.next() - result - } - } + val dbs = ArrayBuffer[RDB]() - override def close(): Unit = { - closed = true - it.close() - } - } + override def hashDB(name: String): HashDB[Array[Byte]] = { + val db = RDB.open(options, List(dir, name).mkString(File.separator)) + dbs.addOne(db) + RocksHashDB(db) } - override def lastN(prefix: Array[Byte], n: Int): CloseableIterator[FileClient.KeyVal] = { - val it = db.newIterator() - it.seekForPrev(nextKey(prefix)) - new CloseableIterator[KeyVal] { - var cnt = 0 - - override def nested: Iterator[KeyVal] = new Iterator[KeyVal] { - override def hasNext: Boolean = { - val over = cnt < n - val valid = it.isValid - val pref = KeyVal.hasPrefix(it.key(), prefix) - over && valid && pref - } - - override def next(): KeyVal = { - cnt += 1 - val result = KeyVal(it.key(), it.value()) - it.prev() - result - } - } - - override def close(): Unit = it.close() - } - + override def sortedDB(name: String): SortedDB[Array[Byte]] = { + val db = RDB.open(options, List(dir, name).mkString(File.separator)) + dbs.addOne(db) + RocksSortedDB(db, Codec.BYTES) } - override def get(keys: Array[Array[Byte]]): Array[Array[Byte]] = { - val response = db.multiGetAsList(util.Arrays.asList(keys: _*)) - val buf = new Array[Array[Byte]](keys.length) - response.toArray(buf) - buf + override def sortedFloatDB(name: String): SortedDB[Float] = { + val db = RDB.open(options, List(dir, name).mkString(File.separator)) + dbs.addOne(db) + RocksSortedDB(db, Codec.FLOAT) } - override def del(key: Array[Byte]): Unit = { - db.delete(key) + override def sortedIntDB(name: String): SortedDB[Int] = { + val db = RDB.open(options, List(dir, name).mkString(File.separator)) + dbs.addOne(db) + RocksSortedDB(db, Codec.INT) } - override def close(): Unit = { - db.close() + override def sortedStringDB(name: String): SortedDB[String] = { + val db = RDB.open(options, List(dir, name).mkString(File.separator)) + dbs.addOne(db) + RocksSortedDB(db, Codec.STRING) } - override def sync(): Unit = { - db.syncWal() + override def close(): Unit = { + dbs.foreach(_.close()) } - } -object RocksDBClient { +object RocksDBClient extends Logging { + def create(path: java.nio.file.Path, opts: RocksDBBackend) = Resource.make(for { + exists <- Files[IO].exists(fs2.io.file.Path(path.toString)) + _ <- IO.whenA(!exists)( + Files[IO].createDirectory(fs2.io.file.Path(path.toString)) *> info(s"created rocksdb dir $path") + ) + c <- IO(RocksDBClient(path.toString)) + } yield c)(x => IO(x.close())) - def create(path: Path) = Resource.make(IO(createUnsafe(path)))(x => IO(x.close())) - def createUnsafe(path: Path) = { - RocksDB.loadLibrary() - val table = new BlockBasedTableConfig() - table.setBlockCache(new LRUCache(128 * 1024 * 1024)) - table.setCacheIndexAndFilterBlocks(true) - table.setBlockSize(1024) - val opts = new Options() - opts.setCreateIfMissing(true) - opts.setOptimizeFiltersForHits(true) - opts.setTableFormatConfig(table) - val db = RocksDB.open(opts, path.toString) - RocksDBClient(db) - } } - */ diff --git a/src/main/scala/ai/metarank/fstore/file/client/mapdb/MapdbHashDB.scala b/src/main/scala/ai/metarank/fstore/file/client/mapdb/MapdbHashDB.scala index 53ce62f7e..400538bef 100644 --- a/src/main/scala/ai/metarank/fstore/file/client/mapdb/MapdbHashDB.scala +++ b/src/main/scala/ai/metarank/fstore/file/client/mapdb/MapdbHashDB.scala @@ -4,7 +4,7 @@ import ai.metarank.fstore.file.client.HashDB import org.mapdb.HTreeMap import scala.jdk.CollectionConverters._ -case class MapdbHashDB(map: HTreeMap[String, Array[Byte]]) extends HashDB { +case class MapdbHashDB(map: HTreeMap[String, Array[Byte]]) extends HashDB[Array[Byte]] { def put(key: String, value: Array[Byte]): Unit = { map.put(key, value) } @@ -25,6 +25,6 @@ case class MapdbHashDB(map: HTreeMap[String, Array[Byte]]) extends HashDB { def all() = map.entrySet().iterator().asScala.map(e => e.getKey -> e.getValue) - override def sizeof(value: Array[Byte]): Int = value.length + override def sizeof(value: Array[Byte]): Int = value.length + 4 } diff --git a/src/main/scala/ai/metarank/fstore/file/client/rocksdb/RocksDB.scala b/src/main/scala/ai/metarank/fstore/file/client/rocksdb/RocksDB.scala index d17804423..c9797577b 100644 --- a/src/main/scala/ai/metarank/fstore/file/client/rocksdb/RocksDB.scala +++ b/src/main/scala/ai/metarank/fstore/file/client/rocksdb/RocksDB.scala @@ -1,3 +1,90 @@ package ai.metarank.fstore.file.client.rocksdb +import ai.metarank.fstore.file.client.{CloseableIterator, DB, HashDB} +import ai.metarank.fstore.file.client.rocksdb.RocksDB.Codec +import org.rocksdb.{ReadOptions, RocksDB => RDB} -case class RocksDB() +import java.nio.ByteBuffer + +trait RocksDB[V] extends DB[V] { + def db: RDB + def codec: Codec[V] + override def put(key: String, value: V): Unit = { + db.put(key.getBytes(), codec.encode(value)) + } + + override def get(key: String): Option[V] = { + Option(db.get(key.getBytes())).map(codec.decode) + } + + override def del(key: String): Unit = { + db.delete(key.getBytes()) + } + + override def close(): Unit = { + db.close() + } + + override def all(): Iterator[(String, V)] = { + val opts = new ReadOptions() + val rit = db.newIterator(opts) + rit.seekToFirst() + new CloseableIterator[(String, V)] { + var closed = false + override def nested: Iterator[(String, V)] = new Iterator[(String, V)] { + override def hasNext: Boolean = { + val br = 1 + !closed && rit.isValid + } + + override def next(): (String, V) = { + val br = 1 + val k = new String(rit.key()) + val v = codec.decode(rit.value()) + rit.next() + (k, v) + } + } + + override def close(): Unit = { + val b = 1 + rit.close() + closed = true + } + } + } + + override def sync(): Unit = { + db.syncWal() + } + + override def sizeof(value: V): Int = 4 + codec.encode(value).length + +} + +object RocksDB { + sealed trait Codec[T] { + def encode(value: T): Array[Byte] + def decode(value: Array[Byte]): T + } + + object Codec { + val INT = new Codec[Int] { + override def decode(value: Array[Byte]): Int = ByteBuffer.wrap(value).getInt + override def encode(value: Int): Array[Byte] = ByteBuffer.allocate(4).putInt(value).array() + } + val FLOAT = new Codec[Float] { + override def decode(value: Array[Byte]): Float = ByteBuffer.wrap(value).getFloat + override def encode(value: Float): Array[Byte] = ByteBuffer.allocate(4).putFloat(value).array() + } + + val BYTES = new Codec[Array[Byte]] { + override def decode(value: Array[Byte]): Array[Byte] = value + override def encode(value: Array[Byte]): Array[Byte] = value + } + + val STRING = new Codec[String] { + override def decode(value: Array[Byte]): String = new String(value) + override def encode(value: String): Array[Byte] = value.getBytes() + } + } +} diff --git a/src/main/scala/ai/metarank/fstore/file/client/rocksdb/RocksHashDB.scala b/src/main/scala/ai/metarank/fstore/file/client/rocksdb/RocksHashDB.scala new file mode 100644 index 000000000..37ac66c3f --- /dev/null +++ b/src/main/scala/ai/metarank/fstore/file/client/rocksdb/RocksHashDB.scala @@ -0,0 +1,24 @@ +package ai.metarank.fstore.file.client.rocksdb + +import ai.metarank.fstore.file.client.HashDB +import ai.metarank.fstore.file.client.rocksdb.RocksDB.Codec +import com.github.blemale.scaffeine.{Cache, Scaffeine} +import org.rocksdb.{RocksDB => RDB} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +case class RocksHashDB(db: RDB) extends RocksDB[Array[Byte]] with HashDB[Array[Byte]] { + lazy val codec = Codec.BYTES + override def get(keys: Array[String]): Array[Array[Byte]] = { + db.multiGetAsList(keys.toList.map(_.getBytes()).asJava).asScala.toArray + } + + override def put(keys: Array[String], values: Array[Array[Byte]]): Unit = { + var i = 0 + while (i < keys.length) { + put(keys(i), values(i)) + i += 1 + } + } +} diff --git a/src/main/scala/ai/metarank/fstore/file/client/rocksdb/RocksSortedDB.scala b/src/main/scala/ai/metarank/fstore/file/client/rocksdb/RocksSortedDB.scala new file mode 100644 index 000000000..4de93bdd4 --- /dev/null +++ b/src/main/scala/ai/metarank/fstore/file/client/rocksdb/RocksSortedDB.scala @@ -0,0 +1,85 @@ +package ai.metarank.fstore.file.client.rocksdb + +import ai.metarank.fstore.file.client.rocksdb.RocksDB.Codec +import ai.metarank.fstore.file.client.{CloseableIterator, DB, SortedDB} +import org.rocksdb.{ReadOptions, RocksDB => RDB} + +case class RocksSortedDB[T](db: RDB, codec: Codec[T]) extends RocksDB[T] with SortedDB[T] { + override def firstN(prefix: String, n: Int): Iterator[(String, T)] = { + val prefixBytes = prefix.getBytes + new CloseableIterator[(String, T)] { + lazy val it = { + val xit = db.newIterator() + xit.seek(prefix.getBytes()) + xit + } + var cnt = 0 + var closed = false + + override def nested: Iterator[(String, T)] = new Iterator[(String, T)] { + + override def hasNext: Boolean = { + !closed && (cnt < n) && it.isValid && hasPrefix(it.key(), prefixBytes) + } + + override def next(): (String, T) = { + cnt += 1 + val result = (new String(it.key()), codec.decode(it.value())) + it.next() + result + } + } + + override def close(): Unit = { + closed = true + it.close() + } + } + + } + + override def lastN(prefix: String, n: Int): Iterator[(String, T)] = { + val prefixBytes = prefix.getBytes + val it = db.newIterator() + it.seekForPrev(SortedDB.nextKey(prefix).getBytes()) + new CloseableIterator[(String, T)] { + var cnt = 0 + var closed = false + + override def nested: Iterator[(String, T)] = new Iterator[(String, T)] { + override def hasNext: Boolean = { + val over = cnt < n + if (!closed) { + val valid = it.isValid + val pref = hasPrefix(it.key(), prefixBytes) + over && valid && pref + } else { + false + } + } + + override def next(): (String, T) = { + cnt += 1 + val result = (new String(it.key()), codec.decode(it.value())) + it.prev() + result + } + } + + override def close(): Unit = { + closed = true + it.close() + } + } + } + + def hasPrefix(key: Array[Byte], prefix: Array[Byte]): Boolean = { + var i = 0 + var matches = prefix.length < key.length + while (matches && (i < prefix.length)) { + matches = prefix(i) == key(i) + i += 1 + } + matches + } +} diff --git a/src/main/scala/ai/metarank/main/command/Import.scala b/src/main/scala/ai/metarank/main/command/Import.scala index 19e079445..d85e12a86 100644 --- a/src/main/scala/ai/metarank/main/command/Import.scala +++ b/src/main/scala/ai/metarank/main/command/Import.scala @@ -89,7 +89,7 @@ object Import extends Logging { dir <- IO(Files.createTempDirectory("metarank-rocksdb-temp")) _ <- info(s"using local disk cache for redis persistence: $dir") result <- FilePersistence - .create(FileStateConfig(dir.toString, backend = MapDBBackend), mapping.schema, conf.core.`import`.cache) + .create(FileStateConfig(dir.toString, backend = MapDBBackend()), mapping.schema, conf.core.`import`.cache) .use(cache => for { buf2 <- IO(buffer.copy(values = cache.values)) diff --git a/src/main/scala/ai/metarank/model/ItemValue.scala b/src/main/scala/ai/metarank/model/ItemValue.scala index 853a5f439..34a857a48 100644 --- a/src/main/scala/ai/metarank/model/ItemValue.scala +++ b/src/main/scala/ai/metarank/model/ItemValue.scala @@ -56,7 +56,9 @@ object ItemValue { throw new IllegalStateException(s"for ${feature.schema} dim mismatch: ${feature.dim} != ${value.dim}") } if (values.length != ranking.items.size) { - throw new IllegalStateException(s"for ${feature.schema} dim mismatch: there should be ${ranking.items.size} per-document values, but got ${values.length}") + throw new IllegalStateException( + s"for ${feature.schema} dim mismatch: there should be ${ranking.items.size} per-document values, but got ${values.length}" + ) } values }) diff --git a/src/test/scala/ai/metarank/config/StateStoreConfigTest.scala b/src/test/scala/ai/metarank/config/StateStoreConfigTest.scala index 3fda135ad..856423703 100644 --- a/src/test/scala/ai/metarank/config/StateStoreConfigTest.scala +++ b/src/test/scala/ai/metarank/config/StateStoreConfigTest.scala @@ -1,7 +1,8 @@ package ai.metarank.config +import ai.metarank.config.StateStoreConfig.FileStateConfig.RocksDBBackend import ai.metarank.config.StateStoreConfig.RedisStateConfig.{CacheConfig, DBConfig, PipelineConfig} -import ai.metarank.config.StateStoreConfig.{MemoryStateConfig, RedisStateConfig} +import ai.metarank.config.StateStoreConfig.{FileStateConfig, MemoryStateConfig, RedisStateConfig} import io.circe.yaml.parser.parse import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -66,4 +67,29 @@ class StateStoreConfigTest extends AnyFlatSpec with Matchers { conf shouldBe Right(PipelineConfig(flushPeriod = 77.hour)) } + it should "decode file with simple backend" in { + val yaml = + """ + |type: file + |path: /tmp + |format: binary + |backend: rocksdb""".stripMargin + val conf = parse(yaml).flatMap(_.as[StateStoreConfig]) + conf shouldBe Right(FileStateConfig("/tmp", backend = RocksDBBackend())) + } + + it should "decode file with backend and options" in { + val yaml = + """ + |type: file + |path: /tmp + |format: binary + |backend: + | type: rocksdb + | lruCacheSize: 1 + | blockSize: 1""".stripMargin + val conf = parse(yaml).flatMap(_.as[StateStoreConfig]) + conf shouldBe Right(FileStateConfig("/tmp", backend = RocksDBBackend(1,1))) + } + } diff --git a/src/test/scala/ai/metarank/fstore/file/client/FileTestSuite.scala b/src/test/scala/ai/metarank/fstore/file/client/FileTestSuite.scala index 8997188be..d41908934 100644 --- a/src/test/scala/ai/metarank/fstore/file/client/FileTestSuite.scala +++ b/src/test/scala/ai/metarank/fstore/file/client/FileTestSuite.scala @@ -8,7 +8,7 @@ import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers trait FileTestSuite extends AnyFlatSpec with Matchers { this: Suite => - def makeHash(): HashDB + def makeHash(): HashDB[Array[Byte]] def makeTree(): SortedDB[String] "hash" should "read-write" in { @@ -32,7 +32,7 @@ trait FileTestSuite extends AnyFlatSpec with Matchers { this: Suite => db.put("foo2", "bar".getBytes()) db.put("foo3", "bar".getBytes()) val result = fs2.Stream.fromBlockingIterator[IO](db.all(), 128).compile.toList.unsafeRunSync() - result.map(kv => new String(kv._1)) shouldBe List("foo3", "foo1", "foo2") + result.map(kv => new String(kv._1)) should contain theSameElementsAs List("foo3", "foo1", "foo2") } "tree" should "read-write" in { @@ -141,7 +141,9 @@ trait FileTestSuite extends AnyFlatSpec with Matchers { this: Suite => db.put("foo2", "bar") db.put("foo3", "bar") val size = db.size() - size shouldBe PrefixSize(12, 9, 3) + size.keyBytes shouldBe 12 + size.valueBytes should be >= 8L + size.count shouldBe 3 } } diff --git a/src/test/scala/ai/metarank/fstore/file/client/MapDBClientTest.scala b/src/test/scala/ai/metarank/fstore/file/client/MapDBClientTest.scala index dc4be1d2f..6e3ebda70 100644 --- a/src/test/scala/ai/metarank/fstore/file/client/MapDBClientTest.scala +++ b/src/test/scala/ai/metarank/fstore/file/client/MapDBClientTest.scala @@ -5,7 +5,7 @@ import scala.util.Random class MapDBClientTest extends FileTestSuite { lazy val mapdb = MapDBClient.createUnsafe(Files.createTempDirectory("tmp")) - override def makeHash(): HashDB = mapdb.hashDB("yep" + Random.nextInt()) + override def makeHash(): HashDB[Array[Byte]] = mapdb.hashDB("yep" + Random.nextInt()) override def makeTree(): SortedDB[String] = mapdb.sortedStringDB("baa" + Random.nextInt()) diff --git a/src/test/scala/ai/metarank/fstore/file/client/RocksDBClientTest.scala b/src/test/scala/ai/metarank/fstore/file/client/RocksDBClientTest.scala index 59758e63a..d0fd83f9c 100644 --- a/src/test/scala/ai/metarank/fstore/file/client/RocksDBClientTest.scala +++ b/src/test/scala/ai/metarank/fstore/file/client/RocksDBClientTest.scala @@ -1,7 +1,25 @@ package ai.metarank.fstore.file.client import java.nio.file.Files +import scala.util.Random -//class RocksDBClientTest extends FileTestSuite { -// override def make(): FileClient = RocksDBClient.createUnsafe(Files.createTempDirectory("rocks-tmp")) -//} +class RocksDBClientTest extends FileTestSuite { + lazy val rdb = RocksDBClient(Files.createTempDirectory("tmp").toString) + + override def makeHash(): HashDB[Array[Byte]] = rdb.hashDB("yep" + Random.nextInt()) + + override def makeTree(): SortedDB[String] = rdb.sortedStringDB("baa" + Random.nextInt()) + + it should "reopen the same state file" in { + val path = Files.createTempDirectory("tmp") + val db1 = MapDBClient.createUnsafe(path) + val kv1 = db1.hashDB("test") + kv1.put("foo", "bar".getBytes()) + db1.close() + + val db2 = MapDBClient.createUnsafe(path) + val kv2 = db2.hashDB("test") + kv2.get("foo").map(b => new String(b)) shouldBe Some("bar") + } + +}