Skip to content

Small NoLog String Database

laforge49 edited this page Oct 6, 2011 · 8 revisions

Unlike the SimpleDataStoreComponent, the SmallDataStoreComponent gives you crash-proof persistence. This is achieved by keeping two copies of the data. Of course, there are still some things that are lacking, like logging and scalability, but these will be addressed going forward. SmallDataStoreComponent, like SimpleDataStoreComponent, is more of a step along a path than something you want to use in production. There may of course be some embedded/mobile applications where it could be useful.

The SmallDataStoreComponent has only a single block, so all data is resident in memory. The database also limits the size of the block, with a default of 1024 bytes, and you can only change this value when you create a new database. If you need to change the size for an existing database, you must open both an old and new database (easy enough as these databases are implemented as subsystems) and transcribe the data.

We will again proceed top-down, beginning with the test code which, of course, is very similar to the test code for SimpleNoLogStringDataStore. Only now, we have separated the update from the query.

{
  val systemServices = SystemServices(new ServicesRootComponentFactory)
  val dbName = "SmallNoLog.db"
  val file = new java.io.File(dbName)
  file.delete
  val properties = new Properties
  properties.put("dbPathname", dbName)
  val db = Subsystem(
    systemServices,
    new SmallNoLogDataStoreComponentFactory,
    properties = properties,
    actorId = ActorId("db"))
  val results = new Results
  val chain = new Chain(results)
  chain.op(systemServices, Register(db))
  chain.op(db, SetRootStringRequest.process(db, "Hello world!"), "timestamp")
  Future(systemServices, chain)
  println(results)
  systemServices.close
}

{
  val systemServices = SystemServices(new ServicesRootComponentFactory)
  val dbName = "SmallNoLog.db"
  val properties = new Properties
  properties.put("dbPathname", dbName)
  val db = Subsystem(
    systemServices,
    new SmallNoLogDataStoreComponentFactory,
    properties = properties,
    actorId = ActorId("db"))
  val results = new Results
  val chain = new Chain(results)
  chain.op(systemServices, Register(db))
  chain.op(db, GetRootStringRequest.process(db), "string")
  Future(systemServices, chain)
  println(results)
  systemServices.close
}

Output.

{timestamp=1348087560880128}
{string=Hello world!}

SmallNoLogStringDataStoreTest

##SmallNoLogStringDataStoreComponentFactory

SmallNoLogStringDataStoreComponentFactory is the same as SimpleNoLogStringDataStoreComponentFactory except for the dependencies.

class SmallNoLogDataStoreComponentFactory
  extends ComponentFactory {
  addDependency(classOf[SmallDataStoreComponentFactory])
  addDependency(classOf[NullTransactionLogComponentFactory])
  addDependency(classOf[NoDbInitializationComponentFactory])

  override def configure(compositeFactory: Factory) {
    val factoryRegistryComponentFactory =
      compositeFactory.componentFactory(classOf[FactoryRegistryComponentFactory]).
        asInstanceOf[FactoryRegistryComponentFactory]

    factoryRegistryComponentFactory.registerFactory(new SetRootStringRequestFactory)
    factoryRegistryComponentFactory.registerFactory(new GetRootStringRequestFactory)
  }
}

SmallNoLogStringDataStoreComponentFactory

##SmallDataStoreComponent

SmallDataStoreComponent is similar to, but much simpler than, SimpleDataStoreComponent. This is because a lot of its logic has been moved to the RootBlockComponent.

class SmallDataStoreComponentFactory extends ComponentFactory {
  addDependency(classOf[TransactionProcessorComponentFactory])
  addDependency(classOf[RootBlockComponentFactory])

  override def instantiate(actor: Actor) = new SmallDataStoreComponent(actor)
}

class SmallDataStoreComponent(actor: Actor)
  extends Component(actor) {
  var dirty = false
  var block: Block = null

  bind(classOf[Commit], commit)
  bind(classOf[Abort], abort)
  bind(classOf[DirtyBlock], dirtyBlock)
  bind(classOf[DbRoot], dbRoot)

  private def commit(msg: AnyRef, rf: Any => Unit) {
    if (!dirty) rf(null)
    else systemServices(WriteRootBlock(block)) {
      rsp => {
        dirty = false
        block(Clean())(rf)
      }
    }
  }

  private def abort(msg: AnyRef, rf: Any => Unit) {
    if (dirty) {
      block = null
      dirty = false
    }
    throw msg.asInstanceOf[Abort].exception
  }

  private def dirtyBlock(msg: AnyRef, rf: Any => Unit) {
    dirty = true
    rf(null)
  }

  private def dbRoot(msg: AnyRef, rf: Any => Unit) {
    if (block != null) rf(block)
    else systemServices(ReadRootBlock()) {
      rsp => {
        block = rsp.asInstanceOf[Block]
        rf(block)
      }
    }
  }
}

