Skip to content

Commit

Permalink
Merge pull request #625 from tranchitella/men-6183
Browse files Browse the repository at this point in the history
feat: CLI command to reindex to the reporting service all the devices
  • Loading branch information
tranchitella committed Jan 30, 2023
2 parents 4aaf0e5 + 6882574 commit 507b24d
Show file tree
Hide file tree
Showing 6 changed files with 546 additions and 45 deletions.
53 changes: 52 additions & 1 deletion client/orchestrator/client_orchestrator.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 Northern.tech AS
// Copyright 2023 Northern.tech AS
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,6 +40,7 @@ const (
HealthURI = "/api/v1/health"
DeviceLimitWarningURI = "/api/v1/workflow/device_limit_email"
ReindexReportingURI = "/api/v1/workflow/reindex_reporting"
ReindexReportingBatchURI = "/api/v1/workflow/reindex_reporting/batch"
// default request timeout, 10s?
defaultReqTimeout = time.Duration(10) * time.Second
)
Expand All @@ -53,6 +54,7 @@ type Config struct {
}

// ClientRunner is an interface of orchestrator client
//
//go:generate ../../utils/mockgen.sh
type ClientRunner interface {
CheckHealth(ctx context.Context) error
Expand All @@ -62,6 +64,7 @@ type ClientRunner interface {
SubmitDeviceLimitWarning(ctx context.Context, devWarn DeviceLimitWarning) error
SubmitUpdateDeviceInventoryJob(ctx context.Context, req UpdateDeviceInventoryReq) error
SubmitReindexReporting(c context.Context, device string) error
SubmitReindexReportingBatch(c context.Context, devices []string) error
}

// Client is an opaque implementation of orchestrator client. Implements
Expand Down Expand Up @@ -405,3 +408,51 @@ func (co *Client) SubmitReindexReporting(ctx context.Context, device string) err
rsp.Status,
)
}

func (co *Client) SubmitReindexReportingBatch(ctx context.Context, devices []string) error {
ctx, cancel := context.WithTimeout(ctx, co.conf.Timeout)
defer cancel()

tenantID := ""
if id := identity.FromContext(ctx); id != nil {
tenantID = id.Tenant
}
reqID := requestid.FromContext(ctx)
wflows := make([]ReindexReportingWorkflow, len(devices))
for i, device := range devices {
wflows[i] = ReindexReportingWorkflow{
RequestID: reqID,
TenantID: tenantID,
DeviceID: device,
Service: ServiceDeviceauth,
}
}
payload, _ := json.Marshal(wflows)
req, err := http.NewRequestWithContext(ctx,
"POST",
utils.JoinURL(co.conf.OrchestratorAddr, ReindexReportingBatchURI),
bytes.NewReader(payload),
)
if err != nil {
return errors.Wrap(err, "workflows: error preparing HTTP request")
}

req.Header.Set("Content-Type", "application/json")

rsp, err := co.http.Do(req)
if err != nil {
return errors.Wrap(err, "workflows: failed to submit reindex job")
}
defer rsp.Body.Close()

if rsp.StatusCode < 300 {
return nil
} else if rsp.StatusCode == http.StatusNotFound {
return errors.New(`workflows: workflow "reindex_reporting" not defined`)
}

return errors.Errorf(
"workflows: unexpected HTTP status from workflows service: %s",
rsp.Status,
)
}
135 changes: 125 additions & 10 deletions client/orchestrator/client_orchestrator_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
// Copyright 2021 Northern.tech AS
// Copyright 2023 Northern.tech AS
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package orchestrator

import (
Expand Down Expand Up @@ -496,3 +496,118 @@ func TestReindexReporting(t *testing.T) {
})
}
}

func mockServerReindexBatch(t *testing.T, tenant string, devices []string, reqid string, code int) (*httptest.Server, error) {
h := func(w http.ResponseWriter, r *http.Request) {
if code != http.StatusOK {
w.WriteHeader(code)
return
}
defer r.Body.Close()

request := make([]ReindexReportingWorkflow, 100)

decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&request)
assert.NoError(t, err)

assert.Len(t, request, len(devices))
for i, device := range devices {
assert.Equal(t, reqid, request[i].RequestID)
assert.Equal(t, tenant, request[i].TenantID)
assert.Equal(t, device, request[i].DeviceID)
assert.Equal(t, ServiceDeviceauth, request[i].Service)
}

assert.Equal(t, "application/json", r.Header.Get("Content-Type"))
w.WriteHeader(http.StatusOK)
}

srv := httptest.NewServer(http.HandlerFunc(h))
return srv, nil
}

