### Imports.

In [None]:
import os

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, DecimalType, LongType, ArrayType
from pyspark.sql.functions import to_date, from_json, regexp_replace, col, when, lower, initcap, explode_outer, datediff, desc, split, lit, round, avg, count, date_format

### Execute the shared notebook to utilize the common utilities.

In [None]:
%run .//shared//load_yaml_config.ipynb

### Configurations.

In [None]:
config = load_configs()

source_dir = config["data"]["source_dir"]
customer_file = config["data"]["customers_file"]
libraries_file = config["data"]["libraries_file"]
checkouts_file = config["data"]["checkouts_file"]
books_file = config["data"]["books_file"]
source_data_format = config["data"]["source_data_format"]

spark_source_data_read_options = config["spark_source_data_read_options"]
return_limit_in_days = config["data"]["return_limit_in_days"]

customers_path = os.path.join(source_dir, customer_file)
libraries_path = os.path.join(source_dir, libraries_file)
checkouts_path = os.path.join(source_dir, checkouts_file)
books_path = os.path.join(source_dir, books_file)

print(customers_path)
print(libraries_path)
print(checkouts_path)
print(books_path)
print(spark_source_data_read_options)
print(return_limit_in_days)
print(source_data_format)

### Build spark session.

In [None]:
spark = SparkSession.builder.appName(
    "library_book_late_analysis").master("local").getOrCreate()

### Read raw data.

In [None]:
customers_raw_df = spark.read.format(source_data_format).options(
    **spark_source_data_read_options).load(customers_path)
if customers_raw_df.isEmpty():
    raise SystemExit("Customers collection is empty!")


books_raw_df = spark.read.format(source_data_format).options(
    **spark_source_data_read_options).load(books_path)
if books_raw_df.isEmpty():
    raise SystemExit("Books collection is empty!")


checkouts_raw_df = spark.read.format(source_data_format).options(
    **spark_source_data_read_options).load(checkouts_path)
if checkouts_raw_df.isEmpty():
    raise SystemExit("Checkouts collection is empty!")


libraries_raw_df = spark.read.format(source_data_format).options(
    **spark_source_data_read_options).load(libraries_path)
if libraries_raw_df.isEmpty():
    raise SystemExit("Libraries collection is empty!")

### Function definitions.

In [None]:
"""
Input arguments:
    1. total_amount (int) - Represents the total value, equivalent to 100%.
    2. part_amount (int) - The value for which the percentage will be calculated relative to total_amount.

Output:
    1. (int) - Returns percentage calculated from part_amount and total_amount.

Description:
    This function calculates percentage of part_amount relative to total_amount and
    rounds the result to the specified number of decimal places.
"""


def calculate_percentage(total_amount: int, part_amount: int):
    decimals = 3

    try:
        return round((part_amount / total_amount) * 100, decimals)
    except ZeroDivisionError:
        print("Total amount can't be zero!")

In [None]:
"""
Input arguments:
    1. df (DataFrame) - The dataframe on which operation is performed.

Output:
    1. df (DataFrame) - Returns the modified dataframe.

Description:
    This function removes all leading and trailing whitespace from a string
    and replaces multiple consecutive spaces between words with a single space.
"""


def trim_white_spaces_from_df(df):
    for field in df.dtypes:
        column_name = field[0]
        column_type = field[1]

        if column_type != "string":
            continue

        df = df.withColumn(
            column_name, regexp_replace(column_name, "\s+", " ")
        )
    return df

In [None]:
"""
Input arguments:
    1. df (DataFrame) - The dataframe on which the operation is performed.
    2. perfix (str) - A string that will be prefixed to the beginning of 
    every column name, except for the id column.

Output:
    1. df (DataFrame) - Returns modified dataframe.

Description:
    This function is employed to rename columns prior to merging data from the 
    Libraries, Checkouts and Customers tables. This step is necessary to facilitate
    data manipulation, as Libraries and Customers share identical column names in
    some instances. The only exception to this renaming process is the id column, as
    the Checkouts table already include a prefix for its foreign key (i.e., library_id)
"""


def append_prefix_to_column_name(df, prefix: str):
    for column_name in df.columns:
        if column_name == "id":
            continue

        new_column_name = f"{prefix}_{column_name}"
        df = df.withColumnRenamed(column_name, new_column_name)

    return df

### Checkouts collection data cleaning.

In [None]:
checkouts_df = trim_white_spaces_from_df(checkouts_raw_df)

# Cast string columns to date data types
checkouts_df = checkouts_df.withColumn(
    "date_checkout", to_date("date_checkout", "yyyy-MM-dd")
).withColumn("date_returned", to_date("date_returned", "yyyy-MM-dd"))

print(checkouts_df.count())
checkouts_df.show()

### Books collection data cleaning.

In [None]:
books_df = trim_white_spaces_from_df(books_raw_df)

