Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 81 additions & 44 deletions aggregator/internal/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,33 @@ type Aggregator struct {
taskSubscriber event.Subscription
blsAggregationService blsagg.BlsAggregationService

// Using map here instead of slice to allow for easy lookup of tasks, when aggregator is restarting,
// its easier to get the task from the map instead of filling the slice again
tasks map[uint32][32]byte
// Mutex to protect the tasks map
tasksMutex *sync.Mutex
// BLS Signature Service returns an Index
// Since our ID is not an idx, we build this cache
// Note: In case of a reboot, this doesn't need to be loaded,
// and can start from zero
batchesRootByIdx map[uint32][32]byte
batchesRootByIdxMutex *sync.Mutex

// This is the counterpart,
// to use when we have the batch but not the index
// Note: In case of a reboot, this doesn't need to be loaded,
// and can start from zero
batchesIdxByRoot map[[32]byte]uint32
batchesIdxByRootMutex *sync.Mutex

// This task index is to communicate with the local BLS
// Service.
// Note: In case of a reboot it can start from 0 again
nextBatchIndex uint32
nextBatchIndexMutex *sync.Mutex

OperatorTaskResponses map[[32]byte]*TaskResponsesWithStatus
// Mutex to protect the taskResponses map
taskResponsesMutex *sync.Mutex
logger logging.Logger

// FIXME(marian): This is a hacky workaround to send some sensible index to the BLS aggregation service,
// which needs a task index.
taskCounter uint32
taskCounterMutex *sync.Mutex
metricsReg *prometheus.Registry
metrics *metrics.Metrics
batchesResponseMutex *sync.Mutex
logger logging.Logger

metricsReg *prometheus.Registry
metrics *metrics.Metrics
}

func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error) {
Expand All @@ -79,7 +89,9 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
return nil, err
}

tasks := make(map[uint32][32]byte)
batchesRootByIdx := make(map[uint32][32]byte)
batchesIdxByRoot := make(map[[32]byte]uint32)

operatorTaskResponses := make(map[[32]byte]*TaskResponsesWithStatus, 0)

