In [43]:
from pyspark import SparkContext, SparkConf
import findspark

findspark.init()

# Create a SparkConf object
conf = SparkConf().setAppName("DataScience").setMaster("local[*]")

# Create a SparkContext
sc.stop()  # Use this line of code only after the first run of the codes
sc = SparkContext(conf=conf)

# Load the data as an RDD
rdd = sc.textFile("C:/books.csv")

# Apply filter() to remove header from the rdd
header = rdd.first()
rdd_no_header = rdd.filter(lambda line: line != header)

# Apply map() transformation to extract the book titles
titles_rdd = rdd_no_header.map(lambda line: line.split(",")[1])

# Apply flatMap() transformation to split authors' names
authors_rdd = rdd_no_header.flatMap(lambda line: line.split(",")[2].split("/"))

# Apply union() transformation to combine two RDDs
combined_rdd = titles_rdd.union(authors_rdd)

# Convert the RDD to key-value pair RDD with language_code as the key
#key_value_rdd = rdd_no_header.map(lambda x: (x.split(",")[6], x))

# Filter out datasets with invalid language codes
filtered_rdd = rdd_no_header.filter(lambda line: line.split(",")[6] not in ['9.78067E+12', '9.78085E+12', '9.78159E+12'])

# Convert the RDD to key-value pair RDD with language_code as the key
key_value_rdd = filtered_rdd.map(lambda x: (x.split(",")[6], x))

# Sort the key-value pairs by key (language_code)
sorted_rdd = key_value_rdd.sortByKey()

# Group the sorted RDD by key (language_code)
grouped_rdd = sorted_rdd.groupByKey()

# Print the first 100 rows including header
print("Pre RDD Transformation:")
for row in rdd.take(100):
    print(row)

# Print the contents of the RDD
#print("\nDataset as an RDD without the headers:")
#for line in rdd_no_header.collect():
    #print(line)

print("\nPost RDD Transformation:")
# Print the results
print("\nTitles:")
for title in titles_rdd.collect():
    print(title)

# count(): Count the number of elements in the RDD
num_titles = titles_rdd.count()
print("\nNumber of titles:", num_titles)

print("\nAuthors:")
for author in authors_rdd.collect():
    print(author)

# count(): Count the number of elements in the RDD
num_authors = authors_rdd.count()
print("\nNumber of authors:", num_authors)

print("\nCombined RDD:")
for item in combined_rdd.collect():
    print(item)

# count(): Count the number of elements in the RDD
num_data = combined_rdd.count()
print("\nNumber of combined data:", num_data)

# Print the language code and corresponding data
for language_code, data in grouped_rdd.collect():
    print(f"\nLanguage Code: {language_code}")
    for row in data:
        print(row)

Pre RDD Transformation:
bookID,title,authors,average_rating,isbn,isbn13,language_code,  num_pages,ratings_count,text_reviews_count,publication_date,publisher,
1,Harry Potter and the Half-Blood Prince (Harry Potter  #6),J.K. Rowling/Mary GrandPré,4.57,439785960,9.78044E+12,eng,652,2095690,27591,9/16/2006,Scholastic Inc.,
2,Harry Potter and the Order of the Phoenix (Harry Potter  #5),J.K. Rowling/Mary GrandPré,4.49,439358078,9.78044E+12,eng,870,2153167,29221,09/01/2004,Scholastic Inc.,
4,Harry Potter and the Chamber of Secrets (Harry Potter  #2),J.K. Rowling,4.42,439554896,9.78044E+12,eng,352,6333,244,11/01/2003,Scholastic,
5,Harry Potter and the Prisoner of Azkaban (Harry Potter  #3),J.K. Rowling/Mary GrandPré,4.56,043965548X,9.78044E+12,eng,435,2339585,36325,05/01/2004,Scholastic Inc.,
8,Harry Potter Boxed Set  Books 1-5 (Harry Potter  #1-5),J.K. Rowling/Mary GrandPré,4.78,439682584,9.78044E+12,eng,2690,41428,164,9/13/2004,Scholastic,
9,"Unauthorized Harry Potter Book Seven News: ""Hal