Skip to content

Commit

Permalink
Forwarder: publish interactions to AppSync
Browse files Browse the repository at this point in the history
  • Loading branch information
asiaziola committed May 15, 2024
1 parent 3d1f124 commit b0568ab
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 3 deletions.
20 changes: 18 additions & 2 deletions src/forward/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,16 @@ func NewController(config *config.Config) (self *Controller, err error) {
WithInputChannel(fetcher.Output).
WithInputChannel(interactionStreamer.Output)

duplicator := task.NewDuplicator[*Payload](config, "interaction-duplicator").
WithOutputChannels(2, 0).
WithInputChannel(joiner.Output)

// Publish to all redis instances
redisMapper := redisMapper(config).
WithInputChannel(joiner.Output)
WithInputChannel(duplicator.NextChannel())

appSyncMapper := appSyncMapper(config).
WithInputChannel(duplicator.NextChannel())

watched := func() *task.Task {
redisDuplicator := task.NewDuplicator[*model.InteractionNotification](config, "redis-duplicator").
Expand All @@ -71,9 +78,18 @@ func NewController(config *config.Config) (self *Controller, err error) {
redisPublishers = append(redisPublishers, redisPublisher.Task)
}

// Publish to AppSync
appSyncPublisher := publisher.NewAppSyncPublisher[*model.InteractionNotification](config, "interaction-appsync-publisher").
WithChannelName(config.Forwarder.PublisherAppSyncChannelName).
WithMonitor(monitor).
WithInputChannel(appSyncMapper.Output)

return task.NewTask(config, "watched").
WithSubtask(duplicator.Task).
WithSubtask(redisDuplicator.Task).
WithSubtaskSlice(redisPublishers)
WithSubtaskSlice(redisPublishers).
WithSubtask(appSyncPublisher.Task).
WithSubtask(appSyncMapper.Task)
}

watchdog := task.NewWatchdog(config).
Expand Down
32 changes: 32 additions & 0 deletions src/forward/redis-mapper.go → src/forward/mappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,35 @@ func redisMapper(config *config.Config) (self *task.Mapper[*Payload, *model.Inte
return nil
})
}

func appSyncMapper(config *config.Config) (self *task.Mapper[*Payload, *model.InteractionNotification]) {
return task.NewMapper[*Payload, *model.InteractionNotification](config, "map-appsync-notification").
WithWorkerPool(1, config.Contract.StoreBatchSize).
WithProcessFunc(func(data *Payload, out chan *model.InteractionNotification) (err error) {
// Neglect empty messages
if data.Interaction == nil {
return nil
}

self.Log.WithField("contract_id", data.Interaction.ContractId).Trace("Publishing interaction to AppSync")

interactionStr, err := data.Interaction.Interaction.MarshalJSON()
if err != nil {
self.Log.WithField("contract_id", data.Interaction.ContractId).Warn("Failed to marshal interaction")
return err
}

select {
case <-self.Ctx.Done():
case out <- &model.InteractionNotification{
ContractTxId: data.Interaction.ContractId,
Test: false,
Source: "warp-gw",
Interaction: string(interactionStr),
SrcTxId: data.SrcTxId,
}:
}

return nil
})
}
6 changes: 5 additions & 1 deletion src/utils/config/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type Forwarder struct {
// Interactions are saved to this Redis channel
PublisherRedisChannelName string

// Interactions are saved to this AppSync channel
PublisherAppSyncChannelName string

// How long to wait before after receiving a new block height before sending L1 interactions
// This delay ensures sequencer finishes handling requests in time
HeightDelay time.Duration
Expand All @@ -40,7 +43,8 @@ func setForwarderDefaults() {
viper.SetDefault("Forwarder.FetcherLastSortKeySettingBlockHeight", "0")
viper.SetDefault("Forwarder.FetcherLastSortKeySettingEnabled", "true")
viper.SetDefault("Forwarder.FetcherBatchSize", "10")
viper.SetDefault("Forwarder.PublisherRedisChannelName", "contracts")
viper.SetDefault("Forwarder.PublisherRedisChannelName", "interactions")
viper.SetDefault("Forwarder.PublisherAppSyncChannelName", "interactions")
viper.SetDefault("Forwarder.HeightDelay", "1s")
viper.SetDefault("Forwarder.ArweaveFetcherQueueSize", "3000")
viper.SetDefault("Forwarder.ArweaveFetcherBlockSendTimeout", "300s")
Expand Down

0 comments on commit b0568ab

Please sign in to comment.