chainioConfig := sdkclients.BuildAllConfig{
Expand All @@ -104,27 +116,32 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller(avsReader.AvsRegistryReader, operatorPubkeysService, logger)
blsAggregationService := blsagg.NewBlsAggregatorService(avsRegistryService, logger)

// Explicitly initializing this value just in case.
taskCounter := uint32(0)

// Metrics
reg := prometheus.NewRegistry()
aggregatorMetrics := metrics.NewMetrics(aggregatorConfig.Aggregator.MetricsIpPortAddress, reg, logger)

nextBatchIndex := uint32(0)

aggregator := Aggregator{
AggregatorConfig: &aggregatorConfig,
avsReader: avsReader,
avsSubscriber: avsSubscriber,
avsWriter: avsWriter,
NewBatchChan: newBatchChan,
tasks: tasks,
tasksMutex: &sync.Mutex{},
AggregatorConfig: &aggregatorConfig,
avsReader: avsReader,
avsSubscriber: avsSubscriber,
avsWriter: avsWriter,
NewBatchChan: newBatchChan,

batchesRootByIdx: batchesRootByIdx,
batchesRootByIdxMutex: &sync.Mutex{},

batchesIdxByRoot: batchesIdxByRoot,
batchesIdxByRootMutex: &sync.Mutex{},

nextBatchIndex: nextBatchIndex,
nextBatchIndexMutex: &sync.Mutex{},

OperatorTaskResponses: operatorTaskResponses,
taskResponsesMutex: &sync.Mutex{},
batchesResponseMutex: &sync.Mutex{},
blsAggregationService: blsAggregationService,
logger: logger,
taskCounter: taskCounter,
taskCounterMutex: &sync.Mutex{},
metricsReg: reg,
metrics: aggregatorMetrics,
}
Expand Down Expand Up @@ -193,9 +210,9 @@ func (agg *Aggregator) sendAggregatedResponseToContract(blsAggServiceResp blsagg
"taskIndex", blsAggServiceResp.TaskIndex,
)

agg.tasksMutex.Lock()
batchMerkleRoot := agg.tasks[blsAggServiceResp.TaskIndex]
agg.tasksMutex.Unlock()
agg.batchesRootByIdxMutex.Lock()
batchMerkleRoot := agg.batchesRootByIdx[blsAggServiceResp.TaskIndex]
agg.batchesRootByIdxMutex.Unlock()

_, err := agg.avsWriter.SendAggregatedResponse(context.Background(), batchMerkleRoot, nonSignerStakesAndSignature)
if err != nil {
Expand All @@ -206,31 +223,51 @@ func (agg *Aggregator) sendAggregatedResponseToContract(blsAggServiceResp blsagg
func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, taskCreatedBlock uint32) {
agg.AggregatorConfig.BaseConfig.Logger.Info("Adding new task", "Batch merkle root", batchMerkleRoot)

agg.taskCounterMutex.Lock()
agg.taskCounter++
agg.taskCounterMutex.Unlock()
agg.nextBatchIndexMutex.Lock()
batchIndex := agg.nextBatchIndex
agg.nextBatchIndexMutex.Unlock()

agg.tasksMutex.Lock()
if _, ok := agg.tasks[agg.taskCounter]; ok {
agg.logger.Warn("Task already exists", "taskIndex", agg.taskCounter)
agg.tasksMutex.Unlock()
// --- UPDATE BATCH - INDEX CACHES ---

agg.batchesRootByIdxMutex.Lock()
if _, ok := agg.batchesRootByIdx[batchIndex]; ok {
agg.logger.Warn("Batch already exists", "batchIndex", batchIndex, "batchRoot", batchMerkleRoot)
agg.batchesRootByIdxMutex.Unlock()
return
}
agg.batchesRootByIdx[batchIndex] = batchMerkleRoot
agg.batchesRootByIdxMutex.Unlock()

agg.batchesIdxByRootMutex.Lock()
// This shouldn't happen, since both maps are updated together
if _, ok := agg.batchesIdxByRoot[batchMerkleRoot]; ok {
agg.logger.Warn("Batch already exists", "batchIndex", batchIndex, "batchRoot", batchMerkleRoot)
agg.batchesRootByIdxMutex.Unlock()
return
}
agg.tasks[agg.taskCounter] = batchMerkleRoot
agg.tasksMutex.Unlock()
agg.batchesIdxByRoot[batchMerkleRoot] = batchIndex
agg.batchesIdxByRootMutex.Unlock()

// --- UPDATE TASK RESPONSES ---

agg.taskResponsesMutex.Lock()
agg.batchesResponseMutex.Lock()
agg.OperatorTaskResponses[batchMerkleRoot] = &TaskResponsesWithStatus{
taskResponses: make([]types.SignedTaskResponse, 0),
submittedToEthereum: false,
}
agg.taskResponsesMutex.Unlock()
agg.batchesResponseMutex.Unlock()

quorumNums := eigentypes.QuorumNums{eigentypes.QuorumNum(QUORUM_NUMBER)}
quorumThresholdPercentages := eigentypes.QuorumThresholdPercentages{eigentypes.QuorumThresholdPercentage(QUORUM_THRESHOLD)}

// FIXME(marian): Hardcoded value of timeToExpiry to 100s. How should be get this value?
err := agg.blsAggregationService.InitializeNewTask(agg.taskCounter, taskCreatedBlock, quorumNums, quorumThresholdPercentages, 100*time.Second)
err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, 100*time.Second)

// --- INCREASE BATCH INDEX ---

agg.nextBatchIndexMutex.Lock()
agg.nextBatchIndex = agg.nextBatchIndex + 1
agg.nextBatchIndexMutex.Unlock()

// FIXME(marian): When this errors, should we retry initializing new task? Logging fatal for now.
if err != nil {
agg.logger.Fatalf("BLS aggregation service error when initializing new task: %s", err)
Expand Down
11 changes: 5 additions & 6 deletions aggregator/internal/pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,16 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponse(signedTaskResponse *typ
return fmt.Errorf("task with batch merkle root %d does not exist", signedTaskResponse.BatchMerkleRoot)
}

// TODO: Check if the task response is valid
agg.taskResponsesMutex.Lock()
agg.batchesResponseMutex.Lock()
taskResponses := agg.OperatorTaskResponses[signedTaskResponse.BatchMerkleRoot]
taskResponses.taskResponses = append(
agg.OperatorTaskResponses[signedTaskResponse.BatchMerkleRoot].taskResponses,
*signedTaskResponse)
agg.taskResponsesMutex.Unlock()
agg.batchesResponseMutex.Unlock()

agg.taskCounterMutex.Lock()
taskIndex := agg.taskCounter
agg.taskCounterMutex.Unlock()
agg.batchesIdxByRootMutex.Lock()
taskIndex := agg.batchesIdxByRoot[signedTaskResponse.BatchMerkleRoot]
agg.batchesIdxByRootMutex.Unlock()

err := agg.blsAggregationService.ProcessNewSignature(
context.Background(), taskIndex, signedTaskResponse.BatchMerkleRoot,
Expand Down
3 changes: 2 additions & 1 deletion batcher/client/send_burst_tasks.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#!/bin/bash

counter=1
burst=5
burst=8

if [ -z "$1" ]; then
echo "Using default burst value: 10"
elif ! [[ "$1" =~ ^[0-9]+$ ]]; then
Expand Down
2 changes: 1 addition & 1 deletion contracts/bindings/AlignedLayerServiceManager/binding.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion contracts/bindings/ERC20Mock/binding.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion contracts/src/core/AlignedLayerServiceManager.sol
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// SPDX-License-Identifier: UNLICENSED
pragma solidity ^0.8.9;
pragma solidity =0.8.12;

import {Pausable} from "eigenlayer-core/contracts/permissions/Pausable.sol";
import {IPauserRegistry} from "eigenlayer-core/contracts/interfaces/IPauserRegistry.sol";
Expand Down
2 changes: 1 addition & 1 deletion contracts/src/core/ERC20Mock.sol
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: MIT
// OpenZeppelin Contracts (last updated v4.8.0) (token/ERC20/ERC20.sol)

pragma solidity ^0.8.9;
pragma solidity =0.8.12;
Comment thread
entropidelic marked this conversation as resolved.

import "@openzeppelin/contracts/interfaces/IERC20.sol";
import "@openzeppelin/contracts/utils/Context.sol";
Expand Down
7 changes: 3 additions & 4 deletions core/types/signed_task_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import (
)

type SignedTaskResponse struct {
BatchMerkleRoot [32]byte
TaskCreatedBlock uint32
BlsSignature bls.Signature
OperatorId eigentypes.OperatorId
BatchMerkleRoot [32]byte
BlsSignature bls.Signature
OperatorId eigentypes.OperatorId
}
12 changes: 6 additions & 6 deletions operator/pkg/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"encoding/binary"
"crypto/ecdsa"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/yetanotherco/aligned_layer/metrics"
"log"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/yetanotherco/aligned_layer/metrics"

"github.com/yetanotherco/aligned_layer/operator/sp1"
"github.com/yetanotherco/aligned_layer/operator/halo2ipa"

Expand Down Expand Up @@ -139,10 +140,9 @@ func (o *Operator) Start(ctx context.Context) error {
responseSignature := o.SignTaskResponse(newBatchLog.BatchMerkleRoot)

signedTaskResponse := types.SignedTaskResponse{
BatchMerkleRoot: newBatchLog.BatchMerkleRoot,
TaskCreatedBlock: newBatchLog.TaskCreatedBlock,
BlsSignature: *responseSignature,
OperatorId: o.OperatorId,
BatchMerkleRoot: newBatchLog.BatchMerkleRoot,
BlsSignature: *responseSignature,
OperatorId: o.OperatorId,
}

o.Logger.Infof("Signed hash: %+v", *responseSignature)
Expand Down