From 6dfa3c9f07831f6d42f6629d93273537a9353055 Mon Sep 17 00:00:00 2001 From: Sanket Jadhav Date: Mon, 6 Apr 2026 12:19:05 +0530 Subject: [PATCH 1/3] Add ClusterName field to clusterContext and update related log messages --- .../pkg/object/command/sparkeks/sparkeks.go | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/internal/pkg/object/command/sparkeks/sparkeks.go b/internal/pkg/object/command/sparkeks/sparkeks.go index a9fdb45..4ae3097 100644 --- a/internal/pkg/object/command/sparkeks/sparkeks.go +++ b/internal/pkg/object/command/sparkeks/sparkeks.go @@ -114,6 +114,7 @@ type clusterContext struct { Properties map[string]string `yaml:"properties,omitempty" json:"properties,omitempty"` Image *string `yaml:"image,omitempty" json:"image,omitempty"` Region *string `yaml:"region,omitempty" json:"region,omitempty"` + ClusterName *string `yaml:"cluster_name,omitempty" json:"cluster_name,omitempty"` SparkApplicationFile string `yaml:"spark_application_file,omitempty" json:"spark_application_file,omitempty"` RequiredSparkSQLExtensions string `yaml:"required_spark_sql_extensions,omitempty" json:"required_spark_sql_extensions,omitempty"` } @@ -513,7 +514,7 @@ func getAndUploadPodContainerLogs(ctx context.Context, execCtx *executionContext } // getSparkApplicationPodLogs fetches logs from pods and uploads them to S3. -func getSparkApplicationPodLogs(ctx context.Context, execCtx *executionContext, pods []corev1.Pod, writeToStderr bool) error { +func getSparkApplicationPodLogs(ctx context.Context, execCtx *executionContext, pods []corev1.Pod, writeToStderr bool) { for _, pod := range pods { if !isPodInValidPhase(pod) { continue @@ -525,7 +526,6 @@ func getSparkApplicationPodLogs(ctx context.Context, execCtx *executionContext, getAndUploadPodContainerLogs(ctx, execCtx, pod, container, true, stderrLogSuffix, false) } } - return nil } // createSparkClients creates Kubernetes and Spark clients for the EKS cluster. @@ -563,7 +563,11 @@ func createSparkClients(ctx context.Context, execCtx *executionContext) error { execCtx.kubeClient = kubeClient if execCtx.runtime != nil && execCtx.runtime.Stdout != nil { - execCtx.runtime.Stdout.WriteString(fmt.Sprintf("Successfully created Spark Operator and Kubernetes clients for cluster: %s\n", execCtx.cluster.Name)) + clusterName := "" + if execCtx.clusterContext.ClusterName != nil { + clusterName = *execCtx.clusterContext.ClusterName + } + execCtx.runtime.Stdout.WriteString(fmt.Sprintf("Successfully created Spark Operator and Kubernetes clients for cluster: %s\n", clusterName)) } return nil } @@ -590,10 +594,15 @@ func updateKubeConfig(ctx context.Context, execCtx *executionContext) (string, e kubeconfigPath := tmpfile.Name() tmpfile.Close() // Close the file so `aws` can write to it + clusterName := "" + if execCtx.clusterContext.ClusterName != nil { + clusterName = *execCtx.clusterContext.ClusterName + } + args := []string{ "eks", "update-kubeconfig", "--region", region, - "--name", execCtx.cluster.Name, + "--name", clusterName, "--kubeconfig", kubeconfigPath, } if roleArn != "" { @@ -671,7 +680,7 @@ func applySparkOperatorConfig(execCtx *executionContext) { // Add default spark properties sparkApp.Spec.SparkConf[sparkAppNameProperty] = execCtx.appName - // Set spark event log directory for spark history server + // Set spark event log directory for spark history server if execCtx.commandContext.EventLogURI != "" { eventLogURI := updateS3ToS3aURI(execCtx.commandContext.EventLogURI) sparkApp.Spec.SparkConf[sparkEventLogDirProperty] = eventLogURI @@ -902,9 +911,7 @@ func collectSparkApplicationLogs(ctx context.Context, execCtx *executionContext, return } - if err := getSparkApplicationPodLogs(ctx, execCtx, pods, writeToStderr); err != nil { - execCtx.runtime.Stderr.WriteString(fmt.Sprintf("Warning: failed to collect pod logs: %v\n", err)) - } + getSparkApplicationPodLogs(ctx, execCtx, pods, writeToStderr) } // isTerminalState checks if the given state is a terminal state. From 9639b557c81611d65639c5c7fab5ffd463b96173 Mon Sep 17 00:00:00 2001 From: Sanket Jadhav Date: Mon, 6 Apr 2026 13:06:18 +0530 Subject: [PATCH 2/3] remove test changes --- internal/pkg/object/command/sparkeks/sparkeks.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/pkg/object/command/sparkeks/sparkeks.go b/internal/pkg/object/command/sparkeks/sparkeks.go index 4ae3097..ed06022 100644 --- a/internal/pkg/object/command/sparkeks/sparkeks.go +++ b/internal/pkg/object/command/sparkeks/sparkeks.go @@ -514,7 +514,7 @@ func getAndUploadPodContainerLogs(ctx context.Context, execCtx *executionContext } // getSparkApplicationPodLogs fetches logs from pods and uploads them to S3. -func getSparkApplicationPodLogs(ctx context.Context, execCtx *executionContext, pods []corev1.Pod, writeToStderr bool) { +func getSparkApplicationPodLogs(ctx context.Context, execCtx *executionContext, pods []corev1.Pod, writeToStderr bool) error { for _, pod := range pods { if !isPodInValidPhase(pod) { continue @@ -526,6 +526,7 @@ func getSparkApplicationPodLogs(ctx context.Context, execCtx *executionContext, getAndUploadPodContainerLogs(ctx, execCtx, pod, container, true, stderrLogSuffix, false) } } + return nil } // createSparkClients creates Kubernetes and Spark clients for the EKS cluster. @@ -911,7 +912,9 @@ func collectSparkApplicationLogs(ctx context.Context, execCtx *executionContext, return } - getSparkApplicationPodLogs(ctx, execCtx, pods, writeToStderr) + if err := getSparkApplicationPodLogs(ctx, execCtx, pods, writeToStderr); err != nil { + execCtx.runtime.Stderr.WriteString(fmt.Sprintf("Warning: failed to collect pod logs: %v\n", err)) + } } // isTerminalState checks if the given state is a terminal state. From 96379689e74e196315b2c4516b4712bf2a51e70e Mon Sep 17 00:00:00 2001 From: Sanket Jadhav Date: Mon, 6 Apr 2026 15:52:52 +0530 Subject: [PATCH 3/3] Rename ClusterName to KubernetesClusterName in clusterContext for clarity --- internal/pkg/object/command/sparkeks/sparkeks.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/pkg/object/command/sparkeks/sparkeks.go b/internal/pkg/object/command/sparkeks/sparkeks.go index ed06022..658f0cb 100644 --- a/internal/pkg/object/command/sparkeks/sparkeks.go +++ b/internal/pkg/object/command/sparkeks/sparkeks.go @@ -114,7 +114,7 @@ type clusterContext struct { Properties map[string]string `yaml:"properties,omitempty" json:"properties,omitempty"` Image *string `yaml:"image,omitempty" json:"image,omitempty"` Region *string `yaml:"region,omitempty" json:"region,omitempty"` - ClusterName *string `yaml:"cluster_name,omitempty" json:"cluster_name,omitempty"` + KubernetesClusterName *string `yaml:"cluster_name,omitempty" json:"cluster_name,omitempty"` SparkApplicationFile string `yaml:"spark_application_file,omitempty" json:"spark_application_file,omitempty"` RequiredSparkSQLExtensions string `yaml:"required_spark_sql_extensions,omitempty" json:"required_spark_sql_extensions,omitempty"` } @@ -565,8 +565,8 @@ func createSparkClients(ctx context.Context, execCtx *executionContext) error { if execCtx.runtime != nil && execCtx.runtime.Stdout != nil { clusterName := "" - if execCtx.clusterContext.ClusterName != nil { - clusterName = *execCtx.clusterContext.ClusterName + if execCtx.clusterContext.KubernetesClusterName != nil { + clusterName = *execCtx.clusterContext.KubernetesClusterName } execCtx.runtime.Stdout.WriteString(fmt.Sprintf("Successfully created Spark Operator and Kubernetes clients for cluster: %s\n", clusterName)) } @@ -596,8 +596,8 @@ func updateKubeConfig(ctx context.Context, execCtx *executionContext) (string, e tmpfile.Close() // Close the file so `aws` can write to it clusterName := "" - if execCtx.clusterContext.ClusterName != nil { - clusterName = *execCtx.clusterContext.ClusterName + if execCtx.clusterContext.KubernetesClusterName != nil { + clusterName = *execCtx.clusterContext.KubernetesClusterName } args := []string{