Skip to content

Commit

Permalink
Merge pull request #11499 from vegaprotocol/release/v0.77.3
Browse files Browse the repository at this point in the history
Release/v0.77.3
  • Loading branch information
jeremyletang committed Jul 25, 2024
2 parents 6e72490 + e57a6da commit c89ec09
Show file tree
Hide file tree
Showing 28 changed files with 3,144 additions and 86 deletions.
32 changes: 32 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,38 @@
- [](https://github.com/vegaprotocol/vega/issues/xxx)


## 0.77.3

### 🚨 Breaking changes

- [](https://github.com/vegaprotocol/vega/issues/xxx)

### 🗑️ Deprecation

- [](https://github.com/vegaprotocol/vega/issues/xxx)

### 🛠 Improvements

- [1333](https://github.com/vegaprotocol/core-test-coverage/issues/1333) - Added coverage for `0093-TRTO-001`.
- [1334](https://github.com/vegaprotocol/core-test-coverage/issues/1334) - Added coverage for `0093-TRTO-002`.
- [1335](https://github.com/vegaprotocol/core-test-coverage/issues/1335) - Added coverage for `0093-TRTO-003`.
- [1336](https://github.com/vegaprotocol/core-test-coverage/issues/1336) - Added coverage for `0093-TRTO-004`.
- [1337](https://github.com/vegaprotocol/core-test-coverage/issues/1337) - Added coverage for `0093-TRTO-005`.
- [1338](https://github.com/vegaprotocol/core-test-coverage/issues/1338) - Added coverage for `0093-TRTO-006`.
- [1339](https://github.com/vegaprotocol/core-test-coverage/issues/1339) - Added coverage for `0093-TRTO-007`.
- [1340](https://github.com/vegaprotocol/core-test-coverage/issues/1340) - Added coverage for `0093-TRTO-008`.
- [1341](https://github.com/vegaprotocol/core-test-coverage/issues/1341) - Added coverage for `0093-TRTO-009`.
- [1342](https://github.com/vegaprotocol/core-test-coverage/issues/1342) - Added coverage for `0093-TRTO-010`.
- [1343](https://github.com/vegaprotocol/core-test-coverage/issues/1343) - Added coverage for `0093-TRTO-011`.
- [1344](https://github.com/vegaprotocol/core-test-coverage/issues/1344) - Added coverage for `0093-TRTO-012`.
- [1382](https://github.com/vegaprotocol/core-test-coverage/issues/1382) - Added coverage for `0093-TRTO-013`.
- [11497](https://github.com/vegaprotocol/vega/issues/11497) - Improve performance with the `ackedEvents` store.

### 🐛 Fixes

- [11496](https://github.com/vegaprotocol/vega/pull/11496) - Fix panic with long block auctions.


## 0.77.2

### 🐛 Fixes
Expand Down
48 changes: 26 additions & 22 deletions core/datasource/external/ethverifier/acked_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (
"github.com/emirpasic/gods/utils"
)

const oneHour = 3600 // seconds

type ackedEvtBucket struct {
ts int64
hashes []string
endTs int64
hashes map[string]struct{}
}

func ackedEvtBucketComparator(a, b interface{}) int {
Expand All @@ -39,30 +42,36 @@ type ackedEvents struct {
func (a *ackedEvents) AddAt(ts int64, hashes ...string) {
_, value := a.events.Find(func(i int, value interface{}) bool {
bucket := value.(*ackedEvtBucket)
return bucket.ts == ts
return bucket.ts <= ts && bucket.endTs >= ts
})

if value != nil {
bucket := value.(*ackedEvtBucket)
for _, newHash := range hashes {
found := false
for _, v := range bucket.hashes {
// hash already exists
if v == newHash {
found = true
break
}
}

if !found {
bucket.hashes = append(bucket.hashes, newHash)
}
bucket.hashes[newHash] = struct{}{}
}

return
}

a.events.Add(&ackedEvtBucket{ts: ts, hashes: append([]string{}, hashes...)})
hashesM := map[string]struct{}{}
for _, v := range hashes {
hashesM[v] = struct{}{}
}

a.events.Add(&ackedEvtBucket{ts: ts, endTs: ts + oneHour, hashes: hashesM})
}

// RestoreExactAt - is to be used when loading a snapshot only
// this prevent restoring in different buckets, which could happen
// when events are received out of sync (e.g: timestamps 100 before 90) which could make gap between buckets.
func (a *ackedEvents) RestoreExactAt(ts int64, hashes ...string) {
hashesM := map[string]struct{}{}
for _, v := range hashes {
hashesM[v] = struct{}{}
}

a.events.Add(&ackedEvtBucket{ts: ts, endTs: ts + oneHour, hashes: hashesM})
}

func (a *ackedEvents) Add(hash string) {
Expand All @@ -72,13 +81,8 @@ func (a *ackedEvents) Add(hash string) {
func (a *ackedEvents) Contains(hash string) bool {
_, value := a.events.Find(func(index int, value interface{}) bool {
bucket := value.(*ackedEvtBucket)
for _, v := range bucket.hashes {
if hash == v {
return true
}
}

return false
_, ok := bucket.hashes[hash]
return ok
})

return value != nil
Expand Down
7 changes: 5 additions & 2 deletions core/datasource/external/ethverifier/l2_verifier_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ethverifier

import (
"context"
"slices"
"sort"

"code.vegaprotocol.io/vega/core/datasource/external/ethcall"
Expand Down Expand Up @@ -63,9 +64,11 @@ func (s *L2Verifiers) GetState(k string) ([]byte, []types.StateProvider, error)
iter := v.ackedEvts.events.Iterator()
for iter.Next() {
v := (iter.Value().(*ackedEvtBucket))
hashes := maps.Keys(v.hashes)
slices.Sort(hashes)
slice = append(slice, &snapshotpb.EthVerifierBucket{
Ts: v.ts,
Hashes: v.hashes,
Hashes: hashes,
})
}

Expand Down Expand Up @@ -153,7 +156,7 @@ func (s *L2Verifiers) restoreState(ctx context.Context, l2EthOracles *snapshotpb
}

verifier.restorePatchBlock(ctx, patchBlock)
verifier.restoreSeen(v.Misc.Buckets)
verifier.restoreSeen(ctx, v.Misc.Buckets)
}
pending := []*ethcall.ContractCallEvent{}

Expand Down
26 changes: 21 additions & 5 deletions core/datasource/external/ethverifier/verifier_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package ethverifier
import (
"context"
"fmt"
"slices"

"code.vegaprotocol.io/vega/core/datasource/external/ethcall"
"code.vegaprotocol.io/vega/core/metrics"
Expand All @@ -26,6 +27,8 @@ import (
"code.vegaprotocol.io/vega/libs/proto"
"code.vegaprotocol.io/vega/logging"
snapshotpb "code.vegaprotocol.io/vega/protos/vega/snapshot/v1"

"golang.org/x/exp/maps"
)

var (
Expand Down Expand Up @@ -89,9 +92,11 @@ func (s *Verifier) serialiseMisc() ([]byte, error) {
iter := s.ackedEvts.events.Iterator()
for iter.Next() {
v := (iter.Value().(*ackedEvtBucket))
hashes := maps.Keys(v.hashes)
slices.Sort(hashes)
slice = append(slice, &snapshotpb.EthVerifierBucket{
Ts: v.ts,
Hashes: v.hashes,
Hashes: hashes,
})
}

Expand Down Expand Up @@ -181,9 +186,20 @@ func (s *Verifier) OnStateLoaded(ctx context.Context) error {
return nil
}

func (s *Verifier) restoreSeen(buckets []*snapshotpb.EthVerifierBucket) {
func (s *Verifier) restoreSeen(ctx context.Context, buckets []*snapshotpb.EthVerifierBucket) {
// if we are executing a protocol upgrade,
// let's force bucketing things. This will reduce
// increase performance at startup, and everyone is starting
// from the same snapshot, so that will keep state consistent
if vgcontext.InProgressUpgrade(ctx) {
for _, v := range buckets {
s.ackedEvts.AddAt(v.Ts, v.Hashes...)
}
return
}

for _, v := range buckets {
s.ackedEvts.AddAt(v.Ts, v.Hashes...)
s.ackedEvts.RestoreExactAt(v.Ts, v.Hashes...)
}
}

Expand All @@ -200,14 +216,14 @@ func (s *Verifier) restorePatchBlock(_ context.Context, patchBlock *types.EthBlo
s.patchBlock = patchBlock
}

func (s *Verifier) restoreMisc(_ context.Context, pl *snapshotpb.EthOracleVerifierMisc) {
func (s *Verifier) restoreMisc(ctx context.Context, pl *snapshotpb.EthOracleVerifierMisc) {
if pl.PatchBlock != nil {
s.patchBlock = &types.EthBlock{
Height: pl.PatchBlock.BlockHeight,
Time: pl.PatchBlock.BlockTime,
}
}
s.restoreSeen(pl.Buckets)
s.restoreSeen(ctx, pl.Buckets)
}

func (s *Verifier) restorePendingCallEvents(ctx context.Context,
Expand Down
48 changes: 26 additions & 22 deletions core/evtforward/acked_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (
"github.com/emirpasic/gods/utils"
)

const oneHour = 3600 // seconds

type ackedEvtBucket struct {
ts int64
hashes []string
endTs int64
hashes map[string]struct{}
}

func ackedEvtBucketComparator(a, b interface{}) int {
Expand All @@ -39,30 +42,36 @@ type ackedEvents struct {
func (a *ackedEvents) AddAt(ts int64, hashes ...string) {
_, value := a.events.Find(func(i int, value interface{}) bool {
bucket := value.(*ackedEvtBucket)
return bucket.ts == ts
return bucket.ts <= ts && bucket.endTs >= ts
})

if value != nil {
bucket := value.(*ackedEvtBucket)
for _, newHash := range hashes {
found := false
for _, v := range bucket.hashes {
// hash already exists
if v == newHash {
found = true
break
}
}

if !found {
bucket.hashes = append(bucket.hashes, newHash)
}
bucket.hashes[newHash] = struct{}{}
}

return
}

a.events.Add(&ackedEvtBucket{ts: ts, hashes: append([]string{}, hashes...)})
hashesM := map[string]struct{}{}
for _, v := range hashes {
hashesM[v] = struct{}{}
}

a.events.Add(&ackedEvtBucket{ts: ts, endTs: ts + oneHour, hashes: hashesM})
}

// RestoreExactAt - is to be used when loading a snapshot only
// this prevent restoring in different buckets, which could happen
// when events are received out of sync (e.g: timestamps 100 before 90) which could make gap between buckets.
func (a *ackedEvents) RestoreExactAt(ts int64, hashes ...string) {
hashesM := map[string]struct{}{}
for _, v := range hashes {
hashesM[v] = struct{}{}
}

a.events.Add(&ackedEvtBucket{ts: ts, endTs: ts + oneHour, hashes: hashesM})
}

func (a *ackedEvents) Add(hash string) {
Expand All @@ -72,13 +81,8 @@ func (a *ackedEvents) Add(hash string) {
func (a *ackedEvents) Contains(hash string) bool {
_, value := a.events.Find(func(index int, value interface{}) bool {
bucket := value.(*ackedEvtBucket)
for _, v := range bucket.hashes {
if hash == v {
return true
}
}

return false
_, ok := bucket.hashes[hash]
return ok
})

return value != nil
Expand Down
22 changes: 19 additions & 3 deletions core/evtforward/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ package evtforward

import (
"context"
"slices"

"code.vegaprotocol.io/vega/core/types"
vgcontext "code.vegaprotocol.io/vega/libs/context"
"code.vegaprotocol.io/vega/libs/proto"
snapshotpb "code.vegaprotocol.io/vega/protos/vega/snapshot/v1"

"github.com/emirpasic/gods/sets/treeset"
"golang.org/x/exp/maps"
)

var (
Expand Down Expand Up @@ -50,9 +53,11 @@ func (f *Forwarder) serialise() ([]byte, error) {
iter := f.ackedEvts.events.Iterator()
for iter.Next() {
v := iter.Value().(*ackedEvtBucket)
hashes := maps.Keys(v.hashes)
slices.Sort(hashes)
slice = append(slice, &snapshotpb.EventForwarderBucket{
Ts: v.ts,
Hashes: v.hashes,
Hashes: hashes,
})
}

Expand Down Expand Up @@ -91,13 +96,24 @@ func (f *Forwarder) LoadState(ctx context.Context, p *types.Payload) ([]types.St
return nil, types.ErrUnknownSnapshotType
}

func (f *Forwarder) restore(_ context.Context, p *types.PayloadEventForwarder) {
func (f *Forwarder) restore(ctx context.Context, p *types.PayloadEventForwarder) {
f.ackedEvts = &ackedEvents{
timeService: f.timeService,
events: treeset.NewWith(ackedEvtBucketComparator),
}

// if we are executing a protocol upgrade,
// let's force bucketing things. This will reduce
// increase performance at startup, and everyone is starting
// from the same snapshot, so that will keep state consistent
if vgcontext.InProgressUpgrade(ctx) {
for _, v := range p.Buckets {
f.ackedEvts.AddAt(v.Ts, v.Hashes...)
}
return
}

for _, v := range p.Buckets {
f.ackedEvts.AddAt(v.Ts, v.Hashes...)
f.ackedEvts.RestoreExactAt(v.Ts, v.Hashes...)
}
}
10 changes: 6 additions & 4 deletions core/execution/amm/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (e *Engine) BestPricesAndVolumes() (*num.Uint, uint64, *num.Uint, uint64) {
fp := pool.BestPrice(nil)

// get the volume on the buy side by simulating an incoming sell order
bid := num.UintZero().Sub(fp, pool.oneTick)
bid := num.Max(pool.lower.low, num.UintZero().Sub(fp, pool.oneTick))
volume := pool.TradableVolumeInRange(types.SideSell, fp.Clone(), bid)

if volume != 0 {
Expand All @@ -341,7 +341,7 @@ func (e *Engine) BestPricesAndVolumes() (*num.Uint, uint64, *num.Uint, uint64) {
}

// get the volume on the sell side by simulating an incoming buy order
ask := num.UintZero().Add(fp, pool.oneTick)
ask := num.Min(pool.upper.high, num.UintZero().Add(fp, pool.oneTick))
volume = pool.TradableVolumeInRange(types.SideBuy, fp.Clone(), ask)
if volume != 0 {
if bestAsk == nil || ask.LT(bestAsk) {
Expand Down Expand Up @@ -474,7 +474,9 @@ func (e *Engine) submit(active []*Pool, agg *types.Order, inner, outer *num.Uint
}

// partition takes the given price range and returns which pools have volume in that region, and
// divides that range into sub-levels where AMM boundaries end.
// divides that range into sub-levels where AMM boundaries end. Note that `outer` can be nil for the case
// where the incoming order is a market order (so we have no bound on the price), and we've already consumed
// all volume on the orderbook.
func (e *Engine) partition(agg *types.Order, inner, outer *num.Uint) ([]*Pool, []*num.Uint) {
active := []*Pool{}
bounds := map[string]*num.Uint{}
Expand Down Expand Up @@ -503,7 +505,7 @@ func (e *Engine) partition(agg *types.Order, inner, outer *num.Uint) ([]*Pool, [
// This is because to get the BUY volume an AMM has at price P, we need to calculate the difference
// in its position between prices P -> P + 1. For SELL volume its the other way around and we
// need the difference in position from P - 1 -> P.
if inner.EQ(outer) {
if inner != nil && outer != nil && inner.EQ(outer) {
if agg.Side == types.SideSell {
outer = num.UintZero().Add(outer, e.oneTick)
} else {
Expand Down
Loading

0 comments on commit c89ec09

Please sign in to comment.