In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType, 
    StructField, 
    IntegerType, 
    StringType, 
    ArrayType, 
    MapType, 
    FloatType
)
import os
from dotenv import load_dotenv, find_dotenv

_ = load_dotenv(find_dotenv())

In [2]:
CATALOG_URI = "http://nessie:19120/api/v1"                  # Nessie Server URI
WAREHOUSE = "s3://warehouse/"                               # Minio Address to Write to
STORAGE_URI = f"http://{os.getenv('IP_ADDRESS')}:9000"      # Minio IP address from docker inspect

In [3]:
conf = (
    pyspark.SparkConf()
        .setAppName('sales_data_app')
        # Include necessary packages
        .set('spark.jars.packages', 'org.postgresql:postgresql:42.7.3,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.77.1,software.amazon.awssdk:bundle:2.24.8,software.amazon.awssdk:url-connection-client:2.24.8')
        # Enable Iceberg and Nessie extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
        # Configure Nessie catalog
        .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.nessie.uri', CATALOG_URI)
        .set('spark.sql.catalog.nessie.ref', 'main')
        .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
        .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
        # Set Minio as the S3 endpoint for Iceberg storage
        .set('spark.sql.catalog.nessie.s3.endpoint', STORAGE_URI)
        .set('spark.sql.catalog.nessie.warehouse', WAREHOUSE)
        .set('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
)

In [4]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Session Started")

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.postgresql#postgresql added as a dependency
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12 added as a dependency
software.amazon.awssdk#bundle added as a dependency
software.amazon.awssdk#url-connection-client added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8444db70-9cd2-47af-badc-6d22c5602197;1.0
	confs: [default]
	found org.postgresql#postgresql;42.7.3 in central
	found org.checkerframework#checker-qual;3.42.0 in central
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.0 in central
	found org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12;0.77.1 in central
	found software.amazon.awssdk#bundle;2.24.8 in central
	found software.amazon.awssdk#url-connection-client;2.24.8 in central
	found software.amazon.awssdk#utils;2.24

Spark Session Started


In [5]:
import os
from llama_index.core import SimpleDirectoryReader

# Notice that the folder path is the folder path within the Docker container.
documents = SimpleDirectoryReader("/workspace/data").load_data(show_progress=True)

Loading files:   0%|          | 0/1 [00:00<?, ?it/s]

In [6]:
from llama_index.core.node_parser import SentenceSplitter

parser = SentenceSplitter.from_defaults()
nodes = parser.get_nodes_from_documents(documents)

In [7]:
serialized = [node.model_dump() for node in nodes]

In [8]:
import pandas as pd

df = pd.DataFrame(serialized)
df.head()

Unnamed: 0,id_,embedding,metadata,excluded_embed_metadata_keys,excluded_llm_metadata_keys,relationships,metadata_template,metadata_separator,text,mimetype,start_char_idx,end_char_idx,metadata_seperator,text_template,class_name
0,4dc3364e-100a-4788-aeaa-d463f5cd5d40,,{'file_path': '/workspace/data/paul_graham_ess...,"[file_name, file_type, file_size, creation_dat...","[file_name, file_type, file_size, creation_dat...",{'1': {'node_id': '381ae328-6be5-4cc6-b621-d14...,{key}: {value},\n,What I Worked On\n\nFebruary 2021\n\nBefore co...,text/plain,2,4320,\n,{metadata_str}\n\n{content},TextNode
1,1711119f-2d93-4dff-bfe9-35c35111ca82,,{'file_path': '/workspace/data/paul_graham_ess...,"[file_name, file_type, file_size, creation_dat...","[file_name, file_type, file_size, creation_dat...",{'1': {'node_id': '381ae328-6be5-4cc6-b621-d14...,{key}: {value},\n,I couldn't have put this into words when I was...,text/plain,3570,7959,\n,{metadata_str}\n\n{content},TextNode
2,dff95c4f-1053-4b7b-9de9-9255937e53f8,,{'file_path': '/workspace/data/paul_graham_ess...,"[file_name, file_type, file_size, creation_dat...","[file_name, file_type, file_size, creation_dat...",{'1': {'node_id': '381ae328-6be5-4cc6-b621-d14...,{key}: {value},\n,So I looked around to see what I could salvage...,text/plain,7166,11549,\n,{metadata_str}\n\n{content},TextNode
3,916cd881-ff51-4777-ba4b-f40e638f097f,,{'file_path': '/workspace/data/paul_graham_ess...,"[file_name, file_type, file_size, creation_dat...","[file_name, file_type, file_size, creation_dat...",{'1': {'node_id': '381ae328-6be5-4cc6-b621-d14...,{key}: {value},\n,"I didn't want to drop out of grad school, but ...",text/plain,10764,15165,\n,{metadata_str}\n\n{content},TextNode
4,0488cd0f-bc3e-4d4b-8736-5be4e35a105d,,{'file_path': '/workspace/data/paul_graham_ess...,"[file_name, file_type, file_size, creation_dat...","[file_name, file_type, file_size, creation_dat...",{'1': {'node_id': '381ae328-6be5-4cc6-b621-d14...,{key}: {value},\n,"We actually had one of those little stoves, fe...",text/plain,14282,18597,\n,{metadata_str}\n\n{content},TextNode


In [9]:
schema = StructType([
    StructField("id_", StringType(), True),
    StructField("embedding", ArrayType(FloatType(), False), True),
    StructField("excluded_embed_metadata_keys", ArrayType(StringType(), True), True),
    StructField("excluded_llm_metadata_keys", ArrayType(StringType(), True), True),
    StructField("relationships", MapType(StringType(), StringType()), True),
    StructField("metadata_template", StringType(), True),
    StructField("metadata_separator", StringType(), True),
    StructField("text", StringType(), False),
    StructField("minetype", StringType(), True),
    StructField("start_char_idx", IntegerType(), False),
    StructField("end_char_idx", IntegerType(), False),
    StructField("metadata_seperator", StringType(), True),
    StructField("text_template", StringType(), True),
    StructField("class_name", StringType(), True),
])

In [10]:
spark_df = spark.createDataFrame(serialized, schema)

In [11]:
spark_df.show()

                                                                                

+--------------------+---------+----------------------------+--------------------------+--------------------+-----------------+------------------+--------------------+--------+--------------+------------+------------------+--------------------+----------+
|                 id_|embedding|excluded_embed_metadata_keys|excluded_llm_metadata_keys|       relationships|metadata_template|metadata_separator|                text|minetype|start_char_idx|end_char_idx|metadata_seperator|       text_template|class_name|
+--------------------+---------+----------------------------+--------------------------+--------------------+-----------------+------------------+--------------------+--------+--------------+------------+------------------+--------------------+----------+
|4dc3364e-100a-478...|     NULL|        [file_name, file_...|      [file_name, file_...|{1 -> {metadata={...|   {key}: {value}|                \n|What I Worked On\...|    NULL|             2|        4320|                \n|{metadata

In [12]:
spark.stop()