Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/worker/storage: Add storage worker status message #5262

Merged
merged 1 commit into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changelog/5262.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
go/worker/storage: Add storage worker status message

A status message that shows the current state of the storage worker
was added to the node's storage worker's status output.
This enables the node operator to quickly check if the storage worker
is still initializing, syncing checkpoints, or syncing rounds.
16 changes: 16 additions & 0 deletions go/worker/storage/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@ import (
// ModuleName is the storage worker module name.
const ModuleName = "worker/storage"

// StorageWorkerStatus is the status of the storage worker.
type StorageWorkerStatus string

const (
StatusInitializing StorageWorkerStatus = "initializing"
StatusStarting StorageWorkerStatus = "starting"
StatusStopping StorageWorkerStatus = "stopping"
StatusInitializingGenesis StorageWorkerStatus = "initializing genesis"
StatusSyncStartCheck StorageWorkerStatus = "sync start check"
StatusSyncingCheckpoints StorageWorkerStatus = "syncing checkpoints"
StatusSyncingRounds StorageWorkerStatus = "syncing rounds"
)

var (
// ErrRuntimeNotFound is the error returned when the called references an unknown runtime.
ErrRuntimeNotFound = errors.New(ModuleName, 1, "worker/storage: runtime not found")
Expand Down Expand Up @@ -48,6 +61,9 @@ type PauseCheckpointerRequest struct {

// Status is the storage worker status.
type Status struct {
// Status is the current status of the storage worker.
Status StorageWorkerStatus `json:"status"`

// LastFinalizedRound is the last synced and finalized round.
LastFinalizedRound uint64 `json:"last_finalized_round"`
}
33 changes: 33 additions & 0 deletions go/worker/storage/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ type Node struct { // nolint: maligned
syncedLock sync.RWMutex
syncedState blockSummary

statusLock sync.RWMutex
status api.StorageWorkerStatus

blockCh *channels.InfiniteChannel
diffCh chan *fetchedDiff
finalizeCh chan finalizeResult
Expand Down Expand Up @@ -175,6 +178,8 @@ func NewNode(

checkpointSyncCfg: checkpointSyncCfg,

status: api.StatusInitializing,

blockCh: channels.NewInfiniteChannel(),
diffCh: make(chan *fetchedDiff),
finalizeCh: make(chan finalizeResult),
Expand Down Expand Up @@ -280,6 +285,10 @@ func (n *Node) Start() error {

// Stop causes the worker to stop watching and shut down.
func (n *Node) Stop() {
n.statusLock.Lock()
n.status = api.StatusStopping
n.statusLock.Unlock()

n.ctxCancel()
}

Expand All @@ -303,8 +312,12 @@ func (n *Node) GetStatus(ctx context.Context) (*api.Status, error) {
n.syncedLock.RLock()
defer n.syncedLock.RUnlock()

n.statusLock.RLock()
defer n.statusLock.RUnlock()

return &api.Status{
LastFinalizedRound: n.syncedState.Round,
Status: n.status,
}, nil
}

Expand Down Expand Up @@ -726,6 +739,10 @@ func (n *Node) worker() { // nolint: gocyclo

n.logger.Info("starting committee node")

n.statusLock.Lock()
n.status = api.StatusStarting
n.statusLock.Unlock()

// Determine genesis block.
genesisBlock, err := n.commonNode.Consensus.RootHash().GetGenesisBlock(n.ctx, &roothashApi.RuntimeRequest{
RuntimeID: n.commonNode.Runtime.ID(),
Expand Down Expand Up @@ -771,6 +788,10 @@ func (n *Node) worker() { // nolint: gocyclo
// Initialize genesis from the runtime descriptor.
isInitialStartup := (cachedLastRound == n.undefinedRound)
if isInitialStartup {
n.statusLock.Lock()
n.status = api.StatusInitializingGenesis
n.statusLock.Unlock()

var rt *registryApi.Runtime
rt, err = n.commonNode.Runtime.ActiveDescriptor(n.ctx)
if err != nil {
Expand All @@ -797,6 +818,10 @@ func (n *Node) worker() { // nolint: gocyclo
// syncing. In case we cannot (likely because we synced the consensus layer via state sync), we
// must wait for a later checkpoint to become available.
if !n.checkpointSyncForced {
n.statusLock.Lock()
n.status = api.StatusSyncStartCheck
n.statusLock.Unlock()

// Determine what is the first round that we would need to sync.
iterativeSyncStart := cachedLastRound
if iterativeSyncStart == n.undefinedRound {
Expand Down Expand Up @@ -925,6 +950,10 @@ func (n *Node) worker() { // nolint: gocyclo
// disabled via config.
//
if (isInitialStartup && !n.checkpointSyncCfg.Disabled) || n.checkpointSyncForced {
n.statusLock.Lock()
n.status = api.StatusSyncingCheckpoints
n.statusLock.Unlock()

var (
summary *blockSummary
attempt int
Expand Down Expand Up @@ -1048,6 +1077,10 @@ func (n *Node) worker() { // nolint: gocyclo
}
}

n.statusLock.Lock()
n.status = api.StatusSyncingRounds
n.statusLock.Unlock()

// Main processing loop. When a new block comes in, its state and io roots are inspected and their
// writelogs fetched from remote storage nodes in case we don't have them locally yet. Fetches are
// asynchronous and, once complete, trigger local Apply operations. These are serialized
Expand Down