Skip to content

Commit

Permalink
fix(blobstorage): blockId determination by timestamp (#16614)
Browse files Browse the repository at this point in the history
Co-authored-by: jeff <113397187+cyberhorsey@users.noreply.github.com>
Co-authored-by: David <david@taiko.xyz>
  • Loading branch information
3 people committed Apr 3, 2024
1 parent 5f75dd8 commit eba19c7
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 33 deletions.
2 changes: 1 addition & 1 deletion packages/blobstorage/.default.indexer.env
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
RPC_URL=wss://l1ws.internal.taiko.xyz
BEACON_URL=https://l1beacon.internal.taiko.xyz/eth/v1/beacon/blob_sidecars/
BEACON_URL=https://l1beacon.internal.taiko.xyz
TAIKO_L1_CONTRACT_ADDRESS=0xC069c3d2a9f2479F559AD34485698ad5199C555f
DATABASE_HOST=localhost
DATABASE_PORT=3306
Expand Down
135 changes: 135 additions & 0 deletions packages/blobstorage/indexer/beaconclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package indexer

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"time"
)

var (
blobURL = "eth/v1/beacon/blob_sidecars"
genesisURL = "eth/v1/beacon/genesis"
configURL = "eth/v1/config/spec"
)

type GetSpecResponse struct {
Data map[string]string `json:"data"`
}

type GenesisResponse struct {
Data struct {
GenesisTime string `json:"genesis_time"`
} `json:"data"`
}

type BeaconClient struct {
*http.Client
beaconURL string
genesisTime uint64
secondsPerSlot uint64
}

type BlobsResponse struct {
Data []struct {
Index string `json:"index"`
Blob string `json:"blob"`
KzgCommitment string `json:"kzg_commitment"`
KzgCommitmentHex []byte `json:"-"`
} `json:"data"`
}

func NewBeaconClient(cfg *Config, timeout time.Duration) (*BeaconClient, error) {
httpClient := &http.Client{Timeout: timeout}

// Get the genesis time.
url := fmt.Sprintf("%s/%s", cfg.BeaconURL, genesisURL)
genesisTime, err := getGenesisTime(url, httpClient)
if err != nil {
return nil, fmt.Errorf("failed to get genesis time: %v", err)
}

url = fmt.Sprintf("%s/%s", cfg.BeaconURL, configURL)
// Get the seconds per slot.
secondsPerSlot, err := getConfigValue(url, "SECONDS_PER_SLOT", httpClient)
if err != nil {
return nil, fmt.Errorf("failed to get SECONDS_PER_SLOT: %v", err)
}

secondsPerSlotUint64, err := strconv.ParseUint(secondsPerSlot, 10, 64)
if err != nil {
return nil, err
}

return &BeaconClient{
beaconURL: cfg.BeaconURL,
genesisTime: genesisTime,
secondsPerSlot: secondsPerSlotUint64,
}, nil
}

func getGenesisTime(endpoint string, client *http.Client) (uint64, error) {
res, err := client.Get(endpoint)
if err != nil {
return 0, err
}
defer res.Body.Close()

var genesisDetail GenesisResponse
if err := json.NewDecoder(res.Body).Decode(&genesisDetail); err != nil {
return 0, err
}

return strconv.ParseUint(genesisDetail.Data.GenesisTime, 10, 64)
}

func getConfigValue(endpoint, key string, client *http.Client) (string, error) {
res, err := client.Get(endpoint)
if err != nil {
return "", err
}
defer res.Body.Close()

var spec GetSpecResponse
if err := json.NewDecoder(res.Body).Decode(&spec); err != nil {
return "", err
}

value, ok := spec.Data[key]
if !ok {
return "", fmt.Errorf("key %s not found in config spec", key)
}

return value, nil
}

func (c *BeaconClient) getBlobs(ctx context.Context, blockID uint64) (*BlobsResponse, error) {
url := fmt.Sprintf("%s/%s/%v", c.beaconURL, blobURL, blockID)
response, err := http.Get(url)
if err != nil {
return nil, err
}
defer response.Body.Close()

body, err := io.ReadAll(response.Body)
if err != nil {
return nil, err
}

var responseData BlobsResponse
if err := json.Unmarshal(body, &responseData); err != nil {
return nil, err
}

return &responseData, nil
}

