diff --git a/services/storehost/storehost.go b/services/storehost/storehost.go index e4f0f0ae..fb79c01e 100644 --- a/services/storehost/storehost.go +++ b/services/storehost/storehost.go @@ -361,7 +361,7 @@ func (t *StoreHost) OpenAppendStreamHandler(w http.ResponseWriter, r *http.Reque req, err := common.GetOpenAppendStreamRequestHTTP(r.Header) if err != nil { - t.logger.WithField(`error`, err).Error("unable to parse all needed headers") + t.logger.WithField(common.TagErr, err).Error("OpenAppendStreamHandler: unable to parse all needed headers") t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) http.Error(w, err.Error(), http.StatusBadRequest) return @@ -370,7 +370,7 @@ func (t *StoreHost) OpenAppendStreamHandler(w http.ResponseWriter, r *http.Reque // setup websocket wsStream, err := t.GetWSConnector().AcceptAppendStream(w, r) if err != nil { - t.logger.WithField(`error`, err).Error("unable to upgrade websocket connection") + t.logger.WithField(common.TagErr, err).Error("OpenAppendStreamHandler: unable to upgrade websocket connection") t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) return } @@ -382,7 +382,7 @@ func (t *StoreHost) OpenAppendStreamHandler(w http.ResponseWriter, r *http.Reque // create thrift stream call wrapper and deligate to streaming call if err = t.OpenAppendStream(ctx, wsStream); err != nil { - t.logger.WithField(`error`, err).Error("unable to open append stream") + t.logger.WithField(common.TagErr, err).Error("OpenAppendStreamHandler: unable to open append stream") return } } @@ -392,25 +392,13 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageRequests) - if atomic.LoadInt32(&t.started) == 0 { - call.Done() - t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) - return newInternalServiceError("StoreHost not started") - } - - // If the disk available space is low, we should fail any request to write extent - if t.storageMonitor != nil && t.storageMonitor.GetStorageMode() == SMReadOnly { - call.Done() - t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) - return newInternalServiceError("StoreHost in read-only mode") - } - // read in args passed in via the Thrift context headers args, err := getInConnArgs(ctx) if err != nil { call.Done() t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) + t.logger.Error("OpenAppendStream: error parsing args") return err } @@ -419,8 +407,20 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore common.TagDst: common.FmtDst(args.destID.String()), }) - log.WithField("args", fmt.Sprintf("destType=%v mode=%v", args.destType, args.mode)). - Info("OpenAppendStream: starting inConn") + if atomic.LoadInt32(&t.started) == 0 { + call.Done() + t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) + log.Error("OpenAppendStream: storehost not started") + return newInternalServiceError("StoreHost not started") + } + + // If the disk available space is low, we should fail any request to write extent + if t.storageMonitor != nil && t.storageMonitor.GetStorageMode() == SMReadOnly { + call.Done() + t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) + log.Error("OpenAppendStream: storehost currently readonly") + return newInternalServiceError("StoreHost in read-only mode") + } in := newInConn(args, call, t.xMgr, t.m3Client, log) @@ -431,7 +431,12 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore numInConn := atomic.AddInt64(&t.numInConn, 1) t.m3Client.UpdateGauge(metrics.OpenAppendStreamScope, metrics.StorageWriteStreams, numInConn) - log.WithField(`numInConn`, numInConn).Info("OpenAppendStream: write stream opened") + + log.WithFields(bark.Fields{ + `destType`: args.destType, + `mode`: args.mode, + `numInConn`: numInConn, + }).Info("OpenAppendStream: inConn started") select { // wait for inConn to be done @@ -439,19 +444,19 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore // .. or wait for shutdown to be triggered case <-t.shutdownC: + log.Info("OpenAppendStream: shutdown, stopping inConn") err = in.Stop() // attempt to stop connection // listen to extreme situations case <-t.disableWriteC: - log.Info("Stop write due to available disk space is extremely low") + log.Error("OpenAppendStream: writes disabled, stopping inConn") err = in.Stop() } numInConn = atomic.AddInt64(&t.numInConn, -1) t.m3Client.UpdateGauge(metrics.OpenAppendStreamScope, metrics.StorageWriteStreams, numInConn) - log.WithField(`numInConn`, numInConn).Info("OpenAppendStream: write stream closed") + log.WithField(`numInConn`, numInConn).Info("OpenAppendStream: inConn done") - log.Info("OpenAppendStream done") return err // FIXME: tchannel does *not* currently propagate this to the remote caller } @@ -534,12 +539,6 @@ func (t *StoreHost) OpenReadStream(ctx thrift.Context, call storeStream.BStoreOp t.m3Client.IncCounter(metrics.OpenReadStreamScope, metrics.StorageRequests) - if atomic.LoadInt32(&t.started) == 0 { - call.Done() - t.m3Client.IncCounter(metrics.OpenReadStreamScope, metrics.StorageFailures) - return newInternalServiceError("StoreHost not started") - } - // read in args passed in via the Thrift context headers args, e := getOutConnArgs(ctx) @@ -566,6 +565,13 @@ func (t *StoreHost) OpenReadStream(ctx thrift.Context, call storeStream.BStoreOp common.TagCnsm: common.FmtCnsm(args.consGroupID.String()), }) + if atomic.LoadInt32(&t.started) == 0 { + call.Done() + t.m3Client.IncCounter(metrics.OpenReadStreamScope, metrics.StorageFailures) + log.Error("OpenReadStream: storehost not started") + return newInternalServiceError("StoreHost not started") + } + out := newOutConn(args, call, t.xMgr, t.m3Client, log) t.shutdownWG.Add(1) @@ -590,6 +596,7 @@ func (t *StoreHost) OpenReadStream(ctx thrift.Context, call storeStream.BStoreOp // .. or wait for shutdown to be triggered case <-t.shutdownC: + log.Info("OpenReadStream: shutdown, stopping inConn") out.Stop() // attempt to stop connection }