In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, DateType
from pyspark.sql import Window as W
import pyspark.sql.functions as F

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Library Management Analysis") \
    .getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/08 16:25:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Schema for 'branch' table
branch_schema = StructType([
    StructField("branch_id", StringType(), True),
    StructField("manager_id", StringType(), True),
    StructField("branch_address", StringType(), True),
    StructField("contact_no", StringType(), True)
])

# Schema for 'employees' table
employees_schema = StructType([
    StructField("emp_id", StringType(), True),
    StructField("emp_name", StringType(), True),
    StructField("position", StringType(), True),
    StructField("salary", FloatType(), True),
    StructField("branch_id", StringType(), True)
])

# Schema for 'books' table
books_schema = StructType([
    StructField("isbn", StringType(), True),
    StructField("book_title", StringType(), True),
    StructField("category", StringType(), True),
    StructField("rental_price", FloatType(), True),
    StructField("status", StringType(), True),
    StructField("author", StringType(), True),
    StructField("publisher", StringType(), True)
])

# Schema for 'issue_status' table
issue_status_schema = StructType([
    StructField("issued_id", StringType(), True),
    StructField("issued_member_id", StringType(), True),
    StructField("issued_book_name", StringType(), True),
    StructField("issued_date", DateType(), True),
    StructField("issued_book_isbn", StringType(), True),
    StructField("issued_emp_id", StringType(), True)
])

# Schema for 'return_status' table
return_status_schema = StructType([
    StructField("return_id", StringType(), True),
    StructField("issued_id", StringType(), True),
    StructField("return_book_name", StringType(), True),
    StructField("return_date", DateType(), True),
    StructField("return_book_isbn", StringType(), True)
])

# Schema for 'members' table
members_schema = StructType([
    StructField("member_id", StringType(), True),
    StructField("member_name", StringType(), True),
    StructField("member_address", StringType(), True),
    StructField("reg_date", DateType(), True)
])

# Load the CSV files into DataFrames
branch_df = spark.read.csv("/data/Library-System-Management/branch.csv", header=True, schema=branch_schema)
employees_df = spark.read.csv("/data/Library-System-Management/employees.csv", header=True, schema=employees_schema)
books_df = spark.read.csv("/data/Library-System-Management/books.csv", header=True, schema=books_schema)
issue_status_df = spark.read.csv("/data/Library-System-Management/issued_status.csv", header=True, schema=issue_status_schema)
return_status_df = spark.read.csv("/data/Library-System-Management/return_status.csv", header=True, schema=return_status_schema)
members_df = spark.read.csv("/data/Library-System-Management/members.csv", header=True, schema=members_schema)

# Register DataFrames as temporary views
branch_df.createOrReplaceTempView("branch")
employees_df.createOrReplaceTempView("employees")
books_df.createOrReplaceTempView("books")
issue_status_df.createOrReplaceTempView("issue_status")
return_status_df.createOrReplaceTempView("return_status")
members_df.createOrReplaceTempView("members")

views = ["branch","employees","books","issue_status","return_status","members"]
for view in views: 
    print(f"{view}")
    spark.sql('select * from '+view).show(20,False)
    print()

branch


                                                                                

+---------+----------+--------------+-------------+
|branch_id|manager_id|branch_address|contact_no   |
+---------+----------+--------------+-------------+
|B001     |E109      |123 Main St   |+919099988676|
|B002     |E109      |456 Elm St    |+919099988677|
|B003     |E109      |789 Oak St    |+919099988678|
|B004     |E110      |567 Pine St   |+919099988679|
|B005     |E110      |890 Maple St  |+919099988680|
+---------+----------+--------------+-------------+


employees


                                                                                

+------+----------------+---------+-------+---------+
|emp_id|emp_name        |position |salary |branch_id|
+------+----------------+---------+-------+---------+
|E101  |John Doe        |Clerk    |60000.0|B001     |
|E102  |Jane Smith      |Clerk    |45000.0|B002     |
|E103  |Mike Johnson    |Librarian|55000.0|B001     |
|E104  |Emily Davis     |Assistant|40000.0|B001     |
|E105  |Sarah Brown     |Assistant|42000.0|B001     |
|E106  |Michelle Ramirez|Assistant|43000.0|B001     |
|E107  |Michael Thompson|Clerk    |62000.0|B005     |
|E108  |Jessica Taylor  |Clerk    |46000.0|B004     |
|E109  |Daniel Anderson |Manager  |57000.0|B003     |
|E110  |Laura Martinez  |Manager  |41000.0|B005     |
|E111  |Christopher Lee |Assistant|65000.0|B005     |
+------+----------------+---------+-------+---------+


