Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNS-291: Implemented fixation entry lib #306

Merged
merged 18 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 119 additions & 151 deletions common/fixation_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,148 +10,137 @@ import (
"github.com/lavanet/lava/utils"
)

/*
This library provides a standard set of API for managing entries that are meant to be fixated.
This API set should enable users to easily set, modify, and delete fixated entries.
A fixated entry is one whose value is retained on-chain with the block in which it was created when it is changed.
By contrast, when a non-fixated entry is altered, only its most recent value is stored.

Fixated entry structure:
Entry {
string index; // a unique entry name.
uint64 block; // the block the entry was created in.
uint64 references; // Number of references to this entry (number of entities using it).
bytes data; // The data saved in the entry. Can be any type.
}

How does it work?
To help with the explanation, we'll use a reference example. Assume you created a custom module
named "packages". This module keeps package objects as fixated entries (i.e., the module keeps track
of every package AND its older versions). Each package has an "index" field which acts as a unique name
for the package.

In general, each object that uses this library will have a versioned store field of type VersionedStore.
The versioned store (from now on: "vs") holds a unique store key, a codec and a map of unique indices.
Using our reference example, the store key of the module "packages" will be "packages". If another module,
say module "projects", also uses fixated entries, its store key will be "projects". This way we get a clear
separation between the fixated entries of "packages" and of "projects". In other words, the store key acts as
a namespace. The codec is used to determine how the packages should be marshaled, and the unique indices map
helps to track which packages do we have (not including older versions packages).

In each store (or namespace), marshaled objects of type Entry are saved. Note that a module can have different
kind of fixated entries. They are separate between them, we use a fixationKey (a unique key for each type of
fixated entry).

Going back to the reference example, let's say I have a package named "myGreatPackage" that was created in block
101 (it also has additional fields). The package's entry holds the marshaled package object (in the data field)
and has 33 references. Also, let's say the packages module can hold fixated price objects. An example for a
price object can be with price of 2ulava which was created on block 203. The price's entry holds the marshaled
price object and has 12 references.
The store key to access the package object will be "Entry_<packageFixationKey>". The package entry's key will
be "Entry_<packageFixationKey>_myGreatPackage_101"

The store key to access the price object will be "Entry_<priceFixationKey>". The package entry's key will
be "Entry_<priceFixationKey>_2ulava_203"

Moreover, to track the indices of fixated objects more easily (without regarding the indices of older versions
of entries), we keep an EntryIndex list. See fixation_entry_index.go for more details.

*/
// VerionedStore manages lists of entries with versions in the store.
oren-lava marked this conversation as resolved.
Show resolved Hide resolved
//
// Its primary use it to implemented "fixated entries": entries that may change over
// time, and whose versions must be retained on-chain as long as they are referenced.
// For examples, an older version of a package is needed as long as the subscription
// that uses it lives.
//
// Once instantiated, VersionedStore offers 4 methods:
// - SetEntry(index, block, *entry): add a new "block" version of an entry "index".
// - GetEntry(index, block, *entry): get a copy (and reference) an version of an entry.
// - PutEntry(index, block): drop a reference to a version of an entry.
// - RemoveEntry(index): mark an entry as unavailable for new GetEntry() calls.
// - GetAllEntryIndex(index): get all the entries indices (without versions).
//
// How does it work? The explanation below illustrates how the data is stored, assuming the
// user is the module "packages":
//
// 1. When instantiated, VersionedStore gets a `prefix string` - used as a namespace
// to separate between instances of VersionedStore. For instance, module "packages"
// would use its module name for prefix.
//
// 2. Each entry is wrapped by a RawEntry, that holds the index, block, ref-count,
// and the (marshalled) data of the entry.
//
// 3. VersionedStore keeps the entry indices with a special prefix; and it keeps the
// and the RawEntres with a prefix that includes their index (using the block as key).
// For instance, modules "packages" may have a package named "YourPackage" created at
// block 110, and it was updated at block 220, and package named "MyPakcage" created
// at block 150. The store with prefix "packages" will hold the following objects:
//
// prefix: packages_Entry_Index_ key: MyPackage data: MyPackage
// prefix: packages_Entry_Index_ key: YourPackage data: YourPackage
// prefix: packages_Entry_Raw_MyPackage key: 150 data: RawEntry
// prefix: packages_Entry_Raw_YourPackage key: 220 data: RawEntry
// prefix: packages_Entry_Raw_YourPackage key: 110 data: RawEntry
//
// Thus, iterating on the prefix "packages_Entry_Index_" would yield all the package
// indices. Reverse iterating on the prefix "packages_Entry_Raw_<INDEX>" would yield
// all the versioned of the entry named <INDEX> in descending order.
//
// 4. VersionedStore keeps a reference count of versioned of entries, and when the
// count reaches 0 it marks them for deletion. The actual deletion takes place after
// a fixed number of epochs has passed.

