Skip to content

Commit

Permalink
Merge pull request #1469 from memphisdev/RND-123-dls-indication-on-th…
Browse files Browse the repository at this point in the history
…e-station-overview-endpoint

RND-123-dls-indication-on-the-station-overview-endpoint
  • Loading branch information
shohamroditimemphis committed Dec 3, 2023
2 parents 5482b84 + ff53821 commit 4b4626b
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 0 deletions.
31 changes: 31 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,37 @@ func GetStationByName(name string, tenantName string) (bool, models.Station, err
return true, stations[0], nil
}

func GetStationsByDlsStationName(name string, tenantName string) ([]models.Station, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return []models.Station{}, err
}
defer conn.Release()
query := `SELECT * FROM stations WHERE dls_station = $1 AND (is_deleted = false) AND tenant_name = $2`
stmt, err := conn.Conn().Prepare(ctx, "get_stations_act_as_dls_station_by_name", query)
if err != nil {
return []models.Station{}, err
}
if tenantName != conf.GlobalAccount {
tenantName = strings.ToLower(tenantName)
}
rows, err := conn.Conn().Query(ctx, stmt.Name, name, tenantName)
if err != nil {
return []models.Station{}, err
}
defer rows.Close()
stations, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.Station])
if err != nil {
return []models.Station{}, err
}
if len(stations) == 0 {
return []models.Station{}, nil
}
return stations, nil
}

func GetStationById(stationId int, tenantName string) (bool, models.Station, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
Expand Down
16 changes: 16 additions & 0 deletions server/memphis_handlers_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,19 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
return
}

stationsActAsDlsStation, err := db.GetStationsByDlsStationName(stationName.Ext(), user.TenantName)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData at GetStationsByDlsStationName: At station %v: %v", user.TenantName, user.Username, body.StationName, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}

usedAsDlsStations := make([]string, 0)
if len(stationsActAsDlsStation) > 0 {
for _, dlsStation := range stationsActAsDlsStation {
usedAsDlsStations = append(usedAsDlsStations, dlsStation.Name)
}
}
var functionsEnabled bool
if station.Version >= 2 {
functionsEnabled = true
Expand Down Expand Up @@ -736,6 +749,7 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
"resend_disabled": station.ResendDisabled,
"functions_enabled": functionsEnabled,
"max_amount_of_allowed_producers": usageLimit,
"act_as_dls_station_in_stations": usedAsDlsStations,
}
} else {
var emptyResponse struct{}
Expand Down Expand Up @@ -768,6 +782,7 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
"resend_disabled": station.ResendDisabled,
"functions_enabled": functionsEnabled,
"max_amount_of_allowed_producers": usageLimit,
"act_as_dls_station_in_stations": usedAsDlsStations,
}
} else {
response = gin.H{
Expand Down Expand Up @@ -797,6 +812,7 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
"resend_disabled": station.ResendDisabled,
"functions_enabled": functionsEnabled,
"max_amount_of_allowed_producers": usageLimit,
"act_as_dls_station_in_stations": usedAsDlsStations,
}
}
}
Expand Down
15 changes: 15 additions & 0 deletions server/memphis_handlers_ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,18 @@ func memphisWSGetStationOverviewData(s *Server, h *Handlers, stationName string,
return map[string]any{}, err
}

stationsActAsDlsStation, err := db.GetStationsByDlsStationName(sn.Ext(), station.TenantName)
if err != nil {
return map[string]any{}, err
}

usedAsDlsStations := make([]string, 0)
if len(stationsActAsDlsStation) > 0 {
for _, dlsStation := range stationsActAsDlsStation {
usedAsDlsStations = append(usedAsDlsStations, dlsStation.Name)
}
}

if err == ErrNoSchema { // non native stations will always reach this point
if !station.IsNative {
cp, dp, cc, dc := getFakeProdsAndConsForPreview()
Expand Down Expand Up @@ -416,6 +428,7 @@ func memphisWSGetStationOverviewData(s *Server, h *Handlers, stationName string,
"resend_disabled": station.ResendDisabled,
"functions_enabled": functionsEnabled,
"max_amount_of_allowed_producers": usageLimit,
"act_as_dls_station_in_stations": usedAsDlsStations,
}
} else {
response = map[string]any{
Expand Down Expand Up @@ -445,6 +458,7 @@ func memphisWSGetStationOverviewData(s *Server, h *Handlers, stationName string,
"resend_disabled": station.ResendDisabled,
"functions_enabled": functionsEnabled,
"max_amount_of_allowed_producers": usageLimit,
"act_as_dls_station_in_stations": usedAsDlsStations,
}
}

Expand Down Expand Up @@ -490,6 +504,7 @@ func memphisWSGetStationOverviewData(s *Server, h *Handlers, stationName string,
"resend_disabled": station.ResendDisabled,
"functions_enabled": functionsEnabled,
"max_amount_of_allowed_producers": usageLimit,
"act_as_dls_station_in_stations": usedAsDlsStations,
}

return response, nil
Expand Down

0 comments on commit 4b4626b

Please sign in to comment.