In [None]:
!pip install kfp[kubernetes]

In [None]:
import kfp 

from kfp.dsl import component, pipeline
from kfp import kubernetes

In [None]:
HTTP_PROXY, HTTPS_PROXY, NO_PROXY = 'http://egress.ps7.internal:3128', 'http://egress.ps7.internal:3128', "127.0.0.1,localhost,::1,10.0.0.0/8,172.16.0.0/16,192.168.0.0/16,10.152.183.0/24,.svc,.local,.kubeflow"

def add_proxy(obj, http_proxy=HTTP_PROXY, https_proxy=HTTPS_PROXY, no_proxy=NO_PROXY):
    """Adds the proxy env vars to the PipelineTask object."""
    return (
        obj.set_env_variable(name="http_proxy", value=http_proxy)
        .set_env_variable(name="https_proxy", value=https_proxy)
        .set_env_variable(name="HTTP_PROXY", value=http_proxy)
        .set_env_variable(name="HTTPS_PROXY", value=https_proxy)
        .set_env_variable(name="no_proxy", value=no_proxy)
        .set_env_variable(name="NO_PROXY", value=no_proxy)
    )

In [None]:
# This is the same as below, but some of the logic put in the spark8t library
# @component(
#     base_image="docker.io/bikalpadhakalcanonical/charmed-spark:5",
#     packages_to_install=["pyspark==3.4.2"]
# )
# def spark_test_component() -> None:
#     import logging
#     from operator import add
#     from spark8t.session import SparkSession
         
#     def count_vowels(text: str) -> int:
#       count = 0
#       for char in text:
#         if char.lower() in "aeiou":
#           count += 1
#       return count

#     lines = """Canonical's Charmed Data Platform solution for Apache Spark runs Spark jobs on your Kubernetes cluster.
#     You can get started right away with MicroK8s - the mightiest tiny Kubernetes distro around! 
#     The spark-client snap simplifies the setup process to run Spark jobs against your Kubernetes cluster. 
#     Spark on Kubernetes is a complex environment with many moving parts.
#     Sometimes, small mistakes can take a lot of time to debug and figure out.
#     """
    
#     with SparkSession(app_name="CountVowels", namespace="admin", username="spark") as spark:
#         n = spark.sparkContext.parallelize(lines.splitlines(), 2).map(count_vowels).reduce(add)
#         logging.warning(f"The number of vowels in the string is {n}")


@component(
    base_image="ghcr.io/canonical/charmed-spark:3.5-22.04_edge",
)
def spark_test_component() -> None:
    import logging
    import os
    import pyspark
    import socket
    from lightkube import Client
    from operator import add
    from spark8t.services import K8sServiceAccountRegistry
    from spark8t.services import LightKube as LightKubeInterface
    
    def count_vowels(text: str) -> int:
      count = 0
      for char in text:
        if char.lower() in "aeiou":
          count += 1
      return count

    lines = """Canonical's Charmed Data Platform solution for Apache Spark runs Spark jobs on your Kubernetes cluster.
    You can get started right away with MicroK8s - the mightiest tiny Kubernetes distro around! 
    The spark-client snap simplifies the setup process to run Spark jobs against your Kubernetes cluster. 
    Spark on Kubernetes is a complex environment with many moving parts.
    Sometimes, small mistakes can take a lot of time to debug and figure out.
    """

    app_name = "CountVowels"
    SPARK_SERVICE_ACCOUNT = os.environ["SPARK_SERVICE_ACCOUNT"]
    SPARK_NAMESPACE = os.environ["SPARK_NAMESPACE"]

    pod_ip = socket.gethostbyname(socket.gethostname())
    k8s_master = Client().config.cluster.server
    interface = LightKubeInterface(None, None)
    registry = K8sServiceAccountRegistry(interface)

    import re
    from spark8t.utils import environ

    host_parser = re.compile("^(?:https?:\/\/)?(?:[^@\/\n]+@)?(?:www\.)?([^:\/\n]+)")
    rest_api_host = host_parser.match(k8s_master).groups()[0]

    with environ(NO_PROXY=rest_api_host, no_proxy=rest_api_host):
        spark_properties = registry.get(
            f"{SPARK_NAMESPACE}:{SPARK_SERVICE_ACCOUNT}"
        ).configurations.props | {
            "spark.driver.host": pod_ip,
        }

    builder = pyspark.sql.SparkSession\
                    .builder\
                    .appName(app_name)\
                    .master(f"k8s://{k8s_master}")
    for conf, val in spark_properties.items():
        builder = builder.config(conf, val)
    session = builder.getOrCreate()

    n = session.sparkContext.parallelize(lines.splitlines(), 2).map(count_vowels).reduce(add)
    logging.warning(f"The number of vowels in the string is {n}")


In [None]:
@pipeline(name="spark-test-pipeline")
def spark_pipeline():
    task = add_proxy(spark_test_component())
    kubernetes.add_pod_label(
        task,
        label_key='access-spark-pipeline',
        label_value='true',
    )
    # kubernetes.add_pod_annotation(
    #     task,
    #     annotation_key='traffic.sidecar.istio.io/excludeInboundPorts',
    #     annotation_value='37371,6060',
    # )
    # kubernetes.add_pod_annotation(
    #     task,
    #     annotation_key='traffic.sidecar.istio.io/excludeOutboundPorts',
    #     annotation_value='37371,6060',
    # )

In [None]:
client=kfp.Client()
kfp.compiler.Compiler().compile(
    spark_pipeline,
    package_path="spark_test_pipeline.yaml"
)
run = client.create_run_from_pipeline_func(
    spark_pipeline,
    arguments={},
    enable_caching=False
)