/
persistence.go
176 lines (153 loc) · 4.89 KB
/
persistence.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
/*
* Copyright (c) 2021, 2024 Oracle and/or its affiliates.
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
*/
package cmd
import (
"encoding/json"
"github.com/oracle/coherence-cli/pkg/config"
"github.com/oracle/coherence-cli/pkg/constants"
"github.com/oracle/coherence-cli/pkg/fetcher"
"github.com/oracle/coherence-cli/pkg/utils"
"github.com/spf13/cobra"
"strings"
"sync"
"time"
)
// getPersistenceCmd represents the get persistence command.
var getPersistenceCmd = &cobra.Command{
Use: "persistence",
Short: "display persistence details for a cluster",
Long: `The 'get persistence' command displays persistence information for a cluster.`,
Args: cobra.ExactArgs(0),
RunE: func(cmd *cobra.Command, args []string) error {
var (
err error
dataFetcher fetcher.Fetcher
connection string
)
connection, dataFetcher, err = GetConnectionAndDataFetcher()
if err != nil {
return err
}
for {
var servicesSummary = config.ServicesSummaries{}
servicesResult, err := dataFetcher.GetServiceDetailsJSON()
if err != nil {
return err
}
if strings.Contains(OutputFormat, constants.JSONPATH) {
result, err := utils.GetJSONPathResults(servicesResult, OutputFormat)
if err != nil {
return err
}
cmd.Println(result)
} else if OutputFormat == constants.JSON {
cmd.Println(string(servicesResult))
} else {
err = json.Unmarshal(servicesResult, &servicesSummary)
if err != nil {
return utils.GetError("unable to unmarshall service result", err)
}
deDuplicatedServices := DeduplicatePersistenceServices(servicesSummary)
err = processPersistenceServices(deDuplicatedServices, dataFetcher)
if err != nil {
return err
}
printWatchHeader(cmd)
cmd.Println(FormatCurrentCluster(connection))
cmd.Println(FormatPersistenceServices(deDuplicatedServices, true))
}
// check to see if we should exit if we are not watching
if !isWatchEnabled() {
break
}
// we are watching so sleep and then repeat until CTRL-C
time.Sleep(time.Duration(watchDelay) * time.Second)
}
return nil
},
}
func processPersistenceServices(deDuplicatedServices []config.ServiceSummary, dataFetcher fetcher.Fetcher) error {
var (
wg sync.WaitGroup
errorSink = createErrorSink()
m = sync.RWMutex{}
)
wg.Add(len(deDuplicatedServices))
// get the persistence coordinator details for each service
for i, value := range deDuplicatedServices {
go func(service string, index int) {
defer wg.Done()
var (
data []byte
err1 error
coordinator = config.PersistenceCoordinator{}
)
data, err1 = dataFetcher.GetPersistenceCoordinator(service)
if err1 != nil {
errorSink.AppendError(err1)
return
}
if len(data) == 0 {
return
}
err1 = json.Unmarshal(data, &coordinator)
if err1 != nil {
errorSink.AppendError(utils.GetError("unable to unmarshall persistence coordinator", err1))
return
}
// protect the slice for update
m.Lock()
defer m.Unlock()
deDuplicatedServices[index].Idle = coordinator.Idle
deDuplicatedServices[index].Snapshots = coordinator.Snapshots
deDuplicatedServices[index].OperationStatus = coordinator.OperationStatus
}(value.ServiceName, i)
}
// wait for the results
wg.Wait()
errorList := errorSink.GetErrors()
if len(errorList) == 0 {
return nil
}
return utils.GetErrors(errorList)
}
// DeduplicatePersistenceServices removes duplicated persistence details.
func DeduplicatePersistenceServices(servicesSummary config.ServicesSummaries) []config.ServiceSummary {
// the current results include 1 entry for each service and member, so we need to remove duplicates
var finalServices = make([]config.ServiceSummary, 0)
for _, value := range servicesSummary.Services {
// only check distributed
if !utils.IsDistributedCache(value.ServiceType) || !value.StorageEnabled {
continue
}
// check to see if this service and member already exists in the finalServices
if len(finalServices) == 0 {
// no entries so add it anyway
finalServices = append(finalServices, value)
} else {
var foundIndex = -1
for i, v := range finalServices {
if v.ServiceName == value.ServiceName {
foundIndex = i
break
}
}
if foundIndex >= 0 {
// update the existing service
finalServices[foundIndex].PersistenceActiveSpaceUsed += value.PersistenceActiveSpaceUsed
finalServices[foundIndex].PersistenceBackupSpaceUsed += value.PersistenceBackupSpaceUsed
finalServices[foundIndex].PersistenceLatencyAverageTotal += value.PersistenceLatencyAverage
if value.PersistenceLatencyMax > finalServices[foundIndex].PersistenceLatencyMax {
finalServices[foundIndex].PersistenceLatencyMax = value.PersistenceLatencyMax
}
} else {
// new service
finalServices = append(finalServices, value)
}
}
}
return finalServices
}