Skip to content

Commit

Permalink
bigtable: implement managed backup feature (googleapis#2524)
Browse files Browse the repository at this point in the history
  • Loading branch information
liubonan authored and tritone committed Aug 25, 2020
1 parent 28c48dc commit 7fa6e80
Show file tree
Hide file tree
Showing 2 changed files with 357 additions and 0 deletions.
225 changes: 225 additions & 0 deletions bigtable/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1424,3 +1424,228 @@ func UpdateInstanceAndSyncClusters(ctx context.Context, iac *InstanceAdminClient

return results, nil
}

// RestoreTable creates a table from a backup. The table will be created in the same cluster as the backup.
func (ac *AdminClient) RestoreTable(ctx context.Context, table, cluster, backup string) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
backupPath := prefix + "/clusters/" + cluster + "/backups/" + backup

req := &btapb.RestoreTableRequest{
Parent: prefix,
TableId: table,
Source: &btapb.RestoreTableRequest_Backup{backupPath},
}
op, err := ac.tClient.RestoreTable(ctx, req)
if err != nil {
return err
}
resp := btapb.Table{}
return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp)
}

// CreateBackup creates a new backup in the specified cluster from the
// specified source table with the user-provided expire time.
func (ac *AdminClient) CreateBackup(ctx context.Context, table, cluster, backup string, expireTime time.Time) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()

parsedExpireTime, err := ptypes.TimestampProto(expireTime)
if err != nil {
return err
}

req := &btapb.CreateBackupRequest{
Parent: prefix + "/clusters/" + cluster,
BackupId: backup,
Backup: &btapb.Backup{
ExpireTime: parsedExpireTime,
SourceTable: prefix + "/tables/" + table,
},
}

op, err := ac.tClient.CreateBackup(ctx, req)
if err != nil {
return err
}
resp := btapb.Backup{}
return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp)
}

// Backups returns a BackupIterator for iterating over the backups in a cluster.
// To list backups across all of the clusters in the instance specify "-" as the cluster.
func (ac *AdminClient) Backups(ctx context.Context, cluster string) *BackupIterator {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
clusterPath := prefix + "/clusters/" + cluster

it := &BackupIterator{}
req := &btapb.ListBackupsRequest{
Parent: clusterPath,
}

fetch := func(pageSize int, pageToken string) (string, error) {
req.PageToken = pageToken
if pageSize > math.MaxInt32 {
req.PageSize = math.MaxInt32
} else {
req.PageSize = int32(pageSize)
}

var resp *btapb.ListBackupsResponse
err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
var err error
resp, err = ac.tClient.ListBackups(ctx, req)
return err
}, retryOptions...)
if err != nil {
return "", err
}
for _, s := range resp.Backups {
backupInfo, err := newBackupInfo(s)
if err != nil {
return "", fmt.Errorf("failed to parse backup proto %v", err)
}
it.items = append(it.items, backupInfo)
}
return resp.NextPageToken, nil
}
bufLen := func() int { return len(it.items) }
takeBuf := func() interface{} { b := it.items; it.items = nil; return b }

it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf)

return it
}

// newBackupInfo creates a BackupInfo struct from a btapb.Backup protocol buffer.
func newBackupInfo(backup *btapb.Backup) (*BackupInfo, error) {
nameParts := strings.Split(backup.Name, "/")
name := nameParts[len(nameParts)-1]
tablePathParts := strings.Split(backup.SourceTable, "/")
tableID := tablePathParts[len(tablePathParts)-1]

startTime, err := ptypes.Timestamp(backup.StartTime)
if err != nil {
return nil, fmt.Errorf("invalid startTime: %v", err)
}

endTime, err := ptypes.Timestamp(backup.EndTime)
if err != nil {
return nil, fmt.Errorf("invalid endTime: %v", err)
}

expireTime, err := ptypes.Timestamp(backup.ExpireTime)
if err != nil {
return nil, fmt.Errorf("invalid expireTime: %v", err)
}

return &BackupInfo{
Name: name,
SourceTable: tableID,
SizeBytes: backup.SizeBytes,
StartTime: startTime,
EndTime: endTime,
ExpireTime: expireTime,
State: backup.State.String(),
}, nil
}

// BackupIterator is an EntryIterator that iterates over log entries.
type BackupIterator struct {
items []*BackupInfo
pageInfo *iterator.PageInfo
nextFunc func() error
}

// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details.
func (it *BackupIterator) PageInfo() *iterator.PageInfo {
return it.pageInfo
}

// Next returns the next result. Its second return value is iterator.Done
// (https://godoc.org/google.golang.org/api/iterator) if there are no more
// results. Once Next returns Done, all subsequent calls will return Done.
func (it *BackupIterator) Next() (*BackupInfo, error) {
if err := it.nextFunc(); err != nil {
return nil, err
}
item := it.items[0]
it.items = it.items[1:]
return item, nil
}

// BackupInfo contains backup metadata. This struct is read-only.
type BackupInfo struct {
Name string
SourceTable string
SizeBytes int64
StartTime time.Time
EndTime time.Time
ExpireTime time.Time
State string
}

// BackupInfo gets backup metadata.
func (ac *AdminClient) BackupInfo(ctx context.Context, cluster, backup string) (*BackupInfo, error) {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
clusterPath := prefix + "/clusters/" + cluster
backupPath := clusterPath + "/backups/" + backup

req := &btapb.GetBackupRequest{
Name: backupPath,
}

var resp *btapb.Backup
err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
var err error
resp, err = ac.tClient.GetBackup(ctx, req)
return err
}, retryOptions...)
if err != nil {
return nil, err
}

