Skip to content

Commit

Permalink
Merge pull request #330 from xxwjj/replication
Browse files Browse the repository at this point in the history
[WIP] add replication feature
  • Loading branch information
xing-yang committed Mar 30, 2018
2 parents 5786c51 + 4ed0320 commit ac71e26
Show file tree
Hide file tree
Showing 32 changed files with 3,412 additions and 597 deletions.
17 changes: 16 additions & 1 deletion contrib/drivers/drbd/replication.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -31,3 +30,19 @@ func (r *ReplicationDriver) Unset() error { return nil }
func (r *ReplicationDriver) CreateReplication(opt *pb.CreateReplicationOpts) (*model.ReplicationSpec, error) {
return nil, nil
}

func (r *ReplicationDriver) DeleteReplication(opt *pb.DeleteReplicationOpts) error {
return nil
}

func (r *ReplicationDriver) EnableReplication(opt *pb.EnableReplicationOpts) error {
return nil
}

func (r *ReplicationDriver) DisableReplication(opt *pb.DisableReplicationOpts) error {
return nil
}

func (r *ReplicationDriver) FailoverReplication(opt *pb.FailoverReplicationOpts) error {
return nil
}
4 changes: 4 additions & 0 deletions contrib/drivers/replication_drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type ReplicationDriver interface {
Unset() error

CreateReplication(opt *pb.CreateReplicationOpts) (*model.ReplicationSpec, error)
DeleteReplication(opt *pb.DeleteReplicationOpts) error
EnableReplication(opt *pb.EnableReplicationOpts) error
DisableReplication(opt *pb.DisableReplicationOpts) error
FailoverReplication(opt *pb.FailoverReplicationOpts) error
}

// Init
Expand Down
11 changes: 10 additions & 1 deletion examples/policy.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,14 @@
"dock:list": "rule:admin_api",
"dock:get": "rule:admin_api",
"pool:list": "rule:admin_api",
"pool:get": "rule:admin_api"
"pool:get": "rule:admin_api",
"replication:create": "rule:admin_or_owner",
"replication:list": "rule:admin_or_owner",
"replication:list_detail": "rule:admin_or_owner",
"replication:get": "rule:admin_or_owner",
"replication:update": "rule:admin_or_owner",
"replication:delete": "rule:admin_or_owner",
"replication:action:enable": "rule:admin_or_owner",
"replication:action:disable": "rule:admin_or_owner",
"replication:action:failover": "rule:admin_or_owner"
}
64 changes: 64 additions & 0 deletions pkg/api/base.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,20 @@
package api

import (
"encoding/json"
"net/http"
"net/url"
"reflect"
"strings"

"github.com/astaxie/beego"
// log "github.com/golang/glog"
"github.com/opensds/opensds/pkg/model"
)

type BasePortal struct {
beego.Controller
Self interface{}
}

func (this *BasePortal) GetParameters() (map[string][]string, error) {
Expand All @@ -36,3 +43,60 @@ func (this *BasePortal) GetParameters() (map[string][]string, error) {
}
return m, nil
}

type ActionSpec map[string]interface{}

func (this *BasePortal) Action() {
action := ActionSpec{}
// IO reader only can be used once, but in action situation request body will read twice
// one for parent action , and the other for child action which is the real action.
this.Ctx.Input.CopyBody(beego.BConfig.MaxMemory)

// Unmarshal the request body
if err := json.Unmarshal(this.Ctx.Input.RequestBody, &action); err != nil {
model.HttpError(this.Ctx, http.StatusBadRequest, "decode request body failed, %v", err)
return
}

v := reflect.ValueOf(this.Self)
for m, _ := range action {
m = strings.Title(m) // convert the first letter to upper
if val := v.MethodByName(m); val.IsValid() {
val.Call([]reflect.Value{reflect.ValueOf(this.Ctx)})
if !this.Ctx.Output.IsSuccessful() {
return
}
} else {
model.HttpError(this.Ctx, http.StatusNotFound,
"specified action(%s) does not find in controller", m)
return
}
}
}

// Filter some items in spec that no need to transfer to users.
func (this *BasePortal) outputFilter(resp interface{}, whiteList []string) interface{} {
v := reflect.ValueOf(resp)
if v.Kind() == reflect.Slice {
var s []map[string]interface{}
for i := 0; i < v.Len(); i++ {
m := this.doFilter(v.Index(i).Interface(), whiteList)
s = append(s, m)
}
return s
} else {
return this.doFilter(resp, whiteList)
}
}

