## Word Count

In [1]:
import os
import requests
from urllib.parse import urlparse
from pyspark import SparkConf
from pyspark.sql import SparkSession

In [2]:
def download_file(url, filename=None):
    if filename is None:
        filename = os.path.basename(urlparse(url).path)
    response = requests.get(url)
    with open(filename, 'w') as stream:
        stream.write(response.content.decode('utf-8'))

In [3]:
download_file('https://www.gutenberg.org/files/2600/2600-0.txt', 'war_and_peace.txt')

### Initialize Spark

In [4]:
# initialization of spark context
conf = SparkConf().setAppName('WordCount').setMaster('local[*]') 
spark = SparkSession\
        .builder\
        .appName("PythonWordCount")\
        .config(conf=conf)\
        .getOrCreate()

In [5]:
sc = spark.sparkContext

In [6]:
spark

In [7]:
sc

### Create RDDs

In [10]:
# read data from disk, as a result we get RDD of lines
linesRDD = sc.textFile('war_and_peace.txt')

In [11]:
linesRDD

war_and_peace.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

In [12]:
# from RDD of lines create RDD of lists of words 
wordsRDD = linesRDD.flatMap(lambda line: line.split(' '))

In [13]:
print(wordsRDD.toDebugString())

b'(1) PythonRDD[4] at RDD at PythonRDD.scala:53 []\n |  war_and_peace.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0 []\n |  war_and_peace.txt HadoopRDD[2] at textFile at NativeMethodAccessorImpl.java:0 []'


In [14]:
# from RDD of lists of words make RDD of words tuples where 
# the first element is a word and the second is counter, at the
# beginning it should be 1
wordCountRDD = wordsRDD.map(lambda word: (word, 1))

In [15]:
wordCountRDD

PythonRDD[5] at RDD at PythonRDD.scala:53

In [16]:
# combine elements with the same word value
resultRDD = wordCountRDD.reduceByKey(lambda a, b: a + b)

In [17]:
# results sorted by highest
resultsortedRDD = resultRDD.sortBy(lambda x: x[1], ascending=False)

### Show and Save Results

In [18]:
# call an action - get first 10
resultsortedRDD.take(10)

[('the', 31704),
 ('and', 20564),
 ('', 16774),
 ('to', 16322),
 ('of', 14857),
 ('a', 10017),
 ('in', 8228),
 ('he', 7631),
 ('his', 7630),
 ('that', 7229)]

In [None]:
# write it back to disk
resultRDD.saveAsTextFile('word_counts_war_and_peace.txt')

In [19]:
spark.stop()