Skip to content
This repository has been archived by the owner on May 17, 2022. It is now read-only.

Commit

Permalink
Update logging to properly use logrus
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisgoffinet committed Aug 15, 2018
1 parent 704464f commit 117b201
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 35 deletions.
12 changes: 5 additions & 7 deletions pkg/durable/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ package durable
import (
"bytes"
"encoding/gob"
"io/ioutil"
"log"
"time"

log "github.com/sirupsen/logrus"
)

type Request struct {
Expand All @@ -24,7 +24,6 @@ type Config struct {
MaxMsgSize int32
SyncEvery int64
SyncTimeout time.Duration
Logger *log.Logger
}

func defaultConfig() *Config {
Expand All @@ -36,7 +35,6 @@ func defaultConfig() *Config {
MaxMsgSize: 1000,
SyncEvery: 10,
SyncTimeout: time.Second * 10,
Logger: log.New(ioutil.Discard, "", 0),
}
}

Expand Down Expand Up @@ -69,7 +67,7 @@ func (b channel) reader() {
var item Request
dec := gob.NewDecoder(bytes.NewReader(data))
if err := dec.Decode(&item); err != nil {
b.config.Logger.Printf("Error unmarshalling object: %s\n", err.Error())
log.Errorf("Error unmarshalling object: %s\n", err.Error())
}
b.out <- item
}
Expand All @@ -82,9 +80,9 @@ func (b channel) writer() {
enc := gob.NewEncoder(&network)

if err := enc.Encode(item); err != nil {
b.config.Logger.Printf("Error marshalling object: %s\n", err.Error())
log.Errorf("Error marshalling object: %s\n", err.Error())
} else if err := b.dq.Put(network.Bytes()); err != nil {
b.config.Logger.Printf("Error putting object: %s\n", err.Error())
log.Errorf("Error putting object: %s\n", err.Error())
}
}
}
Expand Down
47 changes: 23 additions & 24 deletions pkg/durable/diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
"errors"
"fmt"
"io"
"log"
"math/rand"
"os"
"path"
"sync"
"sync/atomic"
"time"

log "github.com/sirupsen/logrus"
)

// diskQueue implements the BackendQueue interface
Expand Down Expand Up @@ -64,7 +65,6 @@ type diskQueue struct {
exitChan chan int
exitSyncChan chan int

logger *log.Logger
}

// newDiskQueue instantiates a new instance of diskQueue, retrieving metadata
Expand All @@ -85,13 +85,12 @@ func newDiskQueue(config *Config) *diskQueue {
exitSyncChan: make(chan int),
syncEvery: config.SyncEvery,
syncTimeout: config.SyncTimeout,
logger: config.Logger,
}

// no need to lock here, nothing else could possibly be touching this instance
err := d.retrieveMetaData()
if err != nil && !os.IsNotExist(err) {
d.logf("ERROR: diskqueue(%s) failed to retrieveMetaData - %s", d.name, err)
log.Errorf("ERROR: diskqueue(%s) failed to retrieveMetaData - %s", d.name, err)
}

go d.ioLoop()
Expand All @@ -100,7 +99,7 @@ func newDiskQueue(config *Config) *diskQueue {
}

func (d *diskQueue) logf(f string, args ...interface{}) {
d.logger.Printf(f, args...)
log.Printf(f, args...)
}