SmallDataStoreComponent

##RootBlockComponent

RootBlockComponent provides the crash-proof persistence. It handles two message classes:

case class ReadRootBlock()
case class WriteRootBlock(rootBlock: Block)

Here's the component factory:

class RootBlockComponentFactory extends ComponentFactory {

  addDependency(classOf[RandomIOComponentFactory])

  override def instantiate(actor: Actor) = new RootBlockComponent(actor)
}

Now lets look at the component itself.

class RootBlockComponent(actor: Actor)
  extends Component(actor) {
  val HEADER_LENGTH = longLength + longLength + intLength + intLength
  private var maxBlockSize = 0
  private var currentRootOffset = 0

  ...
}

Only 3 member variables are used:

  1. HEADER_LENGTH - Each disk area has a fixed portion (the header) and a variable portion (the serialized contents of the root Block). The header consists of
    1. A checksum of the remainder of the header and the variable portion.
    2. A timestamp.
    3. The maximum root block size. And
    4. The length of the variable portion.
  2. maxBlockSize - This is the number of bytes reserved for each of the two copies of the root block, including the header. This value is set by maxRootBlockSize property and also saved in the headers. Once the database is initialized, this value can not be changed.
  3. currentRootOffset - The location of the header of the currently active root block. The value toggles alternately between 0 and maxBlockSize.

There are only two message class bindings:

bind(classOf[ReadRootBlock], readRootBlock)
bindSafe(classOf[WriteRootBlock], new ChainFactory(writeRootBlock))

On open, maxBlockSize is initialized. A check is also made that the InitDb message is supported. (This is implemented in NoDbInitializationComponent.)

override def open {
  super.open
  maxBlockSize = GetProperty.int("maxRootBlockSize", 1024)
  actor.requiredService(classOf[InitDb])
}

If the maxRootBlockSize is not present, a (rather small) default value of 1024 is used.

The readRootBlock method is used to process a ReadRootBlock message. This method needs to read both root blocks on disk and return the latest valid block, or create a block if the database file is empty.

private def readRootBlock(msg: AnyRef, rf: Any => Unit) {
  val pathname = GetProperty.required("dbPathname")
  val file = new java.io.File(pathname)
  val fileLength = file.length
  if (fileLength == 0L) {
    currentRootOffset = maxBlockSize
    val block = Block(mailbox)
    block.setSystemServices(systemServices)
    actor(InitDb(block))(rf)
  } else {
    _readRootBlock(0L) {
      rsp1 => {
        val b0 = rsp1.asInstanceOf[Block]
        _readRootBlock(maxBlockSize) {
          rsp2 => {
            val b1 = rsp2.asInstanceOf[Block]
            if (b0 == null && b1 == null)
              throw new IllegalStateException("Db corrupted")
            if (b1 == null) {
              currentRootOffset = 0
              rf(b0)
            } else if (b0 == null) {
              currentRootOffset = maxBlockSize
              rf(b1)
            } else if (b0.key.asInstanceOf[Long] > b1.key.asInstanceOf[Long]) {
              currentRootOffset = 0
              rf(b0)
            } else {
              currentRootOffset = maxBlockSize
              rf(b1)
            }
          }
        }
      }
    }
  }
}

The above code does the following:

  1. If the file is empty, the currentRootOffset is set to the max root block size, so that the first block to be written will be at offset 0 and a new block is returned. Otherwise...
  2. The two blocks are read and validates, with their keys set to the time they were written. If neither block is valid, an exception is thrown.
  3. The currentRootOffset is set to the offset of the selected block and that block is returned.

The _readRootBlock method is used to read and validate the block whose header begins at the given offset.

