## Preliminaries
We can first output some versions that are running and read the minio credentials from the secret that has been mounted.

In [None]:
! python3 -V
! java --version
! pyspark --version

In [None]:
import os

# get minio credentials
with open("/minio-s3-credentials/accessKey", "r") as f:
    minio_user = f.read().strip()

with open("/minio-s3-credentials/secretKey", "r") as f:
    minio_pwd = f.read().strip()

## Spark
Spark can be used in client mode (recommended for JupyterHub notebooks, as code is intended to be called in an interactive
fashion), which is the default, or cluster mode. This notebook uses spark in client mode, meaning that the notebook itself
acts as the driver. It is important that the versions of spark and python match across the driver (running in the juypyterhub image)
and the executor(s) (running in a separate image, specified below with the `spark.kubernetes.container.image` setting).

The jupyterhub image `quay.io/jupyter/pyspark-notebook:spark-3.5.2` uses a base ubuntu image (like the spark images).
The versions of java match exactly. Python versions can differ at patch level, and the image used below `oci.stackable.tech/sandbox/spark:3.5.2-python311` is built from a `spark:3.5.2-scala2.12-java17-ubuntu` base image with python 3.11 (the same major/minor version as the notebook) installed.

## S3
As we will be reading data from an S3 bucket, we need to add the necessary `hadoop` and `aws` libraries in the same hadoop version as the
notebook image (see `spark.jars.packages`), and define the endpoint settings (see `spark.hadoopo.fs.*`).

In [None]:
from pyspark.sql import SparkSession

NAMESPACE = os.environ.get("NAMESPACE", "default")
POD_NAME = os.environ.get("HOSTNAME", f"jupyter-{os.environ.get('USER', 'default')}-{NAMESPACE}")

EXECUTOR_IMAGE = "oci.stackable.tech/sandbox/spark:3.5.2-python311" 

spark = (
    SparkSession.builder
    .master(f"k8s://https://{os.environ['KUBERNETES_SERVICE_HOST']}:{os.environ['KUBERNETES_SERVICE_PORT']}")
    .appName(f"process-s3-{POD_NAME}")
    .config("spark.kubernetes.container.image", EXECUTOR_IMAGE)
    .config("spark.kubernetes.container.image.pullPolicy", "IfNotPresent")
    .config("spark.kubernetes.namespace", NAMESPACE)
    .config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
    .config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark")
    .config("spark.driver.port", "2222")
    .config("spark.driver.blockManager.port", "7777")
    .config("spark.executor.instances", "1")
    .config("spark.executor.memory", "1g")
    .config("spark.executor.cores", "1")
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000/")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.access.key", minio_user)
    .config("spark.hadoop.fs.s3a.secret.key", minio_pwd)
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-client-api:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hadoop:hadoop-common:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.162")
    .config("spark.kubernetes.driver.pod.name", POD_NAME)
    .getOrCreate()
)

### Create an in-memory DataFrame
This will check that libraries across driver and executor are compatible.

In [None]:
df = spark.createDataFrame([("a", 1), ("b", 2)], ["col1", "col2"])
df.show()

### Check s3 with pyarrow
As well as spark, we can inspect S3 buckets with the `pyarrow` library.

In [None]:
# Manual S3 file check via pyarrow.fs
import pyarrow.fs as fs

s3 = fs.S3FileSystem(endpoint_override="http://minio:9000/", access_key=minio_user, secret_key=minio_pwd, scheme="http")
files = s3.get_file_info(fs.FileSelector("demo/gas-sensor/raw/", recursive=True))
for f in files:
    print("Found file:", f.path)

### Read/Write operations

In [None]:
df = spark.read.csv("s3a://demo/gas-sensor/raw/", header = True)
df.show()

In [None]:
df.write.csv("s3a://demo/gas-sensor/rewritten/", mode="overwrite")
df.write.parquet("s3a://demo/gas-sensor/parquet/", mode="overwrite")

df2 = spark.read.parquet("s3a://demo/gas-sensor/parquet/", header = True)
df2.count()

In [None]:
from pyspark.sql import functions

df2 = df2.withColumn("hour", (functions.floor(df2.timesecs / 60) + 1))

dfs = df2.select(
    df2.hour,
    df2.humidity,
    df2.temperature,
    df2.flowrate
).groupby("hour").agg(
    functions.round(functions.avg('humidity'), 2).alias('humidity'),
    functions.round(functions.avg('temperature'), 2).alias('temperature'),
    functions.round(functions.avg('flowrate'), 2).alias('flowrate')
).orderBy("hour")

dfs.show()

In [None]:
dfs.write.parquet("s3a://demo/gas-sensor/agg/", mode="overwrite")

### Convert between Spark and Pandas DataFrames

In [None]:
df_pandas = dfs.toPandas()
df_pandas.head(10)

In [None]:
spark_df = spark.createDataFrame(df_pandas)
spark_df.show()