In [None]:
%%writefile data.txt
Apache Spark is an open-source processing engine that you can use to process Hadoop data. The following diagram shows the components involved in running Spark jobs. See Spark Cluster Mode Overview for additional component details.

HPE Ezmeral Data Fabric supports the following types of cluster managers:
Spark's standalone cluster manager
YARN
The configuration and operational steps for Spark differ based on the Spark mode you choose to install. The steps to integrate Spark with other components are the same when using either Standalone of YARN cluster mode, except where otherwise noted.
This section provides documentation about configuring and using Spark with HPE Ezmeral Data Fabric, but it does not duplicate the Apache Spark documentation.

You can also refer to additional documentation available on the Apache Spark Product Page.

Getting Started with Spark Interactive Shell
After you have a basic understanding of Apache Spark and have it installed and running on your cluster, you can use it to load datasets, apply schemas, and query data from the Spark interactive shell.

In [None]:
!hadoop fs -mkdir dtap://TenantStorage/tmp
!hadoop fs -copyFromLocal data.txt dtap://TenantStorage/tmp

In [None]:
YAML="""
apiVersion: "sparkoperator.hpe.com/v1beta2"
kind: SparkApplication
metadata:
  name: spark-wordcount-secure-dtap
spec:
  sparkConf:
    # Note: If you are executing the application as a K8S user that MapR can verify,
    #       you do not need to specify a spark.mapr.user.secret
    #spark.mapr.user.secret: spark-user-secret
    # Note: You do not need to specify a spark.eventLog.dir
    #       it will be auto-generated with the pattern "maprfs:///apps/spark/<namespace>"
    #spark.eventLog.dir: "maprfs:///apps/spark/sampletenant"

    # DTAP configuration
    spark.hadoop.fs.dtap.impl: "com.bluedata.hadoop.bdfs.Bdfs"
    spark.hadoop.fs.AbstractFileSystem.dtap.impl: "com.bluedata.hadoop.bdfs.BdAbstractFS"
    spark.hadoop.fs.dtap.impl.disable.cache: "false"
    spark.driver.extraClassPath: "/opt/bdfs/bluedata-dtap.jar"
    spark.executor.extraClassPath: "/opt/bdfs/bluedata-dtap.jar"
  type: Java
  sparkVersion: 3.1.2
  mode: cluster
  # adding dtap connector to spark classpath
  deps:
    jars:
      - local:///opt/bdfs/bluedata-dtap.jar
  image: gcr.io/mapr-252711/spark-3.1.2:202202161825P150
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.JavaWordCount
  mainApplicationFile: "local:///opt/mapr/spark/spark-3.1.2/examples/jars/spark-examples_2.12-3.1.2.3-eep-800.jar"
  restartPolicy:
    type: Never
  arguments:
    - dtap://TenantStorage/tmp/data.txt
  imagePullSecrets:
    - imagepull
  driver:
    cores: 1
    coreLimit: "1000m"
    memory: "512m"
    labels:
      version: 3.1.2
      hpecp.hpe.com/dtap: hadoop2 # enabling dtap side-car container for driver pod
    # Note: You do not need to specify a serviceAccount
    #       it will be auto-generated referencing the pre-existing "hpe-<namespace>"
    serviceAccount: hpe-testing
  executor:
    cores: 1
    coreLimit: "1000m"
    instances: 2
    memory: "512m"
    labels:
      version: 3.1.2
      hpecp.hpe.com/dtap: hadoop2 # enabling dtap side-car container for executor pods
"""

In [None]:
with open("dtap-wordcount.yaml", "w+") as f:
    f.write(YAML)

In [None]:
from ezmllib.spark import submit, delete, logs

In [None]:
submit(yaml_path="dtap-wordcount.yaml")

In [None]:
%%bash
timestamp=$(date +%s --date="$giveDate 60 minutes")
while true ; do
    current_timestamp=$(date +%s)
    POD=$(kubectl get pods --no-headers -o custom-columns=":metadata.name" | grep spark-wordcount-secure-dtap-driver)
    if [ -n "$POD" ]; then
        STATUS=$(kubectl get pod $POD --no-headers | awk '{ print $3 }')
        if [ "$STATUS" == "Error" ]; then
            echo "Test Failed."
            break
        fi
        if [ "$STATUS" == "ImagePuErrorllBackOff" ]; then
            echo "Test Failed."
            break
        fi
        if [ "$STATUS" == "Completed" ]; then
            echo "Test Passed."
            break
        fi
    sleep 20s
    fi
    
    if [ "$current_timestamp" -gt "$timestamp" ]; then
        echo "Test Failed."
        break
    fi
done

In [None]:
!kubectl delete -f dtap-wordcount.yaml