Skip to content

Commit

Permalink
fix: lbsvc in same ns (#129)
Browse files Browse the repository at this point in the history
* test: add test for lbsvc strategy for pvcs in the same ns

Signed-off-by: Utku Ozdemir <uoz@protonmail.com>

* test: add test for lbsvc strategy for pvcs in the same ns

Signed-off-by: Utku Ozdemir <uoz@protonmail.com>

* fix: use different names for src and dest releases on lbsvc strategy

Signed-off-by: Utku Ozdemir <uoz@protonmail.com>

* fix: releaseName on mnt2

Signed-off-by: Utku Ozdemir <uoz@protonmail.com>
  • Loading branch information
utkuozdemir committed Nov 26, 2021
1 parent 21fb5bf commit 8aa71a6
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 44 deletions.
26 changes: 26 additions & 0 deletions internal/integrationtest/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,32 @@ func TestSameNS(t *testing.T) {
assert.NoError(t, err)
}

func TestSameNSLbSvc(t *testing.T) {
assert.NoError(t, clearDests())

_, err := execInPod(ns1, "dest", generateExtraDataShellCommand)
assert.NoError(t, err)

cmd := fmt.Sprintf("-l debug m -s lbsvc -i -n %s -N %s source dest", ns1, ns1)
assert.NoError(t, runCliApp(cmd))

stdout, err := execInPod(ns1, "dest", printDataUidGidContentShellCommand)
assert.NoError(t, err)

parts := strings.Split(stdout, "\n")
assert.Equal(t, len(parts), 3)
if len(parts) < 3 {
return
}

assert.Equal(t, dataFileUid, parts[0])
assert.Equal(t, dataFileGid, parts[1])
assert.Equal(t, generateDataContent, parts[2])

_, err = execInPod(ns1, "dest", checkExtraDataShellCommand)
assert.NoError(t, err)
}

func TestNoChown(t *testing.T) {
assert.NoError(t, clearDests())

Expand Down
8 changes: 4 additions & 4 deletions internal/migrator/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ func (m *migrator) Run(mig *migration.Migration) error {
for _, name := range mig.Strategies {
id := util.RandomHexadecimalString(5)
e := task.Execution{
ID: id,
HelmReleaseName: "pv-migrate-" + id,
Task: t,
Logger: t.Logger.WithField("id", id),
ID: id,
HelmReleaseNamePrefix: "pv-migrate-" + id,
Task: t,
Logger: t.Logger.WithField("id", id),
}

sLogger := e.Logger.WithField("strategy", name)
Expand Down
25 changes: 15 additions & 10 deletions internal/strategy/lbsvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,40 @@ func (r *LbSvc) Run(e *task.Execution) (bool, error) {
}
privateKeyMountPath := "/root/.ssh/id_" + keyAlgorithm

doneCh := registerCleanupHook(e)
defer cleanupAndReleaseHook(e, doneCh)
srcReleaseName := e.HelmReleaseNamePrefix + "-src"
destReleaseName := e.HelmReleaseNamePrefix + "-dest"
releaseNames := []string{srcReleaseName, destReleaseName}

err = installOnSource(e, publicKey)
doneCh := registerCleanupHook(e, releaseNames)
defer cleanupAndReleaseHook(e, releaseNames, doneCh)

err = installOnSource(e, srcReleaseName, publicKey)
if err != nil {
return true, err
}

sourceKubeClient := e.Task.SourceInfo.ClusterClient.KubeClient
svcName := fmt.Sprintf("pv-migrate-%s-sshd", e.ID)
svcName := srcReleaseName + "-sshd"
lbSvcAddress, err := k8s.GetServiceAddress(sourceKubeClient, sourceNs, svcName)
if err != nil {
return true, err
}

sshTargetHost := formatSSHTargetHost(lbSvcAddress)

err = installOnDest(e, privateKey, privateKeyMountPath, sshTargetHost)
err = installOnDest(e, destReleaseName, privateKey, privateKeyMountPath, sshTargetHost)
if err != nil {
return true, err
}

showProgressBar := !e.Task.Migration.Options.NoProgressBar
kubeClient := s.ClusterClient.KubeClient
jobName := e.HelmReleaseName + "-rsync"
jobName := destReleaseName + "-rsync"
err = k8s.WaitForJobCompletion(e.Logger, kubeClient, destNs, jobName, showProgressBar)
return true, err
}

func installOnSource(e *task.Execution, publicKey string) error {
func installOnSource(e *task.Execution, releaseName string, publicKey string) error {
t := e.Task
s := t.SourceInfo
ns := s.Claim.Namespace
Expand All @@ -73,10 +77,11 @@ func installOnSource(e *task.Execution, publicKey string) error {
"source.path=" + t.Migration.Source.Path,
}

return installHelmChart(e, s, helmValues)
return installHelmChart(e, s, releaseName, helmValues)
}

func installOnDest(e *task.Execution, privateKey string, privateKeyMountPath string, sshHost string) error {
func installOnDest(e *task.Execution, releaseName string, privateKey string,
privateKeyMountPath string, sshHost string) error {
t := e.Task
d := t.DestInfo
ns := d.Claim.Namespace
Expand All @@ -95,7 +100,7 @@ func installOnDest(e *task.Execution, privateKey string, privateKeyMountPath str
"dest.path=" + t.Migration.Dest.Path,
}

return installHelmChart(e, d, helmValues)
return installHelmChart(e, d, releaseName, helmValues)
}

func formatSSHTargetHost(host string) string {
Expand Down
11 changes: 7 additions & 4 deletions internal/strategy/mnt2.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,20 @@ func (r *Mnt2) Run(e *task.Execution) (bool, error) {
"dest.path=" + t.Migration.Dest.Path,
}

doneCh := registerCleanupHook(e)
defer cleanupAndReleaseHook(e, doneCh)
releaseName := e.HelmReleaseNamePrefix
releaseNames := []string{releaseName}

err := installHelmChart(e, s, helmValues)
doneCh := registerCleanupHook(e, releaseNames)
defer cleanupAndReleaseHook(e, releaseNames, doneCh)

err := installHelmChart(e, s, releaseName, helmValues)
if err != nil {
return true, err
}

showProgressBar := !opts.NoProgressBar
kubeClient := t.SourceInfo.ClusterClient.KubeClient
jobName := e.HelmReleaseName + "-rsync"
jobName := e.HelmReleaseNamePrefix + "-rsync"
err = k8s.WaitForJobCompletion(e.Logger, kubeClient, ns, jobName, showProgressBar)
return true, err
}
Expand Down
31 changes: 13 additions & 18 deletions internal/strategy/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ func GetStrategiesMapForNames(names []string) (map[string]Strategy, error) {
return sts, nil
}

func registerCleanupHook(e *task.Execution) chan<- bool {
func registerCleanupHook(e *task.Execution, releaseNames []string) chan<- bool {
doneCh := make(chan bool)
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
go func() {
select {
case <-signalCh:
e.Logger.Warn(":large_orange_diamond: Received termination signal")
cleanup(e)
cleanup(e, releaseNames)
os.Exit(1)
case <-doneCh:
return
Expand All @@ -74,31 +74,26 @@ func registerCleanupHook(e *task.Execution) chan<- bool {
return doneCh
}

func cleanupAndReleaseHook(e *task.Execution, doneCh chan<- bool) {
cleanup(e)
func cleanupAndReleaseHook(e *task.Execution, releaseNames []string, doneCh chan<- bool) {
cleanup(e, releaseNames)
doneCh <- true
}

func cleanup(e *task.Execution) {
func cleanup(e *task.Execution, releaseNames []string) {
t := e.Task
logger := e.Logger
logger.Info(":broom: Cleaning up")
var result *multierror.Error
s := t.SourceInfo

err := cleanupForPVC(logger, e.HelmReleaseName, s)
if err != nil {
result = multierror.Append(result, err)
}

d := t.DestInfo
err = cleanupForPVC(logger, e.HelmReleaseName, d)
if err != nil {
result = multierror.Append(result, err)

for _, name := range releaseNames {
err := cleanupForPVC(logger, name, s)
if err != nil {
result = multierror.Append(result, err)
}
}

err = result.ErrorOrNil()
err := result.ErrorOrNil()
if err != nil {
logger.WithError(err).
Warn(":large_orange_diamond: Cleanup failed, you might want to clean up manually")
Expand Down Expand Up @@ -147,15 +142,15 @@ func getMergedHelmValues(helmValues []string, opts *migration.Options) (map[stri
return valsOptions.MergeValues(helmProviders)
}

func installHelmChart(e *task.Execution, pvcInfo *pvc.Info, values []string) error {
func installHelmChart(e *task.Execution, pvcInfo *pvc.Info, name string, values []string) error {
helmActionConfig, err := initHelmActionConfig(e.Logger, pvcInfo)
if err != nil {
return err
}

install := action.NewInstall(helmActionConfig)
install.Namespace = pvcInfo.Claim.Namespace
install.ReleaseName = e.HelmReleaseName
install.ReleaseName = name
install.Wait = true
install.Timeout = 1 * time.Minute

Expand Down
11 changes: 7 additions & 4 deletions internal/strategy/svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,20 @@ func (r *Svc) Run(e *task.Execution) (bool, error) {
"dest.path=" + t.Migration.Dest.Path,
}

doneCh := registerCleanupHook(e)
defer cleanupAndReleaseHook(e, doneCh)
releaseName := e.HelmReleaseNamePrefix
releaseNames := []string{releaseName}

err = installHelmChart(e, d, helmValues)
doneCh := registerCleanupHook(e, releaseNames)
defer cleanupAndReleaseHook(e, releaseNames, doneCh)

err = installHelmChart(e, d, releaseName, helmValues)
if err != nil {
return true, err
}

showProgressBar := !opts.NoProgressBar
kubeClient := t.SourceInfo.ClusterClient.KubeClient
jobName := e.HelmReleaseName + "-rsync"
jobName := releaseName + "-rsync"
err = k8s.WaitForJobCompletion(e.Logger, kubeClient, destNs, jobName, showProgressBar)
return true, err
}
8 changes: 4 additions & 4 deletions internal/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type Task struct {
}

type Execution struct {
ID string
HelmReleaseName string
Task *Task
Logger *log.Entry
ID string
HelmReleaseNamePrefix string
Task *Task
Logger *log.Entry
}

0 comments on commit 8aa71a6

Please sign in to comment.