Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
More logs for OpenAppendStream, OpenReadStream failures
Browse files Browse the repository at this point in the history
  • Loading branch information
kiranrg committed Nov 20, 2017
1 parent 27bdb2b commit c1fe33d
Showing 1 changed file with 35 additions and 28 deletions.
63 changes: 35 additions & 28 deletions services/storehost/storehost.go
Expand Up @@ -361,7 +361,7 @@ func (t *StoreHost) OpenAppendStreamHandler(w http.ResponseWriter, r *http.Reque

req, err := common.GetOpenAppendStreamRequestHTTP(r.Header)
if err != nil {
t.logger.WithField(`error`, err).Error("unable to parse all needed headers")
t.logger.WithField(common.TagErr, err).Error("OpenAppendStreamHandler: unable to parse all needed headers")
t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures)
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand All @@ -370,7 +370,7 @@ func (t *StoreHost) OpenAppendStreamHandler(w http.ResponseWriter, r *http.Reque
// setup websocket
wsStream, err := t.GetWSConnector().AcceptAppendStream(w, r)
if err != nil {
t.logger.WithField(`error`, err).Error("unable to upgrade websocket connection")
t.logger.WithField(common.TagErr, err).Error("OpenAppendStreamHandler: unable to upgrade websocket connection")
t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures)
return
}
Expand All @@ -382,7 +382,7 @@ func (t *StoreHost) OpenAppendStreamHandler(w http.ResponseWriter, r *http.Reque

// create thrift stream call wrapper and deligate to streaming call
if err = t.OpenAppendStream(ctx, wsStream); err != nil {
t.logger.WithField(`error`, err).Error("unable to open append stream")
t.logger.WithField(common.TagErr, err).Error("OpenAppendStreamHandler: unable to open append stream")
return
}
}
Expand All @@ -392,25 +392,13 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore

t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageRequests)

if atomic.LoadInt32(&t.started) == 0 {
call.Done()
t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures)
return newInternalServiceError("StoreHost not started")
}

// If the disk available space is low, we should fail any request to write extent
if t.storageMonitor != nil && t.storageMonitor.GetStorageMode() == SMReadOnly {
call.Done()
t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures)
return newInternalServiceError("StoreHost in read-only mode")
}

// read in args passed in via the Thrift context headers
args, err := getInConnArgs(ctx)

if err != nil {
call.Done()
t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures)
t.logger.Error("OpenAppendStream: error parsing args")
return err
}

Expand All @@ -419,8 +407,20 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore
common.TagDst: common.FmtDst(args.destID.String()),
})

log.WithField("args", fmt.Sprintf("destType=%v mode=%v", args.destType, args.mode)).
Info("OpenAppendStream: starting inConn")
if atomic.LoadInt32(&t.started) == 0 {
call.Done()
t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures)
log.Error("OpenAppendStream: storehost not started")
return newInternalServiceError("StoreHost not started")
}

// If the disk available space is low, we should fail any request to write extent
if t.storageMonitor != nil && t.storageMonitor.GetStorageMode() == SMReadOnly {
call.Done()
t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures)
log.Error("OpenAppendStream: storehost currently readonly")
return newInternalServiceError("StoreHost in read-only mode")
}

in := newInConn(args, call, t.xMgr, t.m3Client, log)

Expand All @@ -431,27 +431,32 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore

numInConn := atomic.AddInt64(&t.numInConn, 1)
t.m3Client.UpdateGauge(metrics.OpenAppendStreamScope, metrics.StorageWriteStreams, numInConn)
log.WithField(`numInConn`, numInConn).Info("OpenAppendStream: write stream opened")

log.WithFields(bark.Fields{
`destType`: args.destType,
`mode`: args.mode,
`numInConn`: numInConn,
}).Info("OpenAppendStream: inConn started")

select {
// wait for inConn to be done
case err = <-in.Done():

// .. or wait for shutdown to be triggered
case <-t.shutdownC:
log.Info("OpenAppendStream: shutdown, stopping inConn")
err = in.Stop() // attempt to stop connection

// listen to extreme situations
case <-t.disableWriteC:
log.Info("Stop write due to available disk space is extremely low")
log.Error("OpenAppendStream: writes disabled, stopping inConn")
err = in.Stop()
}

numInConn = atomic.AddInt64(&t.numInConn, -1)
t.m3Client.UpdateGauge(metrics.OpenAppendStreamScope, metrics.StorageWriteStreams, numInConn)
log.WithField(`numInConn`, numInConn).Info("OpenAppendStream: write stream closed")
log.WithField(`numInConn`, numInConn).Info("OpenAppendStream: inConn done")

log.Info("OpenAppendStream done")
return err // FIXME: tchannel does *not* currently propagate this to the remote caller
}

Expand Down Expand Up @@ -534,12 +539,6 @@ func (t *StoreHost) OpenReadStream(ctx thrift.Context, call storeStream.BStoreOp

t.m3Client.IncCounter(metrics.OpenReadStreamScope, metrics.StorageRequests)

if atomic.LoadInt32(&t.started) == 0 {
call.Done()
t.m3Client.IncCounter(metrics.OpenReadStreamScope, metrics.StorageFailures)
return newInternalServiceError("StoreHost not started")
}

// read in args passed in via the Thrift context headers
args, e := getOutConnArgs(ctx)

Expand All @@ -566,6 +565,13 @@ func (t *StoreHost) OpenReadStream(ctx thrift.Context, call storeStream.BStoreOp
common.TagCnsm: common.FmtCnsm(args.consGroupID.String()),
})

if atomic.LoadInt32(&t.started) == 0 {
call.Done()
t.m3Client.IncCounter(metrics.OpenReadStreamScope, metrics.StorageFailures)
log.Error("OpenReadStream: storehost not started")
return newInternalServiceError("StoreHost not started")
}

out := newOutConn(args, call, t.xMgr, t.m3Client, log)

t.shutdownWG.Add(1)
Expand All @@ -590,6 +596,7 @@ func (t *StoreHost) OpenReadStream(ctx thrift.Context, call storeStream.BStoreOp

// .. or wait for shutdown to be triggered
case <-t.shutdownC:
log.Info("OpenReadStream: shutdown, stopping inConn")
out.Stop() // attempt to stop connection
}

Expand Down

0 comments on commit c1fe33d

Please sign in to comment.