In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, Tokenizer, HashingTF
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from datetime import datetime




# Initialize Spark session
spark = SparkSession.builder \
    .appName("Book Impact Prediction") \
    .getOrCreate()

# Load data
df = spark.read.option("header", "true").option("delimiter", ",").option("quote", "\"").option("escape", "\"").csv("../csv/books_task.csv")


df = df.withColumn("Impact", col("Impact").cast("float"))
# Data Cleaning and Feature Engineering
# Convert publishedDate to year and month
df = df.withColumn("publishedYear", df["publishedDate"].substr(1, 4).cast(IntegerType()))
df = df.withColumn("publishedMonth", df["publishedDate"].substr(6, 2).cast(IntegerType()))

df = df.fillna(0)
# Handle missing values
df = df.fillna({'description': ''})
df = df.fillna({'authors': ''})
df = df.fillna({'publisher': ''})

# Feature Engineering for 'categories'
# One-hot encode 'categories'
indexer = StringIndexer(inputCol="categories", outputCol="categoryIndex")
encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
pipeline = Pipeline(stages=[indexer, encoder])
model = pipeline.fit(df)
df = model.transform(df)

# Feature Engineering for 'authors', 'publisher', and 'description'
# Tokenize 'authors', 'publisher', and 'description'
tokenizer = Tokenizer(inputCol="authors", outputCol="authorWords")
df = tokenizer.transform(df)

tokenizer = Tokenizer(inputCol="publisher", outputCol="publisherWords")
df = tokenizer.transform(df)

tokenizer = Tokenizer(inputCol="description", outputCol="descWords")
df = tokenizer.transform(df)

# Convert 'description' to TF-IDF features
hashingTF = HashingTF(inputCol="descWords", outputCol="descFeatures", numFeatures=100)
df = hashingTF.transform(df)

# Convert 'authors' and 'publisher' to TF-IDF features
hashingTF = HashingTF(inputCol="authorWords", outputCol="authorFeatures", numFeatures=100)
df = hashingTF.transform(df)

hashingTF = HashingTF(inputCol="publisherWords", outputCol="publisherFeatures", numFeatures=100)
df = hashingTF.transform(df)


df = df.fillna(0)

# Regression
# Prepare feature vector
assembler = VectorAssembler(inputCols=["publishedYear", "publishedMonth", "descFeatures", "authorFeatures", "publisherFeatures", "categoryVec"], outputCol="features")
df = assembler.transform(df)

df = df.fillna(0)

In [6]:
# Split data into train and test sets
train, test = df.randomSplit([0.8, 0.2], seed=42)

In [7]:
# Make predictions
# Train linear regression model
lr = LinearRegression(featuresCol="features", labelCol="Impact")
model = lr.fit(train)

# Make predictions
predictions = model.transform(test)

# Evaluate model
evaluator = RegressionEvaluator(labelCol="Impact", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predictions)
print("Mean Squared Error:", mse)

24/04/25 17:49:52 WARN Instrumentation: [c4cab3ab] regParam is zero, which might cause numerical instability and overfitting.

Mean Squared Error: 3942.9142309505382


                                                                                

In [8]:
# Define RandomForestRegressor
rf = RandomForestRegressor(featuresCol="features", labelCol="Impact")

# Define parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 30]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

# Define evaluator
evaluator = RegressionEvaluator(labelCol="Impact", predictionCol="prediction", metricName="mape")

# Cross-validation with 3 folds
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

# Time start
start_time = datetime.now()

# Fit model
cvModel = crossval.fit(train)

# Time end
end_time = datetime.now()

# Total training time
total_time = end_time - start_time

# Make predictions
predictions = cvModel.transform(test)

# Calculate MAPE
mape = evaluator.evaluate(predictions)

print("Mean Absolute Percentage Error (MAPE):", mape)
print("Total Training Time:", total_time)


                                                                                

IllegalArgumentException: RegressionEvaluator_5950b43eb6be parameter metricName given invalid value mape.

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/tanmay/Documents/highlevel/venv/lib/python3.10/site-packages/py4j/clientserver.py", line 480, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/tanmay/Documents/highlevel/venv/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/tanmay/Documents/highlevel/venv/lib/python3.10/site-packages/py4j/clientserver.py", line 503, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
