forked from cloudfoundry/cli
-
Notifications
You must be signed in to change notification settings - Fork 0
/
logs.go
129 lines (105 loc) · 3.21 KB
/
logs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package api
import (
"crypto/tls"
"errors"
. "github.com/cloudfoundry/cli/cf/i18n"
"time"
"github.com/cloudfoundry/cli/cf/api/authentication"
"github.com/cloudfoundry/cli/cf/configuration"
consumer "github.com/cloudfoundry/loggregator_consumer"
"github.com/cloudfoundry/loggregatorlib/logmessage"
)
type LogsRepository interface {
RecentLogsFor(appGuid string) ([]*logmessage.LogMessage, error)
TailLogsFor(appGuid string, onConnect func(), onMessage func(*logmessage.LogMessage)) error
Close()
}
type LoggregatorLogsRepository struct {
consumer consumer.LoggregatorConsumer
config configuration.Reader
TrustedCerts []tls.Certificate
tokenRefresher authentication.TokenRefresher
messageQueue *SortedMessageQueue
onMessage func(*logmessage.LogMessage)
}
var BufferTime time.Duration = 5 * time.Second
func NewLoggregatorLogsRepository(config configuration.Reader, consumer consumer.LoggregatorConsumer, refresher authentication.TokenRefresher) LogsRepository {
return &LoggregatorLogsRepository{
config: config,
consumer: consumer,
tokenRefresher: refresher,
messageQueue: NewSortedMessageQueue(BufferTime, time.Now),
}
}
func (repo *LoggregatorLogsRepository) Close() {
repo.consumer.Close()
repo.flushMessageQueue()
}
func (repo *LoggregatorLogsRepository) RecentLogsFor(appGuid string) ([]*logmessage.LogMessage, error) {
messages, err := repo.consumer.Recent(appGuid, repo.config.AccessToken())
switch err.(type) {
case nil: // do nothing
case *consumer.UnauthorizedError:
repo.tokenRefresher.RefreshAuthToken()
messages, err = repo.consumer.Recent(appGuid, repo.config.AccessToken())
default:
return messages, err
}
consumer.SortRecent(messages)
return messages, err
}
func (repo *LoggregatorLogsRepository) TailLogsFor(appGuid string, onConnect func(), onMessage func(*logmessage.LogMessage)) error {
repo.onMessage = onMessage
endpoint := repo.config.LoggregatorEndpoint()
if endpoint == "" {
return errors.New(T("Loggregator endpoint missing from config file"))
}
repo.consumer.SetOnConnectCallback(onConnect)
logChan, err := repo.consumer.Tail(appGuid, repo.config.AccessToken())
switch err.(type) {
case nil: // do nothing
case *consumer.UnauthorizedError:
repo.tokenRefresher.RefreshAuthToken()
logChan, err = repo.consumer.Tail(appGuid, repo.config.AccessToken())
default:
return err
}
if err != nil {
return err
}
repo.bufferMessages(logChan, onMessage)
return nil
}
func (repo *LoggregatorLogsRepository) bufferMessages(logChan <-chan *logmessage.LogMessage, onMessage func(*logmessage.LogMessage)) {
for {
sendMessages(repo.messageQueue, onMessage)
select {
case msg, ok := <-logChan:
if !ok {
return
}
repo.messageQueue.PushMessage(msg)
default:
time.Sleep(1 * time.Millisecond)
}
}
}
func (repo *LoggregatorLogsRepository) flushMessageQueue() {
if repo.onMessage == nil {
return
}
for {
message := repo.messageQueue.PopMessage()
if message == nil {
break
}
repo.onMessage(message)
}
repo.onMessage = nil
}
func sendMessages(queue *SortedMessageQueue, onMessage func(*logmessage.LogMessage)) {
for queue.NextTimestamp() < time.Now().UnixNano() {
msg := queue.PopMessage()
onMessage(msg)
}
}