In [0]:
%pip install langchain
dbutils.library.restartPython()

In [0]:
import os
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Directory path containing multiple CSV files
directory_path = "/Volumes/databricks_hackathon/llm/rag/csv"

# List all CSV files in the directory
file_paths = [file.path for file in dbutils.fs.ls(directory_path) if file.path.endswith('.csv')]

# Function to process a single file and split its text into chunks
def process_file(file_path):
    # Read the text file
    df = spark.read.text(file_path)
    
    # Collect all the text into a single string
    text_column = " ".join([row.value for row in df.collect()])
    
    # Initialize the text splitter
    splitter = RecursiveCharacterTextSplitter(
        separators=["\n\n", "\n", " ", ""],
        chunk_size=1000,
        chunk_overlap=200,
        length_function=len,
    )
    
    # Split the text into chunks
    chunks = splitter.split_text(text_column)
    
    return chunks

# Loop through all the CSV files and process them
for file_path in file_paths:
    chunks = process_file(file_path)
    
    # Perform actions with the chunks (e.g., passing them to the next notebook)
    # This could involve saving them to a temporary location, using them in memory, etc.
    for i, chunk in enumerate(chunks):
        print(f"Processing {file_path} - Chunk {i + 1}:\n{chunk}\n")

# Now the chunks are processed and can be used for the next steps in the Databricks workflow

In [0]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ArrayType, StringType
import pandas as pd

@pandas_udf("array<string>")
def get_chunks(dummy):
    return pd.Series([chunks])

# Register the UDF
spark.udf.register("get_chunks_udf", get_chunks)

<pyspark.sql.udf.UserDefinedFunction at 0x7fdb9f6a0450>

In [0]:
%sql
insert into databricks_hackathon.llm.docs_text (text)
select explode(get_chunks_udf('dummy')) as text;

In [0]:
# Assuming you have a CSV file you want to read into a DataFrame
df = spark.read.csv("/path/to/your/data.csv", header=True, inferSchema=True)

# Now you can create a temporary view from the DataFrame
df.createOrReplaceTempView("temp_table")

# Your SQL operation
spark.sql("""
    INSERT INTO databricks_hackathon.llm.docs_track
    SELECT * FROM temp_table
    WHERE NOT EXISTS (
        SELECT 1 FROM databricks_hackathon.llm.docs_track
        WHERE temp_table.file_name = databricks_hackathon.llm.docs_track.file_name
    )
""")