Skip to content

Commit

Permalink
view:fix logging
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon committed Aug 13, 2019
1 parent c12b6f3 commit 4bd0eff
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 7 deletions.
2 changes: 1 addition & 1 deletion tester/queueconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (qc *queueConsumer) addToBuffer(event kafka.Event) {
qc.eventBuffer <- event

if len(qc.eventBuffer) > eventBufferQueueSize*0.9 {
logger.Printf("buffer nearly full: %d, %s. Will drop events.", len(qc.eventBuffer), qc.queue.topic)
logger.Printf("buffer nearly full: %d, %s. Will drop event.", len(qc.eventBuffer), qc.queue.topic)
<-qc.eventBuffer
}
}
Expand Down
5 changes: 2 additions & 3 deletions tester/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,19 +269,18 @@ func (km *Tester) waitForConsumers() {
}

func (km *Tester) waitStartup() {
logger.Printf("Tester: Waiting for startup")
logger.Printf("Waiting for startup")
km.mQueues.RLock()
defer km.mQueues.RUnlock()
for _, queue := range km.topicQueues {
queue.waitConsumersInit()
}
logger.Printf("Tester: Waiting for startup done")
logger.Printf("Waiting for startup done")
}

// Consume a message using the topic's configured codec
func (km *Tester) Consume(topic string, key string, msg interface{}) {
km.waitStartup()
log.Printf("startup")

// if the user wants to send a nil for some reason,
// just let her. Goka should handle it accordingly :)
Expand Down
6 changes: 3 additions & 3 deletions view.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ func (v *View) Run(ctx context.Context) error {
for id, p := range v.partitions {
pid, par := int32(id), p
errg.Go(func() error {
v.opts.log.Printf("view [%s]: partition %d started", pid)
defer v.opts.log.Printf("view: partition %d stopped", pid)
v.opts.log.Printf("view [%s]: partition %d started", v.Topic(), pid)
defer v.opts.log.Printf("view [%s]: partition %d stopped", v.Topic(), pid)
if err := par.st.Open(); err != nil {
return fmt.Errorf("view [%s]: error opening storage partition %d: %v", v.Topic(), pid, err)
}
Expand All @@ -155,7 +155,7 @@ func (v *View) Run(ctx context.Context) error {

v.opts.log.Printf("view [%s]: closing consumer", v.Topic())
if err := v.consumer.Close(); err != nil {
_ = errs.Collect(fmt.Errorf("view: failed closing consumer: %v", err))
_ = errs.Collect(fmt.Errorf("view [%s]: failed closing consumer: %v", v.Topic(), err))
}

if !v.opts.restartable {
Expand Down

0 comments on commit 4bd0eff

Please sign in to comment.