# Retain only numeric characters and dots in the price column
# Retain only numeric characters in the pages column
books_df = (
    books_df.withColumn("price", regexp_replace("price", "[^\d\.]", ""))
    .withColumn("pages", regexp_replace("pages", "[^\d]", ""))
)

books_df = books_df.withColumn(
    "categories",
    when(
        col("categories").contains("&"), regexp_replace(
            "categories", "&", "','")
    ).otherwise(col("categories")),
)

books_df = books_df.withColumn(
    "categories",
    when(
        col("categories").contains(","), regexp_replace(
            "categories", ",", "','")
    ).otherwise(col("categories")),
)

# Convert columns to their appropriate data types
# Explode the data on the authors and categories fields
# Renamae the column to follow snake_case conventions
books_df = (
    books_df.withColumn("authors", from_json(
        "authors", ArrayType(StringType())))
    .withColumn("categories", from_json("categories", ArrayType(StringType())))
    .withColumn("pages", col("pages").cast(LongType()))
    .withColumn("price", col("price").cast(DecimalType(precision=10, scale=2)))
    .withColumn("authors", explode_outer(col("authors")))
    .withColumn("categories", explode_outer(col("categories")))
    .withColumnRenamed("publishedDate", "published_date")
)

books_df = trim_white_spaces_from_df(books_df)

print(books_df.count())
books_df.show()

### Customers collection data cleaning.

In [None]:
customers_df = trim_white_spaces_from_df(customers_raw_df)

# Retain only numeric characters and dots in the zipcode to extract the number without decimals
# Convert the data types of the columns and apply the initicap function to capitalize the first letter of each word
customers_df = (
    customers_df.withColumn(
        "zipcode", regexp_replace("zipcode", "[^\d\.]", ""))
    .withColumn("zipcode", col("zipcode").cast(LongType()))
    .withColumn("city", initcap("city"))
    .withColumn("name", initcap("name"))
    .withColumn("education", initcap("education"))
    .withColumn("state", initcap("state"))
    .withColumn("gender", lower(col("gender")))
    .withColumn("occupation", initcap("occupation"))
)

print(customers_df.count())
customers_df.show()

### Libraries collection data cleaning.

In [None]:
libraries_df = trim_white_spaces_from_df(libraries_raw_df)

# Retain only numeric characters and dots in the postal_code to extract the number without decimals
# Convert the data types of the columns and apply the initicap function to capitalize the first letter of each word
libraries_df = (
    libraries_df.withColumn("region", lower(col("region")))
    .withColumn("postal_code", regexp_replace("postal_code", "[^\d\.]", ""))
    .withColumn("postal_code", col("postal_code").cast(LongType()))
    .withColumn("city", initcap("city"))
)

print(libraries_df.count())
libraries_df.show()

### Merge data into single dataframe.

In [None]:
checkouts_df = checkouts_df.withColumnRenamed("id", "checkout_id")
libraries_df = append_prefix_to_column_name(df=libraries_df, prefix="library")

library_checkouts = checkouts_df.join(
    libraries_df, on=checkouts_df.library_id == libraries_df.id, how="inner"
).drop("id")

customers_df = append_prefix_to_column_name(df=customers_df, prefix="customer")

final_df = library_checkouts.join(
    customers_df, on=library_checkouts.patron_id == customers_df.id, how="inner").drop("id")
final_df = final_df.withColumn("days_kept", datediff(
    col("date_returned"), col("date_checkout")))

final_df.show()

### Filter data where books were returned late.

In [None]:
# Considering only records where customers returned books late.
# The checkout date must not exceed the return date.
# The customer's birth date must not be later then the checkout date.
late_returns_df = final_df.filter(
    (col("days_kept") >= return_limit_in_days)
    & (col("date_checkout") < col("date_returned"))
    & (col("date_checkout") > col("customer_birth_date"))
)

if late_returns_df.isEmpty():
    raise SystemExit("No data found in unified table!")

print(late_returns_df.count())
late_returns_df.show()

### Analysis.

In [None]:
late_returns_df.describe().show()

In [None]:
customer_city_df = late_returns_df.filter((col("customer_city").isNotNull()))
total_late_returns_per_customer_city = customer_city_df.count()

customer_city_df = (
    customer_city_df.groupBy("customer_city")
    .count()
    .select("customer_city", col("count").alias("late_returns"))
    .orderBy(
        desc("late_returns"),
    )
)
customer_city_df = customer_city_df.withColumn(
    "percentage_late_returns",
    lit(calculate_percentage(total_late_returns_per_customer_city, col("late_returns"))),
)

print(total_late_returns_per_customer_city)
customer_city_df.show()

In [None]:
full_name_df = (
    late_returns_df.select("customer_name", "days_kept")
    .withColumn("first_name", split(col("customer_name"), " ").getItem(0))
    .withColumn("last_name", split(col("customer_name"), " ").getItem(1))
    .drop("customer_name")
)
total_late_returns_per_first_name = full_name_df.count()