private def _readRootBlock(offset: Long)(rf: Any => Unit) {
  val headerBytes: Array[Byte] = null
  systemServices(ReadBytesOrNull(offset, HEADER_LENGTH)) {
    rsp1 => {
      if (rsp1 == null) {
        rf(null)
      }
      else {
        val headerBytes = rsp1.asInstanceOf[Array[Byte]]
        val data = new MutableData(headerBytes, 0)
        val checksum = new IncDesLong
        checksum.load(data)
        val timestamp = new IncDesLong
        timestamp.load(data)
        val maxSize = new IncDesInt
        maxSize.load(data)
        val blockLength = new IncDesInt
        blockLength.load(data)
        blockLength(Value()) {
          rsp2 => {
            val length = rsp2.asInstanceOf[Int]
            if (length < 1 || length + HEADER_LENGTH > maxBlockSize) rf(null)
            else systemServices(ReadBytesOrNull(offset + HEADER_LENGTH, length)) {
              rsp3 => {
                if (rsp3 == null) rf(null)
                else {
                  val blockBytes = rsp3.asInstanceOf[Array[Byte]]
                  val adler32 = new Adler32
                  adler32.update(headerBytes, longLength, HEADER_LENGTH - longLength)
                  adler32.update(blockBytes)
                  val newcs = adler32.getValue
                  checksum(Value()) {
                    rsp4 => {
                      val cs = rsp4.asInstanceOf[Long]
                      if (cs != newcs) rf(null)
                      else {
                        val rootBlock = Block(mailbox)
                        rootBlock.setSystemServices(systemServices)
                        rootBlock.load(blockBytes)
                        timestamp(Value()) {
                          rsp5 => {
                            rootBlock.partness(null, rsp5, null)
                            maxSize(Value()) {
                              rsp6 => {
                                val ms = rsp6.asInstanceOf[Int]
                                if (ms != maxBlockSize) throw new IllegalArgumentException(
                                  "maxRootBlockSize property must be " + ms)
                                rf(rootBlock)
                              }
                            }
                          }
                        }
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}

The above code does the following:

  1. The header is read from disk. If unable to read, a null is returned.
  2. The checksum, timestamp, max size and block length are loaded from the header.
  3. If the block length is not valid, a null is returned.
  4. The serialized content of the block is read. If unable to read, a null is returned.
  5. A checksum of the header (excluding the checksum in the header) and the serialized block contents is calculated and compared to the checksum in the header. If not equal, a null is returned.
  6. A block is created and loaded from the serialized data. Its key is set to the timestamp from the header.
  7. If the value of the maxRootBlockSize property is not equal to the max size in the header, an exception is thrown.
  8. The block is returned.

The writeRootBlock is called in response to a WriteRootBlock message that contains the block to be written. This method adds a number of operations to a chain which is subsequently sent to the db subsystem actor which the RootBlockComponent is a part of. The effect of this method is to alternately write the block, together with its header, to the two areas on disk reserved for it.

private def writeRootBlock(msg: AnyRef, chain: Chain) {
  currentRootOffset = maxBlockSize - currentRootOffset
  val rootBlock = msg.asInstanceOf[WriteRootBlock].rootBlock
  var bytes: Array[Byte] = null
  var length = 0
  val checksum = new IncDesLong
  val timestamp = new IncDesLong
  val maxSize = new IncDesInt
  val blockLength = new IncDesInt
  var data: MutableData = null
  chain.op(systemServices, GetTimestamp(), "timestamp")
  chain.op(maxSize, Set(null, maxBlockSize))
  chain.op(timestamp, Unit => Set(null, chain("timestamp")))
  chain.op(rootBlock, Length(), "length")
  chain.op(blockLength, Unit => {
    length = chain("length").asInstanceOf[Int]
    if (length + HEADER_LENGTH > maxBlockSize)
      throw new IllegalArgumentException("Root block size exceeds maxRootBlockSize property: " +
        length + HEADER_LENGTH + " > " + maxBlockSize)
    Set(null, length)
  })
  chain.op(timestamp, Unit => {
    bytes = new Array[Byte](HEADER_LENGTH + length)
    data = new MutableData(bytes, longLength)
    Save(data)
  })
  chain.op(maxSize, Unit => Save(data))
  chain.op(blockLength, Unit => Save(data))
  chain.op(rootBlock, Unit => Save(data))
  chain.op(checksum, Unit => {
    val adler32 = new Adler32
    adler32.update(bytes, longLength, length + HEADER_LENGTH - longLength)
    val cs = adler32.getValue
    Set(null, cs)
  })
  chain.op(checksum, Unit => {
    data.rewind
    Save(data)
  })
  chain.op(actor, Unit => WriteBytes(currentRootOffset, bytes))
}

The above code does the following:

  1. The currentRootOffset is toggled to the alternate location.
  2. If the total length of the header and the serialized block contents exceeds the max root block size, an exception is thrown.
  3. An array of bytes is allocated to hold the header and the serialized block contents.
  4. Skipping the area reserved for the checksum, the timestamp, max root block size, block length and the serialized block contents are copied to the byte array.
  5. The checksum is calculated and written at the start of the byte array.
  6. The byte array is written to disk at the currentRootOffet.

RootBlockComponent

##NoDbInitializationComponent

This component handles the InitDb message sent by the RootBlockComponent when a new database needs initialization. But it performs no initialization.

class NoDbInitializationComponentFactory extends ComponentFactory {
  override def instantiate(actor: Actor) = new NoDbInitializationComponent(actor)
}

class NoDbInitializationComponent(actor: Actor)
  extends Component(actor) {

  bind(classOf[InitDb], initDb)

  private def initDb(msg: AnyRef, rf: Any => Unit) {
    val rootBlock = msg.asInstanceOf[InitDb].rootBlock
    rf(rootBlock)
  }
}

NoDbInitializationComponent


tutorial

Clone this wiki locally