# Calculate PI with Spark

* Jupyter: Spark on Kubernetes client mode
* Jupyter: Spark on Kubernetes local mode
* Command line: Spark on Kubernetes cluster mode
* Command line Spark on Kubernetes client mode
* Command line Spark on Kubernetes local mode

## Jupyter 

### Spark on Kubernetes client mode

In [9]:
%%time
import findspark, os
findspark.init()

import pyspark, random
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
print(pyspark.__version__)
print(pyspark.__file__)
print ("My Host IP is {}".format(os.environ["SPARK_DRIVER_HOST"]))
try:
    sc.stop()
    spark.stop()
except NameError:
  print("Spark was not defined!")

spark = SparkSession.builder \
       .master("k8s://https://192.168.65.3:6443") \
       .appName("pi") \
       .config("spark.submit.deployMode", "client")\
       .config("spark.driver.host", os.environ["SPARK_DRIVER_HOST"])\
       .config("spark.driver.cores", "1")\
       .config("spark.kubernetes.container.image", "sshakeri/ubuntu:worker_k8s")\
       .config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")\
       .config("spark.executor.cores", "1")\
       .config("spark.executor.instances", "1")\
       .config("spark.driver.allowMultipleContexts", "true")\
       .config("spark.executor.memory", "2g").getOrCreate()
sc = spark.sparkContext
NUM_SAMPLES=1000
def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1
count = sc.parallelize(range(0, NUM_SAMPLES)).filter(inside).count()
print ("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))

2.4.3
/opt/spark/python/pyspark/__init__.py
My Host IP is 10.1.3.112
Pi is roughly 3.132400
CPU times: user 50 ms, sys: 10 ms, total: 60 ms
Wall time: 8.62 s


### Spark on Kubernetes local mode

In [12]:
%%time
import findspark, os
findspark.init()

import pyspark, random
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
print(pyspark.__version__)
print(pyspark.__file__)
try:
    sc.stop()
    spark.stop()
except NameError:
  print("Spark was not defined!")

spark = SparkSession.builder \
       .master("local[*]") \
       .appName("pi") \
       .config("spark.executor.instances", "1")\
       .config("spark.driver.allowMultipleContexts", "true")\
       .config("spark.executor.memory", "1g").getOrCreate()
sc = spark.sparkContext
NUM_SAMPLES=100000
def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1
count = sc.parallelize(range(0, NUM_SAMPLES)).filter(inside).count()
print ("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))

2.4.3
/opt/spark/python/pyspark/__init__.py
Pi is roughly 3.140720
CPU times: user 20 ms, sys: 20 ms, total: 40 ms
Wall time: 1.13 s


# Commmand line

## Cluster mode

In [None]:
!/opt/spark/bin/spark-submit \
--master k8s://https://192.168.65.3:6443 \
--deploy-mode cluster\
--name cluster-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.container.image=sshakeri/ubuntu:worker_k8s \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
/opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar 1000 

## Client mode

In [13]:
!/opt/spark/bin/spark-submit \
--master k8s://https://192.168.65.3:6443 \
--deploy-mode client\
--name client-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.container.image=sshakeri/ubuntu:worker_k8s \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.driver.host=$SPARK_DRIVER_HOST \
/opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar 1000 

19/08/31 03:55:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/08/31 03:55:36 INFO SparkContext: Running Spark version 2.4.3
19/08/31 03:55:36 INFO SparkContext: Submitted application: Spark Pi
19/08/31 03:55:36 INFO SecurityManager: Changing view acls to: root
19/08/31 03:55:36 INFO SecurityManager: Changing modify acls to: root
19/08/31 03:55:36 INFO SecurityManager: Changing view acls groups to: 
19/08/31 03:55:36 INFO SecurityManager: Changing modify acls groups to: 
19/08/31 03:55:36 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
19/08/31 03:55:37 INFO Utils: Successfully started service 'sparkDriver' on port 36709.
19/08/31 03:55

## Local mode

In [None]:
!/opt/spark/bin/spark-submit \
--deploy-mode client\
--name client-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.container.image=sshakeri/ubuntu:worker_k8s \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.driver.host=$SPARK_DRIVER_HOST \
/opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar 1000 