first_name_df = (
    full_name_df.groupBy("first_name")
    .agg(
        count("first_name").alias("late_returns"),
        round(avg("days_kept"), 1).alias("avgerage_days_kept"),
    )
    .orderBy(desc("late_returns"))
)

first_name_df = first_name_df.withColumn(
    "percentage_late_returns",
    lit(calculate_percentage(total_late_returns_per_first_name, col("late_returns"))),
)

print(total_late_returns_per_first_name)
first_name_df.show()

In [None]:
last_name_df = full_name_df.groupBy("last_name").count().orderBy(desc("count"))
last_name_df.show()

In [None]:
occupation_df = late_returns_df.filter(col("customer_occupation").isNotNull())
total_late_returns_per_occupation = occupation_df.count()

occupation_df = (
    occupation_df.groupBy("customer_occupation")
    .count()
    .select("customer_occupation", col("count").alias("late_returns"))
    .orderBy(desc("late_returns"))
)

occupation_df = occupation_df.withColumn(
    "percentage_late_returns",
    lit(calculate_percentage(total_late_returns_per_occupation, col("late_returns"))),
)

print(total_late_returns_per_occupation)
occupation_df.show()

In [None]:
education_df = late_returns_df.filter(col("customer_education").isNotNull())
total_late_returns_per_education = education_df.count()

education_df = (
    education_df.groupBy("customer_education")
    .count()
    .select("customer_education", col("count").alias("late_returns"))
    .orderBy(desc("late_returns"))
)

education_df = education_df.withColumn(
    "percentage_late_returns",
    lit(calculate_percentage(total_late_returns_per_education, col("late_returns"))),
)

print(total_late_returns_per_education)
education_df.show()

In [None]:
education_occupation_df = late_returns_df.filter(
    (col("customer_education").isNotNull()) & (
        col("customer_occupation").isNotNull())
)
total_late_returns_per_education_occupation = education_occupation_df.count()

education_occupation_df = (
    education_occupation_df.groupBy(
        "customer_education", "customer_occupation")
    .count()
    .select(
        "customer_education",
        "customer_occupation",
        col("count").alias("late_returns"),
    )
    .orderBy(desc("late_returns"))
)

education_occupation_df = education_occupation_df.withColumn(
    "percentage_late_returns",
    lit(
        calculate_percentage(
            total_late_returns_per_education_occupation, col("late_returns")
        )
    ),
)

print(total_late_returns_per_education_occupation)
education_occupation_df.show()

In [None]:
gender_education_occupation_df = late_returns_df.filter(
    (col("customer_education").isNotNull())
    & (col("customer_occupation").isNotNull())
    & (col("customer_gender").isNotNull())
)
total_late_returns_per_gender_education_occupation = (
    gender_education_occupation_df.count()
)

gender_occupation_education_df = (
    gender_education_occupation_df.groupBy(
        "customer_gender", "customer_occupation", "customer_education"
    )
    .count()
    .select(
        "customer_gender",
        "customer_occupation",
        "customer_education",
        col("count").alias("late_returns"),
    )
    .orderBy(desc("late_returns"))
)
gender_occupation_education_df = gender_occupation_education_df.withColumn(
    "percentage_late_returns",
    lit(
        calculate_percentage(
            total_late_returns_per_gender_education_occupation, col(
                "late_returns")
        )
    ),
)

print(total_late_returns_per_gender_education_occupation)
gender_occupation_education_df.show()

In [None]:
day_of_week_checkout_df = late_returns_df.withColumn(
    "day_of_the_week", date_format("date_checkout", "EEEE")
)
total_late_returns_per_day_of_week = day_of_week_checkout_df.count()

day_of_week_checkout_df = (
    day_of_week_checkout_df.groupBy("day_of_the_week")
    .count()
    .select("day_of_the_week", col("count").alias("late_returns"))
    .orderBy(desc("late_returns"))
)
day_of_week_checkout_df = day_of_week_checkout_df.withColumn(
    "percentage_late_returns",
    lit(
        calculate_percentage(
            total_late_returns_per_day_of_week, col("late_returns")
        )
    ),
)

print(total_late_returns_per_day_of_week)
day_of_week_checkout_df.show()

In [None]:
month_checkout_df = late_returns_df.withColumn(
    "month", date_format("date_checkout", "MMMM")
)
total_late_returns_per_month = month_checkout_df.count()

month_checkout_df = (
    month_checkout_df.groupBy("month")
    .count()
    .select("month", col("count").alias("late_returns"))
    .orderBy(desc("late_returns"))
)
month_checkout_df = month_checkout_df.withColumn(
    "percentage_late_returns",
    lit(
        calculate_percentage(
            total_late_returns_per_month, col("late_returns")
        )
    ),
)

print(total_late_returns_per_month)
month_checkout_df.show()