Skip to content

Commit

Permalink
Implement deal and provider coverage calculation
Browse files Browse the repository at this point in the history
Implement metrics that calculate deals and provider coverage by the indexer.

Deal coverage is calculated as: sum of deals discoverable by the indexer
divided by total discoverable deals. The term "discoverable" here
implies that that the provider of the deal has a non-nil address
discovered via FileCoin API.

Provider coverage is calculated as: number of providers known by the
indexer with at least one deal divided by total number of discoverable
providers with at least one deal.

This commit also removes WIP test files pushed by accident in earlier
PRs.

Fixes #5
  • Loading branch information
masih committed Nov 25, 2022
1 parent 86916bc commit 7c37e09
Show file tree
Hide file tree
Showing 9 changed files with 398 additions and 125 deletions.
3 changes: 2 additions & 1 deletion announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func (hf *heyFil) announce(ctx context.Context, ai *peer.AddrInfo, head cid.Cid)
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPut, hf.httpAnnounceEndpoint, bytes.NewBuffer(anncb))
url := hf.httpIndexerEndpoint + `/ingest/announce`
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewBuffer(anncb))
if err != nil {
return err
}
Expand Down
49 changes: 0 additions & 49 deletions announce_test.go

This file was deleted.

132 changes: 127 additions & 5 deletions fil_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
Expand All @@ -17,16 +19,35 @@ import (

type (
StateMinerInfoResp struct {
PeerId string
Multiaddrs []string
PeerId string `json:"PeerId"`
Multiaddrs []string `json:"Multiaddrs"`
}
ChainHead struct {
Height int64 `json:"Height"`
}
StateMarketDealResult struct {
Key string
Deal *StateMarketDeal
Err error
}
StateMarketDeal struct {
Proposal struct {
Provider string `json:"Provider"`
EndEpoch int64 `json:"EndEpoch"`
} `json:"Proposal"`
State struct {
SectorStartEpoch int64 `json:"SectorStartEpoch"`
SlashEpoch int64 `json:"SlashEpoch"`
} `json:"State"`
}
)

const (
methodFilStateMarketParticipants = "Filecoin.StateMarketParticipants"
methodFilStateMinerInfo = "Filecoin.StateMinerInfo"
methodFilStateMarketParticipants = `Filecoin.StateMarketParticipants`
methodFilStateMinerInfo = `Filecoin.StateMinerInfo`
methodFilChainHead = `Filecoin.ChainHead`

// noopURL is a placeholder for mandatory but noop URL reqired by HTTP client during go-stream
// noopURL is a placeholder for mandatory but noop URL required by HTTP client during go-stream
// connection.
noopURL = "http://publisher.invalid/head"
)
Expand Down Expand Up @@ -90,6 +111,107 @@ func (hf *heyFil) stateMinerInfo(ctx context.Context, mid string) (*peer.AddrInf
}
}

func (hf *heyFil) chainHead(ctx context.Context) (*ChainHead, error) {
resp, err := hf.c.Call(ctx, methodFilChainHead)
switch {
case err != nil:
return nil, err
case resp.Error != nil:
return nil, resp.Error
default:
var c ChainHead
if err := resp.GetObject(&c); err != nil {
return nil, err
}
return &c, nil
}
}

func (hf *heyFil) stateMarketDeals(ctx context.Context) (chan StateMarketDealResult, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, hf.marketDealsAlt, nil)
if err != nil {
return nil, err
}
resp, err := hf.httpClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("faild to get market deals: %d %s", resp.StatusCode, resp.Status)
}
results := make(chan StateMarketDealResult, 1)
go func() {

defer func() {
close(results)
_ = resp.Body.Close()
}()

// Process deal information in streaming fashion for a smaller more predictable memory
// footprint. Because, it is provided as a giant (currently ~5 GiB) JSON file off S3 usually
// and we don't know how big it might grow.
decoder := json.NewDecoder(resp.Body)
dealIDParser := func() (string, error) {
token, err := decoder.Token()
if err != nil {
return "", err
}
switch tt := token.(type) {
case string:
return tt, nil
case json.Delim:
if tt.String() != `}` {
return "", fmt.Errorf("expected delimier close object but got: %s", tt)
}
return "", nil
default:
return "", fmt.Errorf("expected string but got: %T", token)
}
}

dealParser := func() (*StateMarketDeal, error) {
var deal StateMarketDeal
err := decoder.Decode(&deal)
if err != nil {
return nil, err
}
return &deal, nil
}

token, err := decoder.Token()
if err != nil {
results <- StateMarketDealResult{Err: err}
return
}
if _, ok := token.(json.Delim); !ok {
results <- StateMarketDealResult{Err: fmt.Errorf("unexpected JSON token: expected delimiter but got: %v", token)}
return
}
for {
var next StateMarketDealResult
next.Key, next.Err = dealIDParser()
if next.Err != nil {
results <- next
return
}
if next.Key == "" {
// We should be at the end; assert so.
if t, err := decoder.Token(); err != io.EOF {
results <- StateMarketDealResult{Err: fmt.Errorf("expected no more tokens but got: %v", t)}
}
return
}
next.Deal, next.Err = dealParser()
if next.Err != nil {
results <- next
return
}
results <- next
}
}()
return results, nil
}

