Skip to content

Commit

Permalink
fix: add write permissions check for DELETE and DROP MEASUREMENT (#23219
Browse files Browse the repository at this point in the history
)

We previously allowed read tokens access to all of v1 query, including
InfluxQL queries that made state changes to the DB, specifically,
'DELETE' and 'DROP MEASUREMENT'. This allowed tokens with only read
permissions to delete points via the legacy /query endpoint.
/api/v2/query was unaffected.

This adjusts the behavior to verify that the token has write permissions
when specifying 'DELETE' and 'DROP MEASUREMENT' InfluxQL queries. We
follow the same pattern as other existing v1 failure scenarios and
instead of failing hard with 401, we use ectx.Send() to send an error to
the user (with 200 status):

{"results":[{"statement_id":0,"error":"insufficient permissions"}]}

Returning in this manner is consistent with Cloud 2, which also returns
200 with "insufficient permissions" for these two InfluxQL queries.

To facilitate authorization unit tests, we add MustNewPermission() to
testing/util.go.

Closes: #22799
  • Loading branch information
jdstrand committed Mar 24, 2022
1 parent 5e3ea7b commit e304ef9
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 0 deletions.
8 changes: 8 additions & 0 deletions testing/util.go
Expand Up @@ -120,6 +120,14 @@ func MustCreateUsers(ctx context.Context, svc influxdb.UserService, us ...*influ
}
}

func MustNewPermission(a influxdb.Action, rt influxdb.ResourceType, orgID platform.ID) *influxdb.Permission {
perm, err := influxdb.NewPermission(a, rt, orgID)
if err != nil {
panic(err)
}
return perm
}

