In [118]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

import os, shutil
spark = (
    SparkSession.builder
    .appName("BookRecommenderTransform") \
    .master("local[*]") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()
)

In [119]:
book_tags_schema= StructType([
    StructField('goodreads_book_id', StringType(), True),
    StructField('tag_id', StringType(), True),
    StructField('count', IntegerType(), True),   
])

books_schema = StructType([
    StructField("index", StringType(), True),
    StructField("book_id", StringType(), True),
    StructField("best_book_id", StringType(), True),
    StructField("work_id", StringType(), True),
    StructField("books_count", IntegerType(), True),
    StructField("isbn", StringType(), True),
    StructField("isbn13", StringType(), True),
    StructField("authors", StringType(), True),
    StructField("original_publication_year", DoubleType(), True),
    StructField("original_title", StringType(), True),
    StructField("title", StringType(), True),
    StructField("language_code", StringType(), True),
    StructField("average_rating", DoubleType(), True),
    StructField("ratings_count", IntegerType(), True),
    StructField("work_ratings_count", IntegerType(), True),
    StructField("work_text_reviews_count", IntegerType(), True),
    StructField("ratings_1", IntegerType(), True),
    StructField("ratings_2", IntegerType(), True),
    StructField("ratings_3", IntegerType(), True),
    StructField("ratings_4", IntegerType(), True),
    StructField("ratings_5", IntegerType(), True),
    StructField("image_url", StringType(), True),
    StructField("small_image_url", StringType(), True)
])

ratings_schema= StructType([
    StructField('book_id', StringType(), True),
    StructField('user_id', StringType(), True),
    StructField('rating', IntegerType(), True),   
])

tags_schema= StructType([
    StructField('tag_id', IntegerType(), True),
    StructField('tag_name', StringType(), True), 
])

to_read_schema= StructType([
    StructField('book_id', StringType(), True),
    StructField('user_id', StringType(), True),
    StructField('rating', IntegerType(), True),   
])

In [120]:
df_book_tags = spark.read.csv("archive/book_tags.csv", header=True, schema= book_tags_schema)
df_books     = spark.read.csv("archive/books.csv", header=True, schema= books_schema)
df_ratings   = spark.read.csv("archive/ratings.csv", header=True, schema= ratings_schema)
df_tags      = spark.read.csv("archive/tags.csv", header=True, schema= tags_schema)
df_to_read   = spark.read.csv("archive/to_read.csv", header=True, schema= to_read_schema)


In [121]:
df_final = (
    df_books
    .join(df_ratings, "book_id", how="inner")
    .join(df_book_tags, df_book_tags.goodreads_book_id == df_books.book_id, how="inner")
    .join(df_tags, 'tag_id', how="inner")
)

df_final.dropDuplicates()

DataFrame[tag_id: string, book_id: string, index: string, best_book_id: string, work_id: string, books_count: int, isbn: string, isbn13: string, authors: string, original_publication_year: double, original_title: string, title: string, language_code: string, average_rating: double, ratings_count: int, work_ratings_count: int, work_text_reviews_count: int, ratings_1: int, ratings_2: int, ratings_3: int, ratings_4: int, ratings_5: int, image_url: string, small_image_url: string, user_id: string, rating: int, goodreads_book_id: string, count: int, tag_name: string]

In [122]:
df_final = df_final.select(
    col('user_id'),
    col('book_id'),
    col('best_book_id'),
    col('goodreads_book_id'),
    col('work_id'),
    col('books_count'),
    col('isbn'),
    col('authors'),
    col('original_publication_year').alias('year'),
    col('title'),
    col('language_code'),
    col('average_rating'),
    col('rating'),
    col('tag_name'),
)

In [123]:
df_final.show()
df_final.count()

+-------+-------+------------+-----------------+-------+-----------+---------+--------------------+------+--------------------+-------------+--------------+------+-----------------+
|user_id|book_id|best_book_id|goodreads_book_id|work_id|books_count|     isbn|             authors|  year|               title|language_code|average_rating|rating|         tag_name|
+-------+-------+------------+-----------------+-------+-----------+---------+--------------------+------+--------------------+-------------+--------------+------+-----------------+
|    153|   1032|        1032|             1032|1224415|         26|345479173|Donald J. Trump, ...|1987.0|Trump: The Art of...|        en-US|          3.66|     3|          to-read|
|    153|   1032|        1032|             1032|1224415|         26|345479173|Donald J. Trump, ...|1987.0|Trump: The Art of...|        en-US|          3.66|     3|currently-reading|
|    153|   1032|        1032|             1032|1224415|         26|345479173|Donald J. Tr

7970100

In [124]:
df_final.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- best_book_id: string (nullable = true)
 |-- goodreads_book_id: string (nullable = true)
 |-- work_id: string (nullable = true)
 |-- books_count: integer (nullable = true)
 |-- isbn: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- year: double (nullable = true)
 |-- title: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- average_rating: double (nullable = true)
 |-- rating: integer (nullable = true)
 |-- tag_name: string (nullable = true)



In [125]:
temp_dir = "data/clean_books_temp"
final_dir = "data/clean_books_csv"
final_name = "clean_books.csv"
(
    df_final.coalesce(1)
    .write.option("header", "true")
    .mode("overwrite")
    .csv(temp_dir)
)

for f in os.listdir(temp_dir):
    if f.endswith(".csv"):
        shutil.move(os.path.join(temp_dir, f), os.path.join(final_dir, final_name))

shutil.rmtree(temp_dir)
print(f"✅ File đã được lưu tại: {final_dir}/{final_name}")


                                                                                

