<a href="https://colab.research.google.com/github/wcj365/word-count/blob/master/wordcount_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Simple Word Count Program
## Use Apache Spark

In [56]:
# Install spark-related dependencies

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!wget -q http://apache.osuosl.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz

!tar xf spark-2.4.5-bin-hadoop2.7.tgz

!pip install -q findspark

!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 71kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 50.7MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257927 sha256=c174dcdb317444370a27c87f52e6ee5c820dfe9640ce2b7e8a505aad2bcf30aa
  Stored in directory: /root/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5


In [0]:
import os
import shutil
from pyspark import SparkContext, SparkConf


FILE = "hamlet.txt"     
APP_NAME = "Word Count"        # Spark application name
SPARK_URL = "local[*]"         # Spark master URL

In [0]:
# Set up Spark required environment variables

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [59]:
# Test if Spark works properly

conf = SparkConf().setAppName(APP_NAME).setMaster(SPARK_URL)

sc = SparkContext.getOrCreate(conf)

a = sc.parallelize(range(4))    # Initial the RDD

a.collect()

[0, 1, 2, 3]

In [0]:
# Create a sameple data file

with open(FILE, "w") as file:   # "w" = write
    file.write("To be or\n")            # "\n" = new line
    file.write("Not to be")


In [67]:
# read the file into Spark

words = sc.textFile("hamlet.txt")

words.count()        # how many lines?  

2

In [68]:
# Take the first 10 lines   
       
words.take(10) 

['To be or', 'Not to be']

In [69]:
# Collect all lines 
# Don't do this if the file is large because this tries 
# to get data from # all note to the driver application

words.collect()

['To be or', 'Not to be']

In [70]:
# Split the lines into words

words2 = words.flatMap(lambda line : line.split(" "))
words2.take(20)

['To', 'be', 'or', 'Not', 'to', 'be']

In [72]:
# Cover each entry to a tuple of (<lower-case word>, 1)
# So that we can count later

words3 = words2.map(lambda word : (word.lower(), 1))
words3.take(10)

[('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)]

In [76]:
# Perform the counting

words4 = words3.reduceByKey(lambda word, count : word +count)

words4.take(10)

[('to', 2), ('be', 2), ('or', 1), ('not', 1)]

In [0]:
# save the RDD to folder "output"

if os.path.exists("output"):
    shutil.rmtree("output")
    
words4.saveAsTextFile("output")

In [0]:
# Delete the file

os.remove(FILE)