forked from cloudfoundry/cli
-
Notifications
You must be signed in to change notification settings - Fork 0
/
logging.go
117 lines (93 loc) · 2.58 KB
/
logging.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package v3action
import (
"time"
noaaErrors "github.com/cloudfoundry/noaa/errors"
"github.com/cloudfoundry/sonde-go/events"
)
const StagingLog = "STG"
type NOAATimeoutError struct{}
func (_ NOAATimeoutError) Error() string {
return "Timeout trying to connect to NOAA"
}
type LogMessage struct {
message string
messageType events.LogMessage_MessageType
timestamp time.Time
sourceType string
sourceInstance string
}
func (log LogMessage) Message() string {
return log.message
}
func (log LogMessage) Type() string {
if log.messageType == events.LogMessage_OUT {
return "OUT"
}
return "ERR"
}
func (log LogMessage) Staging() bool {
return log.sourceType == StagingLog
}
func (log LogMessage) Timestamp() time.Time {
return log.timestamp
}
func (log LogMessage) SourceType() string {
return log.sourceType
}
func (log LogMessage) SourceInstance() string {
return log.sourceInstance
}
func NewLogMessage(message string, messageType int, timestamp time.Time, sourceType string, sourceInstance string) *LogMessage {
return &LogMessage{
message: message,
messageType: events.LogMessage_MessageType(messageType),
timestamp: timestamp,
sourceType: sourceType,
sourceInstance: sourceInstance,
}
}
func (_ Actor) GetStreamingLogs(appGUID string, client NOAAClient) (<-chan *LogMessage, <-chan error) {
// Do not pass in token because client should have a TokenRefresher set
eventStream, errStream := client.TailingLogs(appGUID, "")
messages := make(chan *LogMessage)
errs := make(chan error)
go func() {
defer close(messages)
defer close(errs)
dance:
for {
select {
case event, ok := <-eventStream:
if !ok {
break dance
}
messages <- &LogMessage{
message: string(event.GetMessage()),
messageType: event.GetMessageType(),
timestamp: time.Unix(0, event.GetTimestamp()),
sourceInstance: event.GetSourceInstance(),
sourceType: event.GetSourceType(),
}
case err, ok := <-errStream:
if !ok {
break dance
}
if _, ok := err.(noaaErrors.RetryError); ok {
break
}
if err != nil {
errs <- err
}
}
}
}()
return messages, errs
}
func (actor Actor) GetStreamingLogsForApplicationByNameAndSpace(appName string, spaceGUID string, client NOAAClient) (<-chan *LogMessage, <-chan error, Warnings, error) {
app, allWarnings, err := actor.GetApplicationByNameAndSpace(appName, spaceGUID)
if err != nil {
return nil, nil, allWarnings, err
}
messages, logErrs := actor.GetStreamingLogs(app.GUID, client)
return messages, logErrs, allWarnings, err
}