return newBackupInfo(resp)
}

// DeleteBackup deletes a backup in a cluster.
func (ac *AdminClient) DeleteBackup(ctx context.Context, cluster, backup string) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
clusterPath := prefix + "/clusters/" + cluster
backupPath := clusterPath + "/backups/" + backup

req := &btapb.DeleteBackupRequest{
Name: backupPath,
}
_, err := ac.tClient.DeleteBackup(ctx, req)
return err
}

// UpdateBackup updates the backup metadata in a cluster. The API only supports updating expire time.
func (ac *AdminClient) UpdateBackup(ctx context.Context, cluster, backup string, expireTime time.Time) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
clusterPath := prefix + "/clusters/" + cluster
backupPath := clusterPath + "/backups/" + backup

expireTimestamp, err := ptypes.TimestampProto(expireTime)
if err != nil {
return err
}

updateMask := &field_mask.FieldMask{}
updateMask.Paths = append(updateMask.Paths, "expire_time")

req := &btapb.UpdateBackupRequest{
Backup: &btapb.Backup{
Name: backupPath,
ExpireTime: expireTimestamp,
},
UpdateMask: updateMask,
}
_, err = ac.tClient.UpdateBackup(ctx, req)
return err
}
132 changes: 132 additions & 0 deletions bigtable/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1971,6 +1971,138 @@ func TestIntegration_InstanceUpdate(t *testing.T) {
}
}

func TestIntegration_AdminBackup(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()

if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support backups")
}

timeout := 5 * time.Minute
ctx, _ := context.WithTimeout(context.Background(), timeout)

adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()

table := testEnv.Config().Table
cluster := testEnv.Config().Cluster

// Delete the table at the end of the test. Schedule ahead of time
// in case the client fails
defer adminClient.DeleteTable(ctx, table)

list := func(cluster string) ([]*BackupInfo, error) {
infos := []*BackupInfo(nil)

it := adminClient.Backups(ctx, cluster)
for {
s, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
infos = append(infos, s)
}
return infos, err
}

if err := adminClient.CreateTable(ctx, table); err != nil {
t.Fatalf("Creating table: %v", err)
}

// Precondition: no backups
backups, err := list(cluster)
if err != nil {
t.Fatalf("Initial backup list: %v", err)
}
if got, want := len(backups), 0; got != want {
t.Fatalf("Initial backup list len: %d, want: %d", got, want)
}

// Create backup
defer adminClient.DeleteBackup(ctx, cluster, "mybackup")

if err = adminClient.CreateBackup(ctx, table, cluster, "mybackup", time.Now().Add(8*time.Hour)); err != nil {
t.Fatalf("Creating backup: %v", err)
}

// List backup
backups, err = list(cluster)
if err != nil {
t.Fatalf("Listing backups: %v", err)
}
if got, want := len(backups), 1; got != want {
t.Fatalf("Listing backup count: %d, want: %d", got, want)
}
if got, want := backups[0].Name, "mybackup"; got != want {
t.Fatalf("Backup name: %s, want: %s", got, want)
}
if got, want := backups[0].SourceTable, table; got != want {
t.Fatalf("Backup SourceTable: %s, want: %s", got, want)
}
if got, want := backups[0].ExpireTime, backups[0].StartTime.Add(8*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 {
t.Fatalf("Backup ExpireTime: %s, want: %s", got, want)
}

// Get backup
backup, err := adminClient.BackupInfo(ctx, cluster, "mybackup")
if err != nil {
t.Fatalf("BackupInfo: %v", backup)
}
if got, want := *backup, *backups[0]; got != want {
t.Fatalf("BackupInfo: %v, want: %v", got, want)
}

// Update backup
newExpireTime := time.Now().Add(10 * time.Hour)
err = adminClient.UpdateBackup(ctx, cluster, "mybackup", newExpireTime)
if err != nil {
t.Fatalf("UpdateBackup failed: %v", err)
}

// Check that updated backup has the correct expire time
updatedBackup, err := adminClient.BackupInfo(ctx, cluster, "mybackup")
if err != nil {
t.Fatalf("BackupInfo: %v", err)
}
backup.ExpireTime = newExpireTime
// Server clock and local clock may not be perfectly sync'ed.
if got, want := *updatedBackup, *backup; got.ExpireTime.Sub(want.ExpireTime) > time.Minute {
t.Fatalf("BackupInfo: %v, want: %v", got, want)
}

// Restore backup
restoredTable := table + "-restored"
defer adminClient.DeleteTable(ctx, restoredTable)
if err = adminClient.RestoreTable(ctx, restoredTable, cluster, "mybackup"); err != nil {
t.Fatalf("RestoreTable: %v", err)
}
if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil {
t.Fatalf("Restored TableInfo: %v", err)
}

// Delete backup
if err = adminClient.DeleteBackup(ctx, cluster, "mybackup"); err != nil {
t.Fatalf("DeleteBackup: %v", err)
}
backups, err = list(cluster)
if err != nil {
t.Fatalf("List after Delete: %v", err)
}
if got, want := len(backups), 0; got != want {
t.Fatalf("List after delete len: %d, want: %d", got, want)
}
}

func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) {
testEnv, err := NewIntegrationEnv()
if err != nil {
Expand Down

0 comments on commit 7fa6e80

Please sign in to comment.