In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("local[1]").appName("word_count_app").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/21 19:53:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Create RDD from text file
rdd = spark.sparkContext.textFile("test.txt")

In [4]:
# Break down file into list of words
rdd2 = rdd.flatMap(lambda x: x.split(" "))

In [5]:
# Add a column with value 1 for each word encountered
# The result is a PairRDD - which contains key-value pairs
# Key type String - with the word - and value type int with value 1
rdd3 = rdd2.map(lambda x: (x,1))

In [6]:
# reduceByKey() merges the values for each key with the function specified.
# so in our case, we specify it to add the values.
rdd4 = rdd3.reduceByKey(lambda a,b: a+b)
print(rdd4.collect())

[Stage 0:>                                                          (0 + 1) / 1]

[('Project', 9), ('Gutenberg’s', 9), ('Alice’s', 18), ('Adventures', 18), ('in', 18), ('Wonderland', 18), ('by', 18), ('Lewis', 18), ('Carroll', 18), ('This', 27), ('eBook', 27), ('is', 27), ('for', 27), ('the', 27), ('use', 27), ('of', 27), ('anyone', 27), ('anywhere', 27), ('at', 27), ('no', 27), ('cost', 27), ('and', 27), ('with', 27), ('', 1)]


                                                                                

In [7]:
# We first swap the key and value - 
# so that the key is now the word count and the value is the word

rdd5 = rdd4.map(lambda x: (x[1], x[0])).sortByKey()
print(rdd5.collect())

[(1, ''), (9, 'Project'), (9, 'Gutenberg’s'), (18, 'Alice’s'), (18, 'Adventures'), (18, 'in'), (18, 'Wonderland'), (18, 'by'), (18, 'Lewis'), (18, 'Carroll'), (27, 'This'), (27, 'eBook'), (27, 'is'), (27, 'for'), (27, 'the'), (27, 'use'), (27, 'of'), (27, 'anyone'), (27, 'anywhere'), (27, 'at'), (27, 'no'), (27, 'cost'), (27, 'and'), (27, 'with')]


In [8]:
# Filter words that start with 'an'

rdd6 = rdd5.filter(lambda x: 'an' in x[1])

In [9]:
print(rdd6.collect())

[(18, 'Wonderland'), (27, 'anyone'), (27, 'anywhere'), (27, 'and')]