FileNotFoundError: [Errno 2] No such file or directory: 'data/clean_books_csv/clean_books.csv'

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

import os, shutil
spark = (
    SparkSession.builder
    .appName("BookRecommenderTransform") \
    .master("local[*]") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()
)

book_tags_schema= StructType([
    StructField('goodreads_book_id', StringType(), True),
    StructField('tag_id', StringType(), True),
    StructField('count', IntegerType(), True),   
])

books_schema = StructType([
    StructField("index", StringType(), True),
    StructField("book_id", StringType(), True),
    StructField("best_book_id", StringType(), True),
    StructField("work_id", StringType(), True),
    StructField("books_count", IntegerType(), True),
    StructField("isbn", StringType(), True),
    StructField("isbn13", StringType(), True),
    StructField("authors", StringType(), True),
    StructField("original_publication_year", DoubleType(), True),
    StructField("original_title", StringType(), True),
    StructField("title", StringType(), True),
    StructField("language_code", StringType(), True),
    StructField("average_rating", DoubleType(), True),
    StructField("ratings_count", IntegerType(), True),
    StructField("work_ratings_count", IntegerType(), True),
    StructField("work_text_reviews_count", IntegerType(), True),
    StructField("ratings_1", IntegerType(), True),
    StructField("ratings_2", IntegerType(), True),
    StructField("ratings_3", IntegerType(), True),
    StructField("ratings_4", IntegerType(), True),
    StructField("ratings_5", IntegerType(), True),
    StructField("image_url", StringType(), True),
    StructField("small_image_url", StringType(), True)
])

ratings_schema= StructType([
    StructField('book_id', StringType(), True),
    StructField('user_id', StringType(), True),
    StructField('rating', IntegerType(), True),   
])

tags_schema= StructType([
    StructField('tag_id', IntegerType(), True),
    StructField('tag_name', StringType(), True), 
])

to_read_schema= StructType([
    StructField('book_id', StringType(), True),
    StructField('user_id', StringType(), True),
    StructField('rating', IntegerType(), True),   
])

df_book_tags = spark.read.csv("archive/book_tags.csv", header=True, schema= book_tags_schema)
df_books     = spark.read.csv("archive/books.csv", header=True, schema= books_schema)
df_ratings   = spark.read.csv("archive/ratings.csv", header=True, schema= ratings_schema)
df_tags      = spark.read.csv("archive/tags.csv", header=True, schema= tags_schema)
df_to_read   = spark.read.csv("archive/to_read.csv", header=True, schema= to_read_schema)

df_final = (
    df_books
    .join(df_ratings, 'book_id', how="left")
    .join(df_book_tags, df_book_tags.goodreads_book_id == df_books.book_id, how="inner")
    .join(df_tags, 'tag_id', how="left")
)


df_final = df_final.select(
    col('user_id'),
    col('book_id'),
    col('isbn'),
    col('authors'),
    col('original_publication_year').alias('year'),
    col('title'),
    col('language_code'),
    col('average_rating'),
    col('rating'),
    col('tag_name'),
)
df_final.dropDuplicates()


df_final.show()

temp_dir = "data/clean_books"
(
    df_final.coalesce(1)
    .write
    .option("header", "true")
    .option("quote", '"')    
    .option("escape", '"')     
    .mode("overwrite")
    .csv(temp_dir)
)

                                                                                

+-------+--------+----------+-----------------+------+--------------------+-------------+--------------+------+-----------------+
|user_id| book_id|      isbn|          authors|  year|               title|language_code|average_rating|rating|         tag_name|
+-------+--------+----------+-----------------+------+--------------------+-------------+--------------+------+-----------------+
|   NULL|10008056|8756795459|Jussi Adler-Olsen|2010.0|Journal 64 (Afdel...|          dan|          4.04|  NULL|          to-read|
|   NULL|10008056|8756795459|Jussi Adler-Olsen|2010.0|Journal 64 (Afdel...|          dan|          4.04|  NULL|          default|
|   NULL|10008056|8756795459|Jussi Adler-Olsen|2010.0|Journal 64 (Afdel...|          dan|          4.04|  NULL|currently-reading|
|   NULL|10008056|8756795459|Jussi Adler-Olsen|2010.0|Journal 64 (Afdel...|          dan|          4.04|  NULL|            krimi|
|   NULL|10008056|8756795459|Jussi Adler-Olsen|2010.0|Journal 64 (Afdel...|          dan| 

                                                                                

In [14]:
import pandas as pd
import glob

# 1. Đường dẫn đến thư mục chứa các file Spark
temp_dir = "data/clean_books"

# 2. Dùng glob để tìm tất cả các file có đuôi .csv
# Dấu * là ký tự đại diện, khớp với mọi tên file
csv_files = glob.glob(f"{temp_dir}/*.csv")

# 3. Đọc từng file và nối chúng lại
df_list = []
for file_path in csv_files:
    print(f"Đang đọc file: {file_path}")
    df_list.append(pd.read_csv(file_path))

# 4. Nối tất cả các DataFrame nhỏ thành 1 DataFrame lớn
df1 = pd.concat(df_list, ignore_index=True)

# Bây giờ df1 chứa dữ liệu của bạn
# print(df1.head())

Đang đọc file: data/clean_books/part-00000-a70fcedc-6052-44e8-855f-46a1cd725439-c000.csv


  exec(code_obj, self.user_global_ns, self.user_ns)


In [15]:
df1.nunique()

user_id           28906
book_id           10000
isbn               9308
authors            4664
year                293
title              9964
language_code        27
average_rating      184
rating                5
tag_name          34252
dtype: int64