In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower, explode, split, length, col, lit, log, array_contains, regexp_replace
from wordcloud import WordCloud

In [2]:
STOP_WORDS_PATH = "szekspir/stop_words_english.txt"
TEXTE_PATHS = [
    "szekspir/Hamlet.txt",
    "szekspir/KingLear.txt",
    "szekspir/Othello.txt",
    "szekspir/RomeoJuliet.txt",
]
HAMLET_TEXT_PATH = TEXTE_PATHS[0]

spark = SparkSession.builder.appName("ShakespeareTextProcessing").getOrCreate()

stop_words = []
with open(STOP_WORDS_PATH, 'r') as f:
    for line in f:
        stop_words.append(line.strip().lower())

textes = {}
for path in TEXTE_PATHS:
    with open(path, 'r') as f:
        textes[path] = f.read()

hamlet_df = spark.createDataFrame([(textes[HAMLET_TEXT_PATH],)], ["text"])

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/18 14:39:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
def count_words(df, stop_words):
    return df \
    .select(
        col("doc_id"),
        lower(regexp_replace(col("text"), "[^a-zA-Z0-9\\s]", " ")).alias("cleaned_doc")
    ) \
    .withColumn(
        "word", explode(split(col("cleaned_doc"), "\\s+"))
    ) \
    .filter(
        (col("word") != "") & 
        (~array_contains(lit(stop_words), col("word"))) & 
        (length(col("word")) > 2)
    )

def get_top_n_words(word_counts, n=20):
    return word_counts.groupBy("word").count().orderBy(col("count").desc()).limit(n)

In [4]:
hamlet_text = ""
with open(HAMLET_TEXT_PATH, 'r') as f:
    hamlet_text = f.read()
hamlet_df = spark.createDataFrame([(hamlet_text,)], ["text"])

hamlet_word_counts = count_words(hamlet_df.withColumn("doc_id", lit(1)), stop_words)
print("Word counts in Hamlet:")
print(get_top_n_words(hamlet_word_counts, n=20).show())

wordCloud = WordCloud(width=800, height=400, background_color='white')
wordCloud.generate_from_frequencies(
    {row['word']: row['count'] for row in get_top_n_words(hamlet_word_counts, n=100).collect()}
)
wordCloud.to_file("wordcloud_Hamlet.png")

Word counts in Hamlet:


                                                                                

+-----------+-----+
|       word|count|
+-----------+-----+
|     hamlet|  480|
|       lord|  226|
|       king|  207|
|    horatio|  161|
|      queen|  123|
|   polonius|  119|
|    laertes|  111|
|       good|  108|
|    ophelia|   89|
|        thy|   87|
|        sir|   77|
|rosencrantz|   75|
|      enter|   73|
|        tis|   73|
|     father|   71|
|       love|   69|
|       hath|   65|
|      speak|   64|
|        man|   60|
|       thee|   59|
+-----------+-----+

None


<wordcloud.wordcloud.WordCloud at 0x7f3039cdd010>

In [5]:
def calculate_tf_idf(dfs, stop_words):
    if dfs.count() == 0:
        return spark.createDataFrame([], ["word", "tf_idf"])
    
    word_counts_list = count_words(dfs, stop_words)
    word_frequencies = word_counts_list.groupBy("doc_id", "word").count().withColumnRenamed("count", "term_count")
    doc_lengths = word_counts_list.groupBy("doc_id").count().withColumnRenamed("count", "doc_length")

    tf = word_frequencies.join(doc_lengths, "doc_id") \
        .withColumn("tf", col("term_count") / col("doc_length")) \
        .select("doc_id", "word", "tf")
    
    frequency = word_frequencies.select("word", "doc_id").distinct() \
        .groupBy("word").count().withColumnRenamed("count", "doc_frequency")
    
    idf = frequency.withColumn("idf", 
        log(lit(dfs.select("doc_id").distinct().count()) / col("doc_frequency"))
    ).select("word", "idf") 

    tf_idf = tf.join(idf, "word") \
        .withColumn("tf_idf", col("tf") * col("idf")) \
        .select("doc_id", "word", "tf_idf")
    
    return tf_idf

In [6]:
df_data = [(doc_path.split('/')[-1].replace('.txt', ''), text_content)  for doc_path, text_content in textes.items()]
dfs = spark.createDataFrame(df_data, ["doc_id", "text"])
tf_idf_scores = calculate_tf_idf(dfs, stop_words)

print(tf_idf_scores.orderBy(col("tf_idf").desc()).limit(20).show())

                                                                                

+-----------+---------+--------------------+
|     doc_id|     word|              tf_idf|
+-----------+---------+--------------------+
|     Hamlet|   hamlet|0.053353214667859805|
|    Othello|     iago|0.046850052833203565|
|    Othello|  othello| 0.04347581080089527|
|    Othello|   cassio|0.032963749084857906|
|    Othello|desdemona|0.029459728512845455|
|   KingLear|     lear|0.027721128230426925|
|   KingLear|     kent|0.020701615073366028|
|RomeoJuliet|      rom| 0.02064181792843173|
|RomeoJuliet|    romeo|0.019755359489787423|
|     Hamlet|  horatio|0.017895557419844645|
|    Othello|   emilia|0.017779659939470605|
|   KingLear|     glou|0.014039026314121789|
|    Othello| roderigo|0.013626746668937326|
|     Hamlet| polonius| 0.01322715113640691|
|     Hamlet|  laertes| 0.01233793089194258|
|RomeoJuliet|    friar|0.011777233541988657|
|   KingLear|      edg| 0.01165953032867742|
|     Hamlet|  ophelia|0.009892575219665672|
|RomeoJuliet|    nurse|0.009497768985474723|
|   KingLe

In [7]:
for doc_id in dfs.select("doc_id").distinct().collect():
    doc_tf_idf = tf_idf_scores.filter(col("doc_id") == doc_id['doc_id'])
    wordCloud = WordCloud(width=800, height=400, background_color='white')
    wordCloud.generate_from_frequencies(
        {row['word']: row['tf_idf'] for row in doc_tf_idf.collect()}
    )
    wordCloud.to_file(f"wordcloud_tf_idf_{doc_id['doc_id']}.png")

                                                                                