Skip to content

Commit

Permalink
Add port-forward functionalities to vmexport command in virtctl
Browse files Browse the repository at this point in the history
VirtualMachineExport volumes are only downloadable outside the cluster when proper ingress or routes are configured.

In some cases it's useful to force a way to download the volumes without this configuration, so this commit introduces an option to set port-forwarding using the vmexport service, so we can get the vmexport contents outside the cluster without additional configuration.

Signed-off-by: Alvaro Romero <alromero@redhat.com>
  • Loading branch information
alromeros committed Jul 20, 2023
1 parent 34daa9c commit 022607a
Show file tree
Hide file tree
Showing 11 changed files with 648 additions and 8 deletions.
11 changes: 11 additions & 0 deletions pkg/virtctl/memorydump/memorydump.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
createClaimArg = "create-claim"
storageClassArg = "storage-class"
accessModeArg = "access-mode"
portForwardArg = "port-forward"

configName = "config"
filesystemOverhead = cdiv1.Percent("0.055")
Expand All @@ -61,6 +62,7 @@ const (
var (
claimName string
createClaim bool
portForward string
storageClass string
accessMode string
outputFile string
Expand Down Expand Up @@ -113,6 +115,7 @@ func NewMemoryDumpCommand(clientConfig clientcmd.ClientConfig) *cobra.Command {
cmd.SetUsageTemplate(templates.UsageTemplate())
cmd.Flags().StringVar(&claimName, claimNameArg, "", "pvc name to contain the memory dump")
cmd.Flags().BoolVar(&createClaim, createClaimArg, false, "Create the pvc that will conatin the memory dump")
cmd.Flags().StringVar(&portForward, portForwardArg, "", "Configure and set port-forward in the specified port to download the memory dump")
cmd.Flags().StringVar(&storageClass, storageClassArg, "", "The storage class for the PVC.")
cmd.Flags().StringVar(&accessMode, accessModeArg, "", "The access mode for the PVC.")
cmd.Flags().StringVar(&outputFile, "output", "", "Specifies the output path of the memory dump to be downloaded.")
Expand Down Expand Up @@ -328,6 +331,14 @@ func downloadMemoryDump(namespace, vmName string, virtClient kubecli.KubevirtCli
Name: vmexportName,
ExportSource: exportSource,
}
// Complete port-forward arguments
if portForward != "" {
vmExportInfo.PortForward = true
vmExportInfo.TargetPort = portForward
if vmExportInfo.ServiceURL == "" {
vmExportInfo.ServiceURL = fmt.Sprintf("127.0.0.1:%s", portForward)
}
}
// User wants the output in a file, create
output, err := os.Create(vmExportInfo.OutputFile)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/virtctl/vmexport/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ go_library(
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
"//vendor/k8s.io/client-go/tools/portforward:go_default_library",
"//vendor/k8s.io/client-go/transport/spdy:go_default_library",
"//vendor/k8s.io/kubectl/pkg/util:go_default_library",
],
)

Expand Down
184 changes: 176 additions & 8 deletions pkg/virtctl/vmexport/vmexport.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ import (
"crypto/x509"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"os/signal"
"strconv"
"strings"
"time"

Expand All @@ -37,9 +40,13 @@ import (
k8sv1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
kubectlutil "k8s.io/kubectl/pkg/util"

virtv1 "kubevirt.io/api/core/v1"
exportv1 "kubevirt.io/api/export/v1alpha1"
Expand Down Expand Up @@ -70,6 +77,8 @@ const (
OUTPUT_FORMAT_FLAG = "--manifest-output-format"
SERVICE_URL_FLAG = "--service-url"
INCLUDE_SECRET_FLAG = "--include-secret"
PORT_FORWARD_FLAG = "--port-forward"
TARGET_PORT_FLAG = "--target-port"

// Possible output format for manifests
OUTPUT_FORMAT_JSON = "json"
Expand Down Expand Up @@ -119,6 +128,7 @@ var (
shouldCreate bool
includeSecret bool
exportManifest bool
portForward string
serviceUrl string
volumeName string
ttl string
Expand All @@ -141,6 +151,8 @@ type VMExportInfo struct {
KeepVme bool
IncludeSecret bool
ExportManifest bool
PortForward bool
TargetPort string
OutputFile string
OutputWriter io.Writer
VolumeName string
Expand Down Expand Up @@ -191,6 +203,9 @@ func usage() string {
# Download a volume from an already existing VirtualMachineExport (--volume is optional when only one volume is available)
{{ProgramName}} vmexport download vm1-export --volume=volume1 --output=disk.img.gz
# Download a volume as before but through local port 5410
{{ProgramName}} vmexport download vm1-export --volume=volume1 --output=disk.img.gz --port-forward=5410
# Create a VirtualMachineExport and download the requested volume from it
{{ProgramName}} vmexport download vm1-export --vm=vm1 --volume=volume1 --output=disk.img.gz
Expand Down Expand Up @@ -227,6 +242,7 @@ func NewVirtualMachineExportCommand(clientConfig clientcmd.ClientConfig) *cobra.
cmd.Flags().StringVar(&ttl, "ttl", "", "The time after the export was created that it is eligible to be automatically deleted, defaults to 2 hours by the server side if not specified")
cmd.Flags().StringVar(&manifestOutputFormat, "manifest-output-format", "", "Manifest output format, defaults to Yaml. Valid options are yaml or json")
cmd.Flags().StringVar(&serviceUrl, "service-url", "", "Specify service url to use in the returned manifest, instead of the external URL in the Virtual Machine export status. This is useful for NodePorts or if you don't have an external URL configured")
cmd.Flags().StringVar(&portForward, "port-forward", "", "Configures port-forwarding on the specified port. Useful to download without proper ingress/route configuration")
cmd.Flags().BoolVar(&includeSecret, "include-secret", false, "When used with manifest and set to true include a secret that contains proper headers for CDI to import using the manifest")
cmd.Flags().BoolVar(&exportManifest, "manifest", false, "Instead of downloading a volume, retrieve the VM manifest")
cmd.SetUsageTemplate(templates.UsageTemplate())
Expand Down Expand Up @@ -295,6 +311,14 @@ func (c *command) parseExportArguments(args []string, vmeInfo *VMExportInfo) err
vmeInfo.Name = args[1]

// We store the flags in a struct to avoid relying on global variables
if err := c.initVMExportInfo(vmeInfo); err != nil {
return err
}

return nil
}

func (c *command) initVMExportInfo(vmeInfo *VMExportInfo) error {
vmeInfo.ExportSource = getExportSource()
vmeInfo.OutputFile = outputFile
// User wants the output in a file, create
Expand All @@ -315,6 +339,15 @@ func (c *command) parseExportArguments(args []string, vmeInfo *VMExportInfo) err
vmeInfo.OutputFormat = manifestOutputFormat
vmeInfo.IncludeSecret = includeSecret
vmeInfo.ExportManifest = exportManifest
if portForward != "" {
vmeInfo.PortForward = true
vmeInfo.Insecure = true
vmeInfo.TargetPort = portForward
if vmeInfo.ServiceURL == "" {
// Defaulting to localhost
vmeInfo.ServiceURL = fmt.Sprintf("127.0.0.1:%s", portForward)
}
}
vmeInfo.TTL = metav1.Duration{}
if ttl != "" {
duration, err := time.ParseDuration(ttl)
Expand All @@ -323,7 +356,6 @@ func (c *command) parseExportArguments(args []string, vmeInfo *VMExportInfo) err
}
vmeInfo.TTL = metav1.Duration{Duration: duration}
}

return nil
}

Expand Down Expand Up @@ -408,6 +440,12 @@ func DownloadVirtualMachineExport(client kubecli.KubevirtClient, vmeInfo *VMExpo
defer DeleteVirtualMachineExport(client, vmeInfo)
}

if vmeInfo.PortForward {
if err := setupPortForward(client, vmeInfo); err != nil {
return err
}
}

// Wait for the vmexport object to be ready
if err := ExportProcessingComplete(client, vmeInfo, processingWaitInterval, processingWaitTotal); err != nil {
return err
Expand Down Expand Up @@ -596,12 +634,8 @@ func GetManifestUrlsFromVirtualMachineExport(vmexport *exportv1.VirtualMachineEx
func waitForVirtualMachineExport(client kubecli.KubevirtClient, vmeInfo *VMExportInfo, interval, timeout time.Duration) error {
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
vmexport, err := getVirtualMachineExport(client, vmeInfo)
if err != nil {
return true, err
}

if vmexport == nil {
return true, err
if err != nil || vmexport == nil {
return false, err
}

if vmexport.Status == nil {
Expand Down Expand Up @@ -791,6 +825,9 @@ func handleCreateFlags() error {
if keepVme {
return fmt.Errorf(ErrIncompatibleFlag, KEEP_FLAG, CREATE)
}
if portForward != "" {
return fmt.Errorf(ErrIncompatibleFlag, PORT_FORWARD_FLAG, CREATE)
}
if serviceUrl != "" {
return fmt.Errorf(ErrIncompatibleFlag, SERVICE_URL_FLAG, CREATE)
}
Expand All @@ -816,8 +853,11 @@ func handleDeleteFlags() error {
if keepVme {
return fmt.Errorf(ErrIncompatibleFlag, KEEP_FLAG, DELETE)
}
if portForward != "" {
return fmt.Errorf(ErrIncompatibleFlag, PORT_FORWARD_FLAG, DELETE)
}
if serviceUrl != "" {
return fmt.Errorf(ErrIncompatibleFlag, SERVICE_URL_FLAG, CREATE)
return fmt.Errorf(ErrIncompatibleFlag, SERVICE_URL_FLAG, DELETE)
}

return nil
Expand Down Expand Up @@ -857,3 +897,131 @@ func getExportSecretName(vmexportName string) string {
func errExportAlreadyExists(err error) bool {
return strings.Contains(err.Error(), "VirtualMachineExport") && strings.Contains(err.Error(), "already exists")
}

// Port-forward functions

// translateServicePortToTargetPort tranlates the specified port to be used with the service's pod
func translateServicePortToTargetPort(localPort string, remotePort string, svc k8sv1.Service, pod k8sv1.Pod) ([]string, error) {
ports := []string{}
portnum, err := strconv.Atoi(remotePort)
if err != nil {
return ports, err
}
containerPort, err := kubectlutil.LookupContainerPortNumberByServicePort(svc, pod, int32(portnum))
if err != nil {
// can't resolve a named port, or Service did not declare this port, return an error
return ports, err
}

// convert the resolved target port back to a string
remotePort = strconv.Itoa(int(containerPort))
if localPort != remotePort {
return append(ports, fmt.Sprintf("%s:%s", localPort, remotePort)), nil
}

return append(ports, remotePort), nil
}

// waitForExportServiceToBeReady waits until the vmexport service is ready for port-forwarding
func waitForExportServiceToBeReady(client kubecli.KubevirtClient, vmeInfo *VMExportInfo, interval, timeout time.Duration) (*k8sv1.Service, error) {
service := &k8sv1.Service{}
serviceName := fmt.Sprintf("virt-export-%s", vmeInfo.Name)
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
vmexport, err := getVirtualMachineExport(client, vmeInfo)
if err != nil || vmexport == nil {
return false, err
}

if vmexport.Status == nil || vmexport.Status.Phase != exportv1.Ready {
fmt.Printf("waiting for VM Export %s status to be ready...\n", vmeInfo.Name)
return false, nil
}

service, err = client.CoreV1().Services(vmeInfo.Namespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
fmt.Printf("waiting for service %s to be ready before port-forwarding...\n", serviceName)
return false, nil
}
return false, err
}
return true, nil
})
return service, err
}

// runPortForward is the actual function that runs the port-forward. Meant to be run concurrently
func runPortForward(client kubecli.KubevirtClient, pod k8sv1.Pod, namespace string, ports []string, stopChan, readyChan chan struct{}) error {
// Create a port forwarding request
req := client.CoreV1().RESTClient().Post().
Resource("pods").
Name(pod.Name).
Namespace(namespace).
SubResource("portforward")

// Set up the port forwarding options
transport, upgrader, err := spdy.RoundTripperFor(client.Config())
if err != nil {
log.Fatalf("Failed to set up transport: %v", err)
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL())

signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
defer signal.Stop(signals)
go func() {
<-signals
if stopChan != nil {
close(stopChan)
}
}()

// Start port-forwarding
fw, err := portforward.New(dialer, ports, stopChan, readyChan, os.Stdout, os.Stderr)
if err != nil {
log.Fatalf("Failed to setup port forward")
}
return fw.ForwardPorts()
}

// setupPortForward runs a port-forward after initializing all required arguments
func setupPortForward(client kubecli.KubevirtClient, vmeInfo *VMExportInfo) error {
// Wait for the vmexport object to be ready
service, err := waitForExportServiceToBeReady(client, vmeInfo, processingWaitInterval, processingWaitTotal)
if err != nil {
return err
}

// Extract the target pod selector from the service
podSelector := labels.SelectorFromSet(service.Spec.Selector)

// List the pods matching the selector
podList, err := client.CoreV1().Pods(vmeInfo.Namespace).List(context.Background(), metav1.ListOptions{LabelSelector: podSelector.String()})
if err != nil {
return fmt.Errorf("Failed to list pods: %v", err)
}

// Pick the first pod to forward the port
if len(podList.Items) == 0 {
return fmt.Errorf("No pods found for the service %s", service.Name)
}

// Set up the port forwarding ports
ports, err := translateServicePortToTargetPort(vmeInfo.TargetPort, "443", *service, podList.Items[0])
if err != nil {
return err
}

stopChan := make(chan struct{}, 1)
readyChan := make(chan struct{})
go runPortForward(client, podList.Items[0], vmeInfo.Namespace, ports, stopChan, readyChan)

// Wait for the port forwarding to be ready
select {
case <-readyChan:
fmt.Println("Port forwarding is ready.")
case <-time.After(30 * time.Second):
return fmt.Errorf("Timeout waiting for port forwarding to be ready.")
}
return nil
}

0 comments on commit 022607a

Please sign in to comment.