In [None]:
import os

hdfs_fs = 'hdfs://hadoop-master.hadoop-domain.default-tenant.svc.cluster.local:9000'
v3io_fs =  os.getenv('V3IO_HOME_URL')

print(f"HDFS: {hdfs_fs}")
print(f"V3IO: {v3io_fs}")

In [None]:
krb5_cc_name = 'FILE:/User/spark/krb5kdc_ccache'
hadoop_conf_dir = '/User/spark/hadoop/'
krb5_config_file = '/User/spark/krb5.conf'
krb5_keytab_file = '/User/spark/krb5.keytab'
jvm_config_option = f"-Dsun.zip.disableMemoryMapping=true -Djava.security.krb5.conf={krb5_config_file}"

# Use this to enable extra debug around Kerberos
# jvm_config_option = jvm_config_option + " -Dsun.security.krb5.debug=true"

print(f"KRB5CCNAME: {krb5_cc_name}")
print(f"HADOOP_CONF_DIR: {hadoop_conf_dir}")
print(f"KRB5_CONFIG: {krb5_config_file}")
print(f"JVM config: {jvm_config_option}")

In [None]:
os.environ['KRB5CCNAME'] = krb5_cc_name
os.environ['HADOOP_CONF_DIR'] = hadoop_conf_dir
os.environ['KRB5_CONFIG'] = krb5_config_file

In [None]:
!kinit -k -t /User/spark/krb5.keytab hdfs/hadoop-master.hadoop-domain.default-tenant.svc.cluster.local@EXAMPLE.COM
!klist

In [None]:
from pyspark.sql import SparkSession
import socket

hostname = socket.gethostname()

spark = SparkSession.builder.appName("Example") \
    .master('k8s://https://kubernetes.default.svc:443') \
    .config('fs.v3io.impl','io.iguaz.v3io.hcfs.V3IOFileSystem') \
    .config('fs.AbstractFileSystem.v3io.impl','io.iguaz.v3io.hcfs.V3IOAbstractFileSystem') \
    .config('spark.kubernetes.container.image','spark-exec/spark-py:latest') \
    .config('spark.kubernetes.driver.pod.name', hostname) \
    .config('spark.kubernetes.namespace','default-tenant') \
    .config('spark.pyspark.python','python3.7') \
    .config('spark.kubernetes.executor.podTemplateFile','/User/spark/worker_pod.yaml') \
    .config('spark.executor.extraJavaOptions', jvm_config_option) \
    .config('spark.executorEnv.HADOOP_CONF_DIR', hadoop_conf_dir) \
    .config('spark.kerberos.keytab', krb5_keytab_file) \
    .config('spark.kerberos.principal','hdfs/hadoop-master.hadoop-domain.default-tenant.svc.cluster.local@EXAMPLE.COM') \
    .config('spark.kubernetes.kerberos.krb5.path', krb5_config_file) \
    .getOrCreate()

In [None]:
v3io_path = os.path.join(v3io_fs,'examples','demo.csv')
print(v3io_path)

v3io_df = spark.read.csv(v3io_path)
v3io_df.show()

In [None]:
output_path = os.path.join(hdfs_fs,'output.parquet')
print(output_path)

v3io_df.write.parquet(output_path, mode='overwrite')
# v3io_df.write.csv(f'{hdfs_fs}/output.csv', mode='overwrite')

In [None]:
hdfs_df = spark.read.parquet(output_path)
hdfs_df.show()

In [None]:
spark.stop()

### Various optional configurations

1. This will enable using the Hadoop native libs (rather than the libs packages with Spark). It will not work in k8s mode since the Pods created do not have the native Hadoop libraries installed (which can be changed if using a different Docker image for them).

    `.config('spark.executorEnv.LD_LIBRARY_PATH', '/hadoop/lib/native') \`

2. Run in local mode (not using remote executors)

    `.master('local') \`

3. These env variables are not needed in k8s mode.

    `.config('spark.executorEnv.KRB5_CONFIG', krb5_config_file) \`

    `.config('spark.executorEnv.KRB5CCNAME', krb5_cc_name) \`

4. These seem to only work in Spark >=3.0

    `.config('spark.kerberos.access.hadoopFileSystems', hdfs_fs) \`

    `.config('spark.kerberos.renewal.credentials','ccache') \`

5. Not needed in k8s mode. The driver seems to ignore the krb5.conf setting anyway, for some reason:

    `.config('spark.driver.extraJavaOptions', jvm_config_option) \`


In [None]:
# Cleanup env variables, if needed for some reason.

import os

if os.environ.get('HADOOP_CONF_DIR'):
    os.environ.pop('HADOOP_CONF_DIR')
if os.environ.get('KRB5CCNAME'):
   os.environ.pop('KRB5CCNAME')
if os.environ.get('KRB5_CONFIG'):
    os.environ.pop('KRB5_CONFIG')