# Pyspark on kubernetes
   * https://alphatango086.medium.com/pyspark-on-kubernetes-71dcbd034c60
   * https://spark.apache.org/docs/3.2.0/running-on-kubernetes.html

# Client Mode Networking
   * https://spark.apache.org/docs/3.2.0/running-on-kubernetes.html#client-mode-networking
   
Spark executors must be able to connect to the Spark driver over a hostname and a port that is routable from the Spark executors. The specific network configuration that will be required for Spark to work in client mode will vary per setup. If you run your driver inside a Kubernetes pod, **you can use a headless service to allow your driver pod to be routable from the executors by a stable hostname.** 

When deploying your headless service, ensure that the service’s label selector will only match the driver pod and no other pods; it is recommended to assign your driver pod a sufficiently unique label and to use that label in the label selector of the headless service.

Specify the driver’s hostname via spark.driver.host and your spark driver’s port to spark.driver.port.

   * https://jaceklaskowski.github.io/spark-kubernetes-book/demo/running-pyspark-application-on-minikube/#create-kubernetes-resources
   * https://www.kdnuggets.com/2020/08/containerization-pyspark-kubernetes.html
   

In [5]:
import os
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
# Create Spark config for our Kubernetes based cluster manager
sparkConf = SparkConf()
sparkConf.setMaster("k8s://https://kubernetes.default.svc")
sparkConf.setAppName("spark")

sparkConf.set("spark.kubernetes.container.image", "registry.minikube/spark-base-python-3.2:master")

sparkConf.set("spark.kubernetes.namespace", "zeppelin")

sparkConf.set("spark.executor.instances", "3")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.memory", "512m")
sparkConf.set("spark.executor.memory", "512m")

sparkConf.set("spark.kubernetes.pyspark.pythonVersion", "3")

sparkConf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "zeppelin-server")
sparkConf.set("spark.kubernetes.authenticate.serviceAccountName", "zeppelin-server")

sparkConf.set("spark.driver.host", "jupyter-driver.zeppelin.svc.cluster2.xpt")
sparkConf.set("spark.driver.port", "29413")

sparkConf.set("fs.s3a.endpoint", "https://minio.minio-tenant-1.svc.cluster2.xpt")
sparkConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sparkConf.set("fs.s3a.connection.ssl.enabled", "true")
sparkConf.set("fs.s3a.path.style.access", "true")
sparkConf.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
sparkConf.set("spark.sql.warehouse.dir", "s3a://spark/warehouse")
sparkConf.set("spark.sql.catalogImplementation", "hive")
sparkConf.set("spark.hadoop.hive.metastore.uris", "thrift://hive-metastore.hive.svc.cluster2.xpt:9083")

# Initialize our Spark cluster, this will actually
# generate the worker nodes.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext


In [3]:
sc

In [4]:
print("Default configurations")
spark.sparkContext._conf.getAll()

Default configurations


[('spark.executor.instances', '3'),
 ('spark.hadoop.hive.metastore.uris',
  'thrift://hive-metastore.hive.svc.cluster2.xpt:9083'),
 ('spark.sql.warehouse.dir', 's3a://spark/warehouse'),
 ('spark.master', 'k8s://https://kubernetes.default.svc'),
 ('spark.kubernetes.container.image',
  'registry.minikube/spark-base-python-3.2:master'),
 ('spark.kubernetes.authenticate.serviceAccountName', 'zeppelin-server'),
 ('spark.kubernetes.pyspark.pythonVersion', '3'),
 ('spark.kubernetes.executor.podNamePrefix', 'spark-f565ed8cde6de8df'),
 ('spark.app.name', 'spark'),
 ('spark.jars.packages',
  'org.apache.spark:spark-avro_2.12:3.2.4,io.delta:delta-core_2.12:2.0.2,org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.3.0,org.apache.hadoop:hadoop-aws:3.2.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4,io.acryl:datahub-spark-lineage:0.8.23,com.amazon.deequ:deequ:2.0.1-spark-3.2'),
 ('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem'),
 ('spark.files',
  'file:///root/.ivy2/jars/org.apache.spark

In [5]:
csv_us_cities=spark.read.csv("s3a://bronze/kaggle/us_cities_by_pop.csv")

                                                                                

In [6]:
csv_us_cities.createOrReplaceTempView("csv_us_cities")


In [7]:
csv_us_cities.show(10)

[Stage 1:>                                                          (0 + 1) / 1]

+----+------------+------------+-----------+-----------+--------------+------------+------------+--------------+--------------+-------------+------------+
| _c0|         _c1|         _c2|        _c3|        _c4|           _c5|         _c6|         _c7|           _c8|           _c9|         _c10|        _c11|
+----+------------+------------+-----------+-----------+--------------+------------+------------+--------------+--------------+-------------+------------+
|rank|        city|       state|2020_census|2010_census|percent_change|land_area_mi|land_area_km|pop_density_mi|pop_density_km|degrees_north|degrees_west|
|   1|    New York|    New York|    8804190|    8175133|          7.69|       300.5|       778.3|         29298|         11312|        40.66|       73.93|
|   2| Los Angeles|  California|    3898747|    3792621|           2.8|       469.5|        1216|          8304|          3206|        34.01|      118.41|
|   3|     Chicago|    Illinois|    2746388|    2695598|          1.88

                                                                                