Skip to content

Commit

Permalink
Exposing GpdStorage and adding RegionToZoneMap (#754)
Browse files Browse the repository at this point in the history
* exposing GpdStorage and adding RegionToZoneMap

* remove extra input parameter

* parse region url

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
bathina2 and mergify[bot] committed Sep 11, 2020
1 parent eeaeb71 commit 8439985
Showing 1 changed file with 62 additions and 24 deletions.
86 changes: 62 additions & 24 deletions pkg/blockstorage/gcepd/gcepd.go
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"
"net/http"
"net/url"
"path"
"path/filepath"
"strings"
"time"
Expand All @@ -38,10 +40,11 @@ import (
"github.com/kanisterio/kanister/pkg/poll"
)

var _ blockstorage.Provider = (*gpdStorage)(nil)
var _ zone.Mapper = (*gpdStorage)(nil)
var _ blockstorage.Provider = (*GpdStorage)(nil)
var _ zone.Mapper = (*GpdStorage)(nil)

type gpdStorage struct {
// GpdStorage implements blockstorage.Provider
type GpdStorage struct {
service *compute.Service
project string
}
Expand All @@ -55,7 +58,8 @@ const (
snapshotNameFmt = "snap-%s"
)

func (s *gpdStorage) Type() blockstorage.Type {
// Type is part of blockstorage.Provider
func (s *GpdStorage) Type() blockstorage.Type {
return blockstorage.TypeGPD
}

Expand All @@ -66,12 +70,13 @@ func NewProvider(config map[string]string) (blockstorage.Provider, error) {
if err != nil {
return nil, err
}
return &gpdStorage{
return &GpdStorage{
service: gCli.Service,
project: gCli.ProjectID}, nil
}

func (s *gpdStorage) VolumeGet(ctx context.Context, id string, zone string) (*blockstorage.Volume, error) {
// VolumeGet is part of blockstorage.Provider
func (s *GpdStorage) VolumeGet(ctx context.Context, id string, zone string) (*blockstorage.Volume, error) {
var err error
var disk *compute.Disk

Expand All @@ -94,7 +99,8 @@ func (s *gpdStorage) VolumeGet(ctx context.Context, id string, zone string) (*bl
return mv, nil
}

func (s *gpdStorage) VolumeCreate(ctx context.Context, volume blockstorage.Volume) (*blockstorage.Volume, error) {
// VolumeCreate is part of blockstorage.Provider
func (s *GpdStorage) VolumeCreate(ctx context.Context, volume blockstorage.Volume) (*blockstorage.Volume, error) {
var resp *compute.Operation
var err error
tags := make(map[string]string, len(volume.Tags))
Expand Down Expand Up @@ -133,7 +139,8 @@ func (s *gpdStorage) VolumeCreate(ctx context.Context, volume blockstorage.Volum
return s.VolumeGet(ctx, createDisk.Name, volume.Az)
}

func (s *gpdStorage) VolumeDelete(ctx context.Context, volume *blockstorage.Volume) error {
// VolumeDelete is part of blockstorage.Provider
func (s *GpdStorage) VolumeDelete(ctx context.Context, volume *blockstorage.Volume) error {
var op *compute.Operation
var err error
var region string
Expand Down Expand Up @@ -161,15 +168,18 @@ func (s *gpdStorage) VolumeDelete(ctx context.Context, volume *blockstorage.Volu
return s.waitOnOperation(ctx, op, volume.Az)
}

func (s *gpdStorage) SnapshotCopy(ctx context.Context, from blockstorage.Snapshot, to blockstorage.Snapshot) (*blockstorage.Snapshot, error) {
// SnapshotCopy is part of blockstorage.Provider
func (s *GpdStorage) SnapshotCopy(ctx context.Context, from blockstorage.Snapshot, to blockstorage.Snapshot) (*blockstorage.Snapshot, error) {
return nil, errors.Errorf("Not implemented")
}

func (s *gpdStorage) SnapshotCopyWithArgs(ctx context.Context, from blockstorage.Snapshot, to blockstorage.Snapshot, args map[string]string) (*blockstorage.Snapshot, error) {
// SnapshotCopyWithArgs is part of blockstorage.Provider
func (s *GpdStorage) SnapshotCopyWithArgs(ctx context.Context, from blockstorage.Snapshot, to blockstorage.Snapshot, args map[string]string) (*blockstorage.Snapshot, error) {
return nil, errors.New("Copy Snapshot with Args not implemented")
}

func (s *gpdStorage) SnapshotCreate(ctx context.Context, volume blockstorage.Volume, tags map[string]string) (*blockstorage.Snapshot, error) {
// SnapshotCreate is part of blockstorage.Provider
func (s *GpdStorage) SnapshotCreate(ctx context.Context, volume blockstorage.Volume, tags map[string]string) (*blockstorage.Snapshot, error) {
rb := &compute.Snapshot{
Name: fmt.Sprintf(snapshotNameFmt, uuid.NewV1().String()),
Labels: blockstorage.SanitizeTags(ktags.GetTags(tags)),
Expand Down Expand Up @@ -210,14 +220,16 @@ func (s *gpdStorage) SnapshotCreate(ctx context.Context, volume blockstorage.Vol
return rs, nil
}

func (s *gpdStorage) SnapshotCreateWaitForCompletion(ctx context.Context, snap *blockstorage.Snapshot) error {
// SnapshotCreateWaitForCompletion is part of blockstorage.Provider
func (s *GpdStorage) SnapshotCreateWaitForCompletion(ctx context.Context, snap *blockstorage.Snapshot) error {
if err := s.waitOnSnapshotID(ctx, snap.ID); err != nil {
return errors.Wrapf(err, "Waiting on snapshot %v", snap)
}
return nil
}

func (s *gpdStorage) SnapshotDelete(ctx context.Context, snapshot *blockstorage.Snapshot) error {
// SnapshotDelete is part of blockstorage.Provider
func (s *GpdStorage) SnapshotDelete(ctx context.Context, snapshot *blockstorage.Snapshot) error {
op, err := s.service.Snapshots.Delete(s.project, snapshot.ID).Context(ctx).Do()
if isNotFoundError(err) {
log.Debug().Print("Cannot delete snapshot", field.M{"SnapshotID": snapshot.ID, "reason": "Snapshot not found"})
Expand All @@ -229,15 +241,16 @@ func (s *gpdStorage) SnapshotDelete(ctx context.Context, snapshot *blockstorage.
return s.waitOnOperation(ctx, op, "")
}

func (s *gpdStorage) SnapshotGet(ctx context.Context, id string) (*blockstorage.Snapshot, error) {
// SnapshotGet is part of blockstorage.Provider
func (s *GpdStorage) SnapshotGet(ctx context.Context, id string) (*blockstorage.Snapshot, error) {
snap, err := s.service.Snapshots.Get(s.project, id).Context(ctx).Do()
if err != nil {
return nil, err
}
return s.snapshotParse(ctx, snap), nil
}

func (s *gpdStorage) volumeParse(ctx context.Context, volume interface{}, zone string) *blockstorage.Volume {
func (s *GpdStorage) volumeParse(ctx context.Context, volume interface{}, zone string) *blockstorage.Volume {
vol := volume.(*compute.Disk)
volCreationTime, err := time.Parse(time.RFC3339, vol.CreationTimestamp)
if err != nil {
Expand All @@ -257,7 +270,7 @@ func (s *gpdStorage) volumeParse(ctx context.Context, volume interface{}, zone s
}
}

func (s *gpdStorage) snapshotParse(ctx context.Context, snap *compute.Snapshot) *blockstorage.Snapshot {
func (s *GpdStorage) snapshotParse(ctx context.Context, snap *compute.Snapshot) *blockstorage.Snapshot {
var encrypted bool
if snap.SnapshotEncryptionKey == nil {
encrypted = false
Expand Down Expand Up @@ -285,7 +298,8 @@ func (s *gpdStorage) snapshotParse(ctx context.Context, snap *compute.Snapshot)
}
}

func (s *gpdStorage) VolumesList(ctx context.Context, tags map[string]string, zone string) ([]*blockstorage.Volume, error) {
// VolumesList is part of blockstorage.Provider
func (s *GpdStorage) VolumesList(ctx context.Context, tags map[string]string, zone string) ([]*blockstorage.Volume, error) {
var vols []*blockstorage.Volume
fltrs := blockstorage.MapToString(tags, " AND ", ":")
if isMultiZone(zone) {
Expand Down Expand Up @@ -318,7 +332,8 @@ func (s *gpdStorage) VolumesList(ctx context.Context, tags map[string]string, zo
return vols, nil
}

func (s *gpdStorage) SnapshotsList(ctx context.Context, tags map[string]string) ([]*blockstorage.Snapshot, error) {
// SnapshotsList is part of blockstorage.Provider
func (s *GpdStorage) SnapshotsList(ctx context.Context, tags map[string]string) ([]*blockstorage.Snapshot, error) {
var snaps []*blockstorage.Snapshot
fltrs := blockstorage.MapToString(tags, " AND ", ":")
req := s.service.Snapshots.List(s.project).Filter(fltrs)
Expand All @@ -334,7 +349,8 @@ func (s *gpdStorage) SnapshotsList(ctx context.Context, tags map[string]string)
return snaps, nil
}

func (s *gpdStorage) VolumeCreateFromSnapshot(ctx context.Context, snapshot blockstorage.Snapshot, tags map[string]string) (*blockstorage.Volume, error) {
// VolumeCreateFromSnapshot is part of blockstorage.Provider
func (s *GpdStorage) VolumeCreateFromSnapshot(ctx context.Context, snapshot blockstorage.Snapshot, tags map[string]string) (*blockstorage.Volume, error) {
snap, err := s.service.Snapshots.Get(s.project, snapshot.ID).Context(ctx).Do()
if err != nil {
return nil, err
Expand Down Expand Up @@ -405,7 +421,8 @@ func (s *gpdStorage) VolumeCreateFromSnapshot(ctx context.Context, snapshot bloc
return s.VolumeGet(ctx, createDisk.Name, volZone)
}

func (s *gpdStorage) SetTags(ctx context.Context, resource interface{}, tags map[string]string) error {
// SetTags is part of blockstorage.Provider
func (s *GpdStorage) SetTags(ctx context.Context, resource interface{}, tags map[string]string) error {
switch res := resource.(type) {
case *blockstorage.Snapshot:
{
Expand Down Expand Up @@ -468,7 +485,7 @@ func (s *gpdStorage) SetTags(ctx context.Context, resource interface{}, tags map
}

// waitOnOperation waits for the operation to be done
func (s *gpdStorage) waitOnOperation(ctx context.Context, op *compute.Operation, zone string) error {
func (s *GpdStorage) waitOnOperation(ctx context.Context, op *compute.Operation, zone string) error {
waitBackoff := backoff.Backoff{
Factor: 2,
Jitter: false,
Expand Down Expand Up @@ -515,7 +532,7 @@ func (s *gpdStorage) waitOnOperation(ctx context.Context, op *compute.Operation,
}

// waitOnSnapshotID waits for the snapshot to be created
func (s *gpdStorage) waitOnSnapshotID(ctx context.Context, id string) error {
func (s *GpdStorage) waitOnSnapshotID(ctx context.Context, id string) error {
snapWaitBackoff := backoff.Backoff{
Factor: 2,
Jitter: false,
Expand Down Expand Up @@ -547,10 +564,31 @@ func isNotFoundError(err error) bool {
return ok && ae.Code == http.StatusNotFound
}

func (s *gpdStorage) FromRegion(ctx context.Context, region string) ([]string, error) {
// FromRegion is part of zone.Mapper
func (s *GpdStorage) FromRegion(ctx context.Context, region string) ([]string, error) {
return staticRegionToZones(region)
}

// RegionToZoneMap returns the region to zone map fetched from the provider
func (s *GpdStorage) RegionToZoneMap(ctx context.Context) (map[string][]string, error) {
regionMap := make(map[string][]string)
req := s.service.Zones.List(s.project)
if err := req.Pages(ctx, func(page *compute.ZoneList) error {
for _, zone := range page.Items {
regionURL, err := url.Parse(zone.Region)
if err != nil {
return err
}
region := path.Base(regionURL.Path)
regionMap[region] = append(regionMap[region], zone.Name)
}
return nil
}); err != nil {
return nil, err
}
return regionMap, nil
}

func staticRegionToZones(region string) ([]string, error) {
switch region {
case "asia-east1":
Expand Down Expand Up @@ -729,7 +767,7 @@ func getRegionFromZones(az string) (string, error) {
return regions.UnsortedList()[0], nil
}

func (s *gpdStorage) getSelfLinks(ctx context.Context, zones []string) ([]string, error) {
func (s *GpdStorage) getSelfLinks(ctx context.Context, zones []string) ([]string, error) {
selfLinks := make([]string, len(zones))
for i, zone := range zones {
replicaZone, err := s.service.Zones.Get(s.project, zone).Context(ctx).Do()
Expand Down

0 comments on commit 8439985

Please sign in to comment.