Skip to content

Commit

Permalink
Fix system service panic from early hangup in events
Browse files Browse the repository at this point in the history
We weren't actually halting the goroutine that sent events, so it
would continue sending even when the channel closed (the most
notable cause being early hangup - e.g. Control-c on a curl
session). Use a context to cancel the events goroutine and stop
sending events.

Fixes containers#6805

Signed-off-by: Matthew Heon <matthew.heon@pm.me>
  • Loading branch information
mheon authored and skorhone committed Jul 7, 2020
1 parent 975c375 commit 37cc944
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 17 deletions.
13 changes: 7 additions & 6 deletions libpod/events.go
@@ -1,6 +1,7 @@
package libpod

import (
"context"
"fmt"

"github.com/containers/libpod/libpod/events"
Expand Down Expand Up @@ -75,16 +76,16 @@ func (v *Volume) newVolumeEvent(status events.Status) {

// Events is a wrapper function for everyone to begin tailing the events log
// with options
func (r *Runtime) Events(options events.ReadOptions) error {
func (r *Runtime) Events(ctx context.Context, options events.ReadOptions) error {
eventer, err := r.newEventer()
if err != nil {
return err
}
return eventer.Read(options)
return eventer.Read(ctx, options)
}

// GetEvents reads the event log and returns events based on input filters
func (r *Runtime) GetEvents(filters []string) ([]*events.Event, error) {
func (r *Runtime) GetEvents(ctx context.Context, filters []string) ([]*events.Event, error) {
var readErr error
eventChannel := make(chan *events.Event)
options := events.ReadOptions{
Expand All @@ -98,7 +99,7 @@ func (r *Runtime) GetEvents(filters []string) ([]*events.Event, error) {
return nil, err
}
go func() {
readErr = eventer.Read(options)
readErr = eventer.Read(ctx, options)
}()
if readErr != nil {
return nil, readErr
Expand All @@ -112,7 +113,7 @@ func (r *Runtime) GetEvents(filters []string) ([]*events.Event, error) {

// GetLastContainerEvent takes a container name or ID and an event status and returns
// the last occurrence of the container event
func (r *Runtime) GetLastContainerEvent(nameOrID string, containerEvent events.Status) (*events.Event, error) {
func (r *Runtime) GetLastContainerEvent(ctx context.Context, nameOrID string, containerEvent events.Status) (*events.Event, error) {
// check to make sure the event.Status is valid
if _, err := events.StringToStatus(containerEvent.String()); err != nil {
return nil, err
Expand All @@ -122,7 +123,7 @@ func (r *Runtime) GetLastContainerEvent(nameOrID string, containerEvent events.S
fmt.Sprintf("event=%s", containerEvent),
"type=container",
}
containerEvents, err := r.GetEvents(filters)
containerEvents, err := r.GetEvents(ctx, filters)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion libpod/events/config.go
@@ -1,6 +1,7 @@
package events

import (
"context"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -52,7 +53,7 @@ type Eventer interface {
// Write an event to a backend
Write(event Event) error
// Read an event from the backend
Read(options ReadOptions) error
Read(ctx context.Context, options ReadOptions) error
// String returns the type of event logger
String() string
}
Expand Down
3 changes: 2 additions & 1 deletion libpod/events/journal_linux.go
Expand Up @@ -3,6 +3,7 @@
package events

import (
"context"
"fmt"
"strconv"
"time"
Expand Down Expand Up @@ -53,7 +54,7 @@ func (e EventJournalD) Write(ee Event) error {
}

// Read reads events from the journal and sends qualified events to the event channel
func (e EventJournalD) Read(options ReadOptions) error {
func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
defer close(options.EventChannel)
eventOptions, err := generateEventOptions(options.Filters, options.Since, options.Until)
if err != nil {
Expand Down
17 changes: 15 additions & 2 deletions libpod/events/logfile.go
@@ -1,6 +1,7 @@
package events

import (
"context"
"fmt"
"os"

Expand Down Expand Up @@ -40,7 +41,7 @@ func (e EventLogFile) Write(ee Event) error {
}

// Reads from the log file
func (e EventLogFile) Read(options ReadOptions) error {
func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error {
defer close(options.EventChannel)
eventOptions, err := generateEventOptions(options.Filters, options.Since, options.Until)
if err != nil {
Expand All @@ -50,6 +51,17 @@ func (e EventLogFile) Read(options ReadOptions) error {
if err != nil {
return err
}
funcDone := make(chan bool)
copy := true
go func() {
select {
case <-funcDone:
// Do nothing
case <-ctx.Done():
copy = false
t.Kill(errors.New("hangup by client"))
}
}()
for line := range t.Lines {
event, err := newEventFromJSONString(line.Text)
if err != nil {
Expand All @@ -65,10 +77,11 @@ func (e EventLogFile) Read(options ReadOptions) error {
for _, filter := range eventOptions {
include = include && filter(event)
}
if include {
if include && copy {
options.EventChannel <- event
}
}
funcDone <- true
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion libpod/events/nullout.go
@@ -1,5 +1,9 @@
package events

import (
"context"
)

// EventToNull is an eventer type that only performs write operations
// and only writes to /dev/null. It is meant for unittests only
type EventToNull struct{}
Expand All @@ -10,7 +14,7 @@ func (e EventToNull) Write(ee Event) error {
}

// Read does nothing. Do not use it.
func (e EventToNull) Read(options ReadOptions) error {
func (e EventToNull) Read(ctx context.Context, options ReadOptions) error {
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/api/handlers/compat/events.go
@@ -1,6 +1,7 @@
package compat

import (
"context"
"fmt"
"net/http"

Expand Down Expand Up @@ -45,20 +46,23 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
fromStart = true
}

eventCtx, eventCancel := context.WithCancel(r.Context())
eventChannel := make(chan *events.Event)
go func() {
readOpts := events.ReadOptions{FromStart: fromStart, Stream: query.Stream, Filters: libpodFilters, EventChannel: eventChannel, Since: query.Since, Until: query.Until}
eventsError = runtime.Events(readOpts)
eventsError = runtime.Events(eventCtx, readOpts)
}()
if eventsError != nil {
utils.InternalServerError(w, eventsError)
eventCancel()
close(eventChannel)
return
}

// If client disappears we need to stop listening for events
go func(done <-chan struct{}) {
<-done
eventCancel()
if _, ok := <-eventChannel; ok {
close(eventChannel)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/domain/infra/abi/containers.go
Expand Up @@ -741,7 +741,7 @@ func (ic *ContainerEngine) ContainerStart(ctx context.Context, namesOrIds []stri
if ecode, err := ctr.Wait(); err != nil {
if errors.Cause(err) == define.ErrNoSuchCtr {
// Check events
event, err := ic.Libpod.GetLastContainerEvent(ctr.ID(), events.Exited)
event, err := ic.Libpod.GetLastContainerEvent(ctx, ctr.ID(), events.Exited)
if err != nil {
logrus.Errorf("Cannot get exit code: %v", err)
exitCode = define.ExecErrorCodeNotFound
Expand Down Expand Up @@ -871,7 +871,7 @@ func (ic *ContainerEngine) ContainerRun(ctx context.Context, opts entities.Conta
if ecode, err := ctr.Wait(); err != nil {
if errors.Cause(err) == define.ErrNoSuchCtr {
// Check events
event, err := ic.Libpod.GetLastContainerEvent(ctr.ID(), events.Exited)
event, err := ic.Libpod.GetLastContainerEvent(ctx, ctr.ID(), events.Exited)
if err != nil {
logrus.Errorf("Cannot get exit code: %v", err)
report.ExitCode = define.ExecErrorCodeNotFound
Expand Down
2 changes: 1 addition & 1 deletion pkg/domain/infra/abi/events.go
Expand Up @@ -9,5 +9,5 @@ import (

func (ic *ContainerEngine) Events(ctx context.Context, opts entities.EventsOptions) error {
readOpts := events.ReadOptions{FromStart: opts.FromStart, Stream: opts.Stream, Filters: opts.Filter, EventChannel: opts.EventChan, Since: opts.Since, Until: opts.Until}
return ic.Libpod.Events(readOpts)
return ic.Libpod.Events(ctx, readOpts)
}
3 changes: 2 additions & 1 deletion pkg/varlinkapi/attach.go
Expand Up @@ -4,6 +4,7 @@ package varlinkapi

import (
"bufio"
"context"
"io"

"github.com/containers/libpod/libpod"
Expand Down Expand Up @@ -89,7 +90,7 @@ func (i *VarlinkAPI) Attach(call iopodman.VarlinkCall, name string, detachKeys s
if ecode, err := ctr.Wait(); err != nil {
if errors.Cause(err) == define.ErrNoSuchCtr {
// Check events
event, err := i.Runtime.GetLastContainerEvent(ctr.ID(), events.Exited)
event, err := i.Runtime.GetLastContainerEvent(context.Background(), ctr.ID(), events.Exited)
if err != nil {
logrus.Errorf("Cannot get exit code: %v", err)
exitCode = define.ExecErrorCodeNotFound
Expand Down
3 changes: 2 additions & 1 deletion pkg/varlinkapi/events.go
Expand Up @@ -3,6 +3,7 @@
package varlinkapi

import (
"context"
"time"

"github.com/containers/libpod/libpod/events"
Expand All @@ -27,7 +28,7 @@ func (i *VarlinkAPI) GetEvents(call iopodman.VarlinkCall, filter []string, since
eventChannel := make(chan *events.Event)
go func() {
readOpts := events.ReadOptions{FromStart: fromStart, Stream: stream, Filters: filter, EventChannel: eventChannel}
eventsError = i.Runtime.Events(readOpts)
eventsError = i.Runtime.Events(context.Background(), readOpts)
}()
if eventsError != nil {
return call.ReplyErrorOccurred(eventsError.Error())
Expand Down

0 comments on commit 37cc944

Please sign in to comment.