Skip to content

Commit

Permalink
sommelier, writing points in chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
Tadeuchi committed Apr 16, 2024
1 parent e53cf49 commit 892482d
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 46 deletions.
4 changes: 4 additions & 0 deletions src/utils/config/warpy_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ type WarpySyncer struct {

// Timeout for HTTP requests
WriterHttpRequestTimeout time.Duration

// Writer splits interaction into chunks with max size of
WriterInteractionChunkSize int
}

func setWarpySyncerDefaults() {
Expand Down Expand Up @@ -158,4 +161,5 @@ func setWarpySyncerDefaults() {
viper.SetDefault("WarpySyncer.PollerSommelierSecondsForSelect", 3600)
viper.SetDefault("WarpySyncer.WriterBackoffInterval", "3s")
viper.SetDefault("WarpySyncer.WriterHttpRequestTimeout", "30s")
viper.SetDefault("WarpySyncer.WriterInteractionChunkSize", 50)
}
21 changes: 11 additions & 10 deletions src/warpy_sync/poller_sommelier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ type PollerSommelier struct {
db *gorm.DB
monitor monitoring.Monitor

Output chan *InteractionPayload
Output chan *[]InteractionPayload

input chan uint64
}

func NewPollerSommelier(config *config.Config) (self *PollerSommelier) {
self = new(PollerSommelier)

self.Output = make(chan *InteractionPayload, config.WarpySyncer.PollerSommelierChannelBufferLength)
self.Output = make(chan *[]InteractionPayload, config.WarpySyncer.PollerSommelierChannelBufferLength)

self.Task = task.NewTask(config, "poller_sommelier").
WithSubtaskFunc(self.handleNew).
Expand Down Expand Up @@ -85,20 +85,21 @@ func (self *PollerSommelier) handleNew() (err error) {
} else {
self.Log.Debug("No new assets sum found")
}
interactions := make([]InteractionPayload, len(AssetsSums))

for _, sum := range AssetsSums {

for i, sum := range AssetsSums {
self.monitor.GetReport().WarpySyncer.State.PollerSommelierAssetsFromSelects.Inc()

select {
case <-self.Ctx.Done():
return
case self.Output <- &InteractionPayload{
interactions[i] = InteractionPayload{
FromAddress: sum.FromAddress,
Points: int64(sum.Sum * float64(self.Config.WarpySyncer.PollerSommelierPointsBase)),
}:
}
}
select {
case <-self.Ctx.Done():
return
case self.Output <- &interactions:
}
}

return
}
8 changes: 4 additions & 4 deletions src/warpy_sync/syncer_delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type SyncerDelta struct {
monitor monitoring.Monitor
input chan *BlockInfoPayload
Output chan *LastSyncedBlockPayload
OutputInteractionPayload chan *InteractionPayload
OutputInteractionPayload chan *[]InteractionPayload
}

// This task receives block info in the input channel, iterate through all of the block's transactions in order to check if any of it contains
Expand All @@ -29,7 +29,7 @@ func NewSyncerDelta(config *config.Config) (self *SyncerDelta) {
self = new(SyncerDelta)

self.Output = make(chan *LastSyncedBlockPayload)
self.OutputInteractionPayload = make(chan *InteractionPayload)
self.OutputInteractionPayload = make(chan *[]InteractionPayload)

self.Task = task.NewTask(config, "syncer").
WithSubtaskFunc(self.run).
Expand Down Expand Up @@ -117,10 +117,10 @@ func (self *SyncerDelta) checkTxAndWriteInteraction(tx *types.Transaction, block
return err
}

self.OutputInteractionPayload <- &InteractionPayload{
self.OutputInteractionPayload <- &[]InteractionPayload{{
FromAddress: sender,
Points: self.Config.WarpySyncer.SyncerDeltaInteractionPoints,
}
}}
}

return err
Expand Down
7 changes: 4 additions & 3 deletions src/warpy_sync/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ func (input Input) MarshalJSON() ([]byte, error) {
}

type Member struct {
Id string `json:"id"`
Roles []string `json:"roles"`
TxId string `json:"txId"`
Id string `json:"id"`
Roles []string `json:"roles"`
TxId string `json:"txId"`
Points int64 `json:"points"`
}

type BlockInfoPayload struct {
Expand Down
115 changes: 86 additions & 29 deletions src/warpy_sync/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Writer struct {

monitor monitoring.Monitor

input <-chan *InteractionPayload
input <-chan *[]InteractionPayload
sequencerClient *sequencer.Client
httpClient *resty.Client
}
Expand Down Expand Up @@ -47,20 +47,96 @@ func (self *Writer) WithSequencerClient(sequencerClient *sequencer.Client) *Writ
return self
}

func (self *Writer) WithInputChannel(v chan *InteractionPayload) *Writer {
func (self *Writer) WithInputChannel(v chan *[]InteractionPayload) *Writer {
self.input = v
return self
}

func (self *Writer) run() (err error) {
for interactionPayload := range self.input {
self.Log.WithField("from_address", interactionPayload.FromAddress).WithField("sum", interactionPayload.Points).Debug("Writer initialized")
err = self.writeInteraction(interactionPayload.FromAddress, interactionPayload.Points)
self.Log.Debug("Writer initialized")
err = self.writeInteraction(interactionPayload)
}
return
}

func (self *Writer) writeInteraction(fromAddress string, points int64) (err error) {
func (self *Writer) writeInteraction(payloads *[]InteractionPayload) (err error) {
chunkSize := self.Config.WarpySyncer.WriterInteractionChunkSize
if len(*payloads) == 0 {
self.Log.Debug("Interaction Payload slice is empty")
return
}

counter := 0
members := make([]Member, chunkSize)

for _, payload := range *payloads {
if payload.Points == 0 {
self.Log.WithField("from_address", payload.FromAddress).Debug("Skipping from address, points 0")
continue
}

roles, err := self.discordRoles(payload.FromAddress)
if err != nil {
self.Log.WithError(err).Error("Failed to get roles")
return err
}
if roles == nil {
self.Log.WithField("from_address", payload.FromAddress).Debug("Skipping address, not registered in warpy")
continue
}

members[counter] = Member{Id: payload.FromAddress, Roles: *roles, Points: payload.Points}
counter += 1
if counter >= chunkSize {
err = self.sendInteractionChunk(&members)
counter = 0
members = make([]Member, chunkSize)
}
if err != nil {
self.Log.WithError(err).Error("Failed to send interaction chunk")
return err
}
}
if len(members) > 0 {
err = self.sendInteractionChunk(&members)
}

return
}

func (self *Writer) sendInteractionChunk(members *[]Member) (err error) {

input := Input{
Function: "addPointsForAddress",
Points: 0,
AdminId: self.Config.WarpySyncer.SyncerInteractionAdminId,
Members: *members,
}

if len(*members) == 1 {
self.Log.WithField("from_address", (*members)[0].Id).
WithField("points", (*members)[0].Points).
Debug("Writing interaction to Warpy...")

} else {
self.Log.WithField("chunk_size", len(*members)).
WithField("points_default", 0).
Debug("Writing interaction to Warpy...")
}

interactionId, err := warpy.WriteInteractionToWarpy(
self.Ctx, self.Config.WarpySyncer.SyncerSigner, input, self.Config.WarpySyncer.SyncerContractId, self.Log, self.sequencerClient)
if err != nil {
return err
}

self.Log.WithField("interactionId", interactionId).Info("Interaction sent to Warpy")
self.monitor.GetReport().WarpySyncer.State.WriterInteractionsToWarpy.Inc()
return
}

func (self *Writer) discordRoles(fromAddress string) (roles *[]string, err error) {
err = task.NewRetry().
WithContext(self.Ctx).
// Retries infinitely until success
Expand All @@ -73,7 +149,7 @@ func (self *Writer) writeInteraction(fromAddress string, points int64) (err erro
}

self.monitor.GetReport().WarpySyncer.Errors.WriterFailures.Inc()
self.Log.WithError(err).WithField("from_address", fromAddress).WithField("points", points).
self.Log.WithError(err).WithField("from_address", fromAddress).
Warn("Could not process assets sum, retrying...")
return err
}).
Expand All @@ -93,7 +169,6 @@ func (self *Writer) writeInteraction(fromAddress string, points int64) (err erro
senderDiscordId := []model.SenderDiscordIdPayload{}
senderDiscordId = append(senderDiscordId, *senderDiscordIdPayload...)

roles := []string{}
senderRoles, err := warpy.GetSenderRoles(self.httpClient, self.Config.WarpySyncer.SyncerWarpyApiUrl, senderDiscordId[0].Key, self.Log)

if err != nil {
Expand All @@ -102,30 +177,12 @@ func (self *Writer) writeInteraction(fromAddress string, points int64) (err erro
}

if senderRoles != nil {
roles = append(roles, *senderRoles...)
}

if points == 0 {
return nil
}

input := Input{
Function: "addPointsForAddress",
Points: points,
AdminId: self.Config.WarpySyncer.SyncerInteractionAdminId,
Members: []Member{{Id: fromAddress, Roles: roles}},
}
self.Log.WithField("from_address", fromAddress).WithField("points", points).Debug("Writing interaction to Warpy...")
interactionId, err := warpy.WriteInteractionToWarpy(
self.Ctx, self.Config.WarpySyncer.SyncerSigner, input, self.Config.WarpySyncer.SyncerContractId, self.Log, self.sequencerClient)
if err != nil {
return err
roles = senderRoles
} else {
roles = &[]string{}
}
self.Log.WithField("interactionId", interactionId).Info("Interaction sent to Warpy")
self.monitor.GetReport().WarpySyncer.State.WriterInteractionsToWarpy.Inc()

return err
return nil
})

return
}

0 comments on commit 892482d

Please sign in to comment.