In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws, col, udf, split
from pyspark.sql.types import StringType
import re
import logging
import time

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s:%(message)s')
logger = logging.getLogger(__name__)

logger.info("Initializing Spark Session.")
spark = SparkSession.builder \
    .appName("Connect to HDFS") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://10.0.2.15:9000") \
    .getOrCreate()

# Test HDFS access by listing files
hdfs_path = "hdfs:10.0.2.15:9000/user/Group04/Books.csv"


logger.info("Loading data...")

def preprocess_text(text):
    if text:
        # Remove special characters and digits
        text = re.sub(r'[^a-zA-Z\s]', '', text)
        # Convert to lowercase
        text = text.lower()
        # Remove extra spaces
        text = re.sub(r'\s+', ' ', text).strip()
    else:
        text = ''
    return text
    
# Load Data
df = spark.read.csv(hdfs_path, header=True, inferSchema=True)
df.show(5)

logger.info("Removing duplicates...")
df = df.dropDuplicates(['Id'])
df = df.withColumn('cleaned_title', preprocess_udf(col('Title')))
df = df.dropDuplicates(['cleaned_title'])
logger.info(f"Total records after removing duplicates: {df.count()}")

logger.info("Sampling data...")
sample_fraction = 1.0  # Adjust as needed
df_sample = df.sample(withReplacement=False, fraction=sample_fraction, seed=42)
logger.info(f"Total records after sampling: {df_sample.count()}")

df_sample = df_sample.fillna('')

logger.info("Combining text fields...")
df_sample = df_sample.withColumn('combined_text', concat_ws(' ',
    col('Title'),
    col('description'),
    col('authors'),
    col('categories')
))

logger.info("Preprocessing text...")

preprocess_udf = udf(preprocess_text, StringType())
df_sample = df_sample.withColumn('processed_text', preprocess_udf(col('combined_text')))

logger.info("Tokenizing and removing stop words...")

from pyspark.ml.feature import Tokenizer, StopWordsRemover

tokenizer = Tokenizer(inputCol='processed_text', outputCol='words')
df_sample = tokenizer.transform(df_sample)

remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
df_sample = remover.transform(df_sample)

logger.info("Converting filtered_words to binary vectors using CountVectorizer...")

from pyspark.ml.feature import CountVectorizer

count_vectorizer = CountVectorizer(
    inputCol='filtered_words',
    outputCol='cv_features',
    binary=True,
    vocabSize=5000,
    minDF=2
)

start_time = time.time()
cv_model = count_vectorizer.fit(df_sample)
df_sample = cv_model.transform(df_sample)
end_time = time.time()
logger.info(f"CountVectorizer completed in {end_time - start_time:.2f} seconds.")

logger.info("Applying MinHashLSH for approximate similarity join...")

from pyspark.ml.feature import MinHashLSH

mh = MinHashLSH(
    inputCol='cv_features',
    outputCol='hashes',
    numHashTables=3
)
start_time = time.time()
mh_model = mh.fit(df_sample)
df_sample = mh_model.transform(df_sample)
end_time = time.time()
logger.info(f"MinHashLSH completed in {end_time - start_time:.2f} seconds.")

logger.info("Performing approximate similarity join...")

threshold = 0.8  # Adjust as needed
similarity_df = mh_model.approxSimilarityJoin(
    df_sample.select('Id', 'Title', 'cv_features'),
    df_sample.select('Id', 'Title', 'cv_features'),
    threshold=threshold,
    distCol='JaccardDistance'
)

logger.info("Filtering out self-matches and selecting top N recommendations...")

similarity_df = similarity_df.filter(col('datasetA.Id') != col('datasetB.Id'))

from pyspark.sql.window import Window
import pyspark.sql.functions as F

window_spec = Window.partitionBy('datasetA.Id').orderBy('JaccardDistance')
similarity_df = similarity_df.withColumn('rank', F.row_number().over(window_spec))

N = 10
top_n_df = similarity_df.filter(col('rank') <= N)