func MustNewPermissionAtID(id platform.ID, a influxdb.Action, rt influxdb.ResourceType, orgID platform.ID) *influxdb.Permission {
perm, err := influxdb.NewPermissionAtID(id, a, rt, orgID)
if err != nil {
Expand Down
17 changes: 17 additions & 0 deletions v1/coordinator/statement_executor.go
Expand Up @@ -372,6 +372,14 @@ func (e *StatementExecutor) executeDeleteSeriesStatement(ctx context.Context, q
return err
}

// Require write for DELETE queries
_, _, err = authorizer.AuthorizeWrite(ctx, influxdb.BucketsResourceType, mapping.BucketID, ectx.OrgID)
if err != nil {
return ectx.Send(ctx, &query.Result{
Err: fmt.Errorf("insufficient permissions"),
})
}

// Convert "now()" to current time.
q.Condition = influxql.Reduce(q.Condition, &influxql.NowValuer{Now: time.Now().UTC()})

Expand All @@ -383,6 +391,15 @@ func (e *StatementExecutor) executeDropMeasurementStatement(ctx context.Context,
if err != nil {
return err
}

// Require write for DROP MEASUREMENT queries
_, _, err = authorizer.AuthorizeWrite(ctx, influxdb.BucketsResourceType, mapping.BucketID, ectx.OrgID)
if err != nil {
return ectx.Send(ctx, &query.Result{
Err: fmt.Errorf("insufficient permissions"),
})
}

return e.TSDBStore.DeleteMeasurement(ctx, mapping.BucketID.String(), q.Name)
}

Expand Down
158 changes: 158 additions & 0 deletions v1/coordinator/statement_executor_test.go
Expand Up @@ -386,6 +386,164 @@ func TestQueryExecutor_ExecuteQuery_ShowDatabases(t *testing.T) {
}
}

func testExecDeleteSeriesOrDropMeasurement(t *testing.T, qType string) {
orgID := platform.ID(0xff00)
otherOrgID := platform.ID(0xff01)
bucketID := platform.ID(0xffee)
otherBucketID := platform.ID(0xffef)

qStr := qType
if qStr == "DELETE" {
qStr = "DELETE FROM"
}
qErr := errors.New("insufficient permissions")

testCases := []struct {
name string
query string
permissions []influxdb.Permission
expectedErr error
}{
// expected FAIL
{
name: fmt.Sprintf("read-only bucket (%s)", qType),
query: qStr,
permissions: []influxdb.Permission{
*itesting.MustNewPermissionAtID(bucketID, influxdb.ReadAction, influxdb.BucketsResourceType, orgID),
},
expectedErr: qErr,
},
{
name: fmt.Sprintf("read-only all buckets (%s)", qType),
query: qStr,
permissions: []influxdb.Permission{
*itesting.MustNewPermission(influxdb.ReadAction, influxdb.BucketsResourceType, orgID),
},
expectedErr: qErr,
},
{
name: fmt.Sprintf("write-only other bucket (%s)", qType),
query: qStr,
permissions: []influxdb.Permission{
*itesting.MustNewPermissionAtID(otherBucketID, influxdb.WriteAction, influxdb.BucketsResourceType, orgID),
},
expectedErr: qErr,
},
{
name: fmt.Sprintf("write-only other org (%s)", qType),
query: qStr,
permissions: []influxdb.Permission{
*itesting.MustNewPermission(influxdb.WriteAction, influxdb.BucketsResourceType, otherOrgID),
},
expectedErr: qErr,
},
{
name: fmt.Sprintf("read-write other org (%s)", qType),
query: qStr,
permissions: []influxdb.Permission{
*itesting.MustNewPermission(influxdb.ReadAction, influxdb.BucketsResourceType, otherOrgID),
*itesting.MustNewPermission(influxdb.WriteAction, influxdb.BucketsResourceType, otherOrgID),
},
expectedErr: qErr,
},
// expected PASS
{
name: fmt.Sprintf("write-only bucket (%s)", qType),
query: qStr,
permissions: []influxdb.Permission{
*itesting.MustNewPermissionAtID(bucketID, influxdb.WriteAction, influxdb.BucketsResourceType, orgID),
},
expectedErr: nil,
},
{
name: fmt.Sprintf("write-only all buckets (%s)", qType),
query: qStr,
permissions: []influxdb.Permission{
*itesting.MustNewPermission(influxdb.WriteAction, influxdb.BucketsResourceType, orgID),
},
expectedErr: nil,
},
{
name: fmt.Sprintf("read-write bucket (%s)", qType),
query: qStr,
permissions: []influxdb.Permission{
*itesting.MustNewPermissionAtID(bucketID, influxdb.ReadAction, influxdb.BucketsResourceType, orgID),
*itesting.MustNewPermissionAtID(bucketID, influxdb.WriteAction, influxdb.BucketsResourceType, orgID),
},
expectedErr: nil,
},
{
name: fmt.Sprintf("read-write all buckets (%s)", qType),
query: qStr,
permissions: []influxdb.Permission{
*itesting.MustNewPermission(influxdb.ReadAction, influxdb.BucketsResourceType, orgID),
*itesting.MustNewPermission(influxdb.WriteAction, influxdb.BucketsResourceType, orgID),
},
expectedErr: nil,
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

// setup a DBRP that we can use
dbrp := mocks.NewMockDBRPMappingService(ctrl)
db := "db0"

empty := ""
isDefault := true
filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &db, RetentionPolicy: nil, Default: &isDefault}
res := []*influxdb.DBRPMapping{{Database: db, RetentionPolicy: empty, OrganizationID: orgID, BucketID: bucketID, Default: isDefault}}
dbrp.EXPECT().
FindMany(gomock.Any(), filt).
Return(res, 1, nil)

qe := DefaultQueryExecutor(t, WithDBRP(dbrp))

// assume storage succeeds if we get that far
qe.TSDBStore.DeleteSeriesFn = func(context.Context, string, []influxql.Source, influxql.Expr) error {
return nil
}
qe.TSDBStore.DeleteMeasurementFn = func(context.Context, string, string) error {
return nil
}

ctx := context.Background()
ctx = icontext.SetAuthorizer(ctx, &influxdb.Authorization{
ID: orgID,
OrgID: orgID,
Status: influxdb.Active,
Permissions: testCase.permissions,
})

results := ReadAllResults(qe.ExecuteQuery(ctx, fmt.Sprintf("%s cpu", testCase.query), "db0", 0, orgID))

var exp []*query.Result
if testCase.expectedErr != nil {
exp = []*query.Result{
{
StatementID: 0,
Err: testCase.expectedErr,
},
}
}
if !reflect.DeepEqual(results, exp) {
t.Fatalf("unexpected results: exp %s, got %s", spew.Sdump(exp), spew.Sdump(results))
}
})
}
}

func TestQueryExecutor_ExecuteQuery_DeleteSeries(t *testing.T) {
testExecDeleteSeriesOrDropMeasurement(t, "DELETE")
}

func TestQueryExecutor_ExecuteQuery_DropMeasurement(t *testing.T) {
testExecDeleteSeriesOrDropMeasurement(t, "DROP MEASUREMENT")
}

// QueryExecutor is a test wrapper for coordinator.QueryExecutor.
type QueryExecutor struct {
*query.Executor
Expand Down

0 comments on commit e304ef9

Please sign in to comment.