In [None]:
! ls -al /usr/local/spark/jars | grep spark-core
! python3 -V
! java --version
! pyspark --version

In [None]:
import os
from pyspark.sql import SparkSession

NAMESPACE = os.environ.get("NAMESPACE", "default")
POD_NAME = os.environ.get("HOSTNAME", f"jupyter-{os.environ.get('USER', 'default')}-{NAMESPACE}")

# works with python-3.11 notebook image
#EXECUTOR_IMAGE = "oci.stackable.tech/sdp/spark-k8s:3.5.0-stackable24.3.0" 

# jars differ in size, 17.0.12 vs. 17.0.13, 3.11.10 vs. 3.11.9
#SerializableBuffer conflict
#EXECUTOR_IMAGE = "oci.stackable.tech/sdp/spark-k8s:3.5.2-stackable24.11.1" 

# java, jars match
#Python in worker has different version (3, 10) than that in driver 3.11, PySpark cannot run with different minor versions. 
#EXECUTOR_IMAGE = "apache/spark:3.5.2-java17-python3" 

# java, jars match
#Python in worker has different version (3, 10) than that in driver 3.11, PySpark cannot run with different minor versions. 
#EXECUTOR_IMAGE = "spark:3.5.2-scala2.12-java17-python3-ubuntu" 

#Python in worker has different version (3, 12) than that in driver 3.11, PySpark cannot run with different minor versions.
#EXECUTOR_IMAGE = "bitnami/spark:3.5.2"

# custom image with python 3.11 - works!
# based off: spark:3.5.2-scala2.12-java17-ubuntu
# see: 
EXECUTOR_IMAGE = "spark:3.5.2-python311" 

spark = (
    SparkSession.builder
    .master(f'k8s://https://{os.environ["KUBERNETES_SERVICE_HOST"]}:{os.environ["KUBERNETES_SERVICE_PORT"]}')
    .appName("process-s3-data")
    .config("spark.kubernetes.container.image", EXECUTOR_IMAGE)
    .config("spark.kubernetes.container.image.pullPolicy", "IfNotPresent")
    .config("spark.kubernetes.namespace", NAMESPACE)
    .config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
    .config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark")
    .config("spark.driver.port", "2222")
    .config("spark.driver.blockManager.port", "7777")
    .config("spark.executor.instances", "1")
    .config("spark.executor.memory", "1g")
    .config("spark.executor.cores", "1")
    # bitnami. See https://github.com/bitnami/containers/issues/52698#issuecomment-2275913474
    #.config("spark.executorEnv.LD_PRELOAD", "/opt/bitnami/common/lib/libnss_wrapper.so")
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000/")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.access.key", "admin")
    .config("spark.hadoop.fs.s3a.secret.key", "adminadmin")
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-client-api:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hadoop:hadoop-common:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.162")
    .config("spark.submit.deployMode", "client")
    .config("spark.kubernetes.driver.pod.name", POD_NAME)
    .getOrCreate()
)

In [None]:
df = spark.createDataFrame([("a", 1), ("b", 2)], ["col1", "col2"])
df.show()

In [None]:
# Manual check via pyarrow.fs
import pyarrow.fs as fs
s3 = fs.S3FileSystem(endpoint_override="http://minio:9000/", access_key="admin", secret_key="adminadmin", scheme="http")
files = s3.get_file_info(fs.FileSelector("demo/gas-sensor/raw/", recursive=True))
for f in files:
    print("Found file:", f.path)

In [None]:
df = spark.read.csv("s3a://demo/gas-sensor/raw/")
df.show()

In [None]:
df.count()

In [None]:
df.write.csv("s3a://demo/gas-sensor/rewritten/", mode="overwrite")
df.write.parquet("s3a://demo/gas-sensor/parquet/", mode="overwrite")

df2 = spark.read.parquet("s3a://demo/gas-sensor/parquet/", header = True)
df2.count()

In [None]:
from pyspark.sql import functions

df2 = df2.withColumn("hour", (functions.floor(df2.timesecs / 60) + 1))

dfs = df2.select(
    df2.hour,
    df2.humidity,
    df2.temperature,
    df2.flowrate
).groupby("hour").agg(
    functions.round(functions.avg('humidity'), 2).alias('humidity'),
    functions.round(functions.avg('temperature'), 2).alias('temperature'),
    functions.round(functions.avg('flowrate'), 2).alias('flowrate')
).orderBy("hour")

dfs.show()

In [None]:
dfs.write.parquet("s3a://demo/gas-sensor/agg/", mode="overwrite")