Skip to content
This repository was archived by the owner on Mar 14, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions core/services/ocr2/plugins/ccip/tokendata/http/http_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package http

import (
"context"
"io"
"net/http"
"time"

"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata"
)

type IHttpClient interface {
// Get issue a GET request to the given url and return the response body and status code.
Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, error)
}

type HttpClient struct {
}

func (s *HttpClient) Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, error) {
// Use a timeout to guard against attestation API hanging, causing observation timeout and failing to make any progress.
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
req, err := http.NewRequestWithContext(timeoutCtx, "GET", url, nil)
if err != nil {
return nil, http.StatusBadRequest, err
}
req.Header.Add("accept", "application/json")
res, err := http.DefaultClient.Do(req)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return nil, http.StatusRequestTimeout, tokendata.ErrTimeout
}
return nil, res.StatusCode, err
}
defer res.Body.Close()

// Explicitly signal if the API is being rate limited
if res.StatusCode == http.StatusTooManyRequests {
return nil, res.StatusCode, tokendata.ErrRateLimit
}

body, err := io.ReadAll(res.Body)
return body, res.StatusCode, err
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package http

import (
"context"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
usdcLatencyBuckets = []float64{
float64(10 * time.Millisecond),
float64(25 * time.Millisecond),
float64(50 * time.Millisecond),
float64(75 * time.Millisecond),
float64(100 * time.Millisecond),
float64(250 * time.Millisecond),
float64(500 * time.Millisecond),
float64(750 * time.Millisecond),
float64(1 * time.Second),
float64(2 * time.Second),
float64(3 * time.Second),
float64(4 * time.Second),
float64(5 * time.Second),
}
usdcClientHistogram = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "ccip_usdc_client_request_total",
Help: "Latency of calls to the USDC client",
Buckets: usdcLatencyBuckets,
}, []string{"status", "success"})
)

type ObservedIHttpClient struct {
IHttpClient
histogram *prometheus.HistogramVec
}

// NewObservedIHttpClient Create a new ObservedIHttpClient with the USDC client metric.
func NewObservedIHttpClient(origin IHttpClient) *ObservedIHttpClient {
return NewObservedIHttpClientWithMetric(origin, usdcClientHistogram)
}

func NewObservedIHttpClientWithMetric(origin IHttpClient, histogram *prometheus.HistogramVec) *ObservedIHttpClient {
return &ObservedIHttpClient{
IHttpClient: origin,
histogram: histogram,
}
}

func (o *ObservedIHttpClient) Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, error) {
return withObservedHttpClient(o.histogram, func() ([]byte, int, error) {
return o.IHttpClient.Get(ctx, url, timeout)
})
}

func withObservedHttpClient[T any](histogram *prometheus.HistogramVec, contract func() (T, int, error)) (T, int, error) {
contractExecutionStarted := time.Now()
value, status, err := contract()
histogram.
WithLabelValues(
strconv.FormatInt(int64(status), 10),
strconv.FormatBool(err == nil),
).
Observe(float64(time.Since(contractExecutionStarted)))
return value, status, err
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package observability

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks"
http2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/http"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/usdc"
)

type expected struct {
status string
result string
count int
}

func TestUSDCClientMonitoring(t *testing.T) {

tests := []struct {
name string
server *httptest.Server
requests int
expected []expected
}{
{
name: "success",
server: newSuccessServer(t),
requests: 5,
expected: []expected{
{"200", "true", 5},
{"429", "false", 0},
},
},
{
name: "rate_limited",
server: newRateLimitedServer(),
requests: 26,
expected: []expected{
{"200", "true", 0},
{"429", "false", 26},
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testMonitoring(t, test.name, test.server, test.requests, test.expected, logger.TestLogger(t))
})
}

}

func testMonitoring(t *testing.T, name string, server *httptest.Server, requests int, expected []expected, log logger.Logger) {
server.Start()
defer server.Close()
attestationURI, err := url.ParseRequestURI(server.URL)
require.NoError(t, err)

// Define test histogram (avoid side effects from other tests if using the real usdcHistogram).
histogram := promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "test_client_histogram_" + name,
Help: "Latency of calls to the USDC mock client",
Buckets: []float64{float64(250 * time.Millisecond), float64(1 * time.Second), float64(5 * time.Second)},
}, []string{"status", "success"})

// Mock USDC reader.
usdcReader := mocks.NewUSDCReader(t)
msgBody := []byte{0xb0, 0xd1}
usdcReader.On("GetLastUSDCMessagePriorToLogIndexInTx", mock.Anything, mock.Anything, mock.Anything).Return(msgBody, nil)

// Service with monitored http client.
observedHttpClient := http2.NewObservedIHttpClientWithMetric(&http2.HttpClient{}, histogram)
tokenDataReaderDefault := usdc.NewUSDCTokenDataReader(log, usdcReader, attestationURI, 0)
tokenDataReader := usdc.NewUSDCTokenDataReaderWithHttpClient(*tokenDataReaderDefault, observedHttpClient)
require.NotNil(t, tokenDataReader)

