In [0]:
#create catelog, schema & volume by UI
#download txt file under the volume
import requests

url = "https://www.gutenberg.org/cache/epub/51355/pg51355.txt"
response = requests.get(url)
response.raise_for_status()

with open(
    "/Volumes/textcontent/textschema/textvolume/iliad.txt",
    "wb"
) as f:
    f.write(response.content)

In [0]:
df = spark.read.text(
    "/Volumes/textcontent/textschema/textvolume/iliad.txt"
)
display(
    df.limit(10)
)

In [0]:
from pyspark.sql.functions import split, explode

words_df = df.select(
    explode(
        split(df["value"], " ")
    ).alias("word")
)
display(
    words_df.limit(10)
)

In [0]:
#change the stopword from doc

stopwords = ['call', 'upon', 'still', 'nevertheless', 'down', 'every', 'forty', 
'‘re', 'always', 'whole', 'side', "n't", 'now', 'however', 'an', 'show', 'least', 
'give', 'below', 'did', 'sometimes', 'which', "'s", 'nowhere', 'per', 'hereupon', 
'yours', 'she', 'moreover', 'eight', 'somewhere', 'within', 'whereby', 'few', 
'has', 'so', 'have', 'for', 'noone', 'top', 'were', 'those', 'thence', 'eleven', 
'after', 'no', '’ll', 'others', 'ourselves', 'themselves', 'though', 'that', 
'nor', 'just', '’s', 'before', 'had', 'toward', 'another', 'should', 'herself', 
'and', 'these', 'such', 'elsewhere', 'further', 'next', 'indeed', 'bottom', 
'anyone', 'his', 'each', 'then', 'both', 'became', 'third', 'whom', '‘ve', 'mine', 
'take', 'many', 'anywhere', 'to', 'well', 'thereafter', 'besides', 'almost', 
'front', 'fifteen', 'towards', 'none', 'be', 'herein', 'two', 'using', 'whatever', 
'please', 'perhaps', 'full', 'ca', 'we', 'latterly', 'here', 'therefore', 'us', 
'how', 'was', 'made', 'the', 'or', 'may', '’re', 'namely', "'ve", 'anyway', 
'amongst', 'used', 'ever', 'of', 'there', 'than', 'why', 'really', 'whither', 
'in', 'only', 'wherein', 'last', 'under', 'own', 'therein', 'go', 'seems', '‘m', 
'wherever', 'either', 'someone', 'up', 'doing', 'on', 'rather', 'ours', 'again', 
'same', 'over', '‘s', 'latter', 'during', 'done', "'re", 'put', "'m", 'much', 
'neither', 'among', 'seemed', 'into', 'once', 'my', 'otherwise', 'part', 
'everywhere', 'never', 'myself', 'must', 'will', 'am', 'can', 'else', 'although', 
'as', 'beyond', 'are', 'too', 'becomes', 'does', 'a', 'everyone', 'but', 'some', 
'regarding', '‘ll', 'against', 'throughout', 'yourselves', 'him', "'d", 'it', 
'himself', 'whether', 'move', '’m', 'hereafter', 're', 'while', 'whoever', 'your', 
'first', 'amount', 'twelve', 'serious', 'other', 'any', 'off', 'seeming', 'four', 
'itself', 'nothing', 'beforehand', 'make', 'out', 'very', 'already', 'various', 
'until', 'hers', 'they', 'not', 'them', 'where', 'would', 'since', 'everything', 
'at', 'together', 'yet', 'more', 'six', 'back', 'with', 'thereupon', 'becoming', 
'around', 'due', 'keep', 'somehow', 'n‘t', 'across', 'all', 'when', 'i', 'empty', 
'nine', 'five', 'get', 'see', 'been', 'name', 'between', 'hence', 'ten', 
'several', 'from', 'whereupon', 'through', 'hereby', "'ll", 'alone', 'something', 
'formerly', 'without', 'above', 'onto', 'except', 'enough', 'become', 'behind', 
'’d', 'its', 'most', 'n’t', 'might', 'whereas', 'anything', 'if', 'her', 'via', 
'fifty', 'is', 'thereby', 'twenty', 'often', 'whereafter', 'their', 'also', 
'anyhow', 'cannot', 'our', 'could', 'because', 'who', 'beside', 'by', 'whence', 
'being', 'meanwhile', 'this', 'afterwards', 'whenever', 'mostly', 'what', 'one', 
'nobody', 'seem', 'less', 'do', '‘d', 'say', 'thus', 'unless', 'along', 
'yourself', 'former', 'thru', 'he', 'hundred', 'three', 'sixty', 'me', 'sometime', 
'whose', 'you', 'quite', '’ve', 'about', 'even', 'thou', 'thy']



In [0]:
display(
    spark.createDataFrame(
        [(word,) for word in stopwords[:10]],
        ["stopword"]
    )
)

In [0]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover

# Tokenize the string column 'word' into an array of words
tokenizer = Tokenizer(
    inputCol="word",
    outputCol="words"
)
tokenized_df = tokenizer.transform(words_df)

# Remove stopwords from the array column
stopwordremover = StopWordsRemover(
    inputCol="words",
    outputCol="filtered_words",
    stopWords=stopwords,
    caseSensitive=False
)
filtered_df = stopwordremover.transform(tokenized_df)

#display(filtered_df.limit(20))

#filtered_df.select("words", "filtered_words").show(truncate=False)


In [0]:
from pyspark.sql.functions import array_join
from pyspark.sql import functions as F
from pyspark.sql.functions import col

final_df = filtered_df.withColumn("cleaned_text", array_join(col("filtered_words"), " "))
display(final_df.limit(20))


In [0]:
#this can be replaced by map/reduce step
word_counts = final_df.groupBy("cleaned_text").count()
display(word_counts)

In [0]:
from pyspark.sql.functions import col

# Map: assign count 1 to each cleaned_text
#mapped_df = final_df.withColumn("count", col("cleaned_text").isNotNull().cast("int"))

#add count only to those that are not null or empty
mapped_df = final_df.filter(
    (col("cleaned_text").isNotNull()) &
    (col("cleaned_text") != "")
).withColumn(
    "count",
    col("cleaned_text").isNotNull().cast("int")
)
display(mapped_df.limit(10))

# Reduce: group by cleaned_text and sum the counts
reduced_df = mapped_df.groupBy("cleaned_text").sum("count").withColumnRenamed("sum(count)", "count")

display(reduced_df.limit(10))

In [0]:
sorted_df = reduced_df.orderBy(
    col("count").desc()
)
display(sorted_df.limit(10))

In [0]:
#convert the df into a table
sorted_df.select(
    col("cleaned_text").alias("term"),
    col("count").alias("frequency")
).write.mode("overwrite").saveAsTable("wordcount_table")

table_df = spark.read.table("wordcount_table")
display(table_df.limit(10))

In [0]:
df = spark.read.table("wordcount_table")
display(df.orderBy(
    col("frequency").desc()
).limit(25))

Databricks visualization. Run in Databricks to view.

In [0]:
empty_count = final_df.filter(
    col("cleaned_text") == ""
).count()
print(empty_count)