-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.go
142 lines (114 loc) · 3.9 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package main
import (
"context"
"github.com/rfizzle/collector-helpers/outputs"
"github.com/rfizzle/collector-helpers/state"
"github.com/rfizzle/gsuite-collector/client"
"github.com/spf13/viper"
adminreports "google.golang.org/api/admin/reports/v1"
"google.golang.org/api/option"
"log"
"os"
"time"
)
func main() {
// Setup variables
var maxMessages = int64(5000)
// Setup Parameters via CLI or ENV
if err := setupCliFlags(); err != nil {
log.Fatalf("initialization failed: %v", err.Error())
}
// Setup log writer
tmpWriter, err := outputs.NewTmpWriter()
if err != nil {
log.Fatalf("%v\n", err.Error())
}
// Setup the channels for handling async messages
chnMessages := make(chan string, maxMessages)
// Setup the Go Routine
pollTime := viper.GetInt("schedule")
// Start Poll
go pollEvery(pollTime, chnMessages, tmpWriter)
// Handle messages in the channel (this will keep the process running indefinitely)
for message := range chnMessages {
handleMessage(message, tmpWriter)
}
}
func pollEvery(seconds int, resultsChannel chan<- string, tmpWriter *outputs.TmpWriter) {
var currentState *state.State
var err error
// Setup State
if state.Exists(viper.GetString("state-path")) {
currentState, err = state.Restore(viper.GetString("state-path"))
if err != nil {
log.Fatalf("Error getting state: %v\n", err.Error())
}
} else {
currentState = state.New()
}
for {
log.Println("Getting data...")
// Get events
eventCount, lastPollTime := getEvents(currentState.LastPollTimestamp, resultsChannel)
// Copy tmp file to correct outputs
if eventCount > 0 {
// Wait until the results channel has no more messages 0
for len(resultsChannel) != 0 {
<-time.After(time.Duration(1) * time.Second)
}
// Close and rotate file
_ = tmpWriter.Rotate()
if err := outputs.WriteToOutputs(tmpWriter.LastFilePath, lastPollTime.Format(time.RFC3339)); err != nil {
log.Fatalf("Unable to write to output: %v", err)
}
// Remove temp file now
err := os.Remove(tmpWriter.LastFilePath)
if err != nil {
log.Fatalf("Unable to remove tmp file: %v", err)
}
}
// Let know that event has been processes
log.Printf("%v events processed...\n", eventCount)
// Update state
currentState.LastPollTimestamp = lastPollTime.Format(time.RFC3339)
state.Save(currentState, viper.GetString("state-path"))
// Wait for x seconds until next poll
<-time.After(time.Duration(seconds) * time.Second)
}
}
func getEvents(timestamp string, resultChannel chan<- string) (int, time.Time) {
// Get current time
now := time.Now()
// Build an HTTP client with JWT header
googleClient, err := client.BuildClient(viper.GetString("gsuite-credentials"), viper.GetString("impersonated-user"))
if err != nil {
log.Fatalf("Unable to build client: %v", err)
}
// Create a new service client with the built HTTP client
srv, err := adminreports.NewService(context.Background(), option.WithHTTPClient(googleClient))
if err != nil {
log.Fatalf("Unable to retrieve reports client %v", err)
}
// Define static event types
var eventTypes = []string{"admin", "calendar", "drive", "login", "mobile", "token", "groups", "saml", "chat", "gplus", "rules", "jamboard", "meet", "user_accounts", "access_transparency", "groups_enterprise", "gcp"}
// Initialize data array
dataCount := 0
// Loop through event types
for _, eventType := range eventTypes {
if viper.GetBool("verbose") {
log.Printf("Getting event type %s\n", eventType)
}
resultSize, err := client.ActivitiesList(srv, eventType, timestamp, now.Format(time.RFC3339), resultChannel)
if err != nil {
log.Fatalf("Unable to retrieve activities list for %s: %v", eventType, err)
}
dataCount += resultSize
}
return dataCount, now
}
// Handle message in a channel
func handleMessage(message string, tmpWriter *outputs.TmpWriter) {
if err := tmpWriter.WriteLog(message); err != nil {
log.Fatalf("Unable to write to temp file: %v", err)
}
}