Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNS-291: Implemented fixation entry lib #306

Merged
merged 18 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 89 additions & 83 deletions common/fixation_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
// For examples, an older version of a package is needed as long as the subscription
// that uses it lives.
//
// Once instantiated, VersionedStore offers 4 methods:
// Once instantiated, FixationStore offers 4 methods:
oren-lava marked this conversation as resolved.
Show resolved Hide resolved
// - SetEntry(index, block, *entry): add a new "block" version of an entry "index".
// - GetEntry(index, block, *entry): get a copy (and reference) an version of an entry.
// - PutEntry(index, block): drop a reference to a version of an entry.
Expand All @@ -27,14 +27,14 @@ import (
// How does it work? The explanation below illustrates how the data is stored, assuming the
// user is the module "packages":
//
// 1. When instantiated, VersionedStore gets a `prefix string` - used as a namespace
// to separate between instances of VersionedStore. For instance, module "packages"
// 1. When instantiated, FixationStore gets a `prefix string` - used as a namespace
// to separate between instances of FixationStore. For instance, module "packages"
// would use its module name for prefix.
//
// 2. Each entry is wrapped by a RawEntry, that holds the index, block, ref-count,
// and the (marshalled) data of the entry.
//
// 3. VersionedStore keeps the entry indices with a special prefix; and it keeps the
// 3. FixationStore keeps the entry indices with a special prefix; and it keeps the
// and the RawEntres with a prefix that includes their index (using the block as key).
// For instance, modules "packages" may have a package named "YourPackage" created at
// block 110, and it was updated at block 220, and package named "MyPakcage" created
Expand All @@ -48,68 +48,70 @@ import (
//
// Thus, iterating on the prefix "packages_Entry_Index_" would yield all the package
// indices. Reverse iterating on the prefix "packages_Entry_Raw_<INDEX>" would yield
// all the versioned of the entry named <INDEX> in descending order.
// all the Fixation of the entry named <INDEX> in descending order.
//
// 4. VersionedStore keeps a reference count of versioned of entries, and when the
// 4. FixationStore keeps a reference count of Fixation of entries, and when the
// count reaches 0 it marks them for deletion. The actual deletion takes place after
// a fixed number of epochs has passed.

type VersionedStore struct {
type FixationStore struct {
storeKey sdk.StoreKey
cdc codec.BinaryCodec
prefix string
}

// AppendEntry adds a new entry to the store
func (vs VersionedStore) AppendEntry(ctx sdk.Context, index string, block uint64, entryData codec.ProtoMarshaler) error {
func (fs *FixationStore) AppendEntry(ctx sdk.Context, index string, block uint64, entryData codec.ProtoMarshaler) error {
// get the latest entry for this index
latestEntry := vs.getUnmarshaledEntryForBlock(ctx, index, block, types.DO_NOTHING)
latestEntry, err := fs.getUnmarshaledEntryForBlock(ctx, index, block, types.DO_NOTHING)
Yaroms marked this conversation as resolved.
Show resolved Hide resolved

// if latest entry is not found, this is a first version entry
if latestEntry == nil {
vs.SetEntryIndex(ctx, index)
if latestEntry == nil && types.ErrEntryNotFound.Is(err) {
oren-lava marked this conversation as resolved.
Show resolved Hide resolved
fs.SetEntryIndex(ctx, index)
} else if err != nil {
return utils.LavaError(ctx, ctx.Logger(), "AppendEntry_get_entry_err", map[string]string{"err": err.Error(), "index": index, "block": strconv.FormatUint(block, 10)}, "error getting entry")
} else {
// make sure the new entry's block is not smaller than the latest entry's block
if block < latestEntry.GetBlock() {
return utils.LavaError(ctx, ctx.Logger(), "AppendEntry_block_too_early", map[string]string{"latestEntryBlock": strconv.FormatUint(latestEntry.GetBlock(), 10), "block": strconv.FormatUint(block, 10), "index": index, "vs.prefix": vs.prefix}, "can't append entry, earlier than the latest entry")
return utils.LavaError(ctx, ctx.Logger(), "AppendEntry_block_too_early", map[string]string{"latestEntryBlock": strconv.FormatUint(latestEntry.GetBlock(), 10), "block": strconv.FormatUint(block, 10), "index": index, "fs.prefix": fs.prefix}, "can't append entry, earlier than the latest entry")
}

// if the new entry's block is equal to the latest entry, overwrite the latest entry
if block == latestEntry.GetBlock() {
return vs.SetEntry(ctx, index, block, entryData)
return fs.ModifyEntry(ctx, index, block, entryData)
}
}

// marshal the new entry's data
b := vs.cdc.MustMarshal(entryData)
marshaledEntryData := fs.cdc.MustMarshal(entryData)
// create a new entry and marshal it
entry := types.Entry{Index: index, Block: block, Data: b, Refcount: 0}
bz := vs.cdc.MustMarshal(&entry)
entry := types.Entry{Index: index, Block: block, Data: marshaledEntryData, Refcount: 0}
marshaledEntry := fs.cdc.MustMarshal(&entry)

// get the relevant store
store := prefix.NewStore(ctx.KVStore(vs.storeKey), types.KeyPrefix(vs.createStoreKey(index)))
store := prefix.NewStore(ctx.KVStore(fs.storeKey), types.KeyPrefix(fs.createStoreKey(index)))
byteKey := types.KeyPrefix(createEntryKey(block))

// set the new entry to the store
store.Set(byteKey, bz)
store.Set(byteKey, marshaledEntry)

// delete old entries
vs.deleteStaleEntries(ctx, index)
fs.deleteStaleEntries(ctx, index)

return nil
}

func (vs VersionedStore) deleteStaleEntries(ctx sdk.Context, index string) {
func (fs *FixationStore) deleteStaleEntries(ctx sdk.Context, index string) {
// get the relevant store and init an iterator
store := prefix.NewStore(ctx.KVStore(vs.storeKey), types.KeyPrefix(vs.createStoreKey(index)))
store := prefix.NewStore(ctx.KVStore(fs.storeKey), types.KeyPrefix(fs.createStoreKey(index)))
iterator := sdk.KVStorePrefixIterator(store, []byte{})
defer iterator.Close()

// iterate over entries
for iterator.Valid() {
// umarshal the old entry version
var oldEntry types.Entry
vs.cdc.MustUnmarshal(iterator.Value(), &oldEntry)
fs.cdc.MustUnmarshal(iterator.Value(), &oldEntry)

iterator.Next()

Expand All @@ -120,43 +122,43 @@ func (vs VersionedStore) deleteStaleEntries(ctx sdk.Context, index string) {

// if the entry's refs is equal to 0 and it has been longer than STALE_ENTRY_TIME from its creation, delete it
if oldEntry.GetRefcount() == 0 && int64(oldEntry.GetBlock())+types.STALE_ENTRY_TIME < ctx.BlockHeight() {
vs.removeEntry(ctx, oldEntry.GetIndex(), oldEntry.GetBlock())
fs.removeEntry(ctx, oldEntry.GetIndex(), oldEntry.GetBlock())
} else {
// else, break (avoiding removal of entries in the middle of the list)
oren-lava marked this conversation as resolved.
Show resolved Hide resolved
break
}
}
}

// SetEntry sets a specific entry in the store
func (vs VersionedStore) SetEntry(ctx sdk.Context, index string, block uint64, entryData codec.ProtoMarshaler) error {
// ModifyEntry modifies an exisiting entry in the store
func (fs *FixationStore) ModifyEntry(ctx sdk.Context, index string, block uint64, entryData codec.ProtoMarshaler) error {
// get the relevant store
store := prefix.NewStore(ctx.KVStore(vs.storeKey), types.KeyPrefix(vs.createStoreKey(index)))
store := prefix.NewStore(ctx.KVStore(fs.storeKey), types.KeyPrefix(fs.createStoreKey(index)))
byteKey := types.KeyPrefix(createEntryKey(block))

// marshal the new entry data
b := vs.cdc.MustMarshal(entryData)
marshaledEntryData := fs.cdc.MustMarshal(entryData)

// get the entry from the store
entry := vs.getUnmarshaledEntryForBlock(ctx, index, block, types.DO_NOTHING)
if entry == nil {
return utils.LavaError(ctx, ctx.Logger(), "SetEntry_cant_find_entry", map[string]string{"vs.prefix": vs.prefix, "index": index, "block": strconv.FormatUint(block, 10)}, "can't set non-existent entry")
entry, err := fs.getUnmarshaledEntryForBlock(ctx, index, block, types.DO_NOTHING)
if err != nil {
return utils.LavaError(ctx, ctx.Logger(), "SetEntry_cant_find_entry", map[string]string{"fs.prefix": fs.prefix, "index": index, "block": strconv.FormatUint(block, 10)}, "can't set non-existent entry")
}

// update the entry's data
entry.Data = b
entry.Data = marshaledEntryData

// marshal the entry
bz := vs.cdc.MustMarshal(entry)
marshaledEntry := fs.cdc.MustMarshal(entry)

// set the entry
store.Set(byteKey, bz)
store.Set(byteKey, marshaledEntry)

return nil
}

// handleRefAction handles ref actions: increase refs, decrease refs or do nothing
func handleRefAction(ctx sdk.Context, entry *types.Entry, refAction types.ReferenceAction) error {
func (fs *FixationStore) handleRefAction(ctx sdk.Context, entry *types.Entry, refAction types.ReferenceAction) error {
// check if entry is nil
if entry == nil {
return utils.LavaError(ctx, ctx.Logger(), "handleRefAction_nil_entry", map[string]string{}, "can't handle reference action, entry is nil")
Expand All @@ -169,38 +171,50 @@ func handleRefAction(ctx sdk.Context, entry *types.Entry, refAction types.Refere
case types.SUB_REFERENCE:
if entry.GetRefcount() > 0 {
entry.Refcount -= 1
} else {
return utils.LavaError(ctx, ctx.Logger(), "handleRefAction_sub_ref_from_non_positive_count", map[string]string{"refCount": strconv.FormatUint(entry.GetRefcount(), 10)}, "refCount is not larger than zero. Can't subtract refcount")
}
oren-lava marked this conversation as resolved.
Show resolved Hide resolved
case types.DO_NOTHING:
}

// get the relevant store
store := prefix.NewStore(ctx.KVStore(fs.storeKey), types.KeyPrefix(fs.createStoreKey(entry.GetIndex())))
oren-lava marked this conversation as resolved.
Show resolved Hide resolved
byteKey := types.KeyPrefix(createEntryKey(entry.GetBlock()))

// marshal the entry
marshaledEntry := fs.cdc.MustMarshal(entry)

// set the entry
store.Set(byteKey, marshaledEntry)

return nil
}

// GetStoreKey returns the versioned store's store key
func (vs VersionedStore) GetStoreKey() sdk.StoreKey {
return vs.storeKey
// GetStoreKey returns the Fixation store's store key
func (fs *FixationStore) GetStoreKey() sdk.StoreKey {
return fs.storeKey
}

// GetCdc returns the versioned store's codec
func (vs VersionedStore) GetCdc() codec.BinaryCodec {
return vs.cdc
// GetCdc returns the Fixation store's codec
func (fs *FixationStore) GetCdc() codec.BinaryCodec {
return fs.cdc
}

// Getprefix returns the versioned store's fixation key
func (vs VersionedStore) GetPrefix() string {
return vs.prefix
// Getprefix returns the Fixation store's fixation key
func (fs *FixationStore) GetPrefix() string {
return fs.prefix
}

// Setprefix sets the versioned store's fixation key
func (vs VersionedStore) SetPrefix(prefix string) VersionedStore {
vs.prefix = prefix
return vs
// Setprefix sets the Fixation store's fixation key
func (fs *FixationStore) WithPrefix(prefix string) *FixationStore {
oren-lava marked this conversation as resolved.
Show resolved Hide resolved
editedFs := FixationStore{storeKey: fs.storeKey, cdc: fs.cdc, prefix: prefix}
return &editedFs
}

// getUnmarshaledEntryForBlock gets an entry by block. Block doesn't have to be precise, it gets the closest entry version
func (vs VersionedStore) getUnmarshaledEntryForBlock(ctx sdk.Context, index string, block uint64, refAction types.ReferenceAction) *types.Entry {
func (fs *FixationStore) getUnmarshaledEntryForBlock(ctx sdk.Context, index string, block uint64, refAction types.ReferenceAction) (*types.Entry, error) {
// get the relevant store using index
store := prefix.NewStore(ctx.KVStore(vs.storeKey), types.KeyPrefix(vs.createStoreKey(index)))
store := prefix.NewStore(ctx.KVStore(fs.storeKey), types.KeyPrefix(fs.createStoreKey(index)))

// init a reverse iterator
iterator := sdk.KVStoreReversePrefixIterator(store, []byte{})
Expand All @@ -209,44 +223,48 @@ func (vs VersionedStore) getUnmarshaledEntryForBlock(ctx sdk.Context, index stri
// iterate over entries
for ; iterator.Valid(); iterator.Next() {
// unmarshal the entry
var val types.Entry
vs.cdc.MustUnmarshal(iterator.Value(), &val)
var entry types.Entry
fs.cdc.MustUnmarshal(iterator.Value(), &entry)

// if the entry's block is smaller than or equal to the requested block, unmarshal the entry's data
oren-lava marked this conversation as resolved.
Show resolved Hide resolved
if val.GetBlock() <= block {
err := handleRefAction(ctx, &val, refAction)
// since the user might not know the precise block that the entry was created on, we get the closest one
// we get from the past since this was the entry that was valid in the requested block (assume there's an
// entry in block 100 and block 200 and the user asks for the version of block 199. Since the block 200's
// didn't exist yet, we get the entry from block 100)
if entry.GetBlock() <= block {
err := fs.handleRefAction(ctx, &entry, refAction)
Yaroms marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil
return nil, err
}

return &val
return &entry, nil
}
}

return nil
return nil, types.ErrEntryNotFound
}

// GetEntryForBlock gets an entry by block. Block doesn't have to be precise, it gets the closest entry version
func (vs VersionedStore) GetEntry(ctx sdk.Context, index string, block uint64, entryData codec.ProtoMarshaler, refAction types.ReferenceAction) error {
// GetEntryForBlock gets an entry by block. The entry is returned by reference on the entryData argument. Block doesn't have to be precise, it gets the closest entry version
oren-lava marked this conversation as resolved.
Show resolved Hide resolved
func (fs *FixationStore) GetEntry(ctx sdk.Context, index string, block uint64, entryData codec.ProtoMarshaler, refAction types.ReferenceAction) error {
// get the unmarshaled entry for block
entry := vs.getUnmarshaledEntryForBlock(ctx, index, block, refAction)
if entry == nil {
return utils.LavaError(ctx, ctx.Logger(), "GetEntry_cant_get_entry", map[string]string{}, "can't get entry")
entry, err := fs.getUnmarshaledEntryForBlock(ctx, index, block, refAction)
if err != nil {
return utils.LavaError(ctx, ctx.Logger(), "GetEntry_cant_get_entry", map[string]string{"err": err.Error()}, "can't get entry")
}

// unmarshal the entry's data
err := vs.cdc.Unmarshal(entry.GetData(), entryData)
err = fs.cdc.Unmarshal(entry.GetData(), entryData)
if err != nil {
Yaroms marked this conversation as resolved.
Show resolved Hide resolved
return utils.LavaError(ctx, ctx.Logger(), "GetEntry_cant_unmarshal", map[string]string{}, "can't unmarshal entry data")
return utils.LavaError(ctx, ctx.Logger(), "GetEntry_cant_unmarshal", map[string]string{"err": err.Error()}, "can't unmarshal entry data")
}

return nil
}

// RemoveEntry removes an entry from the store
func (vs VersionedStore) removeEntry(ctx sdk.Context, index string, block uint64) {
func (fs *FixationStore) removeEntry(ctx sdk.Context, index string, block uint64) {
// get the relevant store
store := prefix.NewStore(ctx.KVStore(vs.storeKey), types.KeyPrefix(vs.createStoreKey(index)))
store := prefix.NewStore(ctx.KVStore(fs.storeKey), types.KeyPrefix(fs.createStoreKey(index)))

// create entry's key
entryKey := createEntryKey(block)
Expand All @@ -255,29 +273,17 @@ func (vs VersionedStore) removeEntry(ctx sdk.Context, index string, block uint64
store.Delete(types.KeyPrefix(entryKey))
}

// getAllUnmarshaledEntries gets all the unmarshaled entries from the store (without entries' old versions)
func (vs VersionedStore) getAllEntries(ctx sdk.Context) []*types.Entry {
latestVersionEntryList := []*types.Entry{}

uniqueIndices := vs.GetAllEntryIndices(ctx)
for _, uniqueIndex := range uniqueIndices {
latestVersionEntry := vs.getUnmarshaledEntryForBlock(ctx, uniqueIndex, uint64(ctx.BlockHeight()), types.DO_NOTHING)
latestVersionEntryList = append(latestVersionEntryList, latestVersionEntry)
}

return latestVersionEntryList
}

// createEntryKey creates an entry key for the KVStore
func createEntryKey(block uint64) string {
return strconv.FormatUint(block, 10)
}

func (vs VersionedStore) createStoreKey(index string) string {
return types.EntryKey + vs.prefix + index
func (fs *FixationStore) createStoreKey(index string) string {
return types.EntryKey + fs.prefix + index
}

// NewVersionedStore returns a new versionedStore object
func NewVersionedStore(storeKey sdk.StoreKey, cdc codec.BinaryCodec, prefix string) *VersionedStore {
return &VersionedStore{storeKey: storeKey, cdc: cdc, prefix: prefix}
// NewFixationStore returns a new FixationStore object
func NewFixationStore(storeKey sdk.StoreKey, cdc codec.BinaryCodec, prefix string) *FixationStore {
fs := FixationStore{storeKey: storeKey, cdc: cdc, prefix: prefix}
return &fs
}
24 changes: 16 additions & 8 deletions common/fixation_entry_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,30 @@ entry index of a package object to the store, it acceses the store using the key
*/

// SetEntryIndex appends an entry index in the store with a new id and updates the count. It returns the index in the list of the added value (for example, if the first value of the list is added, it'll return 0 (the first index in the list))
func (vs VersionedStore) SetEntryIndex(ctx sdk.Context, index string) {
func (fs FixationStore) SetEntryIndex(ctx sdk.Context, index string) {
oren-lava marked this conversation as resolved.
Show resolved Hide resolved
// get the index store with the fixation key
store := prefix.NewStore(ctx.KVStore(vs.GetStoreKey()), types.KeyPrefix(types.EntryIndexKey+vs.prefix))
store := prefix.NewStore(ctx.KVStore(fs.GetStoreKey()), types.KeyPrefix(fs.createEntryIndexStoreKey()))

// convert the index value to a byte array
appendedValue := []byte(index)

// set the index in the index store
store.Set(types.KeyPrefix(types.EntryIndexKey+vs.prefix+index), appendedValue)
store.Set(types.KeyPrefix(types.EntryIndexKey+fs.prefix+index), appendedValue)
oren-lava marked this conversation as resolved.
Show resolved Hide resolved
}

// RemoveEntryIndex removes an EntryIndex from the store
func (vs VersionedStore) removeEntryIndex(ctx sdk.Context, index string) {
func (fs FixationStore) removeEntryIndex(ctx sdk.Context, index string) {
oren-lava marked this conversation as resolved.
Show resolved Hide resolved
// get the index store with the fixation key
store := prefix.NewStore(ctx.KVStore(vs.GetStoreKey()), types.KeyPrefix(types.EntryIndexKey+vs.prefix))
store := prefix.NewStore(ctx.KVStore(fs.GetStoreKey()), types.KeyPrefix(fs.createEntryIndexStoreKey()))

// remove the index from the store
store.Delete(types.KeyPrefix(types.EntryIndexKey + vs.prefix + index))
store.Delete(types.KeyPrefix(fs.createEntryIndexKey(index)))
}

// GetAllEntryIndex returns all EntryIndex
func (vs VersionedStore) GetAllEntryIndices(ctx sdk.Context) []string {
func (fs FixationStore) GetAllEntryIndices(ctx sdk.Context) []string {
oren-lava marked this conversation as resolved.
Show resolved Hide resolved
// get the index store with the fixation key and init an iterator
store := prefix.NewStore(ctx.KVStore(vs.GetStoreKey()), types.KeyPrefix(types.EntryIndexKey+vs.prefix))
store := prefix.NewStore(ctx.KVStore(fs.GetStoreKey()), types.KeyPrefix(fs.createEntryIndexStoreKey()))
iterator := sdk.KVStorePrefixIterator(store, []byte{})
defer iterator.Close()

Expand All @@ -53,3 +53,11 @@ func (vs VersionedStore) GetAllEntryIndices(ctx sdk.Context) []string {

return indexList
}

func (fs FixationStore) createEntryIndexStoreKey() string {
return types.EntryIndexKey + fs.prefix
}

func (fs FixationStore) createEntryIndexKey(index string) string {
return types.EntryIndexKey + fs.prefix + index
}
Loading