func TestReindexReportingBatch(t *testing.T) {
t.Parallel()

testCases := []struct {
name string

tenant string
devices []string
reqid string

url string
code int

err error
}{
{
name: "ok",
tenant: "tenant1",
devices: []string{"device1", "device2"},
reqid: "reqid1",

code: http.StatusOK,
},
{
name: "error, connection refused",
tenant: "tenant2",
devices: []string{"device3"},
reqid: "reqid2",

url: "http://127.0.0.1:12345",
err: errors.New(`workflows: failed to submit reindex job: Post "http://127.0.0.1:12345/api/v1/workflow/reindex_reporting/batch": dial tcp 127.0.0.1:12345: connect: connection refused`),
},
{
name: "error, 404",
tenant: "tenant2",
devices: []string{"device3"},
reqid: "reqid2",

code: http.StatusNotFound,
err: errors.New(`workflows: workflow "reindex_reporting" not defined`),
},
{
name: "error, 500",
tenant: "tenant2",
devices: []string{"device3"},
reqid: "reqid2",

code: http.StatusInternalServerError,
err: errors.New(`workflows: unexpected HTTP status from workflows service: 500 Internal Server Error`),
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
srv, err := mockServerReindexBatch(t, tc.tenant, tc.devices, tc.reqid, tc.code)
assert.NoError(t, err)

defer srv.Close()

ctx := context.Background()
ctx = requestid.WithContext(ctx, tc.reqid)
ctx = identity.WithContext(ctx,
&identity.Identity{
Tenant: tc.tenant,
})

url := tc.url
if url == "" {
url = srv.URL
}
client := NewClient(Config{
OrchestratorAddr: url,
})

err = client.SubmitReindexReportingBatch(ctx, tc.devices)
if tc.err != nil {
assert.EqualError(t, err, tc.err.Error())
} else {
assert.NoError(t, err)
}
})
}
}
16 changes: 15 additions & 1 deletion client/orchestrator/mocks/ClientRunner.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

114 changes: 102 additions & 12 deletions cmd/commands.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
// Copyright 2021 Northern.tech AS
// Copyright 2023 Northern.tech AS
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cmd

import (
Expand Down Expand Up @@ -254,6 +254,29 @@ func PropagateIdDataInventory(db store.DataStore, c cinv.Client, tenant string,
return errReturned
}

func PropagateReporting(db store.DataStore, wflows orchestrator.ClientRunner, tenant string,
dryRun bool) error {
l := log.NewEmpty()

dbs, err := selectDbs(db, tenant)
if err != nil {
return errors.Wrap(err, "aborting")
}

var errReturned error
for _, d := range dbs {
err := tryPropagateReportingForDb(db, wflows, d, dryRun)
if err != nil {
errReturned = err
l.Errorf("giving up on DB %s due to fatal error: %s", d, err.Error())
continue
}
}

l.Info("all DBs processed, exiting.")
return errReturned
}

func selectDbs(db store.DataStore, tenant string) ([]string, error) {
l := log.NewEmpty()

Expand Down Expand Up @@ -364,10 +387,9 @@ func updateDevicesIdData(
}
}

skip += devicesBatchSize
if len(devices) < devicesBatchSize {
break
} else {
skip += devicesBatchSize
}
}
return nil
Expand Down Expand Up @@ -464,6 +486,74 @@ func tryPropagateIdDataInventoryForDb(
return err
}

func tryPropagateReportingForDb(
db store.DataStore,
wflows orchestrator.ClientRunner,
dbname string,
dryRun bool,
) error {
l := log.NewEmpty()

l.Infof("propagating device data to reporting from DB: %s", dbname)

tenant := mstore.TenantFromDbName(dbname, mongo.DbName)

ctx := context.Background()
if tenant != "" {
ctx = identity.WithContext(ctx, &identity.Identity{
Tenant: tenant,
})
}

err := reindexDevicesReporting(ctx, db, wflows, tenant, dryRun)
if err != nil {
l.Infof("Done with DB %s, but there were errors: %s.", dbname, err.Error())
} else {
l.Infof("Done with DB %s", dbname)
}

return err
}

func reindexDevicesReporting(
ctx context.Context,
db store.DataStore,
wflows orchestrator.ClientRunner,
tenant string,
dryRun bool,
) error {
var skip uint

skip = 0
for {
devices, err := db.GetDevices(ctx, skip, devicesBatchSize, model.DeviceFilter{})
if err != nil {
return errors.Wrap(err, "failed to get devices")
}

if len(devices) < 1 {
break
}

if !dryRun {
deviceIDs := make([]string, len(devices))
for i, d := range devices {
deviceIDs[i] = d.Id
}
err := wflows.SubmitReindexReportingBatch(ctx, deviceIDs)
if err != nil {
return err
}
}

skip += devicesBatchSize
if len(devices) < devicesBatchSize {
break
}
}
return nil
}

const (
WorkflowsDeviceLimitText = "@/etc/workflows-enterprise/data/device_limit_email.txt"
WorkflowsDeviceLimitHTML = "@/etc/workflows-enterprise/data/device_limit_email.html"
Expand Down

0 comments on commit 507b24d

Please sign in to comment.