In [None]:
!pip install pyspark==2.4.7 numpy pandas pyspark3d

!sudo apt-get install -y apt-transport-https ca-certificates gnupg
!echo "deb https://packages.cloud.google.com/apt cloud-sdk main" | sudo tee -a /etc/apt/sources.list.d/google-cloud-sdk.list

!curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key --keyring /usr/share/keyrings/cloud.google.gpg add -
!curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -

!sudo apt update
!sudo apt install -y openjdk-8-jdk google-cloud-sdk kubectl

In [None]:
# You should have valid kubeconfig for spark k8s to work
!kubectl cluster-info

In [5]:
import os
os.environ['PYSPARK_PYTHON'] = 'python3'

import pyspark
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
from pyspark3d.repartitioning import prePartition
from pyspark3d.repartitioning import repartitionByCol

In [6]:
k8s_host = os.environ['K8S_MASTER_IP']
pod_ip = !hostname -i
pod_ip = pod_ip[0]
driver_port = 23840
# Image that you should build from compiled spark project
image_name = 'sarex/pyspark-k8s:latest'


In [7]:
conf = pyspark.SparkConf()
conf.setMaster(f'k8s://{k8s_host}')
conf.setAppName('helloworld')

conf.set('spark.app.name', 'helloworld')
conf.set('spark.kubernetes.namespace', 'spark')
conf.set('spark.kubernetes.container.image', image_name)
conf.set('spark.kubernetes.executor.container.image', image_name)
conf.set('spark.kubernetes.container.image.pullSecrets', 'dockerhub')
conf.set('spark.executor.instances', '2')
conf.set('spark.kubernetes.authenticate.serviceAccountName', 'spark')
conf.set('spark.driver.host', pod_ip)
conf.set('spark.driver.port', driver_port)
conf.set('spark.kubernetes.submission.connectionTimeout', '100000')
conf.set('spark.kubernetes.submission.requestTimeout', '100000')
conf.set('spark.kubernetes.driver.connectionTimeout', '100000')
conf.set('spark.kubernetes.driver.requestTimeout', '100000')

conf.set('spark.jars.packages', 'com.github.astrolabsoftware:spark3d_2.11:0.3.1,saurfang:spark-knn:0.3.0')
conf.set('spark.jars', './jhealpix.jar,./spark3D-assembly-0.3.1.jar')

context = pyspark.SparkContext(master=f'k8s://{k8s_host}', conf=conf)


In [8]:
spark = SparkSession(context)


In [9]:
%%time

pdf = pd.DataFrame(np.random.rand(1000000, 3))
df = spark.createDataFrame(pdf)


In [10]:
%%time

result_pdf = df.select("*").toPandas()
result_pdf.head()

Unnamed: 0,0,1,2
0,0.748567,0.746342,0.243716
1,0.927885,0.062615,0.301911
2,0.769165,0.656587,0.186006
3,0.287022,0.795529,0.51453
4,0.44936,0.371976,0.865534


In [None]:
n_parts = 50
options = {
    "geometry": "points",
    "colnames": "0,1,2",
    "coordSys": "cartesian",
    "gridtype": "octree",
}
points_prepart = prePartition(df, options, numPartitions=n_parts)


In [None]:
df_repart = repartitionByCol(points_prepart, "partition_id", preLabeled=True, numPartitions=n_parts)

df_repart.show()
