-
Notifications
You must be signed in to change notification settings - Fork 0
/
log_repository.go
131 lines (106 loc) · 3.36 KB
/
log_repository.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package api
import (
"errors"
"time"
. "github.com/cloudfoundry/cli/cf/i18n"
"github.com/cloudfoundry/cli/cf/api/authentication"
"github.com/cloudfoundry/cli/cf/configuration/core_config"
consumer "github.com/cloudfoundry/loggregator_consumer"
"github.com/cloudfoundry/loggregatorlib/logmessage"
noaa_errors "github.com/cloudfoundry/noaa/errors"
)
//go:generate counterfeiter -o fakes/fake_logs_repository.go . LogsRepository
type LogsRepository interface {
RecentLogsFor(appGuid string) ([]*logmessage.LogMessage, error)
TailLogsFor(appGuid string, onConnect func(), onMessage func(*logmessage.LogMessage)) error
Close()
}
type LoggregatorLogsRepository struct {
consumer consumer.LoggregatorConsumer
config core_config.Reader
// TrustedCerts []tls.Certificate
tokenRefresher authentication.TokenRefresher
messageQueue *Loggregator_SortedMessageQueue
onMessage func(*logmessage.LogMessage)
}
var BufferTime time.Duration = 5 * time.Second
func NewLoggregatorLogsRepository(config core_config.Reader, consumer consumer.LoggregatorConsumer, refresher authentication.TokenRefresher) LogsRepository {
return &LoggregatorLogsRepository{
config: config,
consumer: consumer,
tokenRefresher: refresher,
messageQueue: NewLoggregator_SortedMessageQueue(BufferTime, time.Now),
}
}
func (repo *LoggregatorLogsRepository) Close() {
repo.consumer.Close()
repo.flushMessageQueue()
}
func (repo *LoggregatorLogsRepository) RecentLogsFor(appGuid string) ([]*logmessage.LogMessage, error) {
messages, err := repo.consumer.Recent(appGuid, repo.config.AccessToken())
switch err.(type) {
case nil: // do nothing
case *noaa_errors.UnauthorizedError:
repo.tokenRefresher.RefreshAuthToken()
messages, err = repo.consumer.Recent(appGuid, repo.config.AccessToken())
default:
return messages, err
}
consumer.SortRecent(messages)
return messages, err
}
func (repo *LoggregatorLogsRepository) TailLogsFor(appGuid string, onConnect func(), onMessage func(*logmessage.LogMessage)) error {
repo.onMessage = onMessage
endpoint := repo.config.LoggregatorEndpoint()
if endpoint == "" {
return errors.New(T("Loggregator endpoint missing from config file"))
}
repo.consumer.SetOnConnectCallback(onConnect)
logChan, err := repo.consumer.Tail(appGuid, repo.config.AccessToken())
switch err.(type) {
case nil: // do nothing
case *noaa_errors.UnauthorizedError:
repo.tokenRefresher.RefreshAuthToken()
logChan, err = repo.consumer.Tail(appGuid, repo.config.AccessToken())
default:
return err
}
if err != nil {
return err
}
repo.bufferMessages(logChan, onMessage)
return nil
}
func (repo *LoggregatorLogsRepository) bufferMessages(logChan <-chan *logmessage.LogMessage, onMessage func(*logmessage.LogMessage)) {
for {
sendMessages(repo.messageQueue, onMessage)
select {
case msg, ok := <-logChan:
if !ok {
return
}
repo.messageQueue.PushMessage(msg)
default:
time.Sleep(1 * time.Millisecond)
}
}
}
func (repo *LoggregatorLogsRepository) flushMessageQueue() {
if repo.onMessage == nil {
return
}
for {
message := repo.messageQueue.PopMessage()
if message == nil {
break
}
repo.onMessage(message)
}
repo.onMessage = nil
}
func sendMessages(queue *Loggregator_SortedMessageQueue, onMessage func(*logmessage.LogMessage)) {
for queue.NextTimestamp() < time.Now().UnixNano() {
msg := queue.PopMessage()
onMessage(msg)
}
}