## Preprocess a Reddit dataset

In [1]:
import polars as pl
import numpy as np
import ollama
import llama_index
from llama_index.embeddings.ollama import OllamaEmbedding

ollama_embedding = OllamaEmbedding(model_name="llama3.1") 

In [2]:
# df = pl.read_csv('./data/reddit_answers_long.csv', separator=';')
# df

## Spark Version

Use Pyspark to get embedding.

In [3]:
# df.write_parquet('./data/reddit_answers_long.parquet')

In [4]:
import findspark
findspark.init()
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

from pyspark.sql import SparkSession

In [None]:
# spark = SparkSession.builder.appName("get_embedding").master("local[*]").getOrCreate()
spark = SparkSession.builder.appName("get_embedding").master("local[1]").getOrCreate()

In [6]:
df_spark = spark.read.parquet('./data/reddit_answers_long.parquet')
df_spark.show()

+---+------+--------------------+-------+
|   |  q_id|                text|  votes|
+---+------+--------------------+-------+
|  0|hvbvpz|Two pet ducks. Yo...| 2359.0|
|  1|hvbvpz| Nice try Jeff Bezos|  764.0|
|  2|hvbvpz|A curved shower r...| 1525.0|
|  3|hvbvpz|Another monitor. ...| 1227.0|
|  4|hvbvpz|A nasal irrigatio...|  659.0|
|  5|hvbvpz|House plants. The...| 4078.0|
|  6|hvbvpz|AeroPress coffee ...|  539.0|
|  7|hvbvpz|New pair of socks...| 1727.0|
|  8|hvbvpz|I bought a $1 bac...| 4088.0|
|  9|hvbvpz|Rice cooker. That...|  924.0|
| 10|hvbvpz|A decent light wa...| 1145.0|
| 11|hvbvpz|If you have troub...|  656.0|
| 12|hvbvpz|Vertical mouse. $...| 1786.0|
| 13|hvbvpz|A king sized blan...|63308.0|
| 15|hvbvpz|Sharp knives for ...|19673.0|
| 16|hvbvpz|This post cost me...|12411.0|
| 17|hvbvpz|A glass breaker o...| 5733.0|
| 18|hvbvpz|Motion sensor nig...|24840.0|
| 19|hvbvpz|In my opinion def...|18972.0|
| 20|hvbvpz|Carbon monoxide d...|38616.0|
+---+------+--------------------+-

In [7]:
df_spark.count()

5777453

In [8]:
# use a small sample data for pyspark
df_spark_sampled = df_spark.sample(fraction=0.000001)
print(df_spark_sampled.count())
df_spark_sampled.show()

5
+-------+------+--------------------+-----+
|       |  q_id|                text|votes|
+-------+------+--------------------+-----+
| 384360|d11h5w|Talk fo listen, n...| 20.0|
| 700354|ubwl7/|Give her all the ...|  3.0|
| 736913|qzsxb/|A true open world...|  4.0|
|1340492|patvg/|I used to work at...| 41.0|
|5720176|fwxg91|Takis and Tostito...|  3.0|
+-------+------+--------------------+-----+



In [9]:
# Function to process each partition (batch) with Llama3


def process_batch(iterator):
    texts = [row.text for row in iterator]
    all_chunks = []
    
    def split_text_into_chunks(text, chunk_size=512):
        return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]

    for long_text in texts:
        chunks = split_text_into_chunks(long_text, chunk_size=512)
        all_chunks.extend(chunks)
    
    embeddings = ollama_embedding.get_text_embedding_batch(all_chunks, show_progress=False)  # Get embeddings for the batch
    for chunk, em in zip(all_chunks, embeddings):
        yield chunk, em[:5]

# Apply the batch processing to the Spark DataFrame
embeddings_rdd = df_spark_sampled.rdd.mapPartitions(process_batch)

In [10]:
from pyspark.sql.types import (
    ArrayType,
    FloatType,
    StringType,
    StructType,
    StructField,
)

In [11]:
# Convert the RDD back to a DataFrame with embeddings
df_schema = StructType(
    [
        StructField("chunk", StringType(), False),
        StructField("embedding", ArrayType(FloatType()), True),
    ]
)
embeddings_df = spark.createDataFrame(embeddings_rdd, df_schema)




In [None]:

embeddings_df.count()

5

In [13]:
embeddings_df.show(truncate=False, vertical=True)

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 chunk     | Talk fo listen, not to reply.  People remember how you made them feel, not what you talked about.                                                                                                                                                                                                                                                                                         
 embedding | [-2.3535442, -1.3912575, 1.4241426, 0.13649482, -1.9253745]                                                                                                                                                