Skip to content

Commit

Permalink
near/timestamps: timestamp simplifcation
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpsiegel committed Sep 19, 2022
1 parent cbce1e7 commit a47422c
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 17 deletions.
2 changes: 1 addition & 1 deletion node/cmd/guardiand/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,7 @@ func runNode(cmd *cobra.Command, args []string) {
}
if *nearRPC != "" {
if err := supervisor.Run(ctx, "nearwatch",
near.NewWatcher(*nearRPC, *nearContract, lockC, chainObsvReqC[vaa.ChainIDNear]).Run); err != nil {
near.NewWatcher(*nearRPC, *nearContract, lockC, chainObsvReqC[vaa.ChainIDNear], !(*unsafeDevMode || *testnetMode)).Run); err != nil {
return err
}
}
Expand Down
73 changes: 57 additions & 16 deletions node/pkg/near/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"io/ioutil"
"net/http"
Expand All @@ -28,6 +29,8 @@ import (
type (
// Watcher is responsible for looking over Near blockchain and reporting new transactions to the wormhole contract
Watcher struct {
mainnet bool

nearRPC string
wormholeContract string

Expand All @@ -47,7 +50,6 @@ type (

pendingMessage struct {
height uint64
ts uint64
}
)

Expand All @@ -70,6 +72,7 @@ func NewWatcher(
wormholeContract string,
lockEvents chan *common.MessagePublication,
obsvReqC chan *gossipv1.ObservationRequest,
mainnet bool,
) *Watcher {
return &Watcher{
nearRPC: nearRPC,
Expand All @@ -79,6 +82,7 @@ func NewWatcher(
next_round: 0,
final_round: 0,
pending: map[pendingKey]*pendingMessage{},
mainnet: mainnet,
}
}

Expand All @@ -94,6 +98,22 @@ func (e *Watcher) getBlock(block uint64) ([]byte, error) {
return ioutil.ReadAll(resp.Body)
}

func (e *Watcher) getBlockHash(block_id string) ([]byte, error) {
s := fmt.Sprintf(`{"id": "dontcare", "jsonrpc": "2.0", "method": "block", "params": {"block_id": "%s"}}`, block_id)
resp, err := http.Post(e.nearRPC, "application/json", bytes.NewBuffer([]byte(s)))

if err != nil {
// TODO: We should look at the specifics of the error before we try twice
resp, err = http.Post(e.nearRPC, "application/json", bytes.NewBuffer([]byte(s)))
if err != nil {
return nil, err
}
}

defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}

func (e *Watcher) getFinalBlock() ([]byte, error) {
s := `{"id": "dontcare", "jsonrpc": "2.0", "method": "block", "params": {"finality": "final"}}`
resp, err := http.Post(e.nearRPC, "application/json", bytes.NewBuffer([]byte(s)))
Expand Down Expand Up @@ -132,7 +152,7 @@ func (e *Watcher) getTxStatus(logger *zap.Logger, tx string, src string) ([]byte
return ioutil.ReadAll(resp.Body)
}

func (e *Watcher) parseStatus(logger *zap.Logger, t []byte, hash string, ts uint64) error {
func (e *Watcher) parseStatus(logger *zap.Logger, t []byte, hash string) error {
outcomes := gjson.ParseBytes(t).Get("result.receipts_outcome")

if !outcomes.Exists() {
Expand All @@ -155,6 +175,11 @@ func (e *Watcher) parseStatus(logger *zap.Logger, t []byte, hash string, ts uint
if !l.Exists() {
continue
}
block_hash := o.Get("block_hash")
if !block_hash.Exists() {
logger.Error("block_hash key not found")
continue
}
for _, log := range l.Array() {
event := log.String()
if !strings.HasPrefix(event, "EVENT_JSON:") {
Expand Down Expand Up @@ -204,6 +229,29 @@ func (e *Watcher) parseStatus(logger *zap.Logger, t []byte, hash string, ts uint
return err
}

block_hash_str := block_hash.String()

txBlock, err := e.getBlockHash(block_hash_str)
if err != nil {
return err
}
body := gjson.ParseBytes(txBlock)
if !body.Exists() {
return errors.New("block parse error")
}
ts_nanosec := body.Get("result.header.timestamp")
if !ts_nanosec.Exists() {
return errors.New("block parse error, missing timestamp")
}
ts := uint64(ts_nanosec.Uint()) / 1000000000

if e.mainnet {
height := body.Get("result.header.height")
if height.Exists() && height.Uint() < 74473147 {
return errors.New("test missing observe")
}
}

observation := &common.MessagePublication{
TxHash: txHash,
Timestamp: time.Unix(int64(ts), 0),
Expand Down Expand Up @@ -236,17 +284,17 @@ func (e *Watcher) parseStatus(logger *zap.Logger, t []byte, hash string, ts uint
return nil
}

func (e *Watcher) inspectStatus(logger *zap.Logger, hash string, receiver_id string, ts uint64) error {
func (e *Watcher) inspectStatus(logger *zap.Logger, hash string, receiver_id string) error {
t, err := e.getTxStatus(logger, hash, receiver_id)

if err != nil {
return err
}

return e.parseStatus(logger, t, hash, ts)
return e.parseStatus(logger, t, hash)
}

func (e *Watcher) lastBlock(logger *zap.Logger, hash string, receiver_id string, ts uint64) ([]byte, uint64, error) {
func (e *Watcher) lastBlock(logger *zap.Logger, hash string, receiver_id string) ([]byte, uint64, error) {
t, err := e.getTxStatus(logger, hash, receiver_id)

if err != nil {
Expand Down Expand Up @@ -320,12 +368,6 @@ func (e *Watcher) inspectBody(logger *zap.Logger, block uint64, body gjson.Resul
return nil
}

v := body.Get("result.header.timestamp")
if !v.Exists() {
return nil
}
ts := uint64(v.Uint()) / 1000000000

for _, name := range result.Array() {
chunk, err := e.getChunk(name.String())
if err != nil {
Expand All @@ -343,15 +385,15 @@ func (e *Watcher) inspectBody(logger *zap.Logger, block uint64, body gjson.Resul
continue
}

t, round, err := e.lastBlock(logger, hash.String(), receiver_id.String(), ts)
t, round, err := e.lastBlock(logger, hash.String(), receiver_id.String())
if err != nil {
return err
}
if round != 0 {

if round <= e.final_round {
logger.Info("parseStatus direct", zap.Uint64("block.height", round), zap.Uint64("e.final_round", e.final_round))
err := e.parseStatus(logger, t, hash.String(), ts)
err := e.parseStatus(logger, t, hash.String())
if err != nil {
return err
}
Expand All @@ -367,7 +409,6 @@ func (e *Watcher) inspectBody(logger *zap.Logger, block uint64, body gjson.Resul
e.pendingMu.Lock()
e.pending[key] = &pendingMessage{
height: round,
ts: ts,
}
e.pendingMu.Unlock()
}
Expand Down Expand Up @@ -402,7 +443,7 @@ func (e *Watcher) Run(ctx context.Context) error {

logger.Info("Received obsv request", zap.String("tx_hash", txHash))

err := e.inspectStatus(logger, txHash, e.wormholeContract, 0)
err := e.inspectStatus(logger, txHash, e.wormholeContract)
if err != nil {
logger.Error(fmt.Sprintf("near obsvReqC: %s", err.Error()))
}
Expand Down Expand Up @@ -451,7 +492,7 @@ func (e *Watcher) Run(ctx context.Context) error {
zap.String("key.hash", key.hash),
)

err := e.inspectStatus(logger, key.hash, e.wormholeContract, bLock.ts)
err := e.inspectStatus(logger, key.hash, e.wormholeContract)
delete(e.pending, key)

if err != nil {
Expand Down

0 comments on commit a47422c

Please sign in to comment.