In [1]:
import logging

import sys
from pyspark.sql import SparkSession

from data_transformations.wordcount import word_count_transformer

LOG_FILENAME = 'project.log'
APP_NAME = "WordCount"

if __name__ == '__main__':
    logging.basicConfig(filename=LOG_FILENAME, level=logging.INFO)
    logging.info(sys.argv)

    # if len(sys.argv) is not 3:
    #     logging.warning("Input .txt file and output path are required")
    #     sys.exit(1)

    spark = SparkSession.builder.appName(APP_NAME).getOrCreate()
    sc = spark.sparkContext
    app_name = sc.appName
    logging.info("Application Initialized: " + app_name)
    input_path = "./resources/word_count/words.txt"
    output_path = "./output"
#     word_count_transformer.run(spark, input_path, output_path)
#     logging.info("Application Done: " + spark.sparkContext.appName)
#     spark.stop()


22/04/26 16:20:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/26 16:20:36 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
input_df = spark.read.text(input_path)
input_df.printSchema()

root
 |-- value: string (nullable = true)



In [3]:
input_df.show(10)

+--------------------+
|               value|
+--------------------+
|The Project Guten...|
|Translated by Dav...|
|                    |
|                    |
|This eBook is for...|
|almost no restric...|
|re-use it under t...|
|with this eBook o...|
|                    |
|                    |
+--------------------+
only showing top 10 rows



In [4]:
input_df.count()

786

In [4]:
input_df.rdd.getNumPartitions()

1

In [19]:
from pyspark.ml.feature import Tokenizer


tokenizer = Tokenizer(inputCol="value", outputCol="words")
tokenized = tokenizer.transform(input_df)
tokenized.show()


+--------------------+--------------------+
|               value|               words|
+--------------------+--------------------+
|The Project Guten...|[the, project, gu...|
|Translated by Dav...|[translated, by, ...|
|                    |                  []|
|                    |                  []|
|This eBook is for...|[this, ebook, is,...|
|almost no restric...|[almost, no, rest...|
|re-use it under t...|[re-use, it, unde...|
|with this eBook o...|[with, this, eboo...|
|                    |                  []|
|                    |                  []|
|** This is a COPY...|[**, this, is, a,...|
|**     Please fol...|[**, , , , , plea...|
|                    |                  []|
|                    |                  []|
|                    |                  []|
|                    |                  []|
|Title: Metamorphosis|[title:, metamorp...|
|                    |                  []|
|                    |                  []|
| Author: Franz Kafka|[author:, 

In [30]:
from pyspark.sql.functions import count, explode
tokenized.withColumn("word", explode(col("words"))) \
    .groupBy("word") \
    .agg(count("*").alias('count')) \
    .sort('word', ascending=True) \
    .show()

+----------+-----+
|      word|count|
+----------+-----+
|          |  579|
|    "ah!",|    1|
| "alright,|    1|
|       "am|    1|
|      "and|    2|
|    "anna!|    1|
|   "aren't|    1|
|       "at|    1|
|   "before|    1|
|      "but|    2|
|      "can|    1|
|  "cheerio|    1|
|    "close|    1|
|     "come|    4|
|  "dead?",|    1|
|"defects,"|    1|
|      "did|    1|
|  "father,|    1|
|      "get|    1|
|  "getting|    1|
+----------+-----+
only showing top 20 rows



In [14]:
from pyspark.sql.functions import explode, split, col

x = input_df.withColumn('word', explode(split(col('value'), ' ')))\
    .groupBy('word')\
    .count()\
    .sort('word', ascending=True)\

x.show(50)

+------------+-----+
|        word|count|
+------------+-----+
|            |  579|
|      "Ah!",|    1|
|   "Alright,|    1|
|         "Am|    1|
|        "And|    1|
|      "Anna!|    1|
|     "Aren't|    1|
|         "At|    1|
|     "Before|    1|
|        "But|    1|
|        "Can|    1|
|    "Cheerio|    1|
|      "Close|    1|
|       "Come|    3|
|    "Dead?",|    1|
|  "Defects,"|    1|
|        "Did|    1|
|    "Father,|    1|
|    "Getting|    1|
|        "God|    1|
|       "Good|    1|
|    "Gregor!|    1|
|   "Gregor!"|    1|
|   "Gregor",|    2|
|    "Gregor,|    3|
|    "Gregor?|    1|
|     "Grete,|    1|
|         "He|    2|
|       "He's|    1|
|      "Help,|    1|
|        "How|    1|
|          "I|    4|
|        "I'd|    1|
|       "I'll|    3|
|        "I'm|    1|
|         "If|    2|
|"Information|    1|
|         "Is|    1|
|       "It's|    1|
|       "Just|    2|
|      "Leave|    1|
|        "Let|    1|
|   "Listen",|    1|
|      "Maybe|    1|
|   "Mother's

In [16]:
x.coalesce(1).show(10)



+---------+-----+
|     word|count|
+---------+-----+
|         |  579|
|   "Ah!",|    1|
|"Alright,|    1|
|      "Am|    1|
|     "And|    1|
|   "Anna!|    1|
|  "Aren't|    1|
|      "At|    1|
|  "Before|    1|
|     "But|    1|
+---------+-----+
only showing top 10 rows



                                                                                