In [None]:
def get_spark_ui_url(spark_context):
    from kubernetes import config
    from openshift.dynamic import DynamicClient
    import socket


    namespace = None
    with open('/var/run/secrets/kubernetes.io/serviceaccount/namespace') as file:
        namespace = file.read()

    k8s_client = config.new_client_from_config()
    dyn_client = DynamicClient(k8s_client)
    routes = dyn_client.resources.get(api_version='route.openshift.io/v1', kind='Route')
    jupyter_hub_route = routes.get(namespace=namespace)['items'][0]['spec']['host']

    return f"https://{jupyter_hub_route}/user/{socket.gethostname().split('-nb-')[-1]}/proxy/{spark_context.uiWebUrl.split(':')[-1]}/jobs/"

def get_driver_dns():
    import os

    namespace = None
    with open('/var/run/secrets/kubernetes.io/serviceaccount/namespace') as file:
        namespace = file.read()

    return f"{os.environ['HOSTNAME']}-headless-service.{namespace}.svc.cluster.local"

def initialize_pyspark_context(context_purpose: str, user_name: str):
    import pyspark
    import os

    conf = pyspark.SparkConf()
    conf.set('spark.master',
             f"k8s://https://{os.environ['KUBERNETES_SERVICE_HOST']}:{os.environ['KUBERNETES_SERVICE_PORT']}")
    conf.setAppName(context_purpose)
    conf.set('spark.driver.host', get_driver_dns())
    conf.set('spark.kubernetes.container.image', 'gchr.io/momoadc/jupyter-notebooks/spark-py:v3.2.0')
    conf.set('spark.ui.port', '4040')
    conf.set('spark.kubernetes.namespace', 'jupyterhub')

    conf.set('spark.kubernetes.executor.label.username', user_name)

    # Authentication
    conf.set('spark.kubernetes.authenticate.caCertFile', '/var/run/secrets/kubernetes.io/serviceaccount/ca.crt')
    conf.set('spark.kubernetes.authenticate.clientCertFile', '/var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt')
    conf.set('spark.kubernetes.authenticate.oauthToken', os.environ['OPENSHIFT_TOKEN'])

    # Resources
    conf.set('spark.executor.instances', '2')
    conf.set('spark.kubernetes.executor.request.cores', '500m')
    conf.set('spark.kubernetes.executor.limit.cores', '3')

    spark_context = pyspark.SparkContext(conf=conf)

    print("Spark UI: " + get_spark_ui_url(spark_context))

    return spark_context


In [None]:
sc = initialize_pyspark_context('testy_test', 'my_user')

In [None]:
# Create a distributed data set to test the session.
t = sc.parallelize(range(10))

# Calculate the approximate sum of values in the dataset
r = t.sumApprox(3)
print('Approximate sum: %s' % r)