Skip to content

Commit

Permalink
Merge 6c7b35d into e8f87b6
Browse files Browse the repository at this point in the history
  • Loading branch information
talal committed May 4, 2020
2 parents e8f87b6 + 6c7b35d commit c1e6b2b
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 17 deletions.
10 changes: 10 additions & 0 deletions internal/core/assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ type AssetTypeInfo struct {
ReportsAbsoluteUsage bool
}

//AssetNotFoundErr is returned by AssetManager.GetAssetStatus() if the
//concerning asset can not be found in the respective backend.
type AssetNotFoundErr struct {
InnerError error
}

func (e AssetNotFoundErr) Error() string {
return e.InnerError.Error()
}

//AssetManager is the main modularization interface in Castellum. It
//provides a separation boundary between the plugins that implement the
//concrete behavior for specific asset types, and the core logic of Castellum.
Expand Down
32 changes: 21 additions & 11 deletions internal/plugins/nfs-shares.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"math"
"os"
"regexp"
"strings"
"time"

"github.com/gophercloud/gophercloud"
Expand Down Expand Up @@ -197,20 +198,27 @@ func (m *assetManagerNFS) resize(assetUUID string, oldSize, newSize uint64, useR
//GetAssetStatus implements the core.AssetManager interface.
func (m *assetManagerNFS) GetAssetStatus(res db.Resource, assetUUID string, previousStatus *core.AssetStatus) (core.AssetStatus, error) {
//check status in Prometheus
var bytesReservedBySnapshots, bytesUsed, bytesUsedBySnapshots float64
bytesTotal, err := m.getMetricForShare("netapp_volume_total_bytes", res.ScopeUUID, assetUUID)
if err != nil {
return core.AssetStatus{}, err
}
bytesReservedBySnapshots, err := m.getMetricForShare("netapp_volume_snapshot_reserved_bytes", res.ScopeUUID, assetUUID)
if err != nil {
return core.AssetStatus{}, err
}
bytesUsed, err := m.getMetricForShare("netapp_volume_used_bytes", res.ScopeUUID, assetUUID)
if err != nil {
return core.AssetStatus{}, err
if err == nil {
bytesReservedBySnapshots, err = m.getMetricForShare("netapp_volume_snapshot_reserved_bytes", res.ScopeUUID, assetUUID)
if err == nil {
bytesUsed, err = m.getMetricForShare("netapp_volume_used_bytes", res.ScopeUUID, assetUUID)
if err == nil {
bytesUsedBySnapshots, err = m.getMetricForShare("netapp_volume_snapshot_used_bytes", res.ScopeUUID, assetUUID)
}
}
}
bytesUsedBySnapshots, err := m.getMetricForShare("netapp_volume_snapshot_used_bytes", res.ScopeUUID, assetUUID)
if err != nil {
if strings.Contains(err.Error(), "Prometheus query returned empty result") {
//check if the share still exists in the backend
_, getErr := shares.Get(m.Manila, assetUUID).Extract()
if getErr != nil {
if _, ok := getErr.(gophercloud.ErrDefault404); ok {
return core.AssetStatus{}, core.AssetNotFoundErr{InnerError: fmt.Errorf("share not found in Manila: %s", getErr.Error())}
}
}
}
return core.AssetStatus{}, err
}

Expand Down Expand Up @@ -276,6 +284,8 @@ func prometheusGetSingleValue(api prom_v1.API, queryStr string) (float64, error)

switch resultVector.Len() {
case 0:
//Note: this error message is used by assetManagerNFS.GetAssetStatus()
//to check if the share still exists in the backend.
return 0, fmt.Errorf("Prometheus query returned empty result: %s", queryStr)
default:
//suppress the log message when all values are the same (this can happen
Expand Down
2 changes: 1 addition & 1 deletion internal/plugins/project-quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (m *assetManagerProjectQuota) getQuotaStatus(assetType db.AssetType, projec
return nil, err
}
if project == nil {
return nil, fmt.Errorf("project not found in Keystone: %s", projectID)
return nil, core.AssetNotFoundErr{InnerError: fmt.Errorf("project not found in Keystone: %s", projectID)}
}

opts := projects.GetOpts{Service: info.ServiceType, Resource: info.ResourceName}
Expand Down
7 changes: 7 additions & 0 deletions internal/plugins/static.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type StaticAsset struct {

//When true, return a bogus error from GetAssetStatus().
CannotGetAssetStatus bool

//When true, return a core.AssetNotFoundErr from GetAssetStatus().
CannotFindAsset bool
}

//AssetManagerStatic is a core.AssetManager for testing purposes. It just
Expand Down Expand Up @@ -79,6 +82,7 @@ var (
errTooSmall = errors.New("cannot set size smaller than current usage")
errSimulatedRejection = errors.New("CheckResourceAllowed failing as requested")
errSimulatedGetFailure = errors.New("GetAssetStatus failing as requested")
errSimulatedNotFound = errors.New("GetAssetStatus asset not found in backend")
errSimulatedSetFailure = errors.New("SetAssetSize failing as requested")
)

Expand Down Expand Up @@ -116,6 +120,9 @@ func (m AssetManagerStatic) GetAssetStatus(res db.Resource, assetUUID string, pr
if asset.CannotGetAssetStatus {
return core.AssetStatus{}, errSimulatedGetFailure
}
if asset.CannotFindAsset {
return core.AssetStatus{}, core.AssetNotFoundErr{InnerError: errSimulatedNotFound}
}

if asset.NewSize != 0 {
asset.RemainingDelay--
Expand Down
13 changes: 12 additions & 1 deletion internal/tasks/asset_scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ func (c Context) ScrapeNextAsset(assetType db.AssetType, maxCheckedAt time.Time)
}
status, err := manager.GetAssetStatus(res, asset.UUID, oldStatus)
if err != nil {
errMsg := fmt.Errorf("cannot query status of %s %s: %s", string(assetType), asset.UUID, err.Error())
if _, ok := err.(core.AssetNotFoundErr); ok {
//GetAssetStatus may return a core.AssetNotFoundErr, if the
//concerning asset could not be found in the backend; in that case,
//delete the asset from db.
logg.Error(errMsg.Error())
logg.Info("removing deleted %s asset from DB: UUID = %s, scope UUID = %s", assetType, asset.UUID, res.ScopeUUID)
_, dbErr := c.DB.Delete(&asset)
return dbErr
}

//GetAssetStatus may fail for single assets, e.g. for Manila shares in
//transitional states like Creating/Deleting; in that case, update
//checked_at so that the next call continues with the next asset, but leave
Expand All @@ -122,7 +133,7 @@ func (c Context) ScrapeNextAsset(assetType db.AssetType, maxCheckedAt time.Time)
if dbErr != nil {
return dbErr
}
return fmt.Errorf("cannot query status of %s %s: %s", string(assetType), asset.UUID, err.Error())
return errMsg
}

if logScrapes {
Expand Down
18 changes: 14 additions & 4 deletions internal/tasks/asset_scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,10 +605,12 @@ func TestAssetScrapeObservingNewSizeWhileWaitingForResize(baseT *testing.T) {
}

func TestAssetScrapeWithGetAssetStatusError(baseT *testing.T) {
//This tests the behavior when GetAssetStatus returns an error. The error is
//passed through to the caller of ScrapeNextAsset, but the asset's checked_at
//timestamp is still updated to ensure that the main loop progresses to the
//next asset.
//This tests the behavior when GetAssetStatus returns an error:
//1. If core.AssetNotFoundErr is returned then the asset is deleted from
//the db.
//2. All other errors are passed through to the caller of ScrapeNextAsset,
//but the asset's checked_at timestamp is still updated to ensure that the
//main loop progresses to the next asset.
t := test.T{T: baseT}
forAllSteppingStrategies(t, func(c *Context, res db.Resource, setAsset func(plugins.StaticAsset), clock *test.FakeClock) {

Expand Down Expand Up @@ -659,6 +661,14 @@ func TestAssetScrapeWithGetAssetStatusError(baseT *testing.T) {
ScrapeErrorMessage: "",
})

//Note: this test should be at the end, see below.
//Run GetAssetStatus on the same asset again except this time the
//ScrapeNextAsset should delete the asset from the db.
setAsset(plugins.StaticAsset{Size: 1000, Usage: 600, CannotFindAsset: true})
clock.StepBy(5 * time.Minute)
t.Must(c.ScrapeNextAsset("foo", c.TimeNow()))
t.ExpectAssets(c.DB)

})
}

Expand Down

0 comments on commit c1e6b2b

Please sign in to comment.