(spark-operator)=
# Spark Operator runtime

- [Spark Operator for running Spark jobs over Kubernetes](#spark-operator-for-running-spark-jobs-over-kubernetes)
- [Spark Operator for running Spark jobs on Databricks cluster](#spark-operator-for-running-spark-jobs-on-databricks-cluster)

## Spark Operator for running Spark jobs over Kubernetes

The `spark-on-k8s-operator` allows Spark applications to be defined in a declarative manner and supports one-time Spark 
applications with `SparkApplication` and cron-scheduled applications with `ScheduledSparkApplication`. <br>

When sending a request with MLRun to the Spark operator, the request contains your full application configuration including the 
code and dependencies to run (packaged as a docker image or specified via URIs), the infrastructure parameters, (e.g. the 
memory, CPU, and storage volume specs to allocate to each Spark executor), and the Spark configuration.

Kubernetes takes this request and starts the Spark driver in a Kubernetes pod (a k8s abstraction, just a docker container 
in this case). The Spark driver then communicates directly with the Kubernetes master to request executor pods, scaling them 
up and down at runtime according to the load if dynamic allocation is enabled. Kubernetes takes care of the bin-packing of 
the pods onto Kubernetes nodes (the physical VMs), and dynamically scales the various node pools to meet the requirements.

When using the Spark operator the resources are allocated per task, meaning that it scales down to zero when the task is done.


In [None]:
import mlrun
import os

# set up new spark function with spark operator
# command will use our spark code which needs to be located on our file system
# the name param can have only non capital letters (k8s convention)
read_csv_filepath = os.path.join(os.path.abspath("."), "spark_read_csv.py")
sj = mlrun.new_function(kind="spark", command=read_csv_filepath, name="sparkreadcsv")

# set spark driver config (gpu_type & gpus=<number_of_gpus>  supported too)
sj.with_driver_limits(cpu="1300m")
sj.with_driver_requests(cpu=1, mem="512m")

# set spark executor config (gpu_type & gpus=<number_of_gpus> are supported too)
sj.with_executor_limits(cpu="1400m")
sj.with_executor_requests(cpu=1, mem="512m")

# adds fuse, daemon & iguazio's jars support
sj.with_igz_spark()

# Alternately, move volume_mounts to driver and executor-specific fields and leave
# v3io mounts out of executor mounts if mount_v3io_to_executor=False
# sj.with_igz_spark(mount_v3io_to_executor=False)

# set spark driver volume mount
# sj.function.with_driver_host_path_volume("/host/path", "/mount/path")

# set spark executor volume mount
# sj.function.with_executor_host_path_volume("/host/path", "/mount/path")

# confs are also supported
sj.spec.spark_conf["spark.eventLog.enabled"] = True

# add python module
sj.with_requiremants([`matplotlib`])

# Number of executors
sj.spec.replicas = 2

In [None]:
# Rebuilds the image with MLRun - needed in order to support logging artifacts etc.
sj.deploy()

In [None]:
# Run task while setting the artifact path on which the run artifact (in any) will be saved
sj.run(artifact_path="/User")

### Spark Code (spark_read_csv.py)

```python
from pyspark.sql import SparkSession
from mlrun import get_or_create_ctx

context = get_or_create_ctx("spark-function")

# build spark session
spark = SparkSession.builder.appName("Spark job").getOrCreate()

# read csv
df = spark.read.load('iris.csv', format="csv",
                     sep=",", header="true")

# sample for logging
df_to_log = df.describe().toPandas()

# log final report
context.log_dataset("df_sample",
                     df=df_to_log,
                     format="csv")
spark.stop()
```

## Spark Operator for running Spark jobs on Databricks cluster

When using Spark operator the resources are allocated per task, meaning that it scales down to zero when the task is done.

When using the Spark Operator, you can:
- Send keyword arguments (kwargs) to the job.
- Send your local file/code as a string to the job.
- Use a handler as an endpoint for user code.

There are certain job keyword arguments (kwargs) that can be utilized to 
configure the job itself, including:
- mlrun_internal_timeout_minutes: Specify a time limit for the job's execution.
- mlrun_internal_token_key: Use this argument if you want to use a token key other than the default.
- mlrun_internal_number_of_workers: Specify the number of workers to be utilized by this job.

Do not send variables named `mlrun_internal_code` or `context` since these are utilized by the internal processes of the runtime.
```{Admonition} Important
To ensure the proper execution of this code, set the following environment variables (as shown below):
   - 'DATABRICKS_HOST`
   - 'DATABRICKS_TOKEN`
   - 'DATABRICKS_CLUSTER_ID`
```
Example of running a databricks job from a local file on an existing cluster: DATABRICKS_CLUSTER_ID.

In [None]:
# if needed:
#!pip uninstall mlrun --yes
#!pip install git+https://github.com/mlrun/mlrun.git@development

In [None]:
import os
import mlrun
from mlrun.runtimes.function_reference import FunctionReference

In [None]:
# set your credentials:
os.environ["DATABRICKS_HOST"] = "DATABRICKS_HOST-NAME"
os.environ["DATABRICKS_TOKEN"] = "DATABRICKS_TOKEN"
os.environ["DATABRICKS_CLUSTER_ID"] = "DATABRICKS_CLUSTER_ID"

In [None]:
project = mlrun.get_or_create_project("proj-name", context="./", user_project=False)

job_env = {
    "DATABRICKS_HOST": os.environ["DATABRICKS_HOST"],
    "DATABRICKS_CLUSTER_ID": os.environ.get("DATABRICKS_CLUSTER_ID"),
}
secrets = {"DATABRICKS_TOKEN": os.environ["DATABRICKS_TOKEN"]}

project.set_secrets(secrets)

code = """
def print_kwargs(**kwargs):
    print(f"kwargs: {kwargs}")
"""

function_ref = FunctionReference(
    kind="databricks",
    code=code,
    image="mlrun/mlrun",
    name="project-name",
)

function = function_ref.to_function()

for name, val in job_env.items():
    function.spec.env.append({"name": name, "value": val})

run = function.run(
    handler="print_kwargs",
    project="project-name",
    params={
        "mlrun_internal_timeout_minutes": 15,
        "param1": "value1",
        "param2": "value2",
    },
)
assert (
    run.status.results["databricks_runtime_task"]["logs"]
    == "kwargs: {'param1': 'value1', 'param2': 'value2'}\n"
)