Skip to content

Commit

Permalink
Refactor kopia repository and snapshot helpers (#934)
Browse files Browse the repository at this point in the history
* Remove unused code from package

* Refactor and add code into new package

* Refactor connect and open repo helpers

* More refactoring

* Bump a CI build

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
pavannd1 and mergify[bot] committed Apr 6, 2021
1 parent 7507a31 commit 37d272f
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 139 deletions.
60 changes: 32 additions & 28 deletions pkg/kopia/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ package kopia
import (
"context"
"os"
"path/filepath"
"strings"
"time"

"github.com/jpillora/backoff"
"github.com/kanisterio/kanister/pkg/log"
"github.com/kanisterio/kanister/pkg/poll"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/content"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"

"github.com/kanisterio/kanister/pkg/log"
"github.com/kanisterio/kanister/pkg/poll"
)

const (
Expand All @@ -39,25 +39,18 @@ const (
// ConnectToAPIServer connects to the Kopia API server running at the given address
func ConnectToAPIServer(
ctx context.Context,
cli kubernetes.Interface,
tlsCertSecret,
userPassphraseSecret *corev1.Secret,
tlsCert,
userPassphrase,
hostname,
serverAddress,
username string,
) error {
// Extra fingerprint from the TLS Certificate secret
fingerprint, err := ExtractFingerprintFromCertSecret(ctx, cli, tlsCertSecret.GetName(), tlsCertSecret.GetNamespace())
fingerprint, err := ExtractFingerprintFromCertificateJSON(tlsCert)
if err != nil {
return errors.Wrap(err, "Failed to extract fingerprint from Kopia API Server Certificate Secret")
}

// Extract user passphrase from the secret
passphrase, ok := userPassphraseSecret.Data[hostname]
if !ok {
return errors.Errorf("Failed to extract client passphrase from secret. Secret: %s", userPassphraseSecret.GetName())
}

serverInfo := &repo.APIServerInfo{
BaseURL: serverAddress,
TrustedServerCertificateFingerprint: fingerprint,
Expand Down Expand Up @@ -86,7 +79,7 @@ func ConnectToAPIServer(
Max: 15 * time.Second,
}, func(c context.Context) (bool, error) {
// TODO(@pavan): Modify this to use custom config file path, if required
err := repo.ConnectAPIServer(ctx, defaultConfigFilePath, serverInfo, string(passphrase), opts)
err := repo.ConnectAPIServer(ctx, defaultConfigFilePath, serverInfo, userPassphrase, opts)
switch {
case isGetRepoParametersError(err):
log.Debug().WithError(err).Print("Connecting to the Kopia API Server")
Expand All @@ -99,26 +92,37 @@ func ConnectToAPIServer(
return errors.Wrap(err, "Failed connecting to the Kopia API Server")
}

// OpenRepository opens the connected Kopia repository
func OpenRepository(ctx context.Context) (repo.Repository, error) {
if _, err := os.Stat(defaultConfigFilePath); os.IsNotExist(err) {
return nil, errors.New("Failed find Kopia configuration file")
// OpenRepository connects to the kopia repository based on the config stored in the config file
// NOTE: This assumes that `kopia repository connect` has been already run on the machine
// OR the above Connect function has been used to connect to the repository server
func OpenRepository(ctx context.Context, configFile, password, purpose string) (repo.RepositoryWriter, error) {
repoConfig := repositoryConfigFileName(configFile)
if _, err := os.Stat(repoConfig); os.IsNotExist(err) {
return nil, errors.New("Failed find kopia configuration file")
}

password, ok := repo.GetPersistedPassword(ctx, defaultConfigFilePath)
if !ok || password == "" {
return nil, errors.New("Failed to retrieve Kopia client passphrase")
}

r, err := repo.Open(ctx, defaultConfigFilePath, password, &repo.Options{})
r, err := repo.Open(ctx, repoConfig, password, &repo.Options{})
if os.IsNotExist(err) {
return nil, errors.New("Failed to find Kopia repository, not connected to any repository")
return nil, errors.New("Failed to find kopia repository, use `kopia repository create` or kopia repository connect` if already created")
}

if err != nil {
return nil, errors.Wrap(err, "Failed to open Kopia repository")
return nil, errors.Wrap(err, "Failed to open kopia repository")
}

return r, nil
rw, err := r.NewWriter(ctx, repo.WriteSessionOptions{
Purpose: purpose,
OnUpload: func(i int64) {},
})

return rw, errors.Wrap(err, "Failed to open kopia repository writer")
}

func repositoryConfigFileName(configFile string) string {
if configFile != "" {
return configFile
}
return filepath.Join(os.Getenv("HOME"), ".config", "kopia", "repository.config")
}

func isGetRepoParametersError(err error) bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/stream/register.go → pkg/kopia/register.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 The Kanister Authors.
// Copyright 2021 The Kanister Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package stream
package kopia

import (
// Register supported blob storage providers
Expand Down
111 changes: 111 additions & 0 deletions pkg/kopia/snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2021 The Kanister Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kopia

import (
"context"
"fmt"
"time"

"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/pkg/errors"
)

// SnapshotSource creates and uploads a kopia snapshot to the given repository
func SnapshotSource(
ctx context.Context,
rep repo.RepositoryWriter,
u *snapshotfs.Uploader,
sourceInfo snapshot.SourceInfo,
rootDir fs.Entry,
description string,
) (string, string, error) {
fmt.Printf("Snapshotting %v ...\n", sourceInfo)

t0 := time.Now()

previous, err := findPreviousSnapshotManifest(ctx, rep, sourceInfo, nil)
if err != nil {
return "", "", errors.Wrap(err, "Failed to find previous kopia manifests")
}

policyTree, err := policy.TreeForSource(ctx, rep, sourceInfo)
if err != nil {
return "", "", errors.Wrap(err, "Failed to get kopia policy tree")
}

manifest, err := u.Upload(ctx, rootDir, policyTree, sourceInfo, previous...)
if err != nil {
return "", "", errors.Wrap(err, "Failed to upload the kopia snapshot")
}

manifest.Description = description

snapID, err := snapshot.SaveSnapshot(ctx, rep, manifest)
if err != nil {
return "", "", errors.Wrap(err, "Failed to save kopia manifest")
}

_, err = policy.ApplyRetentionPolicy(ctx, rep, sourceInfo, true)
if err != nil {
return "", "", errors.Wrap(err, "Failed to apply kopia retention policy")
}

if err = policy.SetManual(ctx, rep, sourceInfo); err != nil {
return "", "", errors.Wrap(err, "Failed to set manual field in kopia scheduling policy for source")
}

if ferr := rep.Flush(ctx); ferr != nil {
return "", "", errors.Wrap(ferr, "Failed to flush kopia repository")
}

// TODO: Add size related logs for parsing
fmt.Printf("\nCreated snapshot with root %v and ID %v in %v\n", manifest.RootObjectID(), snapID, time.Since(t0).Truncate(time.Second))

return string(snapID), string(manifest.RootObjectID()), nil
}

// findPreviousSnapshotManifest returns the list of previous snapshots for a given source,
// including last complete snapshot
func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sourceInfo snapshot.SourceInfo, noLaterThan *time.Time) ([]*snapshot.Manifest, error) {
man, err := snapshot.ListSnapshots(ctx, rep, sourceInfo)
if err != nil {
return nil, errors.Wrap(err, "Failed to list previous kopia snapshots")
}

// find latest complete snapshot
var previousComplete *snapshot.Manifest
var result []*snapshot.Manifest

for _, p := range man {
if noLaterThan != nil && p.StartTime.After(*noLaterThan) {
continue
}

if p.IncompleteReason == "" && (previousComplete == nil || p.StartTime.After(previousComplete.StartTime)) {
previousComplete = p
}
}

if previousComplete != nil {
result = append(result, previousComplete)
}

return result, nil
}
22 changes: 22 additions & 0 deletions pkg/kopia/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,25 @@ func extractFingerprintFromSliceOfBytes(pemData []byte) (string, error) {
fingerprint := sha256.Sum256(cert.Raw)
return hex.EncodeToString(fingerprint[:]), nil
}

// ExtractFingerprintFromCertificateJSON fetch the fingerprint from a base64 encoded,
// certificate which is also type asserted into a string.
func ExtractFingerprintFromCertificateJSON(cert string) (string, error) {
var certMap map[string]string

if err := json.Unmarshal([]byte(cert), &certMap); err != nil {
return "", errors.Wrap(err, "Failed to unmarshal Kopia API Server Certificate Secret Data")
}

decodedCertData, err := base64.StdEncoding.DecodeString(certMap[tlsCertificateKey])
if err != nil {
return "", errors.Wrap(err, "Failed to base64 decode Kopia API Server Certificate Secret Data")
}

fingerprint, err := extractFingerprintFromSliceOfBytes(decodedCertData)
if err != nil {
return "", errors.Wrap(err, "Failed to extract fingerprint Kopia API Server Certificate Secret Data")
}

return fingerprint, nil
}

0 comments on commit 37d272f

Please sign in to comment.