# Custom Installation 


In [0]:
import os
local_path = "/Workspace/Users/scott.s@smartsoftus.com/rag/"

requirements_path = os.path.join(local_path, "requirements.txt")
if not os.path.exists(requirements_path):
    dbutils.fs.put("file:" + requirements_path, "", True)




In [0]:
%pip install -r $requirements_path

In [0]:
%restart_python

In [0]:
!pip freeze > requirements_freeze.txt

In [0]:

from pyspark.sql import SparkSession

# Use the existing Spark session
spark = SparkSession.getActiveSession() or SparkSession.builder.getOrCreate()

print(spark.version)
print(type(spark))

# Data Prep for RAG

In [0]:
# Reduce the arrow batch size as our PDF can be big in memory
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 10)

In [0]:
class paths:
    datasets = "/Volumes/arxiv_sample_articles/v01"

class DA:
    paths = paths()
    catalog_name = "arxiv_sample_articles"
    schema_name = "v01"

articles_path = f"{DA.paths.datasets}/arxiv-articles/"
table_name = f"{DA.catalog_name}.{DA.schema_name}.pdf_raw_text"

In [0]:
df = (
      spark.read.format("binaryFile")
      .option("recursiveFileLookup", "true")  
      .load(articles_path)
      )

# save List of the files to table
df.write.mode("overwrite").saveAsTable(table_name)
display(df)


In [0]:
import fitz  

def extract_doc_text(pdf_bytes: bytes) -> str:
    text = ""
    with fitz.open(stream=pdf_bytes, filetype="pdf") as doc:
        for page in doc:
            text += page.get_text()
    return text

In [0]:
with open(f"{articles_path.replace('dbfs:','/dbfs/')}2302.06476.pdf", mode="rb") as pdf:
    doc = extract_doc_text(pdf.read())
    print(doc)

In [0]:
import io
import os
import pandas as pd

# from llama_index.langchain_helpers.text_splitter import SentenceSplitter
from llama_index.core.text_splitter import SentenceSplitter
# from llama_index import Document, set_global_tokenizer
from llama_index.core.schema import Document
from llama_index.core.utils import set_global_tokenizer
from transformers import AutoTokenizer
from typing import Iterator
from pyspark.sql.functions import col,udf,length,pandas_udf,explode
from unstructured.partition.auto import partition

@pandas_udf("array<string>")
def read_as_chunk(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    #set llama2 as tokenizer
    set_global_tokenizer(
        AutoTokenizer.from_pretrained("hf-internal-testing/llama-tokenizer")
    )
    #Sentence splitter from llama_index to split on sentences
    splitter = SentenceSplitter(chunk_size=500, chunk_overlap=50)
    def extract_and_split(b):
        txt = extract_doc_text(b)
        nodes = splitter.get_nodes_from_documents([Document(text=txt)])
        return [n.text for n in nodes]
        
    for x in batch_iter:
        yield x.apply(extract_and_split)
    

In [0]:
df_chunks = (df
                  .withColumn("content", explode(read_as_chunk("content")))
                  .selectExpr('path as pdf_name','content')
)
display(df_chunks)