In [15]:
from pyspark import SparkContext

In [16]:
# Initialize SparkContext
sc = SparkContext(appName="BooksRatingsJoin")

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=BooksRatingsJoin, master=local[*]) created by __init__ at /tmp/ipykernel_49729/930124118.py:2 

In [17]:
# Load CSV files into RDDs
ratings = sc.textFile('/home/maryam/content/BooksRating-CSV/Book-Ratings.csv')
books = sc.textFile('/home/maryam/content/BooksRating-CSV/Books.csv')
users = sc.textFile('/home/maryam/content/BooksRating-CSV/Users.csv')

In [18]:
# Extract headers
ratings_header = ratings.first()
books_header = books.first()
users_header = users.first()

In [19]:
# Parsing functions
def parse_ratings(line):
    try:
        if line == ratings_header:
            return None
        fields = line.replace('"', '').split(';')
        userid = fields[0]  # userid is the first field
        isbn = fields[1]  # isbn is the second field
        rate = float(fields[2])  # rate is the third field
        return (isbn, (userid, rate))
    except Exception as e:
        print(f"Error parsing ratings line: {line}, Error: {e}")
        return None

def parse_books(line):
    try:
        if line == books_header:
            return None
        fields = line.replace('"', '').split(';')
        isbn = fields[0]  # isbn is the first field
        book_title = fields[1]  # book title is the second field
        return (isbn, book_title)
    except Exception as e:
        print(f"Error parsing books line: {line}, Error: {e}")
        return None

def parse_users(line):
    try:
        if line == users_header:
            return None
        fields = line.replace('"', '').split(';')
        userid = fields[0]  # userid is the first field
        user_name = fields[1]  # user name is the second field
        return (userid, user_name)
    except Exception as e:
        print(f"Error parsing users line: {line}, Error: {e}")
        return None

In [20]:
# Parse the CSV lines and filter out invalid rows
parsed_ratings = ratings.map(parse_ratings).filter(lambda x: x is not None)
parsed_books = books.map(parse_books).filter(lambda x: x is not None)
parsed_users = users.map(parse_users).filter(lambda x: x is not None)

In [21]:
# Calculate the sum and count of ratings for each isbn
isbn_ratings = parsed_ratings.map(lambda x: (x[0], x[1][1]))
isbn_rating_totals = isbn_ratings.combineByKey(
    lambda value: (value, 1),
    lambda acc, value: (acc[0] + value, acc[1] + 1),
    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
)
isbn_average_ratings = isbn_rating_totals.mapValues(lambda totals: totals[0] / totals[1])

In [22]:
# Join ratings with books to get book titles
ratings_books_join = parsed_ratings.map(lambda x: (x[0], x[1])).join(parsed_books)

In [23]:
# Join the above result with users to get user names
ratings_books_users_join = ratings_books_join.map(lambda x: (x[1][0][0], (x[0], x[1][0][1], x[1][1]))).join(parsed_users)

In [24]:
# Join with average ratings
final_join = ratings_books_users_join.map(lambda x: (x[1][0][0], (x[1][0][2], x[1][0][1], x[1][1]))).join(isbn_average_ratings)

In [25]:
# Format the final output
final_output = final_join.map(lambda x: (x[1][0][2], x[1][0][0], x[1][0][1], x[1][1]))

In [26]:
# Add the header 
header = sc.parallelize(['username;book title;username’s rate;Book Avg’ rate']) 
final_output_with_header = header.union(final_output.map(lambda x: f'"{x[0]}";"{x[1]}";"{x[2]}";"{x[3]:.2f}"'))

In [27]:
# Coalesce to a single partition 
final_output_with_header = final_output_with_header.coalesce(1)

In [28]:
# Save the results to a new CSV file 
output_file = '/home/maryam/content/BooksRating_csv_with_header3.csv' 
final_output_with_header.saveAsTextFile(output_file) 
print(f"Results saved to {output_file}")

[Stage 10:>                                                         (0 + 1) / 1]

Results saved to /home/maryam/content/BooksRating_csv_with_header3.csv


                                                                                

In [None]:
# Stop the SparkContext 
sc.stop()