// Depth returns the depth of the queue
Expand Down Expand Up @@ -146,9 +145,9 @@ func (d *diskQueue) exit(deleted bool) error {
d.exitFlag = 1

if deleted {
d.logf("DISKQUEUE(%s): deleting", d.name)
log.Debugf("DISKQUEUE(%s): deleting", d.name)
} else {
d.logf("DISKQUEUE(%s): closing", d.name)
log.Debugf("DISKQUEUE(%s): closing", d.name)
}

close(d.exitChan)
Expand Down Expand Up @@ -178,7 +177,7 @@ func (d *diskQueue) Empty() error {
return errors.New("exiting")
}

d.logf("DISKQUEUE(%s): emptying", d.name)
log.Debugf("DISKQUEUE(%s): emptying", d.name)

d.emptyChan <- 1
return <-d.emptyResponseChan
Expand All @@ -189,7 +188,7 @@ func (d *diskQueue) deleteAllFiles() error {

innerErr := os.Remove(d.metaDataFileName())
if innerErr != nil && !os.IsNotExist(innerErr) {
d.logf("ERROR: diskqueue(%s) failed to remove metadata file - %s", d.name, innerErr)
log.Errorf("ERROR: diskqueue(%s) failed to remove metadata file - %s", d.name, innerErr)
return innerErr
}

Expand All @@ -213,7 +212,7 @@ func (d *diskQueue) skipToNextRWFile() error {
fn := d.fileName(i)
innerErr := os.Remove(fn)
if innerErr != nil && !os.IsNotExist(innerErr) {
d.logf("ERROR: diskqueue(%s) failed to remove data file - %s", d.name, innerErr)
log.Errorf("ERROR: diskqueue(%s) failed to remove data file - %s", d.name, innerErr)
err = innerErr
}
}
Expand Down Expand Up @@ -242,7 +241,7 @@ func (d *diskQueue) readOne() ([]byte, error) {
return nil, err
}

d.logf("DISKQUEUE(%s): readOne() opened %s", d.name, curFileName)
log.Debugf("DISKQUEUE(%s): readOne() opened %s", d.name, curFileName)

if d.readPos > 0 {
_, err = d.readFile.Seek(d.readPos, 0)
Expand Down Expand Up @@ -314,7 +313,7 @@ func (d *diskQueue) writeOne(data []byte) error {
return err
}

d.logf("DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName)
log.Debugf("DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName)

if d.writePos > 0 {
_, err = d.writeFile.Seek(d.writePos, 0)
Expand Down Expand Up @@ -362,7 +361,7 @@ func (d *diskQueue) writeOne(data []byte) error {
// sync every time we start writing to a new file
err = d.sync()
if err != nil {
d.logf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err)
log.Errorf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err)
}

if d.writeFile != nil {
Expand Down Expand Up @@ -467,11 +466,11 @@ func (d *diskQueue) checkTailCorruption(depth int64) {
// if depth isn't 0 something went wrong
if depth != 0 {
if depth < 0 {
d.logf(
log.Errorf(
"ERROR: diskqueue(%s) negative depth at tail (%d), metadata corruption, resetting 0...",
d.name, depth)
} else if depth > 0 {
d.logf(
log.Errorf(
"ERROR: diskqueue(%s) positive depth at tail (%d), data loss, resetting 0...",
d.name, depth)
}
Expand All @@ -482,13 +481,13 @@ func (d *diskQueue) checkTailCorruption(depth int64) {

if d.readFileNum != d.writeFileNum || d.readPos != d.writePos {
if d.readFileNum > d.writeFileNum {
d.logf(
log.Errorf(
"ERROR: diskqueue(%s) readFileNum > writeFileNum (%d > %d), corruption, skipping to next writeFileNum and resetting 0...",
d.name, d.readFileNum, d.writeFileNum)
}

if d.readPos > d.writePos {
d.logf(
log.Errorf(
"ERROR: diskqueue(%s) readPos > writePos (%d > %d), corruption, skipping to next writeFileNum and resetting 0...",
d.name, d.readPos, d.writePos)
}
Expand All @@ -512,7 +511,7 @@ func (d *diskQueue) moveForward() {
fn := d.fileName(oldReadFileNum)
err := os.Remove(fn)
if err != nil {
d.logf("ERROR: failed to Remove(%s) - %s", fn, err)
log.Errorf("ERROR: failed to Remove(%s) - %s", fn, err)
}
}

Expand All @@ -535,13 +534,13 @@ func (d *diskQueue) handleReadError() {
badFn := d.fileName(d.readFileNum)
badRenameFn := badFn + ".bad"

d.logf(
log.Warnf(
"NOTICE: diskqueue(%s) jump to next file and saving bad file as %s",
d.name, badRenameFn)

err := os.Rename(badFn, badRenameFn)
if err != nil {
d.logf(
log.Errorf(
"ERROR: diskqueue(%s) failed to rename bad diskqueue file %s to %s",
d.name, badFn, badRenameFn)
}
Expand Down Expand Up @@ -580,7 +579,7 @@ func (d *diskQueue) ioLoop() {
if d.needSync {
err = d.sync()
if err != nil {
d.logf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err)
log.Errorf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err)
}
count = 0
}
Expand All @@ -589,7 +588,7 @@ func (d *diskQueue) ioLoop() {
if d.nextReadPos == d.readPos {
dataRead, err = d.readOne()
if err != nil {
d.logf("ERROR: reading from diskqueue(%s) at %d of %s - %s",
log.Errorf("ERROR: reading from diskqueue(%s) at %d of %s - %s",
d.name, d.readPos, d.fileName(d.readFileNum), err)
d.handleReadError()
continue
Expand Down Expand Up @@ -625,7 +624,7 @@ func (d *diskQueue) ioLoop() {
}

exit:
d.logf("DISKQUEUE(%s): closing ... ioLoop", d.name)
log.Debugf("DISKQUEUE(%s): closing ... ioLoop", d.name)
syncTicker.Stop()
d.exitSyncChan <- 1
}
}
6 changes: 2 additions & 4 deletions pkg/qflow/qflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
_ "net/http/pprof"
"os"
"time"

log "github.com/sirupsen/logrus"
"github.com/threecommaio/qflow/pkg/durable"
// log "github.com/sirupsen/logrus"
)

type Endpoint struct {
Expand All @@ -36,7 +35,7 @@ func ReplicateChannel(endpoint *Endpoint) {
count++

if count%1000 == 0 {
fmt.Println("Processed batch of 1000")
log.Debug("Processed batch of 1000")
}

r := bytes.NewReader(req.Body)
Expand Down Expand Up @@ -97,7 +96,6 @@ func ListenAndServe(config *Config, addr string, dataDir string) {
MaxMsgSize: 1000,
SyncEvery: 10000,
SyncTimeout: time.Second * 10,
Logger: log.New(os.Stdout, "", 0),
})

e := &Endpoint{
Expand Down

0 comments on commit 117b201

Please sign in to comment.