Skip to content

Commit

Permalink
populate channels map in batch
Browse files Browse the repository at this point in the history
  • Loading branch information
nzpr committed Jul 8, 2021
1 parent 70ab88e commit a4ef34b
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 20 deletions.
Expand Up @@ -19,7 +19,9 @@ final case class ContinuationHash(hash: Blake2b256Hash) extends ChannelHash

trait ChannelStore[F[_], C] {
def putChannelHash(channel: C): F[Unit]
def putChannelHashes(channels: Seq[C]): F[Unit] = ???
def putContinuationHash(channels: Seq[C]): F[Unit]
def putContinuationHashes(conts: Seq[Seq[C]]): F[Unit] = ???
def getChannelHash(hash: Blake2b256Hash): F[Option[ChannelHash]]

def continuationKey(channels: Seq[Blake2b256Hash]): Blake2b256Hash =
Expand Down
Expand Up @@ -53,6 +53,39 @@ object ChannelStoreImpl {
} yield ()

override def getChannelHash(hash: Blake2b256Hash): F[Option[ChannelHash]] = store.get(hash)

override def putChannelHashes(channels: Seq[C]): F[Unit] = {
def convert(channel: C): F[(Blake2b256Hash, DataJoinHash)] =
for {
// C => hash(C)
eventKey <- Sync[F].delay(StableHashProvider.hash(channel)(sc))

// C => hashPrefix(C)
dataHash = hashDataChannel(channel, sc)
joinHash = hashJoinsChannel(channel, sc)
} yield (eventKey, DataJoinHash(dataHash, joinHash))

channels.toList.traverse(convert).flatMap(store.put)
}

override def putContinuationHashes(conts: Seq[Seq[C]]): F[Unit] = {
def convert(channels: Seq[C]): F[(Blake2b256Hash, ContinuationHash)] =
for {
// Hash each channel in a list
channelsHashes <- Sync[F].delay(
toOrderedByteVectors(channels)(sc).map(Blake2b256Hash.create)
)
// Concatenate channel hashes and hash result
// Seq[C] => Seq[hash(C)] => Seq[bytes(C)] => bytes(C) => hash(C)
eventKey = continuationKey(channelsHashes)

// Concatenate channels and hash result
// Seq[C] => Seq[bytes(C)] => bytes(C) => hashPrefix(C)
continuationHash = hashContinuationsChannels(channels, sc)
} yield (eventKey, ContinuationHash(continuationHash))

conts.toList.traverse(convert).flatMap(store.put)
}
}

def codecChannelHash: Codec[ChannelHash] =
Expand All @@ -71,8 +104,10 @@ object ChannelStoreImpl {
* Useful in places where we want to have disabled or dummy storage.
*/
final case class NoOpChannelStore[F[_]: Applicative, C]() extends ChannelStore[F, C] {
override def putChannelHash(channel: C): F[Unit] = ().pure[F]
override def putContinuationHash(channels: Seq[C]): F[Unit] = ().pure[F]
override def putChannelHash(channel: C): F[Unit] = ().pure[F]
override def putContinuationHash(channels: Seq[C]): F[Unit] = ().pure[F]
override def putChannelHashes(channel: Seq[C]): F[Unit] = ().pure[F]
override def putContinuationHashes(channels: Seq[Seq[C]]): F[Unit] = ().pure[F]
override def getChannelHash(hash: Blake2b256Hash): F[Option[ChannelHash]] =
none[ChannelHash].pure[F]
}
Expand Down
Expand Up @@ -86,21 +86,22 @@ final case class HistoryRepositoryImpl[F[_]: Concurrent: Parallel: Log: Span, C,
s"${key.toHex};delete-join;0"
}.toList

private def storeChannelHash(action: HotStoreAction) =
action match {
case i: InsertData[C, A] =>
channelHashesStore.putChannelHash(i.channel)
case i: InsertContinuations[C, P, K] =>
channelHashesStore.putContinuationHash(i.channels)
case i: InsertJoins[C] =>
channelHashesStore.putChannelHash(i.channel)
case d: DeleteData[C] =>
channelHashesStore.putChannelHash(d.channel)
case d: DeleteContinuations[C] =>
channelHashesStore.putContinuationHash(d.channels)
case d: DeleteJoins[C] =>
channelHashesStore.putChannelHash(d.channel)
private def storeChannelHash(action: List[HotStoreAction]) = {
val insertChans = action.collect {
case i: InsertData[C, A] => i.channel
case i: InsertJoins[C] => i.channel
case d: DeleteData[C] => d.channel
case d: DeleteJoins[C] => d.channel
}
val insertConts = action.collect {
case i: InsertContinuations[C, P, K] => i.channels
case d: DeleteContinuations[C] => d.channels
}
for {
_ <- channelHashesStore.putContinuationHashes(insertConts)
_ <- channelHashesStore.putChannelHashes(insertChans)
} yield ()
}

private def calculateStorageActions(action: HotStoreTrieAction): Result =
action match {
Expand Down Expand Up @@ -222,12 +223,10 @@ final case class HistoryRepositoryImpl[F[_]: Concurrent: Parallel: Log: Span, C,
override def checkpoint(actions: List[HotStoreAction]): F[HistoryRepository[F, C, P, A, K]] = {
val trieActions = actions.par.map(transform).toList
// store channels mapping
val storeChannels = Stream
.emits(actions.map(a => Stream.eval(storeChannelHash(a).map(_.asLeft[History[F]]))))
.parJoinProcBounded
val storeChannels = storeChannelHash(actions).map(_.asLeft[History[F]])
for {
r <- doCheckpoint(trieActions)
_ <- storeChannels.compile.drain
_ <- storeChannels
_ <- measure(actions)
} yield r
}
Expand Down

0 comments on commit a4ef34b

Please sign in to comment.