# MOUNTING AZURE BLOB

In [None]:
from dotenv import load_dotenv
import os

In [None]:
SOURCE = os.getenv('SOURCE')
MOUNT_POINT = os.getenv('MOUNT_POINT')
BLOB_KEY = os.getenv('BLOB_KEY')

ENDPOINT = os.getenv('ENDPOINT')
ACCOUNT_KEY = os.getenv('ACCOUNT_KEY')
DATABASE = os.getenv('DATABASE')
CONTAINER = os.getenv('CONTAINER')

In [0]:
if any(mount.mountPoint == MOUNT_POINT for mount in dbutils.fs.mounts()):
    dbutils.fs.unmount(MOUNT_POINT)

dbutils.fs.mount(
    source=SOURCE,
    mount_point=MOUNT_POINT,
    extra_configs={"fs.azure.account.key.gutenbergbooks.blob.core.windows.net": BLOB_KEY}
)

/mnt/raw_data has been unmounted.


True

# FUNCTIONS

In [0]:
%pip install sparknlp
%pip install textstat
from datetime import datetime

import sparknlp
from sparknlp.base import *
from sparknlp.annotator import Tokenizer, StopWordsCleaner, Stemmer

from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import lit, desc
from pyspark.sql.types import StringType
from pyspark.sql.types import StructType, StructField

from pyspark.sql.functions import lit, col, regexp_extract
from pyspark.sql.types import DoubleType
from textstat import textstat

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting sparknlp
  Downloading sparknlp-1.0.0-py3-none-any.whl (1.4 kB)
Collecting spark-nlp
  Downloading spark_nlp-5.5.3-py2.py3-none-any.whl (635 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 635.7/635.7 kB 16.6 MB/s eta 0:00:00
Installing collected packages: spark-nlp, sparknlp
Successfully installed spark-nlp-5.5.3 sparknlp-1.0.0
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting textstat
  Downloading textstat-0.7.4-py3-none-any.whl (105 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 105.1/105.1 kB 3.4 MB/s eta 0:00:00
Collecting pyphen
  Downloading pyphen-0.17.2-py3-none-any.whl (2.1 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 2.1/2.1 MB 55.7 MB/s eta 0:00:00
Installing c

In [0]:
%sql
CREATE TABLE IF NOT EXISTS books_to_process (
    book_id LONG,    
    book_author STRING,
    book_title STRING,
    book_category ARRAY<STRING>,
    release_date STRING,
    book_text STRING
)
USING DELTA;

# GET FILE PATHS

In [0]:
def get_files():
    spark = SparkSession.builder.getOrCreate()
    storage_path = '/mnt/raw_data'
    folders = [folder.path for folder in dbutils.fs.ls(storage_path)]
    json_files = []

    for folder in folders:
        files = [file.path for file in dbutils.fs.ls(folder) if file.path.endswith(".json")]
        json_files.extend(files)

    return json_files


# GET FILES THAT HAVE NOT BEEN PROCESSED - LEFT ANTI JOIN

In [0]:
def show_files():

    json_files = get_files()
    if json_files:
        schema = StructType([StructField("file_path", StringType(), True)])
        new_files_df = spark.createDataFrame([(path,) for path in json_files], ["file_path"])
        processed_files_df = spark.read.format("delta").load("dbfs:/user/hive/warehouse/processed_files")
        unprocessed_files_df = new_files_df.join(processed_files_df, "file_path", "left_anti").dropDuplicates()
        unprocessed_files_limited = unprocessed_files_df.limit(1000)
        unprocessed_files = [row.file_path for row in unprocessed_files_limited.collect()]
        the_used_files = spark.createDataFrame([(path,) for path in unprocessed_files], schema)
        the_used_files.write.format("delta").mode("append").save("dbfs:/user/hive/warehouse/processed_files")
        records = spark.read.option("multiline", True).json(unprocessed_files)
        return records
    else:
        print("ERROR")




# SAVE BOOK RECORDS IN DELTA TABLE

In [0]:
from pyspark.sql.functions import length, regexp_replace, col, desc, from_unixtime,unix_timestamp
from pyspark.sql.types import DoubleType
from textstat import textstat

def process_records():
    df_books = show_files()
    df_books = df_books.dropDuplicates()
    df_books.write.format("delta").mode("overwrite").save("dbfs:/user/hive/warehouse/books_to_process")

process_records()


['dbfs:/mnt/raw_data/2025-02-05/100_TheCompleteWorksofWilliamShakespeare.json', 'dbfs:/mnt/raw_data/2025-02-05/101_TheHackerCrackdownLawandDisorderontheElectronicFrontier.json', 'dbfs:/mnt/raw_data/2025-02-05/102_TheTragedyofPuddnheadWilson.json', 'dbfs:/mnt/raw_data/2025-02-05/103_AroundtheWorldinEightyDays.json', 'dbfs:/mnt/raw_data/2025-02-05/104_InauguralAddressofFranklinDelanoRooseveltGiveninWashingtonDCMarch4th1933.json', 'dbfs:/mnt/raw_data/2025-02-05/105_Persuasion.json', 'dbfs:/mnt/raw_data/2025-02-05/106_JungleTalesofTarzan.json', 'dbfs:/mnt/raw_data/2025-02-05/107_FarfromtheMaddingCrowd.json', 'dbfs:/mnt/raw_data/2025-02-05/108_TheReturnofSherlockHolmes.json', 'dbfs:/mnt/raw_data/2025-02-05/109_RenascenceandOtherPoems.json', 'dbfs:/mnt/raw_data/2025-02-05/10_TheKingJamesVersionoftheBible.json', 'dbfs:/mnt/raw_data/2025-02-05/110_TessofthedUrbervillesAPureWoman.json', 'dbfs:/mnt/raw_data/2025-02-05/111_Freckles.json', 'dbfs:/mnt/raw_data/2025-02-05/112_Violists.json', 'dbfs:/

# PROCESS THE BOOKS AND SAVE IT IN COSMOS DB

In [0]:
def get_readability_score(text):
    if text:
        return textstat.flesch_reading_ease(text)
    return None

readability_udf = udf(get_readability_score, DoubleType())

cosmosConfig = {
        "spark.cosmos.accountEndpoint": ENDPOINT,
        "spark.cosmos.accountKey": ACCOUNT_KEY,
        "spark.cosmos.database": DATABASE,
        "spark.cosmos.container": CONTAINER,
        "spark.cosmos.write.strategy": "ItemAppend"
    }

def process_books():
    books_df = spark.read.option("multiline", True).table('hive_metastore.default.books_to_process')
    books_df = books_df.withColumn(
        'word_count', 
        (length(books_df.book_text) - length(regexp_replace(books_df.book_text, ' ', '')) + 1)
    )
    books_df = books_df.withColumn("release_date", from_unixtime(unix_timestamp("release_date", "MMM d, yyyy"), "yyyy-MM-dd"))
    books_df = books_df.sort('book_id')

    books_df = books_df.withColumn("readability", readability_udf(books_df.book_text))
    books_df = books_df.drop('book_text')
    books_df = books_df.withColumn("id", col("book_id").cast("string"))
    books_df.write.format('cosmos.oltp').options(**cosmosConfig).mode('APPEND').save()



process_books()




