Skip to content

Commit

Permalink
fix log broadcaster bug
Browse files Browse the repository at this point in the history
  • Loading branch information
DuodenumL committed Sep 27, 2021
1 parent 6c086a8 commit 99fd8c3
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 27 deletions.
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.9.1
github.com/projecteru2/core v0.0.0-20210618045145-314d3a292929
github.com/projecteru2/libyavirt v0.0.0-20210915071951-6b784e72977b
github.com/projecteru2/libyavirt v0.0.0-20210924065830-1db08f427e87
github.com/prometheus/client_golang v1.11.0
github.com/shirou/gopsutil v3.20.11+incompatible
github.com/sirupsen/logrus v1.7.0
Expand All @@ -25,5 +25,3 @@ require (
gopkg.in/yaml.v2 v2.4.0
gotest.tools/v3 v3.0.3 // indirect
)

replace github.com/projecteru2/libyavirt v0.0.0-20210915071951-6b784e72977b => github.com/DuodenumL/libyavirt v0.0.0-20210920103912-eb23e2ebc0c2
10 changes: 2 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ github.com/CMGS/statsd v0.0.0-20160223095033-48c421b3c1ab h1:/Nl282MSyyUKtYA9gAU
github.com/CMGS/statsd v0.0.0-20160223095033-48c421b3c1ab/go.mod h1:GJO3SGuPXm9A2hpQVV7/wlPr8oP9xxQluI8y99gQu60=
github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53/go.mod h1:+3IMCy2vIlbG1XG/0ggNQv0SvxCAIpPM5b1nCz56Xno=
github.com/CloudyKit/jet/v3 v3.0.0/go.mod h1:HKQPgSJmdK8hdoAbKUUWajkHyHo4RaU5rMdUywE7VMo=
github.com/DuodenumL/libyavirt v0.0.0-20210916094922-4654b515f27e h1:JAAgD05awQUezAbPweS0Pqxru0K4ShXmmzHx3/DONp4=
github.com/DuodenumL/libyavirt v0.0.0-20210916094922-4654b515f27e/go.mod h1:9/SNmdphwl12ubwihkRa9YtOozM6liYLDxsricra1mY=
github.com/DuodenumL/libyavirt v0.0.0-20210916103342-42ef27937ebd h1:FLhHBlEH1ia6NMU/sW1m7KYLgpc9/4SF0hcPlEIbEQY=
github.com/DuodenumL/libyavirt v0.0.0-20210916103342-42ef27937ebd/go.mod h1:9/SNmdphwl12ubwihkRa9YtOozM6liYLDxsricra1mY=
github.com/DuodenumL/libyavirt v0.0.0-20210920103912-eb23e2ebc0c2 h1:O0RF0MccQgarcyTnkudd7+QK4uEaq6iwZEjn9s+dGgY=
github.com/DuodenumL/libyavirt v0.0.0-20210920103912-eb23e2ebc0c2/go.mod h1:FOc+hWBMLsMrmx5p3/moizKeSomedZPNwB6LhS+kEnE=
github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY=
github.com/Microsoft/go-winio v0.4.16-0.20201130162521-d1ffc52c7331 h1:3YnB7Hpmh1lPecPE8doMOtYCrMdrpedZOvxfuNES/Vk=
github.com/Microsoft/go-winio v0.4.16-0.20201130162521-d1ffc52c7331/go.mod h1:XB6nPKklQyQ7GC9LdcBEcBl8PF76WugXOPRXwdLnMv0=
Expand Down Expand Up @@ -433,6 +427,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr
github.com/projecteru2/core v0.0.0-20210618045145-314d3a292929 h1:utetQxBGS03TtC0vgkEHTYh5zXUoTRpWr0I3K8ObexI=
github.com/projecteru2/core v0.0.0-20210618045145-314d3a292929/go.mod h1:Cmom68fvQs8uAA9AEjSXu6sTegObAtgPKJMEWQUoQ+Y=
github.com/projecteru2/libyavirt v0.0.0-20210514093713-959b2e232319/go.mod h1:9/SNmdphwl12ubwihkRa9YtOozM6liYLDxsricra1mY=
github.com/projecteru2/libyavirt v0.0.0-20210924065830-1db08f427e87 h1:iascQhtPjHmEpAJ2Vb+/EoD0dZ+G08G1PdYXeyez1+M=
github.com/projecteru2/libyavirt v0.0.0-20210924065830-1db08f427e87/go.mod h1:FOc+hWBMLsMrmx5p3/moizKeSomedZPNwB6LhS+kEnE=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
Expand Down Expand Up @@ -849,7 +845,6 @@ google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.38.0 h1:/9BgsAsa5nWe26HqOlvlgJnqBuktYOLCgjCPqsa56W0=
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.40.0 h1:AGJ0Ih4mHjSeibYkFGh1dD9KJ/eOtZ93I6hoHhukQ5Q=
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
Expand All @@ -863,7 +858,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
Expand Down
10 changes: 6 additions & 4 deletions logs/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ func (w *Writer) keepalive(ctx context.Context) {
w.reconnect()
timer.Reset(KeepaliveInterval)
case <-ctx.Done():
// leave some time for the pending writing
time.Sleep(CloseWaitInterval)
if err := w.close(); err != nil {
log.Errorf("[keepalive] failed to close writer %s, err: %s", w.addr, err)
}
return
}
}
Expand Down Expand Up @@ -215,10 +220,7 @@ func (w *Writer) Write(logline *types.Log) error {
return err
}