for i := 0; i < requests; i++ {
_, _ = tokenDataReader.ReadTokenData(context.Background(), internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{})
}

// Check that the metrics are updated as expected.
for _, e := range expected {
assert.Equal(t, e.count, counterFromHistogramByLabels(t, histogram, e.status, e.result))
}
}

func counterFromHistogramByLabels(t *testing.T, histogramVec *prometheus.HistogramVec, labels ...string) int {
observer, err := histogramVec.GetMetricWithLabelValues(labels...)
require.NoError(t, err)

metricCh := make(chan prometheus.Metric, 1)
observer.(prometheus.Histogram).Collect(metricCh)
close(metricCh)

metric := <-metricCh
pb := &io_prometheus_client.Metric{}
err = metric.Write(pb)
require.NoError(t, err)

return int(pb.GetHistogram().GetSampleCount())
}

func newSuccessServer(t *testing.T) *httptest.Server {
return httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
response := struct {
Status string `json:"status"`
Attestation string `json:"attestation"`
}{
Status: "complete",
Attestation: "720502893578a89a8a87982982ef781c18b193",
}
responseBytes, err := json.Marshal(response)
require.NoError(t, err)
_, err = w.Write(responseBytes)
require.NoError(t, err)
}))
}

func newRateLimitedServer() *httptest.Server {
return httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusTooManyRequests)
}))
}
45 changes: 14 additions & 31 deletions core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
Expand All @@ -19,6 +17,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/http"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)
Expand Down Expand Up @@ -67,6 +66,7 @@ func (m messageAndAttestation) Validate() error {
type TokenDataReader struct {
lggr logger.Logger
usdcReader ccipdata.USDCReader
httpClient http.IHttpClient
attestationApi *url.URL
attestationApiTimeout time.Duration
}
Expand All @@ -83,15 +83,25 @@ func NewUSDCTokenDataReader(lggr logger.Logger, usdcReader ccipdata.USDCReader,
if usdcAttestationApiTimeoutSeconds == 0 {
timeout = defaultAttestationTimeout
}

return &TokenDataReader{
lggr: lggr,
usdcReader: usdcReader,
httpClient: http.NewObservedIHttpClient(&http.HttpClient{}),
attestationApi: usdcAttestationApi,
attestationApiTimeout: timeout,
}
}

func NewUSDCTokenDataReaderWithHttpClient(origin TokenDataReader, httpClient http.IHttpClient) *TokenDataReader {
return &TokenDataReader{
lggr: origin.lggr,
usdcReader: origin.usdcReader,
httpClient: httpClient,
attestationApi: origin.attestationApi,
attestationApiTimeout: origin.attestationApiTimeout,
}
}

func (s *TokenDataReader) ReadTokenData(ctx context.Context, msg internal.EVM2EVMOnRampCCIPSendRequestedWithMeta) (messageAndAttestation []byte, err error) {
messageBody, err := s.getUSDCMessageBody(ctx, msg)
if err != nil {
Expand Down Expand Up @@ -144,45 +154,18 @@ func (s *TokenDataReader) getUSDCMessageBody(ctx context.Context, msg internal.E

func (s *TokenDataReader) callAttestationApi(ctx context.Context, usdcMessageHash [32]byte) (attestationResponse, error) {
fullAttestationUrl := fmt.Sprintf("%s/%s/%s/0x%x", s.attestationApi, apiVersion, attestationPath, usdcMessageHash)

// Use a timeout to guard against attestation API hanging, causing observation timeout and failing to make any progress.
timeoutCtx, cancel := context.WithTimeout(ctx, s.attestationApiTimeout)
defer cancel()
req, err := http.NewRequestWithContext(timeoutCtx, "GET", fullAttestationUrl, nil)

if err != nil {
return attestationResponse{}, err
}
req.Header.Add("accept", "application/json")
res, err := http.DefaultClient.Do(req)
body, _, err := s.httpClient.Get(ctx, fullAttestationUrl, s.attestationApiTimeout)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return attestationResponse{}, tokendata.ErrTimeout
}
return attestationResponse{}, err
}
defer res.Body.Close()

// Explicitly signal if the API is being rate limited
if res.StatusCode == http.StatusTooManyRequests {
return attestationResponse{}, tokendata.ErrRateLimit
}

body, err := io.ReadAll(res.Body)
if err != nil {
return attestationResponse{}, err
}

var response attestationResponse
err = json.Unmarshal(body, &response)
if err != nil {
return attestationResponse{}, err
}

if response.Status == "" {
return attestationResponse{}, fmt.Errorf("invalid attestation response: %s", string(body))
}

return response, nil
}

Expand Down