type VersionedStore struct {
oren-lava marked this conversation as resolved.
Show resolved Hide resolved
storeKey sdk.StoreKey
cdc codec.BinaryCodec
prefix string
}

// AppendEntry adds a new entry to the store
func (vs VersionedStore) AppendEntry(ctx sdk.Context, fixationKey string, index string, block uint64, entryData codec.ProtoMarshaler) error {
// delete old entries
vs.deleteStaleEntries(ctx, fixationKey)

func (vs VersionedStore) AppendEntry(ctx sdk.Context, index string, block uint64, entryData codec.ProtoMarshaler) error {
// get the latest entry for this index
latestEntry := vs.getUnmarshaledEntryForBlock(ctx, fixationKey, index, block, types.DO_NOTHING)
latestEntry := vs.getUnmarshaledEntryForBlock(ctx, index, block, types.DO_NOTHING)

// if latest entry is not found, this is a first version entry
firstVersion := false
if latestEntry == nil {
oren-lava marked this conversation as resolved.
Show resolved Hide resolved
firstVersion = true
}

// make sure the new entry's block is not smaller than the latest entry's block
if !firstVersion && block < latestEntry.GetBlock() {
return utils.LavaError(ctx, ctx.Logger(), "AppendEntry_block_too_early", map[string]string{"latestEntryBlock": strconv.FormatUint(latestEntry.GetBlock(), 10), "block": strconv.FormatUint(block, 10), "index": index, "fixationKey": fixationKey}, "can't append entry, earlier than the latest entry")
}
vs.SetEntryIndex(ctx, index)
} else {
// make sure the new entry's block is not smaller than the latest entry's block
if block < latestEntry.GetBlock() {
return utils.LavaError(ctx, ctx.Logger(), "AppendEntry_block_too_early", map[string]string{"latestEntryBlock": strconv.FormatUint(latestEntry.GetBlock(), 10), "block": strconv.FormatUint(block, 10), "index": index, "vs.prefix": vs.prefix}, "can't append entry, earlier than the latest entry")
}

// if the new entry's block is equal to the latest entry, overwrite the latest entry
if !firstVersion && block == latestEntry.GetBlock() {
return vs.SetEntry(ctx, fixationKey, index, block, entryData)
// if the new entry's block is equal to the latest entry, overwrite the latest entry
if block == latestEntry.GetBlock() {
return vs.SetEntry(ctx, index, block, entryData)
}
}

// marshal the new entry's data
b := vs.cdc.MustMarshal(entryData)
// create a new entry and marshal it
entry := types.Entry{Index: index, Block: block, Data: b, References: 0}
entry := types.Entry{Index: index, Block: block, Data: b, Refcount: 0}
bz := vs.cdc.MustMarshal(&entry)

// get the relevant store
store := prefix.NewStore(ctx.KVStore(vs.storeKey), types.KeyPrefix(types.EntryKey+fixationKey+index))
byteKey := types.KeyPrefix(createEntryKey(index, fixationKey, block))
store := prefix.NewStore(ctx.KVStore(vs.storeKey), types.KeyPrefix(types.EntryKey+vs.prefix+index))
oren-lava marked this conversation as resolved.
Show resolved Hide resolved
byteKey := types.KeyPrefix(vs.createEntryKey(block))

// set the new entry to the store
store.Set(byteKey, bz)

// if it's a first version entry, add a new key in the uniqueIndices map
if firstVersion {
vs.AppendEntryIndex(ctx, fixationKey, index)
}
// delete old entries
vs.deleteStaleEntries(ctx, index)

return nil
}

func (vs VersionedStore) deleteStaleEntries(ctx sdk.Context, fixationKey string) {
entries := vs.getAllEntries(ctx, fixationKey)
for _, entry := range entries {
// get the relevant store and init an iterator
store := prefix.NewStore(ctx.KVStore(vs.storeKey), types.KeyPrefix(types.EntryKey+fixationKey+entry.GetIndex()))
iterator := sdk.KVStorePrefixIterator(store, []byte{})
defer iterator.Close()

// iterate over entries
for ; iterator.Valid(); iterator.Next() {
// umarshal the old entry version
var oldEntry types.Entry
vs.cdc.MustUnmarshal(iterator.Value(), &oldEntry)

// if the entry's refs is equal to 0 and it has been longer than STALE_ENTRY_TIME from its creation, delete it
if oldEntry.GetReferences() == 0 && int64(oldEntry.GetBlock())+types.STALE_ENTRY_TIME < ctx.BlockHeight() {
vs.removeEntry(ctx, fixationKey, oldEntry.GetIndex(), oldEntry.GetBlock())
} else {
// else, break (avoiding removal of entries in the middle of the list)
break
}
func (vs VersionedStore) deleteStaleEntries(ctx sdk.Context, index string) {
// get the relevant store and init an iterator
store := prefix.NewStore(ctx.KVStore(vs.storeKey), types.KeyPrefix(types.EntryKey+vs.prefix+index))
iterator := sdk.KVStorePrefixIterator(store, []byte{})
defer iterator.Close()

// iterate over entries
for iterator.Valid() {
// umarshal the old entry version
var oldEntry types.Entry
vs.cdc.MustUnmarshal(iterator.Value(), &oldEntry)

iterator.Next()

// skipping removal of latest version
if !iterator.Valid() {
break
}

// try getting this entry's latest version. If it doesn't exist, remove its index for the entry index list
latestVersionEntry := vs.getUnmarshaledEntryForBlock(ctx, fixationKey, entry.GetIndex(), uint64(ctx.BlockHeight()), types.DO_NOTHING)
if latestVersionEntry == nil {
vs.RemoveEntryIndex(ctx, fixationKey, entry.GetIndex())
// if the entry's refs is equal to 0 and it has been longer than STALE_ENTRY_TIME from its creation, delete it
if oldEntry.GetRefcount() == 0 && int64(oldEntry.GetBlock())+types.STALE_ENTRY_TIME < ctx.BlockHeight() {
vs.removeEntry(ctx, oldEntry.GetIndex(), oldEntry.GetBlock())
} else {
// else, break (avoiding removal of entries in the middle of the list)
oren-lava marked this conversation as resolved.
Show resolved Hide resolved
break
}
}
}

// SetEntry sets a specific entry in the store
func (vs VersionedStore) SetEntry(ctx sdk.Context, fixationKey string, index string, block uint64, entryData codec.ProtoMarshaler) error {
func (vs VersionedStore) SetEntry(ctx sdk.Context, index string, block uint64, entryData codec.ProtoMarshaler) error {
// get the relevant store
store := prefix.NewStore(ctx.KVStore(vs.storeKey), types.KeyPrefix(types.EntryKey+fixationKey+index))
byteKey := types.KeyPrefix(createEntryKey(index, fixationKey, block))
store := prefix.NewStore(ctx.KVStore(vs.storeKey), types.KeyPrefix(types.EntryKey+vs.prefix+index))
byteKey := types.KeyPrefix(vs.createEntryKey(block))

// marshal the new entry data
b := vs.cdc.MustMarshal(entryData)

// get the entry from the store
entry := vs.getUnmarshaledEntryForBlock(ctx, fixationKey, index, block, types.DO_NOTHING)
entry := vs.getUnmarshaledEntryForBlock(ctx, index, block, types.DO_NOTHING)
if entry == nil {
return utils.LavaError(ctx, ctx.Logger(), "SetEntry_cant_find_entry", map[string]string{"fixationKey": fixationKey, "index": index, "block": strconv.FormatUint(block, 10)}, "can't set non-existent entry")
return utils.LavaError(ctx, ctx.Logger(), "SetEntry_cant_find_entry", map[string]string{"vs.prefix": vs.prefix, "index": index, "block": strconv.FormatUint(block, 10)}, "can't set non-existent entry")
}

// update the entry's data
Expand All @@ -176,49 +165,17 @@ func handleRefAction(ctx sdk.Context, entry *types.Entry, refAction types.Refere
// handle the ref action
switch refAction {
case types.ADD_REFERENCE:
entry.References += 1
entry.Refcount += 1
case types.SUB_REFERENCE:
if entry.GetReferences() > 0 {
entry.References -= 1
if entry.GetRefcount() > 0 {
entry.Refcount -= 1
}
oren-lava marked this conversation as resolved.
Show resolved Hide resolved
case types.DO_NOTHING:
}

return nil
}

// GetEntry gets the exact entry with index and block (block has to be precise). The user should pass an empty pointer of the desired type (e.g. package, subsciption, etc.).
func (vs VersionedStore) GetEntry(ctx sdk.Context, fixationKey string, index string, block uint64, entryData codec.ProtoMarshaler, refAction types.ReferenceAction) bool {
// create entry key
entryKey := createEntryKey(index, fixationKey, block)

// get the relevant store
store := prefix.NewStore(ctx.KVStore(vs.storeKey), types.KeyPrefix(types.EntryKey+fixationKey+index))

// get the marshled entry
bz := store.Get(types.KeyPrefix(entryKey))

// couldn't find entry
if bz == nil {
return false
}

// unmarshal the entry
var entry types.Entry
vs.cdc.MustUnmarshal(bz, &entry)

// handle ref action
err := handleRefAction(ctx, &entry, refAction)
if err != nil {
return false
}

// unmarshal the entry's data
vs.cdc.MustUnmarshal(entry.GetData(), entryData)

return true
}

// GetStoreKey returns the versioned store's store key
func (vs VersionedStore) GetStoreKey() sdk.StoreKey {
Yaroms marked this conversation as resolved.
Show resolved Hide resolved
return vs.storeKey
Expand All @@ -229,10 +186,21 @@ func (vs VersionedStore) GetCdc() codec.BinaryCodec {
return vs.cdc
}

// Getprefix returns the versioned store's fixation key
func (vs VersionedStore) GetPrefix() string {
return vs.prefix
}

// Setprefix sets the versioned store's fixation key
func (vs VersionedStore) SetPrefix(prefix string) VersionedStore {
vs.prefix = prefix
return vs
}

// getUnmarshaledEntryForBlock gets an entry by block. Block doesn't have to be precise, it gets the closest entry version
func (vs VersionedStore) getUnmarshaledEntryForBlock(ctx sdk.Context, fixationKey string, index string, block uint64, refAction types.ReferenceAction) *types.Entry {
func (vs VersionedStore) getUnmarshaledEntryForBlock(ctx sdk.Context, index string, block uint64, refAction types.ReferenceAction) *types.Entry {
// get the relevant store using index
store := prefix.NewStore(ctx.KVStore(vs.storeKey), types.KeyPrefix(types.EntryKey+fixationKey+index))
store := prefix.NewStore(ctx.KVStore(vs.storeKey), types.KeyPrefix(types.EntryKey+vs.prefix+index))

// init a reverse iterator
iterator := sdk.KVStoreReversePrefixIterator(store, []byte{})
Expand All @@ -259,53 +227,53 @@ func (vs VersionedStore) getUnmarshaledEntryForBlock(ctx sdk.Context, fixationKe
}

// GetEntryForBlock gets an entry by block. Block doesn't have to be precise, it gets the closest entry version
oren-lava marked this conversation as resolved.
Show resolved Hide resolved
func (vs VersionedStore) GetEntryForBlock(ctx sdk.Context, fixationKey string, index string, block uint64, entryData codec.ProtoMarshaler, refAction types.ReferenceAction) error {
func (vs VersionedStore) GetEntry(ctx sdk.Context, index string, block uint64, entryData codec.ProtoMarshaler, refAction types.ReferenceAction) error {
// get the unmarshaled entry for block
entry := vs.getUnmarshaledEntryForBlock(ctx, fixationKey, index, block, refAction)
entry := vs.getUnmarshaledEntryForBlock(ctx, index, block, refAction)
if entry == nil {
return utils.LavaError(ctx, ctx.Logger(), "GetEntryForBlock_cant_get_entry", map[string]string{}, "can't get entry")
return utils.LavaError(ctx, ctx.Logger(), "GetEntry_cant_get_entry", map[string]string{}, "can't get entry")
}

// unmarshal the entry's data
err := vs.cdc.Unmarshal(entry.GetData(), entryData)
if err != nil {
Yaroms marked this conversation as resolved.
Show resolved Hide resolved
return utils.LavaError(ctx, ctx.Logger(), "GetEntryForBlock_cant_unmarshal", map[string]string{}, "can't unmarshal entry data")
return utils.LavaError(ctx, ctx.Logger(), "GetEntry_cant_unmarshal", map[string]string{}, "can't unmarshal entry data")
}

return nil
}

// RemoveEntry removes an entry from the store
func (vs VersionedStore) removeEntry(ctx sdk.Context, fixationKey string, index string, block uint64) {
func (vs VersionedStore) removeEntry(ctx sdk.Context, index string, block uint64) {
// get the relevant store
store := prefix.NewStore(ctx.KVStore(vs.storeKey), types.KeyPrefix(types.EntryKey+fixationKey+index))
store := prefix.NewStore(ctx.KVStore(vs.storeKey), types.KeyPrefix(types.EntryKey+vs.prefix+index))

// create entry's key
entryKey := createEntryKey(index, fixationKey, block)
entryKey := vs.createEntryKey(block)

// delete the entry
store.Delete(types.KeyPrefix(entryKey))
}

// getAllUnmarshaledEntries gets all the unmarshaled entries from the store (without entries' old versions)
func (vs VersionedStore) getAllEntries(ctx sdk.Context, fixationKey string) []*types.Entry {
func (vs VersionedStore) getAllEntries(ctx sdk.Context) []*types.Entry {
latestVersionEntryList := []*types.Entry{}

uniqueIndices := vs.GetAllEntryIndices(ctx, fixationKey)
uniqueIndices := vs.GetAllEntryIndices(ctx)
for _, uniqueIndex := range uniqueIndices {
latestVersionEntry := vs.getUnmarshaledEntryForBlock(ctx, fixationKey, uniqueIndex, uint64(ctx.BlockHeight()), types.DO_NOTHING)
latestVersionEntry := vs.getUnmarshaledEntryForBlock(ctx, uniqueIndex, uint64(ctx.BlockHeight()), types.DO_NOTHING)
latestVersionEntryList = append(latestVersionEntryList, latestVersionEntry)
}

return latestVersionEntryList
}

// createEntryKey creates an entry key for the KVStore
func createEntryKey(index string, fixationKey string, block uint64) string {
return types.EntryKey + fixationKey + index + "_" + strconv.FormatUint(block, 10)
func (vs VersionedStore) createEntryKey(block uint64) string {
return strconv.FormatUint(block, 10)
}

// NewVersionedStore returns a new versionedStore object
func NewVersionedStore(storeKey sdk.StoreKey, cdc codec.BinaryCodec) *VersionedStore {
return &VersionedStore{storeKey: storeKey, cdc: cdc}
func NewVersionedStore(storeKey sdk.StoreKey, cdc codec.BinaryCodec, prefix string) *VersionedStore {
return &VersionedStore{storeKey: storeKey, cdc: cdc, prefix: prefix}
}
Loading