In [1]:
import pyspark
import sparknlp
from pyspark.sql import SparkSession


In [2]:
import os

jar_dir = "/home/vaibhavi/jars"
all_jars = [os.path.join(jar_dir, f) for f in os.listdir(jar_dir) if f.endswith(".jar")]
all_jars_str = ",".join(all_jars)

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Spark NLP with GloVe") \
    .config("spark.jars", all_jars_str) \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()


25/08/03 16:09:06 WARN Utils: Your hostname, vaibhavi-HP-Laptop-15-fd0xxx resolves to a loopback address: 127.0.1.1; using 192.168.0.128 instead (on interface wlo1)
25/08/03 16:09:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/08/03 16:09:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
print(sparknlp.__version__)

6.1.0


In [4]:
df = spark.read.parquet("/home/vaibhavi/spark-ml-venv/ml_project/hybrid_book_recommender/data/final_books_data")

                                                                                

In [5]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- reviews_count: string (nullable = true)
 |-- slug: string (nullable = true)
 |-- users_count: string (nullable = true)
 |-- pages: string (nullable = true)
 |-- description: string (nullable = true)
 |-- book_category_id: string (nullable = true)
 |-- author_name: string (nullable = true)
 |-- weighted_rating_value: double (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [6]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import concat_ws, col

# Define mapping
category_map = {
    "1": "Book",
    "2": "Novella",
    "3": "Short Story",
    "4": "Graphic Novel",
    "5": "Fan Fiction",
    "6": "Research Paper",
    "7": "Poetry",
    "8": "Collection",
    "9": "Web Novel",
    "10": "Light Novel"
}

# UDF to map category_id to text
def map_category(cat_id):
    return category_map.get(str(cat_id), "Unknown")

map_category_udf = udf(map_category, StringType())

# Apply to DataFrame
df = df.withColumn("category_label", map_category_udf(col("book_category_id")))


In [7]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- reviews_count: string (nullable = true)
 |-- slug: string (nullable = true)
 |-- users_count: string (nullable = true)
 |-- pages: string (nullable = true)
 |-- description: string (nullable = true)
 |-- book_category_id: string (nullable = true)
 |-- author_name: string (nullable = true)
 |-- weighted_rating_value: double (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- category_label: string (nullable = true)



In [8]:
from pyspark.sql.functions import concat_ws, col

# Replace these column names with your actual ones if they differ
combined_df = df.withColumn(
    "text_for_embedding",
    concat_ws(" ", 
        col("title"), 
        col("author_name"), 
        col("genres"), 
        col("category_label"), 
        col("description")
    )
)


In [9]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- reviews_count: string (nullable = true)
 |-- slug: string (nullable = true)
 |-- users_count: string (nullable = true)
 |-- pages: string (nullable = true)
 |-- description: string (nullable = true)
 |-- book_category_id: string (nullable = true)
 |-- author_name: string (nullable = true)
 |-- weighted_rating_value: double (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- category_label: string (nullable = true)



In [10]:
combined_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- reviews_count: string (nullable = true)
 |-- slug: string (nullable = true)
 |-- users_count: string (nullable = true)
 |-- pages: string (nullable = true)
 |-- description: string (nullable = true)
 |-- book_category_id: string (nullable = true)
 |-- author_name: string (nullable = true)
 |-- weighted_rating_value: double (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- category_label: string (nullable = true)
 |-- text_for_embedding: string (nullable = false)



In [11]:
combined_df = combined_df.drop("id","release_date","release_year","reviews_count","slug","users_count","weighted_rating_value","pages","description","book_category_id","genres","category_label")

In [12]:
combined_df.printSchema()

root
 |-- title: string (nullable = true)
 |-- author_name: string (nullable = true)
 |-- text_for_embedding: string (nullable = false)



In [13]:
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline


In [15]:
document_assembler = DocumentAssembler() \
    .setInputCol("text_for_embedding") \
    .setOutputCol("document")

tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

glove_embeddings = WordEmbeddingsModel.pretrained("glove_100d", "en") \
    .setInputCols(["document", "token"]) \
    .setOutputCol("embeddings") \
    .setCaseSensitive(False)

embeddings_finisher = EmbeddingsFinisher() \
    .setInputCols(["embeddings"]) \
    .setOutputCols(["finished_embeddings"]) \
    .setOutputAsVector(True) \
    .setCleanAnnotations(False)


glove_100d download started this may take some time.
Approximate size to download 145.3 MB
[ | ]

25/08/03 16:13:26 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
25/08/03 16:13:27 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.


glove_100d download started this may take some time.
Approximate size to download 145.3 MB
[ / ]Download done! Loading the resource.
[OK!]


In [16]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
    document_assembler,
    tokenizer,
    glove_embeddings,
    embeddings_finisher
])

model = pipeline.fit(combined_df)
result_df = model.transform(combined_df)


In [17]:
result_df.printSchema()

root
 |-- title: string (nullable = true)
 |-- author_name: string (nullable = true)
 |-- text_for_embedding: string (nullable = false)
 |-- document: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- token: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)

In [18]:
result_df.printSchema()

root
 |-- title: string (nullable = true)
 |-- author_name: string (nullable = true)
 |-- text_for_embedding: string (nullable = false)
 |-- document: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- token: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)

In [19]:
result_df = result_df.select("title","finished_embeddings")

In [20]:
result_df.printSchema()

root
 |-- title: string (nullable = true)
 |-- finished_embeddings: array (nullable = true)
 |    |-- element: vector (containsNull = true)



In [21]:
final_df = result_df.withColumnRenamed("finished_embeddings", "sentence_vector")

# (Optional) Save
final_df.write.mode("overwrite").parquet("data/vectorized_books_glove.parquet")

ERROR:root:KeyboardInterrupt while sending command.               (0 + 12) / 13]
Traceback (most recent call last):
  File "/home/vaibhavi/ml-project-env/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/vaibhavi/ml-project-env/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

25/08/03 16:37:18 ERROR Executor: Exception in task 12.0 in stage 2.0 (TID 14)3]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/vaibhavi/ml-project-env/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1094, in main
    split_index = read_int(infile)
  File "/home/vaibhavi/ml-project-env/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 594, in read_int
    length = stream.read(4)
KeyboardInterrupt

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIte