In [32]:
%pip install sentence-transformers

Note: you may need to restart the kernel to use updated packages.


In [33]:
# Standard Library
import os
import re
import shutil
import enum
import timeit

# Third-Party Libraries
import pymupdf  # for PDF parsing  #type: ignore[import]
from docx import Document #type: ignore[import]
from pptx import Presentation #type: ignore[import]
from nltk import download #type: ignore[import]
from nltk.corpus import stopwords #type: ignore[import]
from sentence_transformers import SentenceTransformer #type: ignore[import]

# PySpark
from pyspark.sql import SparkSession #type: ignore[import]
from pyspark.ml.feature import Word2Vec #type: ignore[import]
from pyspark.sql.types import StructType, StructField #type: ignore[import]
import pyspark.sql.types as T #type: ignore[import]

# Task 1 : Preprocessing, Tokenization, and Embeddings

In [34]:
# We need to set the following environment variable, so that Spark knows where YARN runs
os.environ['HADOOP_CONF_DIR']="/etc/hadoop/conf"

# Since we are accessing spark through it's python API, we need to make sure that all executor
# instances run the same version of python. 
# (and we want Anaconda to be used, so we have access to numpy, pandas, and so forth)
# You will likely need to adjust this path if your run on a different cluster
os.environ['PYSPARK_PYTHON']="/home/guest/anaconda3/bin/python"
os.environ['PYSPARK_DRIVER_PYTHON']="/home/guest/anaconda3/bin/python"


#The following lines are just there to allow this cell to be re-executed multiple times:
#if a spark session was already started, we stop it before starting a new one
#(there can be only one spark context per jupyter notebook)
if 'spark' in locals():
    spark.stop() #type: ignore[no-untyped-call]

spark = SparkSession \
    .builder \
    .appName("tokenizer") \
    .getOrCreate()

#When dealing with RDDs, we work the sparkContext object. See https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext
sc=spark.sparkContext

In [35]:
# check that we have a working spark context, print its configuration
sc._conf.getAll()
class EmbeddingStrategy(enum.Enum):
    WORD2VEC = "word2vec"
    SENTENCE_TRANSFORMERS = "sentence-transformers"

## Data preprocessing

### Converting documents to raw text 

