In [1]:
import pyspark
from pyspark import SparkContext, SparkConf

In [2]:
# Create a SparkSession
conf = pyspark.SparkConf().setAppName('WordCount').setMaster('local[*]') # Using all cores in our machine
sc = pyspark.SparkContext(conf=conf)

In [19]:
# Reading the Data
rdd =sc.textFile('C://spark//spark-3.0.0-bin-hadoop2.7//README.md') # The data will stored in RDD structure

In [20]:
# To see first 10 RDD elements 
rdd.take(10)

['# Apache Spark',
 '',
 'Spark is a unified analytics engine for large-scale data processing. It provides',
 'high-level APIs in Scala, Java, Python, and R, and an optimized engine that',
 'supports general computation graphs for data analysis. It also supports a',
 'rich set of higher-level tools including Spark SQL for SQL and DataFrames,',
 'MLlib for machine learning, GraphX for graph processing,',
 'and Structured Streaming for stream processing.',
 '',
 '<https://spark.apache.org/>']

In [21]:
# Creating a function to Remove Punctuation and Transform All Words to Lowercase
def lower_clean_str(x):
    punc='!@#$%^&*<(,){.}`~":;[]\+=///>_-'
    lowercased_str = x.lower()
    for ch in punc:
        lowercased_str = lowercased_str.replace(ch,'')
    return lowercased_str     

In [22]:
# Running the function on the data
rdd = rdd.map(lower_clean_str)
rdd.take(10)

[' apache spark',
 '',
 'spark is a unified analytics engine for largescale data processing it provides',
 'highlevel apis in scala java python and r and an optimized engine that',
 'supports general computation graphs for data analysis it also supports a',
 'rich set of higherlevel tools including spark sql for sql and dataframes',
 'mllib for machine learning graphx for graph processing',
 'and structured streaming for stream processing',
 '',
 'httpssparkapacheorg']

In [23]:
# Splitting the string by space character.
rdd = rdd.flatMap(lambda satir: satir.split(" "))
rdd.take(10)

['',
 'apache',
 'spark',
 '',
 'spark',
 'is',
 'a',
 'unified',
 'analytics',
 'engine']

In [24]:
# Filtering to exclude whitespaces.
rdd = rdd.filter(lambda x:x!='')
rdd.take(10)

['apache',
 'spark',
 'spark',
 'is',
 'a',
 'unified',
 'analytics',
 'engine',
 'for',
 'largescale']

In [25]:
# Count each word in the file
rdd = rdd.map(lambda word:(word,1))
rdd.take(10)

[('apache', 1),
 ('spark', 1),
 ('spark', 1),
 ('is', 1),
 ('a', 1),
 ('unified', 1),
 ('analytics', 1),
 ('engine', 1),
 ('for', 1),
 ('largescale', 1)]

In [26]:
# Using "reduceByKey()" to find the frequencies of each word
rdd_count = rdd.reduceByKey(lambda x,y:(x+y)).sortByKey()
rdd_count.collect()

[('1000', 2),
 ('1000000000', 2),
 ('1000count', 2),
 ('a', 10),
 ('abbreviated', 1),
 ('about', 1),
 ('against', 1),
 ('also', 5),
 ('alternatively', 1),
 ('an', 4),
 ('analysis', 1),
 ('analytics', 1),
 ('and', 10),
 ('apache', 2),
 ('apis', 1),
 ('appveyor', 1),
 ('are', 1),
 ('at', 2),
 ('available', 1),
 ('basic', 1),
 ('be', 2),
 ('because', 1),
 ('binpyspark', 1),
 ('binrunexample', 3),
 ('binsparkshell', 1),
 ('build', 3),
 ('buildhttpsamplabcsberkeleyedujenkinsjobsparkmastertestsbthadoop27hive23badgeiconhttpsamplabcsberkeleyedujenkinsjobsparkmastertestsbthadoop27hive23',
  1),
 ('buildhttpsimgshieldsioappveyorciapachesoftwarefoundationsparkmastersvg?styleplasticlogoappveyorhttpsciappveyorcomprojectapachesoftwarefoundationspark',
  1),
 ('building', 5),
 ('buildmvn', 1),
 ('built', 2),
 ('can', 6),
 ('changed', 1),
 ('class', 3),
 ('clean', 1),
 ('cluster', 2),
 ('comes', 1),
 ('command', 2),
 ('computation', 1),
 ('configuration', 2),
 ('configure', 1),
 ('contains', 1),
 ('co

In [27]:
# switching (key,val) pairs as (val,key)
rdd_count=rdd_count.map(lambda x:(x[1],x[0]))
rdd_count.take(10)

[(2, '1000'),
 (2, '1000000000'),
 (2, '1000count'),
 (10, 'a'),
 (1, 'abbreviated'),
 (1, 'about'),
 (1, 'against'),
 (5, 'also'),
 (1, 'alternatively'),
 (4, 'an')]

In [32]:
# Most frequent words in ascending order
rdd_count.sortByKey(True).take(30)

[(1, 'abbreviated'),
 (1, 'about'),
 (1, 'against'),
 (1, 'alternatively'),
 (1, 'analysis'),
 (1, 'analytics'),
 (1, 'apis'),
 (1, 'appveyor'),
 (1, 'are'),
 (1, 'available'),
 (1, 'basic'),
 (1, 'because'),
 (1, 'binpyspark'),
 (1, 'binsparkshell'),
 (1,
  'buildhttpsamplabcsberkeleyedujenkinsjobsparkmastertestsbthadoop27hive23badgeiconhttpsamplabcsberkeleyedujenkinsjobsparkmastertestsbthadoop27hive23'),
 (1,
  'buildhttpsimgshieldsioappveyorciapachesoftwarefoundationsparkmastersvg?styleplasticlogoappveyorhttpsciappveyorcomprojectapachesoftwarefoundationspark'),
 (1, 'buildmvn'),
 (1, 'changed'),
 (1, 'clean'),
 (1, 'comes'),
 (1, 'computation'),
 (1, 'configure'),
 (1, 'contains'),
 (1, 'contribution'),
 (1, 'core'),
 (1,
  'coveragehttpsimgshieldsiobadgedynamicxmlsvg?labelpyspark20coverageurlhttps3a2f2fsparktestgithubio2fpysparkcoveragesitequery2fhtml2fbody2fdiv5b15d2fdiv2fh12fspancolorbbrightgreenstyleplastichttpssparktestgithubiopysparkcoveragesite'),
 (1, 'dataframes'),
 (1, 'de

In [29]:
# Most frequent words in descending order
rdd_count.sortByKey(False).take(10)

[(24, 'the'),
 (18, 'to'),
 (16, 'spark'),
 (15, 'for'),
 (10, 'a'),
 (10, 'and'),
 (9, 'run'),
 (8, 'you'),
 (7, 'is'),
 (7, 'on')]