// Close .
func (w *Writer) Close() error {
// leave some time for the pending writing
time.Sleep(CloseWaitInterval)
func (w *Writer) close() error {
var err error
w.withLock(func() {
if w.enc != nil {
Expand Down
3 changes: 0 additions & 3 deletions manager/workload/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ func (m *Manager) attach(ctx context.Context, ID string) {
log.Errorf("[attach] Create log forward %s failed %s", transfer, err)
return
}
defer func() {
go writer.Close()
}()

// get app info
workloadName, err := m.runtimeClient.GetWorkloadName(ctx, ID)
Expand Down
41 changes: 34 additions & 7 deletions manager/workload/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,21 @@ import (
)

type subscriber struct {
ctx context.Context
cancel context.CancelFunc
buf *bufio.ReadWriter
errChan chan error
}

func (s *subscriber) isDone() bool {
select {
case <-s.ctx.Done():
return true
default:
return false
}
}

// logBroadcaster receives log and broadcasts to subscribers
type logBroadcaster struct {
sync.RWMutex
Expand All @@ -42,30 +53,38 @@ func (l *logBroadcaster) deleteSubscribers(app string) {
}

// subscribe subscribes logs of the specific app.
func (l *logBroadcaster) subscribe(app string, buf *bufio.ReadWriter) (string, <-chan error) {
func (l *logBroadcaster) subscribe(ctx context.Context, app string, buf *bufio.ReadWriter) (string, chan error, func()) {
l.Lock()
defer l.Unlock()

subscribers := l.getSubscribers(app)
errChan := make(chan error)
ID := coreutils.RandomString(8)
ctx, cancel := context.WithCancel(ctx)
errChan := make(chan error)

subscribers[ID] = &subscriber{
ctx: ctx,
cancel: cancel,
buf: buf,
errChan: errChan,
}

logrus.Infof("%s %s log subscribed", app, ID)
return ID, errChan
return ID, errChan, func() {
cancel()
go l.unsubscribe(app, ID)
}
}

func (l *logBroadcaster) unsubscribe(app string, ID string) {
l.Lock()
defer l.Unlock()

subscribers := l.getSubscribers(app)

subscriber, ok := subscribers[ID]
if ok {
defer close(subscriber.errChan)
close(subscriber.errChan)
}

delete(subscribers, ID)
Expand Down Expand Up @@ -96,14 +115,22 @@ func (l *logBroadcaster) broadcast(log *types.Log) {
// use wait group to make sure the logs are ordered
wg := &sync.WaitGroup{}
wg.Add(len(subscribers))
for _, sub := range subscribers {
go func(sub *subscriber) {
for ID, sub := range subscribers {
go func(ID string, sub *subscriber) {
defer wg.Done()
if sub.isDone() {
return
}

if _, err := sub.buf.Write([]byte(line)); err != nil {
logrus.Debugf("[broadcast] failed to write into %v, err: %v", ID, err)
sub.cancel()
sub.errChan <- err
return
}

sub.buf.Flush()
}(sub)
}(ID, sub)
}
wg.Wait()
}
Expand Down
5 changes: 3 additions & 2 deletions manager/workload/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,9 @@ func (m *Manager) Run(ctx context.Context) error {

// PullLog pull logs for specific app
func (m *Manager) PullLog(ctx context.Context, app string, buf *bufio.ReadWriter) {
ID, errChan := m.logBroadcaster.subscribe(app, buf)
defer m.logBroadcaster.unsubscribe(app, ID)
ID, errChan, unsubscribe := m.logBroadcaster.subscribe(ctx, app, buf)
defer unsubscribe()

for {
select {
case <-ctx.Done():
Expand Down

0 comments on commit 99fd8c3

Please sign in to comment.