In [36]:
# Configuration
PARALLELISM = 8  # number of partitions
CHUNK_SIZE = 300  # must be between 200 and 500
EMBEDDING_STRATEGY = EmbeddingStrategy.SENTENCE_TRANSFORMERS  # or SENTENCE_TRANSFORMERS
INPUT_DIR = "../dataset"
TXT_DIR = "../dataset_txt"
OUTPUT_DIR = "../dataset/parquet"
os.makedirs(TXT_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
download('stopwords')
stop_words = set(stopwords.words('english'))


# Cleaning
def clean_text(text):
    text = re.sub(r'^[\s]*[•*·\-–—]+[\s]*', '', text, flags=re.MULTILINE)  # bullet chars at line starts
    text = re.sub(r'http\S+|www\S+|@\w+|#\w+', '', text)     # URLs, hashtags, mentions
    text = re.sub(r'[^\w\s]', ' ', text, flags=re.UNICODE)  # punctuation
    text = text.lower()                                     # lowercase
    tokens = text.split()                                   # tokenize
    tokens = [word for word in tokens if word not in stop_words and len(word) > 1]
    return ' '.join(tokens)

# Loop through files in the dataset directory
for filename in os.listdir(INPUT_DIR):
    filepath = os.path.join(INPUT_DIR, filename)
    text = ""

    if filename.endswith(".pdf"):
        try:
            with pymupdf.open(filepath) as pdf:
                for page in pdf:
                    text += page.get_text()
        except Exception as e:
            print(f"Error while reading PDF : {e}")
            continue

    elif filename.endswith(".docx"):
        try:
            doc = Document(filepath)
            text = "\n".join(p.text for p in doc.paragraphs)
        except Exception as e:
            print(f"Error while reading DOCX : {e}")
            continue

    elif filename.endswith(".pptx"):
        try:
            prs = Presentation(filepath)
            parts = []
            for slide in prs.slides:
                for shape in slide.shapes:
                    if hasattr(shape, "text"):
                        parts.append(shape.text)
            text = "\n".join(parts)
        except Exception as e:
            print(f"Error while reading PPTX : {e}")
            continue
        
    # Clean and save the text
    cleaned_text = clean_text(text)
    output_path = os.path.join(TXT_DIR, filename.rsplit(".", 1)[0] + ".txt")
    with open(output_path, "w", encoding="utf-8") as f:
        f.write(cleaned_text)
        
print("Files saved in /dataset_txt")

[nltk_data] Downloading package stopwords to /home/guest/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


Files saved in /dataset_txt


## Tokenization

In [37]:
# use spark to read all text files in the directory ../dataset_txt
RDD = sc.textFile(f"{TXT_DIR}/*.txt")
RDD = RDD.repartition(PARALLELISM)
print(RDD.getNumPartitions())

8


In [38]:
# Mapping Step
words_RDD = RDD.flatMap(lambda line: line.split()) # Each word becomes an element in the RDD

words_RDD = words_RDD.map(lambda w: w.lower()) # map to lower case

indexed_words = words_RDD.zipWithIndex()  # (word, id) pairs

if CHUNK_SIZE < 200 or CHUNK_SIZE > 500:
    raise ValueError("chunk_size must be between 200 and 500")

indexed_words.take(10)  # show the first 10 elements

# reduce the elements into chunks of size chunk_size
pairs_RDD = indexed_words.map(lambda x: (x[1] // CHUNK_SIZE, x[0]))  # (chunk_id, word)
pairs_RDD = pairs_RDD.groupByKey().map(lambda x: (x[0], list(x[1])))  # (chunk_id, [word1, word2, ...])
pairs_RDD.take(2)  # show the first 10 elements

                                                                                

[(0,
  ['info',
   'h515',
   'big',
   'data',
   'distributed',
   'management',
   'lecture',
   'distributed',
   'processing',
   'spark',
   'dimitris',
   'sacharidis',
   'outline',
   'introduction',
   'resilient',
   'distributed',
   'datasets',
   'rdds',
   'operations',
   'rdds',
   'creation',
   'transformations',
   'actions',
   'special',
   'global',
   'variables',
   'pair',
   'rdds',
   'spark',
   'program',
   'execution',
   'lazy',
   'evaluation',
   'caching',
   'persistence',
   'master',
   'slave',
   'architecture',
   'running',
   'spark',
   'cluster',
   'apache',
   'spark',
   'eco',
   'system',
   'reading',
   '96',
   'outline',
   'introduction',
   'resilient',
   'distributed',
   'datasets',
   'rdds',
   'operations',
   'rdds',
   'creation',
   'transformations',
   'actions',
   'special',
   'global',
   'variables',
   'pair',
   'rdds',
   'spark',
   'program',
   'execution',
   'lazy',
   'evaluation',
   'caching',
   'persi

# Part 2 Embeddings

## Word2Vec & SBERT

In [39]:
if not os.path.exists("../dataset/models/"):
    os.makedirs("../dataset/models/")
else:
    shutil.rmtree("../dataset/models/")
    os.makedirs("../dataset/models/")

schema = StructType([
            StructField("id", T.StringType(), False),
            StructField("text_chunks", T.ArrayType(T.StringType()), False),
            StructField("embedding", T.ArrayType(T.FloatType()), False)
        ])

df = pairs_RDD.toDF(["id", "text_chunks"])


In [None]:

begin = timeit.default_timer()
match EMBEDDING_STRATEGY:
    case EmbeddingStrategy.WORD2VEC:
        # Use Word2Vec
        word2vec = Word2Vec(inputCol="text_chunks", outputCol="embedding")
        model = word2vec.fit(df)
        result_word2vec = model.transform(df)
        #
        # Export the file to Parquet
        result_word2vec.write.save(f"{OUTPUT_DIR}/word2vec",mode="overwrite" ,format="parquet")
        model.save("../dataset/models/word2vec")

    case EmbeddingStrategy.SENTENCE_TRANSFORMERS:
        SBERT = SentenceTransformer('all-MiniLM-L6-v2')
        def encode_partition(iterator):
            model = SBERT
            batch = list(iterator)
            texts = [" ".join(row[1]) for row in batch]  # join word tokens into sentences
            embeddings = model.encode(texts)
            return ((row[0], row[1], emb.tolist()) for row, emb in zip(batch, embeddings))   

        #print("number of partition", df.rdd.getNumPartitions())
        embeddings_rdd = df.rdd.mapPartitions(encode_partition)
        # Convert the RDD to a DataFrame using the schema
        result_sbert = spark.createDataFrame(embeddings_rdd, schema)
        stop = timeit.default_timer()
        result_sbert.write.save(f"{OUTPUT_DIR}/SBERT",mode="overwrite" ,format="parquet")
    case _:
        raise ValueError("Invalid embedding strategy. Choose either WORD2VEC or SENTENCE_TRANSFORMERS.")


Time taken: 1.7099452310067136 seconds