recommendations_df = top_n_df.groupBy('datasetA.Id', 'datasetA.Title').agg(
    F.collect_list('datasetB.Title').alias('Recommended_Titles')
).withColumnRenamed('datasetA.Id', 'Id').withColumnRenamed('datasetA.Title', 'Title')

# Convert the array of recommended titles to a single string
from pyspark.sql.functions import concat_ws

recommendations_df = recommendations_df.withColumn(
    'Recommended_Titles',
    concat_ws(' | ', 'Recommended_Titles')
)

# Select the columns to write
output_df = recommendations_df.select('Id', 'Title', 'Recommended_Titles')

logger.info("Saving recommendations to file...")

output_df.write.csv('book_recommendations_sample.csv', header=True, mode='overwrite')

logger.info("Book recommendation pipeline completed.")



2024-12-03 23:54:53,441 INFO:Initializing Spark Session.
2024-12-03 23:54:53,446 INFO:Loading data...
2024-12-03 23:55:06,006 INFO:Removing duplicates...                             
2024-12-03 23:55:22,175 INFO:Total records after removing duplicates: 166364    
2024-12-03 23:55:22,175 INFO:Sampling data...
2024-12-03 23:55:38,276 INFO:Total records after sampling: 166364               
2024-12-03 23:55:38,284 INFO:Combining text fields...
2024-12-03 23:55:38,305 INFO:Preprocessing text...
2024-12-03 23:55:38,323 INFO:Tokenizing and removing stop words...
2024-12-03 23:55:38,395 INFO:Converting filtered_words to binary vectors using CountVectorizer...
2024-12-03 23:56:02,178 INFO:CountVectorizer completed in 23.78 seconds.        
2024-12-03 23:56:02,179 INFO:Applying MinHashLSH for approximate similarity join...
2024-12-03 23:56:02,220 INFO:MinHashLSH completed in 0.04 seconds.
2024-12-03 23:56:02,221 INFO:Performing approximate similarity join...
2024-12-03 23:56:02,265 INFO:Filteri

In [4]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("LoadAndMergeRecommendations") \
    .master("local[*]") \
    .getOrCreate()

# Path to the output directory containing part files
output_path = "merged_recommendations.csv"

# Load all part files into a single DataFrame
recommendations_df = spark.read.csv(output_path, header=True, inferSchema=True)
recommendations_df.coalesce(1).write.option('sep', '###').csv("merged_recommendations", header=True, mode="overwrite")

In [6]:
import pandas as pd

# Load the merged CSV with tab delimiter
merged_file_path = "merged_recommendations/part-00000-bf156df3-4792-41ad-9235-d683b7fbc646-c000.csv"  # Adjust as needed
merged_df = pd.read_csv(merged_file_path, sep='###')

# Show random row
merged_df.sample(10)

  merged_df = pd.read_csv(merged_file_path, sep='###')


Unnamed: 0,Id,Title,Recommended_Titles
64296,B0007HGS0M,Goldfinger ( A Signet Book),The Trampling of the Lilies | The Sojourner | ...
33596,B000N0ZZ3E,Greek Art (Praeger World of Art),The comedy world of Stan Laurel | Heritage of ...
33320,B000K7NMEE,Warfare in the Age of Bonaparte,ART OF WARFARE IN THE AGE OF NAPOLEON
50203,B000OXLXIG,"All Tucked In... (Harlequin Blaze, 91)",I Thee Bed... (Harlequin Temptation)
49581,B000K5SI5Y,The Christmas Whale,SANTA CLAUS AND HIS ELVES
51125,0393323536,Before & After: Stories from New York,The Famous New York Fundamentalist-Modernist D...
23707,B0007DUPYQ,The apple in the dark,Dark Tyrant's Ascension | Dark Carnival | Dark...
60360,0789302705,Women's Soccer: The Game and the FIFA World Cup,Micro Fiction: An Anthology of Fifty Really Sh...
54221,1563923955,Toyota Corolla & Geo/Chevrolet Prizm 1993-2001...,1999 Timing Belts Manual | Manual of ski mount...
37720,0971246807,Healing Client Relationships: A Professional's...,The Loyal Customer: A Lesson From a Cab Driver...


In [None]:
spark.stop()