Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix file backend options are not used in init, trigger a rocksdb compaction on end #1184

Merged
merged 1 commit into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/main/scala/ai/metarank/fstore/file/FilePersistence.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ case class FilePersistence(schema: Schema, db: FileClient, format: StoreFormat,

override def healthcheck(): IO[Unit] = IO.unit

override def sync: IO[Unit] = IO.unit
override def sync: IO[Unit] = IO {
db.compact()
}

def estimateSize(): IO[List[FeatureSize]] = IO.blocking {
List.concat(
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/ai/metarank/fstore/file/client/DB.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ trait DB[T] {
def all(): Iterator[(String, T)]
def sync(): Unit = {}
def sizeof(value: T): Int
def compact(): Unit = {}

def size(): FileClient.PrefixSize = {
val it = all()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ trait FileClient {
def sortedIntDB(name: String): SortedDB[Int]
def hashDB(name: String): HashDB[Array[Byte]]
def close(): Unit
def compact(): Unit
}

object FileClient {
Expand Down
23 changes: 15 additions & 8 deletions src/main/scala/ai/metarank/fstore/file/client/MapDBClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@ import org.mapdb.{BTreeMap, DB, DBMaker, HTreeMap, Serializer}

import java.nio.file.{Files, Path, Paths}

class MapDBClient(db: DB) extends FileClient {
class MapDBClient(db: DB, opts: MapDBBackend) extends FileClient {
override def hashDB(name: String): HashDB[Array[Byte]] = {
val hash = db.hashMap(name, Serializer.STRING, Serializer.BYTE_ARRAY).createOrOpen()
MapdbHashDB(hash)
}

override def sortedStringDB(name: String): SortedDB[String] = {
val tree = db.treeMap(name, Serializer.STRING, Serializer.STRING).maxNodeSize(16).createOrOpen()
val tree = db.treeMap(name, Serializer.STRING, Serializer.STRING).maxNodeSize(opts.maxNodeSize).createOrOpen()
MapdbSortedDB(tree, _.length)
}

override def sortedDB(name: String): SortedDB[Array[Byte]] = {
val tree = db
.treeMap(name, Serializer.STRING, Serializer.BYTE_ARRAY)
.maxNodeSize(16)
.maxNodeSize(opts.maxNodeSize)
.valuesOutsideNodesEnable()
.createOrOpen()
MapdbSortedDB(tree, _.length)
Expand All @@ -30,7 +30,7 @@ class MapDBClient(db: DB) extends FileClient {
override def sortedFloatDB(name: String): SortedDB[Float] = {
val tree = db
.treeMap(name, Serializer.STRING, ScalaFloatSerializer)
.maxNodeSize(16)
.maxNodeSize(opts.maxNodeSize)
.valuesOutsideNodesEnable()
.createOrOpen()
MapdbSortedDB(tree, _ => 4)
Expand All @@ -42,21 +42,28 @@ class MapDBClient(db: DB) extends FileClient {
MapdbSortedDB(tree, _ => 4)
}

override def compact(): Unit = {
db.commit()
}

def close() =
db.close()

}

object MapDBClient {
def create(path: Path, opts: MapDBBackend): Resource[IO, MapDBClient] =
Resource.make(IO(createUnsafe(path)))(m => IO(m.close()))
Resource.make(IO(createUnsafe(path, opts)))(m => IO(m.close()))

def createUnsafe(path: Path) = {
def createUnsafe(path: Path, opts: MapDBBackend) = {
val pathFile = path.toFile
if (!pathFile.exists()) {
pathFile.mkdirs()
}
val db = DBMaker.fileDB(path.toString + "/state.db").fileMmapEnable().closeOnJvmShutdown().make()
new MapDBClient(db)
val db = opts.mmap match {
case true => DBMaker.fileDB(path.toString + "/state.db").fileMmapEnable().closeOnJvmShutdown().make()
case false => DBMaker.fileDB(path.toString + "/state.db").closeOnJvmShutdown().make()
}
new MapDBClient(db, opts)
}
}
18 changes: 14 additions & 4 deletions src/main/scala/ai/metarank/fstore/file/client/RocksDBClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import org.rocksdb.{RocksDB => RDB}

import java.io.File

case class RocksDBClient(dir: String) extends FileClient {
case class RocksDBClient(dir: String, opts: RocksDBBackend) extends FileClient with Logging {
val options = {
val o = new Options()
o.setCreateIfMissing(true)
val table = new BlockBasedTableConfig()
table.setBlockCache(new LRUCache(2024 * 1024 * 1024))
table.setBlockCache(new LRUCache(opts.lruCacheSize))
table.setCacheIndexAndFilterBlocks(true)
table.setBlockSize(8 * 1024)
table.setBlockSize(opts.blockSize)
o.setTableFormatConfig(table)
o
}
Expand Down Expand Up @@ -59,6 +59,13 @@ case class RocksDBClient(dir: String) extends FileClient {
RocksSortedDB(db, Codec.STRING)
}

override def compact(): Unit = {
dbs.foreach(db => {
logger.info(s"triggering compaction for ${db.getName}")
db.compactRange()
})
}

override def close(): Unit = {
dbs.foreach(_.close())
}
Expand All @@ -70,7 +77,10 @@ object RocksDBClient extends Logging {
_ <- IO.whenA(!exists)(
Files[IO].createDirectory(fs2.io.file.Path(path.toString)) *> info(s"created rocksdb dir $path")
)
c <- IO(RocksDBClient(path.toString))
c <- IO(createUnsafe(path, opts))
} yield c)(x => IO(x.close()))

def createUnsafe(path: java.nio.file.Path, opts: RocksDBBackend) = {
RocksDBClient(path.toString, opts)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ trait RocksDB[V] extends DB[V] {
db.close()
}

override def compact(): Unit = {
db.compactRange()
}

override def all(): Iterator[(String, V)] = {
val opts = new ReadOptions()
val rit = db.newIterator(opts)
Expand Down
3 changes: 2 additions & 1 deletion src/test/scala/ai/metarank/fstore/file/FileTest.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ai.metarank.fstore.file

import ai.metarank.config.StateStoreConfig.FileStateConfig.MapDBBackend
import ai.metarank.fstore.FeatureSuite
import ai.metarank.fstore.file.client.{FileClient, MapDBClient}
import ai.metarank.model.{Feature, FeatureValue, State, Write}
Expand All @@ -11,6 +12,6 @@ import java.nio.file.{Files, Path, Paths}
import cats.implicits._

trait FileTest {
lazy val db: FileClient = MapDBClient.createUnsafe(Files.createTempDirectory("boop"))
lazy val db: FileClient = MapDBClient.createUnsafe(Files.createTempDirectory("boop"), MapDBBackend())

}
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
package ai.metarank.fstore.file.client

import ai.metarank.config.StateStoreConfig.FileStateConfig.MapDBBackend

import java.nio.file.{Files, Paths}
import scala.util.Random

class MapDBClientTest extends FileTestSuite {
lazy val mapdb = MapDBClient.createUnsafe(Files.createTempDirectory("tmp"))
lazy val mapdb = MapDBClient.createUnsafe(Files.createTempDirectory("tmp"), MapDBBackend())
override def makeHash(): HashDB[Array[Byte]] = mapdb.hashDB("yep" + Random.nextInt())

override def makeTree(): SortedDB[String] = mapdb.sortedStringDB("baa" + Random.nextInt())

it should "reopen the same state file" in {
val path = Files.createTempDirectory("tmp")
val db1 = MapDBClient.createUnsafe(path)
val db1 = MapDBClient.createUnsafe(path, MapDBBackend())
val kv1 = db1.hashDB("test")
kv1.put("foo", "bar".getBytes())
db1.close()

val db2 = MapDBClient.createUnsafe(path)
val db2 = MapDBClient.createUnsafe(path, MapDBBackend())
val kv2 = db2.hashDB("test")
kv2.get("foo").map(b => new String(b)) shouldBe Some("bar")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
package ai.metarank.fstore.file.client

import ai.metarank.config.StateStoreConfig.FileStateConfig.RocksDBBackend

import java.nio.file.Files
import scala.util.Random

class RocksDBClientTest extends FileTestSuite {
lazy val rdb = RocksDBClient(Files.createTempDirectory("tmp").toString)
lazy val rdb = RocksDBClient(Files.createTempDirectory("tmp").toString, RocksDBBackend())

override def makeHash(): HashDB[Array[Byte]] = rdb.hashDB("yep" + Random.nextInt())

override def makeTree(): SortedDB[String] = rdb.sortedStringDB("baa" + Random.nextInt())

it should "reopen the same state file" in {
val path = Files.createTempDirectory("tmp")
val db1 = MapDBClient.createUnsafe(path)
val kv1 = db1.hashDB("test")
val db1 = RocksDBClient.createUnsafe(path, RocksDBBackend())
val kv1 = db1.hashDB("test")
kv1.put("foo", "bar".getBytes())
db1.close()

val db2 = MapDBClient.createUnsafe(path)
val db2 = RocksDBClient.createUnsafe(path, RocksDBBackend())
val kv2 = db2.hashDB("test")
kv2.get("foo").map(b => new String(b)) shouldBe Some("bar")
}
Expand Down