func (this *BasePortal) doFilter(resp interface{}, whiteList []string) map[string]interface{} {
v := reflect.ValueOf(resp).Elem()
m := map[string]interface{}{}
for _, name := range whiteList {
field := v.FieldByName(name)
if field.IsValid() {
m[name] = field.Interface()
}
}
return m
}
28 changes: 21 additions & 7 deletions pkg/api/dock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"testing"

"github.com/astaxie/beego"
"github.com/astaxie/beego/context"
c "github.com/opensds/opensds/pkg/context"
"github.com/opensds/opensds/pkg/db"
"github.com/opensds/opensds/pkg/model"
dbtest "github.com/opensds/opensds/testutils/db/testing"
Expand Down Expand Up @@ -57,12 +59,15 @@ func TestListDocks(t *testing.T) {
"sortDir": []string{"asc"},
"sortKey": []string{"name"},
}
mockClient.On("ListDocksWithFilter", m).Return(fakeDocks, nil)
mockClient.On("ListDocksWithFilter", c.NewAdminContext(), m).Return(fakeDocks, nil)

db.C = mockClient

r, _ := http.NewRequest("GET", "/v1beta/docks?offset=0&limit=1&sortDir=asc&sortKey=name", nil)
w := httptest.NewRecorder()
beego.InsertFilter("*", beego.BeforeExec, func(httpCtx *context.Context) {
httpCtx.Input.SetData("context", c.NewAdminContext())
})
beego.BeeApp.Handlers.ServeHTTP(w, r)

var output []model.DockSpec
Expand All @@ -79,8 +84,8 @@ func TestListDocks(t *testing.T) {
"driverName": "cinder",
"endpoint": "localhost:50050",
"createdAt": "2017-10-11T11:28:58",
"updatedAt": ""
}
"updatedAt": ""
}
]`

var expected []model.DockSpec
Expand All @@ -104,11 +109,14 @@ func TestListDocksWithBadRequest(t *testing.T) {
"sortDir": []string{"asc"},
"sortKey": []string{"name"},
}
mockClient.On("ListDocksWithFilter", m).Return(nil, errors.New("db error"))
mockClient.On("ListDocksWithFilter", c.NewAdminContext(), m).Return(nil, errors.New("db error"))
db.C = mockClient

r, _ := http.NewRequest("GET", "/v1beta/docks?offset=0&limit=1&sortDir=asc&sortKey=name", nil)
w := httptest.NewRecorder()
beego.InsertFilter("*", beego.BeforeExec, func(httpCtx *context.Context) {
httpCtx.Input.SetData("context", c.NewAdminContext())
})
beego.BeeApp.Handlers.ServeHTTP(w, r)

if w.Code != 400 {
Expand All @@ -132,12 +140,15 @@ func TestGetDock(t *testing.T) {
}

mockClient := new(dbtest.MockClient)
mockClient.On("GetDock", "b7602e18-771e-11e7-8f38-dbd6d291f4e0").Return(fakeDock, nil)
mockClient.On("GetDock", c.NewAdminContext(), "b7602e18-771e-11e7-8f38-dbd6d291f4e0").Return(fakeDock, nil)
db.C = mockClient

r, _ := http.NewRequest("GET",
"/v1beta/docks/b7602e18-771e-11e7-8f38-dbd6d291f4e0", nil)
w := httptest.NewRecorder()
beego.InsertFilter("*", beego.BeforeExec, func(httpCtx *context.Context) {
httpCtx.Input.SetData("context", c.NewAdminContext())
})
beego.BeeApp.Handlers.ServeHTTP(w, r)

var output model.DockSpec
Expand All @@ -153,7 +164,7 @@ func TestGetDock(t *testing.T) {
"driverName": "cinder",
"endpoint": "localhost:50050",
"createdAt": "2017-10-11T11:28:58",
"updatedAt": ""
"updatedAt": ""
}`

