- **CLEAN BOOKS DATASET CSV**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, size, split
from pyspark.sql.types import StringType, BooleanType, StructType, StructField, FloatType
import re

# Initialize Spark Session
spark = SparkSession.builder.appName("CleanCSVData").getOrCreate()

# Define schema to ensure columns are interpreted correctly
schema = StructType([
    StructField("id", StringType(), True),
    StructField("Book", StringType(), True),
    StructField("Author", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Genres", StringType(), True),
    StructField("Rating", FloatType(), True)
])

# Read the CSV file into a Spark DataFrame with proper handling of special characters
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("quote", '"') \
    .option("escape", '\\') \
    .option("delimiter", ",") \
    .option("multiLine", "true") \
    .schema(schema) \
    .load("abfss://Data@onelake.dfs.fabric.microsoft.com/Datasets.Lakehouse/Files/books.csv")

# Display the original DataFrame
display(df)

# Define a function to check if the row has the expected number of columns
def is_valid_row(*columns):
    # Check if all columns have non-null values
    if len(columns) != 6:
        return False
    return all(columns)

is_valid_row_udf = udf(is_valid_row, BooleanType())

# Apply the UDF to filter rows with missing or misaligned columns
df_clean = df.filter(is_valid_row_udf(
    col("id"),
    col("Book"),
    col("Author"),
    col("Description"),
    col("Genres"),
    col("Rating")
))

# Additional filtering to catch multi-line anomalies in the 'Description' field
df_clean = df_clean.filter(~col("Description").contains("\n"))

# Display the cleaned DataFrame
display(df_clean)

# Coalesce the DataFrame to a single partition for saving
df_clean_coalesced = df_clean.coalesce(1)

# Save the cleaned DataFrame to a new CSV file in Azure Data Lake Storage
cleaned_file_path = "abfss://Data@onelake.dfs.fabric.microsoft.com/Datasets.Lakehouse/Files/booksclean"
df_clean_coalesced.write.format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save(cleaned_file_path)

print(f"Cleaned data saved to: {cleaned_file_path}")
