Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pkg/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,9 @@ var (
extendClientParam bool
grpcClientParam bool
profileFirstParam bool
machineParam string
rackParam string
siteParam string
skipMavenDepsParam bool
backupLogFilesParam bool
validPersistenceModes = []string{"on-demand", "active", "active-backup", "active-async"}
Expand Down Expand Up @@ -1450,6 +1453,9 @@ func init() {
createClusterCmd.Flags().Int32VarP(&jmxRemotePortParam, jmxPortArg, "J", 0, jmxPortMessage)
createClusterCmd.Flags().StringVarP(&jmxRemoteHostParam, jmxHostArg, "j", "", jmxHostMessage)
createClusterCmd.Flags().BoolVarP(&profileFirstParam, profileFirstArg, "F", false, profileFirstMessage)
createClusterCmd.Flags().StringVarP(&machineParam, machineArg, "", "", machineMessage)
createClusterCmd.Flags().StringVarP(&rackParam, rackArg, "", "", rackMessage)
createClusterCmd.Flags().StringVarP(&siteParam, siteArg, "", "", siteMessage)

stopClusterCmd.Flags().BoolVarP(&automaticallyConfirm, "yes", "y", false, confirmOptionMessage)

Expand All @@ -1465,6 +1471,9 @@ func init() {
startClusterCmd.Flags().StringVarP(&jmxRemoteHostParam, jmxHostArg, "j", "", jmxHostMessage)
startClusterCmd.Flags().BoolVarP(&profileFirstParam, profileFirstArg, "F", false, profileFirstMessage)
startClusterCmd.Flags().BoolVarP(&backupLogFilesParam, backupLogFilesArg, "B", false, backupLogFilesMessage)
startClusterCmd.Flags().StringVarP(&machineParam, machineArg, "", "", machineMessage)
startClusterCmd.Flags().StringVarP(&rackParam, rackArg, "", "", rackMessage)
startClusterCmd.Flags().StringVarP(&siteParam, siteArg, "", "", siteMessage)

startConsoleCmd.Flags().StringVarP(&heapMemoryParam, heapMemoryArg, "M", defaultHeap, heapMemoryMessage)
startConsoleCmd.Flags().Int32VarP(&logLevelParam, logLevelArg, "l", 5, logLevelMessage)
Expand Down Expand Up @@ -1493,6 +1502,9 @@ func init() {
scaleClusterCmd.Flags().StringVarP(&profileValueParam, profileArg, "P", "", profileMessage)
scaleClusterCmd.Flags().StringVarP(&serverStartClassParam, startClassArg, "S", "", startClassMessage)
scaleClusterCmd.Flags().BoolVarP(&backupLogFilesParam, backupLogFilesArg, "B", false, backupLogFilesMessage)
scaleClusterCmd.Flags().StringVarP(&machineParam, machineArg, "", "", machineMessage)
scaleClusterCmd.Flags().StringVarP(&rackParam, rackArg, "", "", rackMessage)
scaleClusterCmd.Flags().StringVarP(&siteParam, siteArg, "", "", siteMessage)
}

// sanitizeConnectionName sanitizes a cluster connection
Expand Down
12 changes: 12 additions & 0 deletions pkg/cmd/cluster_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,18 @@ func getCacheServerArgs(connection ClusterConnection, member string, httpPort in
}
}

if machineParam != "" {
baseArgs = append(baseArgs, fmt.Sprintf("-Dcoherence.machine=%s", machineParam))
}

if rackParam != "" {
baseArgs = append(baseArgs, fmt.Sprintf("-Dcoherence.rack=%s", rackParam))
}

if siteParam != "" {
baseArgs = append(baseArgs, fmt.Sprintf("-Dcoherence.site=%s", siteParam))
}

// if default heap is overridden, then use this
if heapMemoryParam != defaultHeap {
heap = heapMemoryParam
Expand Down
77 changes: 75 additions & 2 deletions pkg/cmd/formatting.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ const (
NameColumn = "NAME"
publisherColumn = "PUBLISHER"
receiverColumn = "RECEIVER"
machineColumn = "MACHINE"
rackColumn = "RACK"
siteColumn = "SITE"
avgSize = "AVG SIZE"
avgApply = "AVG APPLY"
avgBacklogDelay = "AVG BACKLOG DELAY"
Expand Down Expand Up @@ -579,6 +582,76 @@ func FormatTopicsSummary(topicDetails []config.TopicDetail) string {
return table.String()
}

// FormatPartitionOwnership returns the partition ownership in column formatted output.
func FormatPartitionOwnership(partitionDetails map[int]*config.PartitionOwnership) string {
var (
ownershipCount = len(partitionDetails)
keys = make([]int, 0)
header = []string{MemberColumn, "PRIMARIES", "BACKUPS", "PRIMARY PARTITIONS"}
)
if ownershipCount == 0 {
return ""
}

// get and sort the keys
for k := range partitionDetails {
keys = append(keys, k)
}
sort.Ints(keys)

// get the backup-count
backupCount := utils.GetBackupCount(partitionDetails)

if OutputFormat == constants.WIDE {
header = []string{MemberColumn, machineColumn, rackColumn, siteColumn, "PRIMARIES", "BACKUPS", "PRIMARY"}
}

// build the header for the backups
for i := 0; i < backupCount; i++ {
header = append(header, fmt.Sprintf("BACKUP %d", i+1))
}

table := newFormattedTable().WithAlignment(generateColumnFormats(backupCount)...).WithHeader(header...)

for j := 0; j < len(keys); j++ {
key := keys[j]
value := partitionDetails[key]

memberID := "Orphaned"
if value.MemberID != -1 {
memberID = fmt.Sprintf("%v", value.MemberID)
}

table.AddRow(memberID)

if OutputFormat == constants.WIDE {
table.AddColumnsToRow(value.Machine, value.Rack, value.Site)
}

table.AddColumnsToRow(formatSmallInteger(int32(value.PrimaryPartitions)),
formatSmallInteger(int32(value.BackupPartitions)))

// add primaries and backups
for i := 0; i <= backupCount; i++ {
table.AddColumnsToRow(utils.FormatPartitions(value.PartitionMap[i]))
}
}

return table.String()
}

func generateColumnFormats(count int) []string {
result := []string{R, R, R, L}
if OutputFormat == constants.WIDE {
result = []string{R, L, L, L, R, R, L}
}

for i := 0; i < count; i++ {
result = append(result, L)
}
return result
}

// FormatTopicsSubscribers returns the topics subscriber details in column formatted output
func FormatTopicsSubscribers(topicsSubscribers []config.TopicsSubscriberDetail) string {
var (
Expand Down Expand Up @@ -1388,7 +1461,7 @@ func FormatMembers(members []config.Member, verbose bool, storageMap map[int]boo
WithAlignment(finalAlignment...)

if OutputFormat == constants.WIDE {
table.AddHeaderColumns("MACHINE", "RACK", "SITE", publisherColumn, receiverColumn)
table.AddHeaderColumns(machineColumn, rackColumn, siteColumn, publisherColumn, receiverColumn)
table.AddFormattingFunction(9, networkStatsFormatter)
table.AddFormattingFunction(10, networkStatsFormatter)
}
Expand Down Expand Up @@ -1813,7 +1886,7 @@ func FormatMachines(machines []config.Machine) string {
return strings.Compare(machines[p].MachineName, machines[q].MachineName) < 0
})

table := newFormattedTable().WithHeader("MACHINE", "PROCESSORS", "LOAD", "TOTAL MEMORY", "FREE MEMORY",
table := newFormattedTable().WithHeader(machineColumn, "PROCESSORS", "LOAD", "TOTAL MEMORY", "FREE MEMORY",
"% FREE", "OS", "ARCH", "VERSION").WithAlignment(L, R, R, R, R, R, L, L, L)
table.AddFormattingFunction(5, machineMemoryFormatting)

Expand Down
56 changes: 56 additions & 0 deletions pkg/cmd/monitor_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ var validPanels = []panelImpl{
createContentPanel(8, "services", "Services", "show services", servicesContent, servicesPanelData),
createContentPanel(8, "service-members", "Service Members (%SERVICE)", "show service members", serviceMembersContent, servicesPanelData),
createContentPanel(8, "service-distributions", "Service Distributions (%SERVICE)", "show service distributions", serviceDistributionsContent, servicesPanelData),
createContentPanel(8, "service-ownership", "Service Ownership (%SERVICE)", "show service ownership", serviceOwnershipContent, servicesPanelData),
createContentPanel(8, "service-storage", "Service Storage", "show service storage", serviceStorageContent, servicesPanelData),
createContentPanel(8, "topic-members", "Topic Members (%SERVICE/%TOPIC)", "show topic members", topicMembersContent, topicsPanelData),
createContentPanel(8, "subscribers", "Topic Subscribers (%SERVICE/%TOPIC)", "show topic subscribers", topicSubscribersContent, topicsPanelData),
Expand Down Expand Up @@ -639,6 +640,61 @@ var serviceDistributionsContent = func(dataFetcher fetcher.Fetcher, _ clusterSum
return strings.Split(distributions.ScheduledDistributions, "\n"), nil
}

var serviceOwnershipContent = func(dataFetcher fetcher.Fetcher, _ clusterSummaryInfo) ([]string, error) {
var (
membersResult []byte
memberNodeID string
membersDetails = config.ServiceMemberDetails{}
)

if serviceName == "" {
return emptyStringArray, errSelectService
}

servicesResult, err := GetDistributedServices(dataFetcher)
if err != nil {
return emptyStringArray, err
}

if !utils.SliceContains(servicesResult, serviceName) {
return emptyStringArray, fmt.Errorf(unableToFindService, serviceName)
}

// find storage member node
membersResult, err = dataFetcher.GetServiceMembersDetailsJSON(serviceName)
if err != nil {
return emptyStringArray, err
}

if len(membersResult) != 0 {
err = json.Unmarshal(membersResult, &membersDetails)
if err != nil {
return emptyStringArray, utils.GetError("unable to unmarshall members ownership", err)
}

for _, v := range membersDetails.Services {
memberNodeID = v.NodeID
break
}

var ownershipData []byte

ownershipData, err = dataFetcher.GetServiceOwnershipJSON(serviceName, memberNodeID)
if err != nil {
return emptyStringArray, err
}

result, err := getOwnershipData(dataFetcher, ownershipData)
if err != nil {
return emptyStringArray, err
}

return strings.Split(FormatPartitionOwnership(result), "\n"), nil
}

return noContentArray, nil
}

var serviceStorageContent = func(dataFetcher fetcher.Fetcher, _ clusterSummaryInfo) ([]string, error) {
storageSummary, err := getServiceStorageDetails(dataFetcher)

Expand Down
7 changes: 7 additions & 0 deletions pkg/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ const (
healthPortMessage = "starting port for health"
jmxPortMessage = "remote JMX port for management member"
jmxHostMessage = "remote JMX RMI host for management member"
machineMessage = "the machine name to use"
rackMessage = "the rack name to use"
siteMessage = "the site name to use"
cacheConfigMessage = "cache configuration file"
operationalConfigMessage = "override override file"
cacheConfigArg = "cache-config"
Expand All @@ -94,6 +97,9 @@ const (
healthPortArg = "health-port"
jmxPortArg = "jmx-port"
jmxHostArg = "jmx-host"
machineArg = "machine"
rackArg = "rack"
siteArg = "site"
logLevelMessage = "coherence log level"
profileMessage = "profile to add to cluster startup command line"
backupLogFilesMessage = "backup old cache server log files"
Expand Down Expand Up @@ -507,6 +513,7 @@ func Initialize(command *cobra.Command) *cobra.Command {
getCmd.AddCommand(getProxyConnectionsCmd)
getCmd.AddCommand(getUseGradleCmd)
getCmd.AddCommand(getServiceStorageCmd)
getCmd.AddCommand(getServiceOwnershipCmd)
getCmd.AddCommand(getCacheStoresCmd)
getCmd.AddCommand(getColorCmd)
getCmd.AddCommand(getNetworkStatsCmd)
Expand Down
Loading