var expected model.DockSpec
Expand All @@ -170,13 +181,16 @@ func TestGetDock(t *testing.T) {

func TestGetDockWithBadRequestError(t *testing.T) {
mockClient := new(dbtest.MockClient)
mockClient.On("GetDock", "b7602e18-771e-11e7-8f38-dbd6d291f4e0").Return(
mockClient.On("GetDock", c.NewAdminContext(), "b7602e18-771e-11e7-8f38-dbd6d291f4e0").Return(
nil, errors.New("db error"))
db.C = mockClient

r, _ := http.NewRequest("GET",
"/v1beta/docks/b7602e18-771e-11e7-8f38-dbd6d291f4e0", nil)
w := httptest.NewRecorder()
beego.InsertFilter("*", beego.BeforeExec, func(httpCtx *context.Context) {
httpCtx.Input.SetData("context", c.NewAdminContext())
})
beego.BeeApp.Handlers.ServeHTTP(w, r)

if w.Code != 400 {
Expand Down
1 change: 1 addition & 0 deletions pkg/api/filter/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type NoAuth struct {
}

func (auth *NoAuth) Filter(httpCtx *context.Context) {
panic("")
ctx := c.GetContext(httpCtx)
ctx.IsAdmin = true
ctx.TenantId = httpCtx.Input.Param(":tenantId")
Expand Down
22 changes: 18 additions & 4 deletions pkg/api/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"testing"

"github.com/astaxie/beego"
"github.com/astaxie/beego/context"
c "github.com/opensds/opensds/pkg/context"
"github.com/opensds/opensds/pkg/db"
"github.com/opensds/opensds/pkg/model"
dbtest "github.com/opensds/opensds/testutils/db/testing"
Expand Down Expand Up @@ -68,11 +70,14 @@ func TestListPools(t *testing.T) {
"sortDir": []string{"asc"},
"sortKey": []string{"name"},
}
mockClient.On("ListPoolsWithFilter", m).Return(fakePools, nil)
mockClient.On("ListPoolsWithFilter", c.NewAdminContext(), m).Return(fakePools, nil)
db.C = mockClient

r, _ := http.NewRequest("GET", "/v1beta/pools?offset=0&limit=1&sortDir=asc&sortKey=name", nil)
w := httptest.NewRecorder()
beego.InsertFilter("*", beego.BeforeExec, func(httpCtx *context.Context) {
httpCtx.Input.SetData("context", c.NewAdminContext())
})
beego.BeeApp.Handlers.ServeHTTP(w, r)

var output []model.StoragePoolSpec
Expand Down Expand Up @@ -122,11 +127,14 @@ func TestListPoolsWithBadRequest(t *testing.T) {
"sortDir": []string{"asc"},
"sortKey": []string{"name"},
}
mockClient.On("ListPoolsWithFilter", m).Return(nil, errors.New("db error"))
mockClient.On("ListPoolsWithFilter", c.NewAdminContext(), m).Return(nil, errors.New("db error"))
db.C = mockClient

r, _ := http.NewRequest("GET", "/v1beta/pools?offset=0&limit=1&sortDir=asc&sortKey=name", nil)
w := httptest.NewRecorder()
beego.InsertFilter("*", beego.BeforeExec, func(httpCtx *context.Context) {
httpCtx.Input.SetData("context", c.NewAdminContext())
})
beego.BeeApp.Handlers.ServeHTTP(w, r)

if w.Code != 400 {
Expand All @@ -137,11 +145,14 @@ func TestListPoolsWithBadRequest(t *testing.T) {
func TestGetPool(t *testing.T) {

mockClient := new(dbtest.MockClient)
mockClient.On("GetPool", "f4486139-78d5-462d-a7b9-fdaf6c797e1b").Return(fakePool, nil)
mockClient.On("GetPool", c.NewAdminContext(), "f4486139-78d5-462d-a7b9-fdaf6c797e1b").Return(fakePool, nil)
db.C = mockClient

r, _ := http.NewRequest("GET", "/v1beta/pools/f4486139-78d5-462d-a7b9-fdaf6c797e1b", nil)
w := httptest.NewRecorder()
beego.InsertFilter("*", beego.BeforeExec, func(httpCtx *context.Context) {
httpCtx.Input.SetData("context", c.NewAdminContext())
})
beego.BeeApp.Handlers.ServeHTTP(w, r)

var output model.StoragePoolSpec
Expand Down Expand Up @@ -184,13 +195,16 @@ func TestGetPool(t *testing.T) {
func TestGetPoolWithBadRequest(t *testing.T) {

mockClient := new(dbtest.MockClient)
mockClient.On("GetPool", "f4486139-78d5-462d-a7b9-fdaf6c797e1b").Return(
mockClient.On("GetPool", c.NewAdminContext(), "f4486139-78d5-462d-a7b9-fdaf6c797e1b").Return(
nil, errors.New("db error"))
db.C = mockClient

r, _ := http.NewRequest("GET",
"/v1beta/pools/f4486139-78d5-462d-a7b9-fdaf6c797e1b", nil)
w := httptest.NewRecorder()
beego.InsertFilter("*", beego.BeforeExec, func(httpCtx *context.Context) {
httpCtx.Input.SetData("context", c.NewAdminContext())
})
beego.BeeApp.Handlers.ServeHTTP(w, r)

if w.Code != 400 {
Expand Down

0 comments on commit ac71e26

Please sign in to comment.