## Initialize Spark

In order to initialize spark, the environment need to be define about where to look at the Java, pyspark both remotely and local.

In [1]:
import findspark
import os
findspark.init(spark_home='/opt/spark')
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-arm64/'
os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3.11'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/home/datasaku/repo/datasaku/.venv/bin/python3'

endpoint or uri that are used below are form of dns service mentioned in this [link](https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/#a-aaaa-records). my-svc.my-namespace.svc.cluster-domain.example (eg: minio-service.minio-dev.svc.cluster.local)

In [2]:
import pyspark
from pyspark.sql import SparkSession
import os
import socket

conf = pyspark.SparkConf().setAll([
    ('spark.driver.host', socket.gethostbyname(socket.gethostname()))
    , ('spark.app.name', 'test')
    , ('spark.master', "spark://spark-master.spark-dev.svc.cluster.local:7077")
    , ("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions")
    , ('spark.hadoop.fs.s3a.endpoint', "http://minio-service.minio-dev.svc.cluster.local:6544")
    # for minio spark
    , ('spark.hadoop.fs.s3a.access.key','minio')
    , ('spark.hadoop.fs.s3a.secret.key', 'minio123')
    , ('spark.hadoop.fs.s3a.path.style.access', 'true')
    , ("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    # for iceberg minio jdbc
    , ('spark.sql.catalog.nessie_catalog', 'org.apache.iceberg.spark.SparkCatalog')
    , ("spark.sql.catalog.nessie_catalog.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")
    , ('spark.sql.defaultCatalog', 'nessie_catalog')
    , ('spark.sql.catalog.nessie_catalog.warehouse', 's3a://iceberg/')
    # ip of service postgre
    , ('spark.sql.catalog.nessie_catalog.uri', 'http://nessie-service.nessie-dev.svc.cluster.local:6788/api/v1')
    , ('spark.sql.catalog.nessie_catalog.ref', 'main')
    , ("spark.sql.catalog.nessie_catalog.authentication.type", 'NONE')
    ])
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/04 15:23:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Sample Operation

In [3]:
# get the branch reference for the nessie catalog
spark.sql("LIST REFERENCES IN nessie_catalog").show()

+-------+----+--------------------+
|refType|name|                hash|
+-------+----+--------------------+
| Branch|main|85da420689e1ce586...|
+-------+----+--------------------+



In [4]:
# create namespace to be used under the nessie_catalog
spark.sql("CREATE NAMESPACE nessie_catalog.datasaku").show()

AnalysisException: [SCHEMA_ALREADY_EXISTS] Cannot create schema `datasaku` because it already exists.
Choose a different name, drop the existing schema, or add the IF NOT EXISTS clause to tolerate pre-existing schema.

In [5]:
# create table manually
spark.sql(
    """CREATE TABLE IF NOT EXISTS nessie_catalog.datasaku.salaries
            (Season STRING, Team STRING, Salary STRING, Player STRING)"""
)

24/12/15 21:34:08 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


DataFrame[]

In [6]:
# insert the data
spark.sql("""INSERT INTO nessie_catalog.datasaku.salaries 
            VALUES ('1', 'bulls', '50', 'kurdo'), ('2', 'bulls', '51', 'kurdo');
            """
            )

                                                                                

DataFrame[]

In [7]:
# take a look at the dataset
spark.sql("select * from nessie_catalog.datasaku.salaries;").show()

[Stage 2:>                                                          (0 + 1) / 1]

+------+-----+------+------+
|Season| Team|Salary|Player|
+------+-----+------+------+
|     1|bulls|    50| kurdo|
|     1|bulls|    50| kurdo|
|     1|bulls|    50| kurdo|
|     2|bulls|    51| kurdo|
|     2|bulls|    51| kurdo|
|     2|bulls|    51| kurdo|
+------+-----+------+------+



                                                                                

In [9]:
# spark operation
sdf = spark.table("nessie_catalog.datasaku.salaries")
sdf_agg = sdf.groupBy('Team').count()
sdf_agg.createOrReplaceTempView("salaries_agg")
spark.sql("select * from salaries_agg").show()

[Stage 2:>                                                          (0 + 1) / 1]

+-----+-----+
| Team|count|
+-----+-----+
|bulls|    4|
+-----+-----+



                                                                                

In [10]:
# save spark dataframe to iceberg table in nessie catalog
spark.sql("drop table if exists nessie_catalog.datasaku.salaries_agg")
sdf_agg.writeTo("nessie_catalog.datasaku.salaries_agg").create()

In [11]:
# check the catalog
spark.sql("SHOW TABLES IN nessie_catalog.datasaku").show()

+---------+------------+-----------+
|namespace|   tableName|isTemporary|
+---------+------------+-----------+
| datasaku|    salaries|      false|
| datasaku|salaries_agg|      false|
+---------+------------+-----------+



In [12]:
# take a look at the newly created table
spark.sql("select * from nessie_catalog.datasaku.salaries_agg").show()

+-----+-----+
| Team|count|
+-----+-----+
|bulls|    4|
+-----+-----+



# end #