func (hf *heyFil) supportsHeadProtocol(ctx context.Context, ai *peer.AddrInfo) (bool, string, protocol.ID, error) {
if err := hf.h.Connect(ctx, *ai); err != nil {
return false, "", "", err
Expand Down
75 changes: 75 additions & 0 deletions fil_deal_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package main

import (
"context"
"sync"
)

type dealStats struct {
hf *heyFil
refreshLock sync.RWMutex
dealCountByParticipant map[string]int64
latestRefreshErr error
}

func (ds *dealStats) start(ctx context.Context) {
go func() {
for {
ds.latestRefreshErr = ds.refresh(ctx)
select {
case <-ctx.Done():
return
case <-ds.hf.dealStatsRefreshInterval.C:
ds.latestRefreshErr = ds.refresh(ctx)
}
}
}()
}

func (ds *dealStats) refresh(ctx context.Context) error {
ch, err := ds.hf.chainHead(ctx)
if err != nil {
return err
}
epoch := ch.Height

deals, err := ds.hf.stateMarketDeals(ctx)
if err != nil {
return err
}

dealCountByParticipant := make(map[string]int64)
var totalDealCount int64
for {
select {
case <-ctx.Done():
return ctx.Err()
case dr, ok := <-deals:
if !ok {
ds.refreshLock.Lock()
ds.dealCountByParticipant = dealCountByParticipant
ds.refreshLock.Unlock()
ds.hf.metrics.notifyDealCount(totalDealCount)
logger.Infow("fetched state market deals", "count", totalDealCount)
return nil
}
switch {
case dr.Err != nil:
return dr.Err
case dr.Deal.State.SectorStartEpoch == -1:
case dr.Deal.State.SlashEpoch != -1:
case dr.Deal.Proposal.EndEpoch < epoch:
default:
totalDealCount++
dealProvider := dr.Deal.Proposal.Provider
dealCountByParticipant[dealProvider] = dealCountByParticipant[dealProvider] + 1
}
}
}
}

func (ds *dealStats) getDealCount(id string) int64 {
ds.refreshLock.RLock()
defer ds.refreshLock.RUnlock()
return ds.dealCountByParticipant[id]
}
20 changes: 12 additions & 8 deletions heyfil.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ type (
toCheck chan *Target
checked chan *Target

metrics metrics
server http.Server
metrics metrics
dealStats *dealStats
server http.Server
}
)

Expand All @@ -30,13 +31,15 @@ func newHeyFil(o ...Option) (*heyFil, error) {
if err != nil {
return nil, err
}
return &heyFil{
hf := &heyFil{
options: opts,
c: jsonrpc.NewClient(opts.api),
targets: make(map[string]*Target),
toCheck: make(chan *Target, 100),
checked: make(chan *Target, 100),
}, nil
}
hf.dealStats = &dealStats{hf: hf}
return hf, nil
}

func (hf *heyFil) Start(ctx context.Context) error {
Expand All @@ -46,9 +49,10 @@ func (hf *heyFil) Start(ctx context.Context) error {
if err := hf.startServer(); err != nil {
return err
}
hf.dealStats.start(ctx)

// start checkers.
for i := 0; i < hf.maxConcCheck; i++ {
for i := 0; i < hf.maxConcurrentParticipantCheck; i++ {
go hf.checker(ctx)
}

Expand All @@ -61,7 +65,7 @@ func (hf *heyFil) Start(ctx context.Context) error {
logger.Errorw("failed to get state market participants", "err", err)
return
}
hf.metrics.notifyTargetCount(len(mids))
hf.metrics.notifyParticipantCount(int64(len(mids)))
logger.Infow("fetched state market participants", "count", len(mids))
for _, mid := range mids {
select {
Expand All @@ -76,7 +80,7 @@ func (hf *heyFil) Start(ctx context.Context) error {
select {
case <-ctx.Done():
return
case t := <-hf.checkInterval.C:
case t := <-hf.participantsCheckInterval.C:
dispatch(ctx, t)
}
}
Expand All @@ -86,7 +90,7 @@ func (hf *heyFil) Start(ctx context.Context) error {
go func() {
snapshot := func(ctx context.Context, t time.Time) {
logger := logger.With("t", t)
hf.metrics.snapshotCountByStatus(hf.targets)
hf.metrics.snapshot(hf.targets)
logger.Debugw("reported check results", "miner-count", len(hf.targets))
}
snapshot(ctx, time.Now())
Expand Down
31 changes: 31 additions & 0 deletions indexer_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
"context"
"fmt"
"io"
"net/http"

"github.com/libp2p/go-libp2p/core/peer"
)

func (hf *heyFil) isKnownByIndexer(ctx context.Context, pid peer.ID) (bool, error) {
url := hf.httpIndexerEndpoint + `/providers/` + pid.String()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return false, err
}
response, err := hf.httpClient.Do(req)
if err != nil {
return false, err
}
defer response.Body.Close()
body, err := io.ReadAll(response.Body)
if err != nil {
return false, err
}
if response.StatusCode != http.StatusOK {
return false, fmt.Errorf("unsuccessful response %d : %s", response.StatusCode, string(body))
}
return true, nil
}
Loading

0 comments on commit 7c37e09

Please sign in to comment.