Skip to content

Commit

Permalink
Merge pull request #560 from opensds/development
Browse files Browse the repository at this point in the history
Merge from Development to Master to prepare for Bali RC release
  • Loading branch information
leonwanghui committed Nov 29, 2018
2 parents a655b30 + 1acc720 commit a7e9da1
Show file tree
Hide file tree
Showing 713 changed files with 918 additions and 63,945 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Expand Up @@ -7,6 +7,7 @@ go_import_path: github.com/opensds/opensds

go:
- 1.9.x
- 1.11.x
- tip

env:
Expand Down
14 changes: 13 additions & 1 deletion client/receiver.go
Expand Up @@ -18,8 +18,10 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"strings"
"time"

"github.com/astaxie/beego/httplib"
"github.com/gophercloud/gophercloud"
Expand Down Expand Up @@ -67,10 +69,19 @@ func NewReceiver() Receiver {

func request(url string, method string, headers HeaderOption, input interface{}, output interface{}) error {
req := httplib.NewBeegoRequest(url, strings.ToUpper(method))
// Set the request timeout a little bit longer upload snapshot to cloud temporarily.
req.SetTimeout(time.Minute*6, time.Minute*6)
// init body
log.Printf("%s %s\n", strings.ToUpper(method), url)
if input != nil {
req.JSONBody(input)
body, err := json.MarshalIndent(input, "", " ")
if err != nil {
return err
}
log.Printf("Request body:\n%s\n", string(body))
req.Body(body)
}

//init header
if headers != nil {
for k, v := range headers {
Expand All @@ -87,6 +98,7 @@ func request(url string, method string, headers HeaderOption, input interface{},
return err
}

log.Printf("\nStatusCode: %s\nResponse Body:\n%s\n", resp.Status, string(rbody))
if 400 <= resp.StatusCode && resp.StatusCode <= 599 {
return NewHttpError(resp.StatusCode, string(rbody))
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/osdsdock/osdsdock.go
Expand Up @@ -30,20 +30,21 @@ import (

func init() {
def := GetDefaultConfig()
flag := CONF.Flag
flag := &CONF.Flag
flag.StringVar(&CONF.OsdsDock.ApiEndpoint, "api-endpoint", def.OsdsDock.ApiEndpoint, "Listen endpoint of dock service")
flag.StringVar(&CONF.OsdsDock.DockType, "dock-type", def.OsdsDock.DockType, "Type of dock service")
flag.StringVar(&CONF.Database.Endpoint, "db-endpoint", def.Database.Endpoint, "Connection endpoint of database service")
flag.StringVar(&CONF.Database.Driver, "db-driver", def.Database.Driver, "Driver name of database service")
// flag.StringVar(&CONF.Database.Credential, "db-credential", def.Database.Credential, "Connection credential of database service")
flag.DurationVar(&CONF.OsdsDock.LogFlushFrequency, "log-flush-frequency", def.OsdsLet.LogFlushFrequency, "Maximum number of seconds between log flushes")
daemon.SetDaemonFlag(&CONF.OsdsDock.Daemon, def.OsdsDock.Daemon)
CONF.Load("/etc/opensds/opensds.conf")
daemon.CheckAndRunDaemon(CONF.OsdsDock.Daemon)
}

func main() {
// Open OpenSDS dock service log file.
logs.InitLogs()
logs.InitLogs(CONF.OsdsDock.LogFlushFrequency)
defer logs.FlushLogs()

// Set up database session.
Expand Down
5 changes: 3 additions & 2 deletions cmd/osdslet/osdslet.go
Expand Up @@ -30,19 +30,20 @@ import (

func init() {
def := GetDefaultConfig()
flag := CONF.Flag
flag := &CONF.Flag
flag.StringVar(&CONF.OsdsLet.ApiEndpoint, "api-endpoint", def.OsdsLet.ApiEndpoint, "Listen endpoint of controller service")
flag.StringVar(&CONF.Database.Endpoint, "db-endpoint", def.Database.Endpoint, "Connection endpoint of database service")
flag.StringVar(&CONF.Database.Driver, "db-driver", def.Database.Driver, "Driver name of database service")
flag.StringVar(&CONF.Database.Credential, "db-credential", def.Database.Credential, "Connection credential of database service")
flag.DurationVar(&CONF.OsdsLet.LogFlushFrequency, "log-flush-frequency", def.OsdsLet.LogFlushFrequency, "Maximum number of seconds between log flushes")
daemon.SetDaemonFlag(&CONF.OsdsLet.Daemon, def.OsdsLet.Daemon)
CONF.Load("/etc/opensds/opensds.conf")
daemon.CheckAndRunDaemon(CONF.OsdsLet.Daemon)
}

func main() {
// Open OpenSDS orchestrator service log file.
logs.InitLogs()
logs.InitLogs(CONF.OsdsLet.LogFlushFrequency)
defer logs.FlushLogs()

// Set up database session.
Expand Down
2 changes: 1 addition & 1 deletion contrib/backup/driver.go
Expand Up @@ -28,7 +28,7 @@ type BackupSpec struct {
type BackupDriver interface {
SetUp() error
Backup(backup *BackupSpec, volumeFile *os.File) error
Restore(backup *BackupSpec, volId string, volFile *os.File) error
Restore(backup *BackupSpec, backupId string, volFile *os.File) error
Delete(backup *BackupSpec) error
CleanUp() error
}
Expand Down
35 changes: 32 additions & 3 deletions contrib/backup/multicloud/client.go
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/astaxie/beego/httplib"
log "github.com/golang/glog"
"io/ioutil"
)

const (
Expand Down Expand Up @@ -101,12 +102,13 @@ func (c *Client) doRequest(method, u string, in interface{}, cb ReqSettingCB) ([
return nil, nil, err
}

log.Errorf("%s: %s OK\n", method, u)
b, err := req.Bytes()
log.V(5).Infof("%s: %s OK\n", method, u)
rbody, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Errorf("Get byte[] from response failed, method: %s\n url: %s\n error: %v", method, u, err)
return nil, nil, err
}
return b, resp.Header, nil
return rbody, resp.Header, nil
}

func (c *Client) request(method, p string, in, out interface{}, cb ReqSettingCB) error {
Expand Down Expand Up @@ -251,3 +253,30 @@ func (c *Client) AbortMultipartUpload(bucketName, objectKey string) error {
//}
return nil
}

func (c *Client) DownloadPart(bucketName, objectKey string, offset, size int64) ([]byte, error) {
p := path.Join("s3", bucketName, objectKey)

reqSettingCB := func(req *httplib.BeegoHTTPRequest) error {
rangeStr := fmt.Sprintf("bytes:%d-%d", offset, offset+size-1)
req.Header("Range", rangeStr)
req.SetTimeout(c.uploadTimeout, c.uploadTimeout)
return nil
}

u, err := url.Parse(p)
if err != nil {
return nil, err
}
base, err := url.Parse(c.baseURL)
if err != nil {
return nil, err
}

fullUrl := base.ResolveReference(u)
body, _, err := c.doRequest("GET", fullUrl.String(), nil, reqSettingCB)
if err != nil {
return nil, err
}
return body, nil
}
45 changes: 39 additions & 6 deletions contrib/backup/multicloud/driver.go
Expand Up @@ -15,6 +15,7 @@
package multicloud

import (
"errors"
"io"
"io/ioutil"
"os"
Expand All @@ -25,8 +26,8 @@ import (
)

const (
ConfFile = "/etc/opensds/driver/multi-cloud.yaml"
UploadChunkSize = 1024 * 1024 * 50
ConfFile = "/etc/opensds/driver/multi-cloud.yaml"
ChunkSize = 1024 * 1024 * 50
)

func init() {
Expand Down Expand Up @@ -89,10 +90,13 @@ func (m *MultiCloud) CleanUp() error {
}

func (m *MultiCloud) Backup(backup *backup.BackupSpec, volFile *os.File) error {
buf := make([]byte, UploadChunkSize)
buf := make([]byte, ChunkSize)
input := &CompleteMultipartUpload{}

bucket := backup.Metadata["bucket"]
bucket, ok := backup.Metadata["bucket"]
if !ok {
return errors.New("can't find bucket in metadata")
}
key := backup.Id
initResp, err := m.client.InitMultiPartUpload(bucket, key)
if err != nil {
Expand Down Expand Up @@ -132,10 +136,39 @@ func (m *MultiCloud) Backup(backup *backup.BackupSpec, volFile *os.File) error {
return nil
}

func (m *MultiCloud) Restore(backup *backup.BackupSpec, volId string, volFile *os.File) error {
func (m *MultiCloud) Restore(backup *backup.BackupSpec, backupId string, volFile *os.File) error {
bucket, ok := backup.Metadata["bucket"]
if !ok {
return errors.New("can't find bucket in metadata")
}
var downloadSize = ChunkSize
// if the size of data of smaller than require download size
// downloading is completed.
for offset := int64(0); downloadSize == ChunkSize; offset += ChunkSize {
data, err := m.client.DownloadPart(bucket, backupId, offset, ChunkSize)
if err != nil {
glog.Errorf("download part failed: %v", err)
return err
}
downloadSize = len(data)
glog.V(5).Infof("download size: %d\n", downloadSize)
volFile.Seek(offset, 0)
size, err := volFile.Write(data)
if err != nil {
glog.Errorf("write part failed: %v", err)
return err
}
if size != downloadSize {
return errors.New("size not equal to download size")
}
glog.V(5).Infof("write buf size len:%d", size)
}
glog.Infof("restore success ...")
return nil
}

func (m *MultiCloud) Delete(backup *backup.BackupSpec) error {
return nil
bucket := backup.Metadata["bucket"]
key := backup.Id
return m.client.RemoveObject(bucket, key)
}
3 changes: 2 additions & 1 deletion contrib/cindercompatibleapi/main.go
Expand Up @@ -23,14 +23,15 @@ import (
"flag"
"fmt"
"os"
"time"

"github.com/opensds/opensds/contrib/cindercompatibleapi/api"
"github.com/opensds/opensds/pkg/utils/logs"
)

func main() {
flag.Parse()
logs.InitLogs()
logs.InitLogs(5 * time.Second)
defer logs.FlushLogs()

cinderEndpoint, ok := os.LookupEnv("CINDER_ENDPOINT")
Expand Down

0 comments on commit a7e9da1

Please sign in to comment.