In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, avg, when, countDistinct, desc, rank; from pyspark.sql.window import Window

In [0]:
# Initialize Spark session
spark = SparkSession.builder.appName("GoldLayerTransformations").getOrCreate()

In [0]:
df_books = spark.read.table("`lms-catalog`.silver.books")
df_books_copies = spark.read.table("`lms-catalog`.silver.books_copies")
df_students = spark.read.table("`lms-catalog`.silver.students")
df_books.display()
df_books_copies.display()
df_students.display()


In [0]:
# Join tables on book_id
df_joined = df_books.join(df_books_copies, "book_id", "inner").display()

In [0]:
# Total Books & Copies
total_books = df_books.select(countDistinct("book_id").alias("total_books")).collect()[0]["total_books"]
total_copies = df_books_copies.select(count("copy_id").alias("total_copies")).collect()[0]["total_copies"]
display(total_books, total_copies)

In [0]:
# Books by Department
df_books_by_department = df_books.groupBy("department").count().withColumnRenamed("count", "book_count")
display(df_books_by_department)

In [0]:
# Books by Availability
df_books_by_status = df_books_copies.groupBy("status").count().withColumnRenamed("count", "status_count")
display(df_books_by_status)

In [0]:
# Top Authors & Publishers
df_top_authors = df_books.groupBy("author").count().orderBy(desc("count")).limit(10)
df_top_publishers = df_books.groupBy("publisher").count().orderBy(desc("count")).limit(10)
display(df_top_authors, df_top_publishers)

In [0]:
# Most Popular Books (Based on Copies)
window_spec = Window.orderBy(desc("copy_count"))
df_book_popularity = df_books_copies.groupBy("book_id").count().withColumnRenamed("count", "copy_count")
df_book_popularity = df_book_popularity.withColumn("rank", rank().over(window_spec)).limit(10)
df_book_popularity = df_book_popularity.join(df_books.select("book_id", "book_title"), "book_id")
df_book_popularity.show()


In [0]:
# Book Distribution by Location
df_location_distribution = df_books_copies.groupBy("location", "rack", "shelf").count().withColumnRenamed("count", "books_count")
df_location_distribution.show()

In [0]:
# Results Summary
print(f"Total Books: {total_books}, Total Copies: {total_copies}")

In [0]:
df_transactions_q2 = spark.read.table("`lms-catalog`.silver.transactions_2020_q2")
df_transactions_q3 = spark.read.table("`lms-catalog`.silver.transactions_2020_q3")
df_transactions_q4 = spark.read.table("`lms-catalog`.silver.transactions_2020_q4")
display(df_transactions_q2, df_transactions_q3, df_transactions_q4)

In [0]:

# Step 1: Join books_table and book_copies_table (Using INNER JOIN) -> books_final_table
books_final_table = df_books.join(df_books_copies, on="book_id", how="inner")


In [0]:
# Step 2: Join books_final_table and transaction_table (Using LEFT JOIN) -> books_transaction_table
books_transaction_table = books_final_table.join(df_transactions_q2, on=["book_id", "copy_id"], how="left")
# books_transaction_table = books_final_table.join(df_transactions_q3, on=["book_id", "copy_id"], how="left")
# books_transaction_table = books_final_table.join(df_transactions_q4, on=["book_id", "copy_id"], how="left")
display(books_transaction_table)


In [0]:
# Step 3: Join students_table and transaction_table (Using RIGHT JOIN) -> student_transaction_table
student_transaction_table = df_students.join(df_transactions_q2, on="student_id", how="right")
display(student_transaction_table)

In [0]:
# Step 4: Transformations on books_transaction_table
# Example: Count total transactions per book
book_transaction_counts = books_transaction_table.groupBy("book_title").agg(count("transaction_id").alias("total_transactions"))
display(book_transaction_counts)


In [0]:
# Example: Calculate average fine per book
avg_fine_per_book = books_transaction_table.groupBy("book_title").agg(avg("fine_amount").alias("average_fine"))
display(avg_fine_per_book)

In [0]:
# Example: Average fine per student
avg_fine_per_student = student_transaction_table.groupBy("student_id").agg(avg("fine_amount").alias("average_fine"))
display(avg_fine_per_student)