func (c *BeaconClient) timeToSlot(timestamp uint64) (uint64, error) {
if timestamp < c.genesisTime {
return 0, fmt.Errorf("provided timestamp (%v) precedes genesis time (%v)", timestamp, c.genesisTime)
}
return (timestamp - c.genesisTime) / c.secondsPerSlot, nil
}
47 changes: 16 additions & 31 deletions packages/blobstorage/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,8 @@ package indexer
import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"io"
"math/big"
"net/http"
"sync"
"time"

Expand All @@ -27,18 +23,8 @@ import (
"github.com/taikoxyz/taiko-mono/packages/blobstorage/pkg/utils"
)

type Response struct {
Data []struct {
Index string `json:"index"`
Blob string `json:"blob"`
KzgCommitment string `json:"kzg_commitment"`
KzgCommitmentHex []byte `json:"-"`
} `json:"data"`
}

// Indexer struct holds the configuration and state for the Ethereum chain listener.
type Indexer struct {
beaconURL string
ethClient *ethclient.Client
startHeight *uint64
taikoL1 *taikol1.TaikoL1
Expand All @@ -48,6 +34,7 @@ type Indexer struct {
wg *sync.WaitGroup
ctx context.Context
latestIndexedBlockNumber uint64
beaconClient *BeaconClient
}

func (i *Indexer) InitFromCli(ctx context.Context, c *cli.Context) error {
Expand Down Expand Up @@ -81,16 +68,22 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) (err error) {
return err
}

l1BeaconClient, err := NewBeaconClient(cfg, utils.DefaultTimeout)
if err != nil {
return err
}

i.blobHashRepo = blobHashRepo
i.ethClient = client
i.beaconURL = cfg.BeaconURL
i.taikoL1 = taikoL1
i.startHeight = cfg.StartingBlockID
i.db = db
i.wg = &sync.WaitGroup{}
i.ctx = ctx
i.cfg = cfg

i.beaconClient = l1BeaconClient

return nil
}

Expand Down Expand Up @@ -291,31 +284,23 @@ func (i *Indexer) checkReorg(ctx context.Context, event *taikol1.TaikoL1BlockPro
}

func (i *Indexer) storeBlob(ctx context.Context, event *taikol1.TaikoL1BlockProposed) error {
slog.Info("blockProposed event found", "blockID", event.Meta.L1Height+1, "emittedIn", event.Raw.BlockNumber, "blobUsed", event.Meta.BlobUsed)

if !event.Meta.BlobUsed {
return nil
}

blockID := event.Meta.L1Height + 1
url := fmt.Sprintf("%s/%v", i.beaconURL, blockID)
response, err := http.Get(url)
blockID, err := i.beaconClient.timeToSlot(event.Meta.Timestamp)
if err != nil {
return err
}
defer response.Body.Close()

body, err := io.ReadAll(response.Body)
if err != nil {
return err
slog.Info("blockProposed event found", "blockID", blockID, "emittedIn", event.Raw.BlockNumber, "blobUsed", event.Meta.BlobUsed)

if !event.Meta.BlobUsed {
return nil
}

var responseData Response
if err := json.Unmarshal(body, &responseData); err != nil {
blobsResponse, err := i.beaconClient.getBlobs(ctx, blockID)
if err != nil {
return err
}

for _, data := range responseData.Data {
for _, data := range blobsResponse.Data {
data.KzgCommitmentHex = common.FromHex(data.KzgCommitment)

metaBlobHash := common.BytesToHash(event.Meta.BlobHash[:])
Expand Down
10 changes: 9 additions & 1 deletion packages/blobstorage/pkg/utils/utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
package utils

import "golang.org/x/exp/constraints"
import (
"time"

"golang.org/x/exp/constraints"
)

const (
DefaultTimeout = 1 * time.Minute
)

// Min return the minimum value of two integers.
func Min[T constraints.Integer](a, b T) T {
Expand Down

0 comments on commit eba19c7

Please sign in to comment.