# Big Data Algorithm & Platforms, Assignment 2, Spark
## Caroline FAVART - Juan LONDONO

# Spark (with PySpark)


In this tutorial, you will learn how to use Apache Spark, a framework for large-scale data processing, within a notebook.
Upon completing this lab you will be able to : 
 + Program in Spark with the Python Language
 + Demonstrate how to read and process data using Spark
 + Compare and contrast RDD and Dataframes. 
 + Build a simple machine learning application with Spark.
 
 
 
# Requirements : Installing Spark, PySpark and configuration to run it on jupyter notebooks

According to your OS, you have to follow the following tutorial :

+ [Windows](https://changhsinlee.com/install-pyspark-windows-jupyter/)
+ [Mac Os X](https://jmedium.com/pyspark-in-mac/)
 
At the end of the installation, you should be able to run the following code that is a kind of Hello word in PySpark.

In [1]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.sql('''select 'spark' as hello ''')
df.show()

IndexError: list index out of range

In [2]:
spark.stop()

## Part 1 :  Spark Context

When writing a spark program, the first thing to do is to define a `SparkContext`. 

In Spark, communication occurs between a driver and executors. The driver has Spark jobs that it needs to run and these jobs are split into tasks that are submitted to the executors for completion. The results from these tasks are delivered back to the driver.

Here, we will use the `findspark` package that has to be installed using the following command: 

` pip3 install findspark`

Then we can use the `findspark.init()`function to locate the Spark process.




In [3]:
# Import of findspark and pyspark and definition of the Spark Context

import findspark

findspark.init()

import pyspark

sc=pyspark.SparkContext("local[*]","First program in Spark")

IndexError: list index out of range

Try printing out sc to see its type.

In [4]:
# Printing the Spark Context

print(sc, 'type : ', type(sc))

(<SparkContext master=local[*] appName=First program in Spark>, 'type : ', <class 'pyspark.context.SparkContext'>)


sc is a SparkContext class object.

You can use Python's `dir()` function to get a list of all the attributes (including methods) accessible through the `sc` object.

In [5]:
# Printing the list of all attributes accessible through sc
dir(sc)

['PACKAGE_EXTENSIONS',
 '__class__',
 '__delattr__',
 '__dict__',
 '__doc__',
 '__enter__',
 '__exit__',
 '__format__',
 '__getattribute__',
 '__getnewargs__',
 '__hash__',
 '__init__',
 '__module__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_accumulatorServer',
 '_active_spark_context',
 '_batchSize',
 '_callsite',
 '_checkpointFile',
 '_conf',
 '_dictToJavaMap',
 '_do_init',
 '_encryption_enabled',
 '_ensure_initialized',
 '_gateway',
 '_getJavaStorageLevel',
 '_initialize_context',
 '_javaAccumulator',
 '_jsc',
 '_jvm',
 '_lock',
 '_next_accum_id',
 '_pickled_broadcast_vars',
 '_python_includes',
 '_repr_html_',
 '_serialize_to_jvm',
 '_temp_dir',
 '_unbatched_serializer',
 'accumulator',
 'addFile',
 'addPyFile',
 'appName',
 'applicationId',
 'binaryFiles',
 'binaryRecords',
 'broadcast',
 'cancelAllJobs',
 'cancelJobGroup',
 'defaultMinPartitions',
 'defaultParallelism',
 'dump_prof

## Part 2 : Understanding Spark RDD’s

### WordCount in Spark
In this part, we will write the wordcount in Spark and apply it on the novel Dracula of Bram Stocker (from the Gutemberg project).

First read the [pg345.txt](./SparkData/pg345.txt) file.

In [6]:
# Loading the text data
data = sc.textFile('./SparkData/pg345.txt')
type(data)

pyspark.rdd.RDD

To see the content, of the file, we need to run the action `collect` on the  RDD `data`

In [7]:
# Displaying the data (as it is a RDD, we use the action collect)
data.collect()

[u'The Project Gutenberg EBook of Dracula, by Bram Stoker',
 u'',
 u'This eBook is for the use of anyone anywhere at no cost and with',
 u'almost no restrictions whatsoever.  You may copy it, give it away or',
 u're-use it under the terms of the Project Gutenberg License included',
 u'with this eBook or online at www.gutenberg.org/license',
 u'',
 u'',
 u'Title: Dracula',
 u'',
 u'Author: Bram Stoker',
 u'',
 u'Release Date: August 16, 2013 [EBook #345]',
 u'',
 u'Language: English',
 u'',
 u'',
 u'*** START OF THIS PROJECT GUTENBERG EBOOK DRACULA ***',
 u'',
 u'',
 u'',
 u'',
 u'Produced by Chuck Greif and the Online Distributed',
 u'Proofreading Team at http://www.pgdp.net (This file was',
 u'produced from images generously made available by The',
 u'Internet Archive)',
 u'',
 u'',
 u'',
 u'',
 u'',
 u'',
 u'',
 u'                                DRACULA',
 u'',
 u'',
 u'',
 u'',
 u'',
 u'                                DRACULA',
 u'',
 u'                                  _by_',
 u'',

With the RDD `data`, from the previous cell, execute a `flatMap()` for each line in the input and then convert it to lower case, remove the commas, split the words on a space and store in the RDD `words`

In [8]:
# Creating a RDD containing the words of the text

# We use flatMap to execute the required actions for each line of data and to "flatten" the result
words=data.flatMap(lambda line : line.lower().replace(',','').split(' '))

# We display the RDD words
words.collect()

# Comment : we observe a u before each string, this corresponds to the encoding of the word (unicode) 
# and can be useful for some symbols. Thus, we decided not to remove it.

[u'the',
 u'project',
 u'gutenberg',
 u'ebook',
 u'of',
 u'dracula',
 u'by',
 u'bram',
 u'stoker',
 u'',
 u'this',
 u'ebook',
 u'is',
 u'for',
 u'the',
 u'use',
 u'of',
 u'anyone',
 u'anywhere',
 u'at',
 u'no',
 u'cost',
 u'and',
 u'with',
 u'almost',
 u'no',
 u'restrictions',
 u'whatsoever.',
 u'',
 u'you',
 u'may',
 u'copy',
 u'it',
 u'give',
 u'it',
 u'away',
 u'or',
 u're-use',
 u'it',
 u'under',
 u'the',
 u'terms',
 u'of',
 u'the',
 u'project',
 u'gutenberg',
 u'license',
 u'included',
 u'with',
 u'this',
 u'ebook',
 u'or',
 u'online',
 u'at',
 u'www.gutenberg.org/license',
 u'',
 u'',
 u'title:',
 u'dracula',
 u'',
 u'author:',
 u'bram',
 u'stoker',
 u'',
 u'release',
 u'date:',
 u'august',
 u'16',
 u'2013',
 u'[ebook',
 u'#345]',
 u'',
 u'language:',
 u'english',
 u'',
 u'',
 u'***',
 u'start',
 u'of',
 u'this',
 u'project',
 u'gutenberg',
 u'ebook',
 u'dracula',
 u'***',
 u'',
 u'',
 u'',
 u'',
 u'produced',
 u'by',
 u'chuck',
 u'greif',
 u'and',
 u'the',
 u'online',
 u'distrib

In [9]:
# Removing the empty rows (rows with only " u'' ") using the action filter

words=words.filter(lambda x: x is not u'')
words.collect()

[u'the',
 u'project',
 u'gutenberg',
 u'ebook',
 u'of',
 u'dracula',
 u'by',
 u'bram',
 u'stoker',
 u'this',
 u'ebook',
 u'is',
 u'for',
 u'the',
 u'use',
 u'of',
 u'anyone',
 u'anywhere',
 u'at',
 u'no',
 u'cost',
 u'and',
 u'with',
 u'almost',
 u'no',
 u'restrictions',
 u'whatsoever.',
 u'you',
 u'may',
 u'copy',
 u'it',
 u'give',
 u'it',
 u'away',
 u'or',
 u're-use',
 u'it',
 u'under',
 u'the',
 u'terms',
 u'of',
 u'the',
 u'project',
 u'gutenberg',
 u'license',
 u'included',
 u'with',
 u'this',
 u'ebook',
 u'or',
 u'online',
 u'at',
 u'www.gutenberg.org/license',
 u'title:',
 u'dracula',
 u'author:',
 u'bram',
 u'stoker',
 u'release',
 u'date:',
 u'august',
 u'16',
 u'2013',
 u'[ebook',
 u'#345]',
 u'language:',
 u'english',
 u'***',
 u'start',
 u'of',
 u'this',
 u'project',
 u'gutenberg',
 u'ebook',
 u'dracula',
 u'***',
 u'produced',
 u'by',
 u'chuck',
 u'greif',
 u'and',
 u'the',
 u'online',
 u'distributed',
 u'proofreading',
 u'team',
 u'at',
 u'http://www.pgdp.net',
 u'(this',

Execute the command with the action `take()` and retrieve the first 10 words from the `flatMap()`transformation

In [10]:
# Executing the action take() to collect the first 10 words of the RDD words

words.take(10)

# Comment : it creates a list with those words.

[u'the',
 u'project',
 u'gutenberg',
 u'ebook',
 u'of',
 u'dracula',
 u'by',
 u'bram',
 u'stoker',
 u'this']

Perform a classic `map()` to create a tuple where each word has a count of 1

In [11]:
# Creation of a list of tuples with each word and an associated count of 1 using map()

map_words=words.map(lambda word : (word,1))
map_words.collect()

[(u'the', 1),
 (u'project', 1),
 (u'gutenberg', 1),
 (u'ebook', 1),
 (u'of', 1),
 (u'dracula', 1),
 (u'by', 1),
 (u'bram', 1),
 (u'stoker', 1),
 (u'this', 1),
 (u'ebook', 1),
 (u'is', 1),
 (u'for', 1),
 (u'the', 1),
 (u'use', 1),
 (u'of', 1),
 (u'anyone', 1),
 (u'anywhere', 1),
 (u'at', 1),
 (u'no', 1),
 (u'cost', 1),
 (u'and', 1),
 (u'with', 1),
 (u'almost', 1),
 (u'no', 1),
 (u'restrictions', 1),
 (u'whatsoever.', 1),
 (u'you', 1),
 (u'may', 1),
 (u'copy', 1),
 (u'it', 1),
 (u'give', 1),
 (u'it', 1),
 (u'away', 1),
 (u'or', 1),
 (u're-use', 1),
 (u'it', 1),
 (u'under', 1),
 (u'the', 1),
 (u'terms', 1),
 (u'of', 1),
 (u'the', 1),
 (u'project', 1),
 (u'gutenberg', 1),
 (u'license', 1),
 (u'included', 1),
 (u'with', 1),
 (u'this', 1),
 (u'ebook', 1),
 (u'or', 1),
 (u'online', 1),
 (u'at', 1),
 (u'www.gutenberg.org/license', 1),
 (u'title:', 1),
 (u'dracula', 1),
 (u'author:', 1),
 (u'bram', 1),
 (u'stoker', 1),
 (u'release', 1),
 (u'date:', 1),
 (u'august', 1),
 (u'16', 1),
 (u'2013', 1

Write the reducing function

In [12]:
# Reducing function :

# We want to sum the counts by word. We use the reduceByKey transformation and display the result.

from operator import add

reduce_words=map_words.reduceByKey(add)
reduce_words.collect()

[(u'funereal', 1),
 (u'shouted.', 1),
 (u'"_czarina', 2),
 (u'yellow', 6),
 (u'four', 23),
 (u"friend's", 2),
 (u"bringin'", 1),
 (u'increase', 2),
 (u'_might_', 1),
 (u'lore', 1),
 (u'should.', 1),
 (u'sinking', 6),
 (u'spider.', 1),
 (u'wood.', 3),
 (u'fingers"', 1),
 (u'foul', 13),
 (u'fingers.', 4),
 (u'screaming', 2),
 (u'writing.', 3),
 (u'disturb', 9),
 (u'stars;', 1),
 (u'wooden', 6),
 (u"everything.'", 1),
 (u"lor'", 3),
 (u'straight', 20),
 (u'colophon]', 1),
 (u'nigh', 8),
 (u'ate.', 1),
 (u'bacon', 1),
 (u'pulse', 2),
 (u'her;', 16),
 (u'second', 17),
 (u'her?', 2),
 (u'sailed', 4),
 (u'misjudge;', 1),
 (u'ruthless', 2),
 (u'comically', 1),
 (u'nature.', 6),
 (u"madman's", 1),
 (u'understood.', 2),
 (u'up.', 22),
 (u'so."', 5),
 (u'hero', 1),
 (u'here', 211),
 (u'china', 2),
 (u'hers', 7),
 (u'chink', 1),
 (u'shock.', 2),
 (u'elaborate', 1),
 (u'"why', 11),
 (u'"paprika', 1),
 (u'finger.', 2),
 (u'pursuers.', 1),
 (u'dimness', 1),
 (u'"who', 1),
 (u'criticism', 1),
 (u'clas

Get all words that occur more than once and sort them alphabetically:

In [13]:
# Sorting alphabetically the words occuring more than once :

# First, we use filter to take only the tuples where the second element is strictly higher than 1.
frequent_words=reduce_words.filter(lambda (x,y) : y>1)

# Then, we sort the corresponding list.
sorted(frequent_words.collect())

[(u'"\'my', 2),
 (u'"_17', 4),
 (u'"_2', 2),
 (u'"_24', 2),
 (u'"_25', 4),
 (u'"_6', 2),
 (u'"_czarina', 2),
 (u'"a', 12),
 (u'"ah', 18),
 (u'"aha!"', 2),
 (u'"all', 8),
 (u'"am', 2),
 (u'"and', 53),
 (u'"are', 5),
 (u'"arthur!', 2),
 (u'"as', 6),
 (u'"at', 3),
 (u'"be', 2),
 (u'"because', 8),
 (u'"because"', 2),
 (u'"believe', 3),
 (u'"bloofer', 6),
 (u'"brave', 2),
 (u'"but', 36),
 (u'"but"', 4),
 (u'"by', 3),
 (u'"can', 4),
 (u'"certainly', 2),
 (u'"come', 14),
 (u'"come!"', 3),
 (u'"come"', 4),
 (u'"count', 4),
 (u'"darkness', 3),
 (u'"dear', 6),
 (u'"did', 3),
 (u'"do', 20),
 (u'"don\'t', 3),
 (u'"dr.', 12),
 (u'"except', 2),
 (u'"fifty', 2),
 (u'"for', 8),
 (u'"forgive', 4),
 (u'"friend', 11),
 (u'"go', 8),
 (u'"god', 4),
 (u'"god!', 2),
 (u'"good', 7),
 (u'"good!"', 4),
 (u'"good-bye', 2),
 (u'"have', 6),
 (u'"he', 15),
 (u'"here', 2),
 (u'"his', 3),
 (u'"how', 16),
 (u'"hush!', 2),
 (u'"i', 169),
 (u'"i\'ll', 3),
 (u'"i\'m', 3),
 (u'"if', 9),
 (u'"in', 10),
 (u'"is', 7),
 (u'"i

Now, get all words that occur more than once and sort them by frequency. Select the first 20. Hints : look at the `takeOrdered` action.

In [14]:
# Sorting by frequency the words occuring more than once :

# We use the takeOrdered action. The first argument corresponds to the number of words we want to select.
# We need to consider the opposite of the number of times a words appears, as takeOrdered uses an ascending order.
frequent_words.takeOrdered(20,key=lambda x: -x[1])

[(u'the', 7984),
 (u'and', 5840),
 (u'i', 4527),
 (u'to', 4518),
 (u'of', 3719),
 (u'a', 2933),
 (u'he', 2523),
 (u'in', 2512),
 (u'that', 2415),
 (u'it', 1876),
 (u'was', 1848),
 (u'as', 1573),
 (u'for', 1514),
 (u'we', 1499),
 (u'is', 1489),
 (u'his', 1457),
 (u'not', 1350),
 (u'with', 1314),
 (u'you', 1302),
 (u'my', 1213)]

### A simple exercice

+ Create a Python collection of 10,000 integers
+ Create a Spark base RDD from that collection
+ Subtract one from each value using map
+ Perform action collect to view results
+ Perform action count to view counts
+ Apply transformation filter and view results with collect

In [15]:
# We create a python collection (a list) of 10,000 random integers going from 0 to 4999 using numpy randint function.

import numpy as np

int_list = [ np.random.randint(5000) for i in range(10000)]

In [16]:
# We transform this list into a RDD with the function parallelize.

int_RDD = sc.parallelize(int_list)

# Checking the type of int_RDD :
type(int_RDD)

pyspark.rdd.RDD

In [17]:
# Then, we subtract one from each value.

# We use the map function :
sub_RDD = int_RDD.map(lambda x: x-1)

# We display the result by performing the action collect :
sub_RDD.collect()

[3039,
 4141,
 4778,
 1468,
 3638,
 1012,
 2976,
 3859,
 42,
 3545,
 4813,
 2057,
 598,
 2928,
 1724,
 1466,
 4859,
 2491,
 3309,
 2102,
 1407,
 4609,
 3783,
 4300,
 4927,
 1299,
 1450,
 2345,
 3616,
 4251,
 1589,
 2629,
 2790,
 1430,
 448,
 3434,
 4498,
 3341,
 1501,
 1811,
 4344,
 2088,
 4570,
 1807,
 3888,
 2201,
 1214,
 3269,
 1610,
 780,
 4028,
 3936,
 880,
 1989,
 1425,
 4668,
 4403,
 4420,
 2495,
 4768,
 1100,
 789,
 4440,
 1279,
 704,
 3234,
 179,
 2968,
 1369,
 1834,
 1030,
 4198,
 3003,
 1670,
 2842,
 1931,
 1634,
 1125,
 1065,
 504,
 1789,
 293,
 4455,
 3589,
 3522,
 4477,
 3089,
 1156,
 2395,
 708,
 1256,
 2307,
 1670,
 2769,
 1960,
 4855,
 3846,
 3017,
 3875,
 476,
 4915,
 413,
 2786,
 3609,
 738,
 2148,
 977,
 4373,
 3893,
 1883,
 3269,
 987,
 1028,
 3898,
 2465,
 2253,
 1485,
 2064,
 2072,
 1508,
 1409,
 914,
 4,
 862,
 4741,
 4708,
 422,
 2626,
 1495,
 4174,
 2091,
 4765,
 4070,
 199,
 3526,
 4945,
 3364,
 3651,
 2427,
 3298,
 1168,
 1335,
 1410,
 2221,
 4245,
 4753,
 3

In [18]:
# Following the same method as above, we create tuples with the counts of each number

# Mapping tuples with count of one for each word :
map_int=sub_RDD.map(lambda word : (word,1))

# Reducing by key to sum the counts by word : 
reduce_int=map_int.reduceByKey(add)

# Perfoming the action collect to view the results : 
reduce_int.collect()

[(0, 1),
 (4096, 2),
 (4228, 2),
 (4, 2),
 (2796, 1),
 (2056, 3),
 (4780, 3),
 (2728, 1),
 (4108, 5),
 (3780, 1),
 (2264, 5),
 (2064, 1),
 (4440, 2),
 (20, 4),
 (2592, 1),
 (2072, 2),
 (2052, 5),
 (1160, 1),
 (2076, 1),
 (32, 2),
 (2736, 1),
 (2456, 2),
 (4132, 1),
 (3820, 1),
 (2088, 1),
 (1372, 2),
 (956, 1),
 (44, 3),
 (4160, 2),
 (2684, 2),
 (48, 2),
 (3080, 1),
 (3944, 2),
 (820, 1),
 (56, 2),
 (4788, 1),
 (60, 1),
 (4912, 4),
 (64, 2),
 (4448, 4),
 (68, 3),
 (2048, 1),
 (72, 4),
 (2060, 4),
 (4776, 2),
 (2124, 4),
 (2128, 3),
 (2744, 1),
 (84, 3),
 (3156, 2),
 (2660, 1),
 (2404, 1),
 (3208, 1),
 (92, 1),
 (1776, 1),
 (4192, 4),
 (4112, 3),
 (4504, 1),
 (2148, 2),
 (3928, 1),
 (276, 1),
 (104, 2),
 (700, 2),
 (2156, 1),
 (4820, 1),
 (112, 3),
 (360, 1),
 (4212, 2),
 (1504, 1),
 (120, 2),
 (2068, 2),
 (1436, 1),
 (124, 4),
 (128, 1),
 (1728, 2),
 (2732, 1),
 (132, 2),
 (28, 1),
 (2184, 2),
 (4460, 1),
 (1232, 2),
 (2192, 1),
 (24, 3),
 (148, 2),
 (4188, 4),
 (152, 2),
 (708, 3),
 (

In [19]:
# Displaying all numbers appearing more than 5 times, sorted in ascending order 

# We perform the transformation filter, then the action collect to turn the RDD into a list and finally we sort the result.
sorted(reduce_int.filter(lambda (x,y) : y>5).collect())

[(97, 6),
 (257, 6),
 (401, 7),
 (404, 7),
 (444, 6),
 (472, 6),
 (618, 6),
 (652, 6),
 (717, 10),
 (749, 6),
 (817, 7),
 (827, 8),
 (921, 6),
 (977, 6),
 (1113, 6),
 (1288, 7),
 (1431, 6),
 (1517, 6),
 (1536, 6),
 (1672, 6),
 (1870, 6),
 (2102, 7),
 (2305, 6),
 (2337, 6),
 (2478, 6),
 (2486, 6),
 (2537, 8),
 (2581, 7),
 (2686, 7),
 (2699, 6),
 (2708, 6),
 (2711, 8),
 (2717, 7),
 (2780, 6),
 (2803, 6),
 (2813, 7),
 (2856, 6),
 (2899, 6),
 (3025, 6),
 (3046, 9),
 (3128, 6),
 (3251, 6),
 (3269, 7),
 (3270, 6),
 (3307, 6),
 (3309, 7),
 (3491, 6),
 (3800, 6),
 (3831, 7),
 (3862, 7),
 (3868, 6),
 (3878, 7),
 (3911, 6),
 (4013, 6),
 (4048, 6),
 (4158, 6),
 (4162, 6),
 (4241, 7),
 (4386, 7),
 (4436, 6),
 (4450, 6),
 (4494, 6),
 (4538, 7),
 (4628, 6),
 (4737, 6),
 (4747, 6),
 (4753, 7),
 (4829, 6),
 (4841, 8),
 (4887, 7),
 (4938, 6),
 (4964, 6)]

### An improved WordCount

Print the top 10 most frequent words with their probability of appearance


In [20]:
# Printing the 10 most frequent words with their probability of appearance :

#  We take the 10 words with the highest count with takeOrdered (as above)
ten_freq = frequent_words.takeOrdered(10,key=lambda x: -x[1])

# Using the count action, we compute the total number of words in the text
tot_words = words.count()

# For each word, we print the probability of appearance (nb of times it appears/total number of words)
for x in ten_freq:
    print("Word: {}, Proba of appearence: {:.2f}%".format(x[0], 100.0*x[1]/tot_words))

Word: the, Proba of appearence: 4.86%
Word: and, Proba of appearence: 3.55%
Word: i, Proba of appearence: 2.75%
Word: to, Proba of appearence: 2.75%
Word: of, Proba of appearence: 2.26%
Word: a, Proba of appearence: 1.78%
Word: he, Proba of appearence: 1.53%
Word: in, Proba of appearence: 1.53%
Word: that, Proba of appearence: 1.47%
Word: it, Proba of appearence: 1.14%


Get rid of special characters (.,:!?')


In [21]:
# We want to remove the special characters, contained in string.punctuation

import string,re

# We use the compile function of re
regex = re.compile('[%s]' % re.escape(string.punctuation))

# and map it to each line with sub to replace the punctuation by '' (empty)
words_bis = words.map(lambda line : regex.sub('',line))

# Displaying the results
words_bis.collect()

[u'the',
 u'project',
 u'gutenberg',
 u'ebook',
 u'of',
 u'dracula',
 u'by',
 u'bram',
 u'stoker',
 u'this',
 u'ebook',
 u'is',
 u'for',
 u'the',
 u'use',
 u'of',
 u'anyone',
 u'anywhere',
 u'at',
 u'no',
 u'cost',
 u'and',
 u'with',
 u'almost',
 u'no',
 u'restrictions',
 u'whatsoever',
 u'you',
 u'may',
 u'copy',
 u'it',
 u'give',
 u'it',
 u'away',
 u'or',
 u'reuse',
 u'it',
 u'under',
 u'the',
 u'terms',
 u'of',
 u'the',
 u'project',
 u'gutenberg',
 u'license',
 u'included',
 u'with',
 u'this',
 u'ebook',
 u'or',
 u'online',
 u'at',
 u'wwwgutenbergorglicense',
 u'title',
 u'dracula',
 u'author',
 u'bram',
 u'stoker',
 u'release',
 u'date',
 u'august',
 u'16',
 u'2013',
 u'ebook',
 u'345',
 u'language',
 u'english',
 u'',
 u'start',
 u'of',
 u'this',
 u'project',
 u'gutenberg',
 u'ebook',
 u'dracula',
 u'',
 u'produced',
 u'by',
 u'chuck',
 u'greif',
 u'and',
 u'the',
 u'online',
 u'distributed',
 u'proofreading',
 u'team',
 u'at',
 u'httpwwwpgdpnet',
 u'this',
 u'file',
 u'was',
 u'p

### Identify the transformations and the actions in your script (first wordcount)


> We have the following actions : take, takeOrdered, collect, count.

> We used the following transformations : flatMap, map, filter, reduceByKey.

### How many times are the transformations evaluated? 


> Transformations are evaluated each time an action is taken on it. Thus, it depends.

### Can you reduce this number? 

> We can use persist or cache to cache the RDD in memory or on disk. It would avoid the recomputation of each transformation.

In [22]:
sc.stop()

## Part 3 : Spark SQL and dataframes

In this part, you will explore Spark DataFrames and the SQL Context. In particular, we will work on a database that contains a sample of the world population by working on data that comes from [pplapi](http://pplapi.com/). The file [agents.json](./SparkData/agents.json) is a file that was extracted from this api using the following command :


`wget https://s3-eu-west-1.amazonaws.com/course.oc-static.com/courses/4297166/agents.json`

In [23]:
# Preparing the session
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# Reading the data
df = spark.read.json("./SparkData/agents.json")

Print the 5 first lines of the dataframe `df`

In [24]:
# Printing the 5 first lines of the spark dataframe with the head function

df.head(5)

[Row(country_name=u'China', id=227417393, latitude=33.15219798270325, longitude=100.85840672174572, sex=u'Male'),
 Row(country_name=u'Haiti', id=6821129477, latitude=19.325567983697297, longitude=-72.43795260265814, sex=u'Female'),
 Row(country_name=u'India', id=2078667700, latitude=23.645271492037235, longitude=80.85636526088884, sex=u'Female'),
 Row(country_name=u'China', id=477556555, latitude=33.45864668881662, longitude=93.33604038078953, sex=u'Female'),
 Row(country_name=u'India', id=1379059984, latitude=28.816938290678692, longitude=80.7728698035823, sex=u'Female')]

Write the spark instructions that enable to display the number of French agents.

In [25]:
# Displaying the number of French agents

# We use filter on the dataframe to select only French agents and then count to compute the number.
df.filter(df.country_name=='France').count()

94

Write the spark instructions that enable to display the number of Indian female agents.

In [26]:
# Displaying the number of Indian female agents

# We use filter on the dataframe to select only Indian and female agents and then count to compute the number.

df.filter((df.country_name=='India')&(df.sex=='Female')).count()

828

Using the notion of temporary view (function `createTempView`), create a temporary view associated to the dataframe `df`. 

In [27]:
# We create a temporary view for the dataframe df : 

df.createTempView("temp_table")

Write some SQL query on the resulting table as shown in the example below.

In [28]:
spark.sql("SELECT country_name,id FROM temp_table ORDER BY id DESC LIMIT 10").show()


+-----------------+----------+
|     country_name|        id|
+-----------------+----------+
| French Polynesia|7170821229|
|       Cabo Verde|7167692449|
|         Suriname|7166451460|
|         Suriname|7166235088|
|            Macau|7166034642|
|       Montenegro|7164357515|
|Equatorial Guinea|7163867872|
|           Bhutan|7163256789|
|           Bhutan|7163004645|
|           Bhutan|7162877973|
+-----------------+----------+



In [29]:
# 5 SQL queries run with spark.sql(QUERY).show() : 

# Displaying the latitude and longitude of French agents 
spark.sql("SELECT latitude,longitude FROM temp_table WHERE country_name = 'France' ").show()

# Displaying the country and id of agents located at a place with latitude & longitude between 0.5 and 1
spark.sql("SELECT country_name,id FROM temp_table WHERE latitude < 1 AND latitude > 0.5 AND longitude < 1 \
          AND longitude > 0.5 ").show()

# Displaying the countries sorted in alphabetical order 
spark.sql("SELECT country_name FROM temp_table GROUP BY country_name ORDER BY country_name").show()

# Displaying the number of agents in each country (with COUNT), sorted alphabetically
spark.sql("SELECT COUNT(id), country_name FROM temp_table GROUP BY country_name ORDER BY country_name").show()

# Displaying the average latitude and longitude of agents in Colombia
spark.sql("SELECT country_name, AVG(latitude), AVG(longitude) FROM temp_table \
          WHERE country_name='Colombia' GROUP BY country_name").show()

+--------------------+--------------------+
|            latitude|           longitude|
+--------------------+--------------------+
|-0.21142875508479517|-0.00395021443374...|
|  1.5099359591520582| -1.7155442515387973|
| 0.06978158062530335|  -1.529365900793559|
|-0.15326107452236482|  2.1243709186708934|
|-0.06137848013048675| -1.4476884573473048|
|  1.1828646583062592| -0.6655754887318799|
|  -0.217260169262005| 0.25087488920444284|
| 0.49047553633530405| 0.12140982654262467|
| -0.4702673586856353|  0.9266846158973026|
|  1.0264852068813861| -0.2140207779096717|
| -0.3000128566536097| 0.38070406466183515|
|  0.9474034213864879|  1.1594401098214056|
|  1.2383110079391852| -0.8057894807391559|
|  0.8804203939951878| -1.9584312089442544|
| 0.36735236174643593|-0.08389139935919032|
|  0.3357881974469305| 0.07190625070884477|
| 0.25437723190280187| -0.2670049016944363|
| -0.9793883947051585| 0.09417020992108467|
|-0.00716727104203...|  -0.564196422329451|
|  1.2161218788026498|  0.596614

In [30]:
spark.stop()
sc.stop()

You can also create a dataframe from an existing RDD as shown on the example below

In [31]:
# Creating a dataframe from existing dataframes : 
sc=pyspark.SparkContext("local[*]","Dataframe examples in Spark")
sqlContext = pyspark.SQLContext(sc)



In [32]:
documents_rdd = sc.parallelize([
        [1, 'cats are cute', 0],
        [2, 'dogs are playfull', 0],
        [3, 'lions are big', 1],
        [4, 'cars are fast', 1]])
users_rdd = sc.parallelize([
        [0, 'Alice', 20],
        [1, 'Bob', 23],
        [2, 'Charles', 32]])

In [33]:
documents_df = documents_rdd.toDF(['doc_id', 'text', 'user_id'])
users_df = users_rdd.toDF(['user_id', 'name', 'age'])

# printing the inferred schema for documents
documents_df.printSchema()


root
 |-- doc_id: long (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: long (nullable = true)



Some functions can be apply to a column or to different columns. Here, we compute the age average in the dataframe `users_df`. 

In [34]:
from pyspark.sql import functions as fn

user_age_df = users_df.select(fn.avg('age')).show()

+--------+
|avg(age)|
+--------+
|    25.0|
+--------+




Compute the max of age in the dataframe `users_df`. 

In [35]:
# Computing the maximum age :

# We use select and apply the function max. We use show() to display the resultng dataframe.
user_age_df = users_df.select(fn.max('age')).show()

+--------+
|max(age)|
+--------+
|      32|
+--------+



Join (as in SQL) the two dataframes `users_df` and `documents_df`

In [36]:
# We join the two dataframes by 'user_id', using the join function

join_1 = users_df.join(documents_df, 'user_id')
join_1.show()

+-------+-----+---+------+-----------------+
|user_id| name|age|doc_id|             text|
+-------+-----+---+------+-----------------+
|      0|Alice| 20|     1|    cats are cute|
|      0|Alice| 20|     2|dogs are playfull|
|      1|  Bob| 23|     3|    lions are big|
|      1|  Bob| 23|     4|    cars are fast|
+-------+-----+---+------+-----------------+



Outer Join (Left) (as in SQL) the two dataframes `users_df` and `documents_df`

In [37]:
# We do an outer join, which means that each information is kept, by specifying 'outer'

join_2 = users_df.join(documents_df, 'user_id', 'outer')
join_2.show()

# We can see that Charles' age and user_id are displayed, even if he doesn't have a corresponding document.

+-------+-------+---+------+-----------------+
|user_id|   name|age|doc_id|             text|
+-------+-------+---+------+-----------------+
|      0|  Alice| 20|     1|    cats are cute|
|      0|  Alice| 20|     2|dogs are playfull|
|      1|    Bob| 23|     3|    lions are big|
|      1|    Bob| 23|     4|    cars are fast|
|      2|Charles| 32|  null|             null|
+-------+-------+---+------+-----------------+



## Part 4 : MLlib - Spark for data analysis and machine learning

In this part, we will see how to create a text classifiation application with Spark. We will use some data from the newsgroups [Usenet](https://en.wikipedia.org/wiki/Usenet_newsgroup) and the objective is to predict the topic of the news.
 
The train data is [here](./SparkData/20ng-train-all-terms.txt) and the test data [here](./SparkData/20ng-test-all-terms.txt).
The are obtained using the following commands: 
`wget http://ana.cachopo.org/datasets-for-single-label-text-categorization/20ng-train-all-terms.txt`
and
`wget http://ana.cachopo.org/datasets-for-single-label-text-categorization/20ng-test-all-terms.txt`

Load these data as some RDDs and translate them in DataFrames.

In [38]:
sc.stop()

findspark.init()
from pyspark.ml.feature import CountVectorizer

sc=pyspark.SparkContext("local[*]","machine learning in Spark")
sqlContext = pyspark.SQLContext(sc)

from pyspark.sql import Row

def load_dataframe(path):
    # We first read the text document in the path and create a RDD
    data_RDD = sc.textFile(path)
    # We split each line of the RDD to isolate the topic (which is at the beginning, before '\t') from the text of the news
    # We use Row to create the names of each row.
    intermediary=data_RDD.map(lambda x : Row(topic=x.split("\t")[0],news=x.split("\t")[1].split(" ")))
    # We finally create the corresponding dataframe
    dataframe=sqlContext.createDataFrame(intermediary)
    return dataframe

train_data = load_dataframe("./SparkData/train-all-terms.txt")
test_data = load_dataframe("./SparkData/test-all-terms.txt")

train_data.show()
test_data.show()

+--------------------+-----------+
|                news|      topic|
+--------------------+-----------+
|[alt, atheism, fa...|alt.atheism|
|[alt, atheism, fa...|alt.atheism|
|[re, gospel, dati...|alt.atheism|
|[re, university, ...|alt.atheism|
|[re, soc, motss, ...|alt.atheism|
|[re, a, visit, fr...|alt.atheism|
|[re, political, a...|alt.atheism|
|[re, an, anecdote...|alt.atheism|
|[re, political, a...|alt.atheism|
|[re, pompous, ass...|alt.atheism|
|[re, pompous, ass...|alt.atheism|
|[re, keith, schne...|alt.atheism|
|[re, keith, schne...|alt.atheism|
|[re, political, a...|alt.atheism|
|[re, political, a...|alt.atheism|
|[re, political, a...|alt.atheism|
|[re, don, t, more...|alt.atheism|
|[re, ancient, isl...|alt.atheism|
|[re, political, a...|alt.atheism|
|[re, there, must,...|alt.atheism|
+--------------------+-----------+
only showing top 20 rows

+--------------------+-----------+
|                news|      topic|
+--------------------+-----------+
|[re, about, the, ...|alt.ath

A first step is to represent our data, i.e. the messages in the form of a bag-of-word representation using the spark method `CountVectorizer` documented [here](https://spark.apache.org/docs/2.1.0/ml-features.html#countvectorizer)

In [39]:
# Bag of word representation of test and train data

# We use the CountVectorizer method on the news column to create the features column. 
# After running it a first time, we see that the vocabSize is 73713.

# Creation and training of the model :
cv = CountVectorizer(inputCol="news", outputCol="features", vocabSize=73713)

model = cv.fit(train_data)

# Application of the model on train_data to create the result dataframe: 
result = model.transform(train_data)
result.show(truncate=True)


+--------------------+-----------+--------------------+
|                news|      topic|            features|
+--------------------+-----------+--------------------+
|[alt, atheism, fa...|alt.atheism|(73713,[0,1,2,3,4...|
|[alt, atheism, fa...|alt.atheism|(73713,[0,1,2,3,4...|
|[re, gospel, dati...|alt.atheism|(73713,[0,1,2,3,4...|
|[re, university, ...|alt.atheism|(73713,[0,1,2,3,4...|
|[re, soc, motss, ...|alt.atheism|(73713,[0,1,2,3,4...|
|[re, a, visit, fr...|alt.atheism|(73713,[0,1,2,3,4...|
|[re, political, a...|alt.atheism|(73713,[0,1,3,4,6...|
|[re, an, anecdote...|alt.atheism|(73713,[0,1,2,3,4...|
|[re, political, a...|alt.atheism|(73713,[0,1,2,3,4...|
|[re, pompous, ass...|alt.atheism|(73713,[0,4,5,11,...|
|[re, pompous, ass...|alt.atheism|(73713,[0,1,2,3,4...|
|[re, keith, schne...|alt.atheism|(73713,[0,1,2,3,4...|
|[re, keith, schne...|alt.atheism|(73713,[0,1,2,3,4...|
|[re, political, a...|alt.atheism|(73713,[0,1,2,3,4...|
|[re, political, a...|alt.atheism|(73713,[0,1,2,

Displaying of the distinct labels in the datasets

In [40]:
train_data.select("topic").distinct().sort("topic").show(truncate=False)

+------------------------+
|topic                   |
+------------------------+
|alt.atheism             |
|comp.graphics           |
|comp.os.ms-windows.misc |
|comp.sys.ibm.pc.hardware|
|comp.sys.mac.hardware   |
|comp.windows.x          |
|misc.forsale            |
|rec.autos               |
|rec.motorcycles         |
|rec.sport.baseball      |
|rec.sport.hockey        |
|sci.crypt               |
|sci.electronics         |
|sci.med                 |
|sci.space               |
|soc.religion.christian  |
|talk.politics.guns      |
|talk.politics.mideast   |
|talk.politics.misc      |
|talk.religion.misc      |
+------------------------+



We will now apply a [NaiveBayes](https://spark.apache.org/docs/latest/ml-classification-regression.html#naive-bayes) classifier to our problem. 
Take the time to read the doc and apply it to our problem. You will first have to associate a number to each label. You can use the [`String Indexer`](https://spark.apache.org/docs/2.1.0/ml-features.html#stringindexer) function of pyspark for that.


In [41]:
# Transformation of the label into a number
# We will use StringIndexer on the column topic to create the column label, with one number corresponding to each topic. 
from pyspark.ml.feature import StringIndexer

# Creation, training and application of the model on result :
indexer = StringIndexer(inputCol="topic", outputCol="label")
indexed_train_data = indexer.fit(result).transform(result)
indexed_train_data.show()

# Displaying the distinct labels : 
indexed_train_data.select("label").distinct().sort("topic").show(truncate=False)

+--------------------+-----------+--------------------+-----+
|                news|      topic|            features|label|
+--------------------+-----------+--------------------+-----+
|[alt, atheism, fa...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[alt, atheism, fa...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[re, gospel, dati...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[re, university, ...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[re, soc, motss, ...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[re, a, visit, fr...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[re, political, a...|alt.atheism|(73713,[0,1,3,4,6...| 17.0|
|[re, an, anecdote...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[re, political, a...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[re, pompous, ass...|alt.atheism|(73713,[0,4,5,11,...| 17.0|
|[re, pompous, ass...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[re, keith, schne...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[re, keith, schne...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[re, po

In [42]:
# Application of the NaivesBayes model

from pyspark.ml.classification import NaiveBayes

# Creation of the trainer and specification of its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# Training of the model
model_nb = nb.fit(indexed_train_data)


In [43]:
# Creation of the test dataset with the bag of words model and the indexer :

result_test = model.transform(test_data)
indexed_test_data = indexer.fit(result).transform(result_test)
indexed_test_data.show()

+--------------------+-----------+--------------------+-----+
|                news|      topic|            features|label|
+--------------------+-----------+--------------------+-----+
|[re, about, the, ...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[re, amusing, ath...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[re, yet, more, r...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[re, christian, m...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[re, after, years...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[re, amusing, ath...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[southern, baptis...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[re, amusing, ath...|alt.atheism|(73713,[0,1,4,5,6...| 17.0|
|[re, requests, in...|alt.atheism|(73713,[0,1,4,5,6...| 17.0|
|[re, thoughts, on...|alt.atheism|(73713,[0,1,4,5,6...| 17.0|
|[re, what, s, a, ...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[re, christian, m...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[re, theism, and,...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|
|[re, yo

In [44]:
# Application of the trained model to the test dataset  

predictions = model_nb.transform(indexed_test_data)

# Displaying the prediction compared to the real value (label column) for the 20 first messages
predictions.show()

+--------------------+-----------+--------------------+-----+--------------------+--------------------+----------+
|                news|      topic|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----------+--------------------+-----+--------------------+--------------------+----------+
|[re, about, the, ...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|[-1077.0701139151...|[1.04746942055342...|      17.0|
|[re, amusing, ath...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|[-4865.8011948258...|[1.04542362129609...|      17.0|
|[re, yet, more, r...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|[-2801.5395297792...|[1.37884733960126...|      17.0|
|[re, christian, m...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|[-1579.1157090824...|[9.08867790888132...|      17.0|
|[re, after, years...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|[-3900.6290918277...|[1.19137051036722...|      17.0|
|[re, amusing, ath...|alt.atheism|(73713,[0,1,2,3,4...| 17.0|[-2415.5243488590..

The Evaluation of the learned model can be done by using the [evaluation](https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html) module of MLlib. Print the accuracy of the obtained model.

In [45]:
# Evaluation of the model with MulticlassClassificationEvaluator

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# We create an evaluator model, specifying the columns to compare :
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")

# We compute the accuracy on the test set thanks to the predictions 
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy*100.00) + "%")

Test set accuracy = 79.9149840595%
