# Define parameters

In [0]:
catalog_name = "dev_silver"
schema_name = "books"

# Import of libraries

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
from pyspark.sql.functions import col, count, when, isnan

# Books silver table

## Data analysis

In [0]:
print("=== BASIC DATASET INFORMATION ===")

books =  spark.table("dev_bronze.books.books_bronze")

# Dataset Shape (rows, columns)
row_count = books.count()
col_count = len(books.columns)
print(f"Dataset shape: ({row_count}, {col_count})")

# Column Names
print(f"\n=== Column names ===\n{books.columns}")

# Data Types
print("\n=== Data types ===")
books.printSchema()

# Missing Values Count per Column
print("\n=== Missing values ===")
missing_counts = books.select([
    count(when(col(c).isNull(), c)).alias(c) for c in books.columns
])
missing_counts.show()

# Duplicate Rows Count
print("\n=== Duplicate rows ===")
duplicates_count = row_count - books.dropDuplicates().count()
print(f"Duplicate rows: {duplicates_count}")


## Creation of books silver table

In [0]:
table_name = "books_silver"

schema = StructType([
    StructField("ISBN", StringType(), True),
    StructField("Book_Title", StringType(), True),
    StructField("Book_Author", StringType(), True),
    StructField("Year_Of_Publication", StringType(), True),
    StructField("Publisher", StringType(), True),
    StructField("Image_URL_S", StringType(), True),
    StructField("Image_URL_M", StringType(), True),
    StructField("Image_URL_L", StringType(), True),
    StructField("Current_Timestamp", DateType(), True)
])

df = spark.sql(f"""
    select 
        distinct ISBN,
        coalesce(trim(`Book-Title`), "Unknown") as Book_Title,
        coalesce(trim(INITCAP(REGEXP_REPLACE(`Book-Author`, r'\s*\.\s*', '. '))), "Unknown") as Book_Author,
        CASE
            WHEN try_cast(`Year-Of-Publication` AS INT) IS NULL THEN 'Unknown'
            WHEN try_cast(`Year-Of-Publication` AS INT) > year(current_date()) THEN 'Unknown'
            WHEN try_cast(`Year-Of-Publication` AS INT) < 1000 THEN 'Unknown'
            ELSE cast(`Year-Of-Publication` AS STRING)
        END AS Year_Of_Publication,
        coalesce(trim(`Publisher`), "Unknown") as Publisher,
        coalesce(trim(`Image-URL-S`), "Unknown") as Image_URL_S,
        coalesce(trim(`Image-URL-M`), "Unknown") as Image_URL_M,
        coalesce(trim(`Image-URL-L`), "Unknown") as Image_URL_L,
        cast(current_timestamp() as date) as Current_Timestamp
    from dev_bronze.books.books_bronze  
    where ISBN is not null
""")
display(df)

(df.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
)
print(f"Table {table_name} updated")

# Ratings silver table

## Data analysis

In [0]:
print("=== BASIC DATASET INFORMATION ===")

ratings =  spark.table("dev_bronze.books.ratings_bronze")

# Dataset Shape (rows, columns)
row_count = ratings.count()
col_count = len(ratings.columns)
print(f"Dataset shape: ({row_count}, {col_count})")

# Column Names
print(f"\n=== Column names ===\n{ratings.columns}")

# Data Types
print("\n=== Data types ===")
ratings.printSchema()

# Missing Values Count per Column
print("\n=== Missing values ===")
missing_counts = ratings.select([
    count(when(col(c).isNull(), c)).alias(c) for c in ratings.columns
])
missing_counts.show()

# Duplicate Rows Count
print("\n=== Duplicate rows ===")
duplicates_count = row_count - ratings.dropDuplicates().count()
print(f"Duplicate rows: {duplicates_count}")

## Creation of ratings silver table

In [0]:
table_name = "ratings_silver"

schema = StructType([
    StructField("User_ID", IntegerType(), True),
    StructField("ISBN", IntegerType(), True),
    StructField("Book_Rating", IntegerType(), True),
    StructField("Current_Timestamp", DateType(), True)
])

df = spark.sql(f"""
    select 
        ISBN,
        cast(coalesce(`User-ID`, 999999999) as int) as User_ID,
        cast(`Book-Rating` as int) as Book_Rating,
        cast(current_timestamp() as date) as Current_Timestamp
    from dev_bronze.books.ratings_bronze  
    where ISBN is not null 
        and `Book-Rating` is not null
        and `Book-Rating` between 0 and 10
    group by 
        ISBN,
        `User-ID`,
        `Book-Rating`

""")
display(df)

(df.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
)
print(f"Table {table_name} updated")

# Users silver table

## Data analysis

In [0]:
print("=== BASIC DATASET INFORMATION ===")

users =  spark.table("dev_bronze.books.users_bronze")

# Dataset Shape (rows, columns)
row_count = users.count()
col_count = len(users.columns)
print(f"Dataset shape: ({row_count}, {col_count})")

# Column Names
print(f"\n=== Column names ===\n{users.columns}")

# Data Types
print("\n=== Data types ===")
users.printSchema()

# Missing Values Count per Column
print("\n=== Missing values ===")
missing_counts = users.select([
    count(when(col(c).isNull(), c)).alias(c) for c in users.columns
])
missing_counts.show()

# Duplicate Rows Count
print("\n=== Duplicate rows ===")
duplicates_count = row_count - users.dropDuplicates().count()
print(f"Duplicate rows: {duplicates_count}")

##Creation of ratings silver table
The column `Location` is not cleaned and split. Cleaning and spliting will be done in further development of the project.

In [0]:
table_name = "users_silver"

schema = StructType([
    StructField("User_ID", IntegerType(), True),
    StructField("Location", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Current_Timestamp", DateType(), True)
])

df = spark.sql(f"""
    select 
        cast(`User-ID` as int) as User_ID,
        Location,
        CASE
            WHEN try_cast(`Age` AS decimal) IS NULL THEN 'Unknown'
            WHEN try_cast(`Age` AS decimal) < 5 THEN 'Unknown'
            WHEN try_cast(`Age` AS decimal) > 120 THEN 'Unknown'
            ELSE cast(`Age` AS STRING)
        end as Age,
        cast(current_timestamp() as date) as Current_Timestamp
    from dev_bronze.books.users_bronze  
    where `User-ID` is not null 
    group by
        `User-ID`,
        `Location`,
        `Age`

""")
display(df)

(df.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
)
print(f"Table {table_name} updated")