books
+-----------------+-----------------------------------------------------+----------------+------------+------+----------------------+-------------------------+
|isbn             |boo

In [3]:
# Library-System-Management:
# List Members Who Have Issued More Than One Book -- Objective: Use GROUP BY to find members who have issued more than one 
spark.sql("""
          
          select issued_member_id as Counts from issue_status group by issued_member_id having count(issued_member_id) > 1
""").show(20,False)

[Stage 6:>                                                          (0 + 1) / 1]

+------+
|Counts|
+------+
|C109  |
|C110  |
|C102  |
|C106  |
|C105  |
|C107  |
|C108  |
+------+



                                                                                

In [6]:
# Find Total Rental Income by Category
spark.sql("""
          with cte as ( select * from books b join issue_status i 
          on b.isbn = i.issued_book_isbn)
        select sum(rental_price), category from cte group by category
""").show(20,False)

+-----------------+----------------+
|sum(rental_price)|category        |
+-----------------+----------------+
|13.0             |Horror          |
|7.5              |Children        |
|49.5             |History         |
|28.5             |Fantasy         |
|7.5              |Mystery         |
|59.0             |Classic         |
|14.5             |Fiction         |
|25.5             |Dystopian       |
|6.5              |Literary Fiction|
|8.5              |Science Fiction |
+-----------------+----------------+



In [8]:
# List Members Who Registered in the Last 180 Days
spark.sql("""
          select member_name, reg_date from members 
          where date_diff(current_date(), reg_date) <=180
""").show(20,False)

+-----------+----------+
|member_name|reg_date  |
+-----------+----------+
|Sam        |2024-06-01|
|John       |2024-05-01|
+-----------+----------+



In [15]:

# List Employees with Their Branch Manager's Name and their branch details
spark.sql("""
with cte as ( 
    select branch.branch_id,manager_id,branch_address,contact_no,emp_name from branch 
    join employees on branch.manager_id = employees.emp_id 
),
  cte2 as ( 
    select * from cte c join employees e on c.branch_id=e.branch_id
)        
    select * from cte2
""").show(20,False)

+---------+----------+--------------+-------------+---------------+------+----------------+---------+-------+---------+
|branch_id|manager_id|branch_address|contact_no   |emp_name       |emp_id|emp_name        |position |salary |branch_id|
+---------+----------+--------------+-------------+---------------+------+----------------+---------+-------+---------+
|B003     |E109      |789 Oak St    |+919099988678|Daniel Anderson|E109  |Daniel Anderson |Manager  |57000.0|B003     |
|B002     |E109      |456 Elm St    |+919099988677|Daniel Anderson|E102  |Jane Smith      |Clerk    |45000.0|B002     |
|B001     |E109      |123 Main St   |+919099988676|Daniel Anderson|E106  |Michelle Ramirez|Assistant|43000.0|B001     |
|B001     |E109      |123 Main St   |+919099988676|Daniel Anderson|E105  |Sarah Brown     |Assistant|42000.0|B001     |
|B001     |E109      |123 Main St   |+919099988676|Daniel Anderson|E104  |Emily Davis     |Assistant|40000.0|B001     |
|B001     |E109      |123 Main St   |+91

In [19]:

# Retrieve the List of Books Not Yet Returned
spark.sql("""
          with cte as (
             select * from issue_status i 
                LEFT join return_status r on i.issued_id = r.issued_id
          )
          select * from cte WHERE return_date is NULL
""").show(20,False)

+---------+----------------+-----------------------------------------------------+-----------+-----------------+-------------+---------+---------+----------------+-----------+----------------+
|issued_id|issued_member_id|issued_book_name                                     |issued_date|issued_book_isbn |issued_emp_id|return_id|issued_id|return_book_name|return_date|return_book_isbn|
+---------+----------------+-----------------------------------------------------+-----------+-----------------+-------------+---------+---------+----------------+-----------+----------------+
|IS121    |C102            |The Shining                                          |2024-03-25 |978-0-385-33312-0|E109         |NULL     |NULL     |NULL            |NULL       |NULL            |
|IS122    |C102            |Fahrenheit 451                                       |2024-03-26 |978-0-451-52993-5|E109         |NULL     |NULL     |NULL            |NULL       |NULL            |
|IS123    |C103            |Dune   

In [None]:
spark.stop()