# Spark (with PySpark)


In this tutorial, you will learn how to use Apache Spark, a framework for large-scale data processing, within a notebook.
Upon completing this lab you will be able to : 
 + Program in Spark with the Python Language
 + Demonstrate how to read and process data using Spark
 + Compare and contrast RDD and Dataframes. 
 + Build a simple machine learning application with Spark.
 
 
 
# Requirements : Installing Spark, PySpark and configuration to run it on jupyter notebooks

According to your OS, you have to follow the following tutorial :

+ [Windows](https://changhsinlee.com/install-pyspark-windows-jupyter/)
+ [Mac Os X](https://jmedium.com/pyspark-in-mac/)
 
At the end of the installation, you should be able to run the following code that is a kind of Hello word in PySpark.

#### Student Name: Ayush K. Rai
#### Program : MSc in Artificial Intelligence, Ecole CentraleSupelec, Paris
#### Assignment 2 for Big Data Algorithms, Techniques and Platforms Course - Fall 2018
##### I have successfully tested the code on Ubuntu 18.04 and PySpark 2.3.1

In [44]:
import findspark
findspark.init('/home/ayush/spark-2.4.0-bin-hadoop2.7/')

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.sql('''select 'spark' as hello ''')
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [45]:
spark.stop()

## Part 1 :  Spark Context

When writing a spark program, the first thing to do is to define a `SparkContext`. 

In Spark, communication occurs between a driver and executors. The driver has Spark jobs that it needs to run and these jobs are split into tasks that are submitted to the executors for completion. The results from these tasks are delivered back to the driver.

Here, we will use the `findspark` package that has to be installed using the following command: 

` pip3 install findspark`

Then we can use the `findspark.init()`function to locate the Spark process.




In [46]:
import findspark

findspark.init("/home/ayush/spark-2.4.0-bin-hadoop2.7/")

import pyspark

sc=pyspark.SparkContext("local[*]","First program in Spark")

Try printing out sc to see its type.

In [47]:
# TO DO
print sc
print type(sc)

<SparkContext master=local[*] appName=First program in Spark>
<class 'pyspark.context.SparkContext'>


You can use Python's `dir()` function to get a list of all the attributes (including methods) accessible through the `sc` object.

In [48]:
dir(sc)

['PACKAGE_EXTENSIONS',
 '__class__',
 '__delattr__',
 '__dict__',
 '__doc__',
 '__enter__',
 '__exit__',
 '__format__',
 '__getattribute__',
 '__getnewargs__',
 '__hash__',
 '__init__',
 '__module__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_accumulatorServer',
 '_active_spark_context',
 '_batchSize',
 '_callsite',
 '_checkpointFile',
 '_conf',
 '_dictToJavaMap',
 '_do_init',
 '_encryption_enabled',
 '_ensure_initialized',
 '_gateway',
 '_getJavaStorageLevel',
 '_initialize_context',
 '_javaAccumulator',
 '_jsc',
 '_jvm',
 '_lock',
 '_next_accum_id',
 '_pickled_broadcast_vars',
 '_python_includes',
 '_repr_html_',
 '_serialize_to_jvm',
 '_temp_dir',
 '_unbatched_serializer',
 'accumulator',
 'addFile',
 'addPyFile',
 'appName',
 'applicationId',
 'binaryFiles',
 'binaryRecords',
 'broadcast',
 'cancelAllJobs',
 'cancelJobGroup',
 'defaultMinPartitions',
 'defaultParallelism',
 'dump_prof

## Part 2 : Understanding Spark RDD’s

### WordCount in Spark
In this part, we will write the wordcount in Spark and apply it on the novel Dracula of Bram Stocker (from the Gutemberg project).

First read the [pg345.txt](./SparkData/pg345.txt) file.

In [49]:
data = sc.textFile('./SparkData/pg345.txt')
type(data)

pyspark.rdd.RDD

To see the content, of the file, we need to run the action `collect` on the  RDD `data`

In [50]:
data.collect()

[u'The Project Gutenberg EBook of Dracula, by Bram Stoker',
 u'',
 u'This eBook is for the use of anyone anywhere at no cost and with',
 u'almost no restrictions whatsoever.  You may copy it, give it away or',
 u're-use it under the terms of the Project Gutenberg License included',
 u'with this eBook or online at www.gutenberg.org/license',
 u'',
 u'',
 u'Title: Dracula',
 u'',
 u'Author: Bram Stoker',
 u'',
 u'Release Date: August 16, 2013 [EBook #345]',
 u'',
 u'Language: English',
 u'',
 u'',
 u'*** START OF THIS PROJECT GUTENBERG EBOOK DRACULA ***',
 u'',
 u'',
 u'',
 u'',
 u'Produced by Chuck Greif and the Online Distributed',
 u'Proofreading Team at http://www.pgdp.net (This file was',
 u'produced from images generously made available by The',
 u'Internet Archive)',
 u'',
 u'',
 u'',
 u'',
 u'',
 u'',
 u'',
 u'                                DRACULA',
 u'',
 u'',
 u'',
 u'',
 u'',
 u'                                DRACULA',
 u'',
 u'                                  _by_',
 u'',

With the RDD `data`, from the previous cell, execute a `flatMap()` for each line in the input and then convert it to lower case, remove the commas, split the words on a space and store in the RDD `words`

In [51]:
def clean_str(x):
    '''This function cleans the string and lowers it'''
    punc='!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~0123456789'
    lowercased_str = x.lower()
    for ch in punc:
        lowercased_str = lowercased_str.replace(ch, '')
                
    return lowercased_str


# TO DO  - map step
data = data.map(lambda x:clean_str(x))
words = data.flatMap(lambda x: x.strip(',').split(' ')).filter(lambda x:x!='')

In [None]:
Execute the command with the action `take()` and retrieve the first 10 words from the `flatMap()`transformation

In [52]:
# TO DO 
words.take(10)

[u'the',
 u'project',
 u'gutenberg',
 u'ebook',
 u'of',
 u'dracula',
 u'by',
 u'bram',
 u'stoker',
 u'this']

Perform a classic `map()` to create a tuple where each word has a count of 1

In [53]:
# TO DO
word_count = words.map(lambda word:(word.lower(),1))
word_count.take(10)

[(u'the', 1),
 (u'project', 1),
 (u'gutenberg', 1),
 (u'ebook', 1),
 (u'of', 1),
 (u'dracula', 1),
 (u'by', 1),
 (u'bram', 1),
 (u'stoker', 1),
 (u'this', 1)]

Write the reducing function

In [54]:
# TO DO 
word_count = word_count.reduceByKey(lambda x,y:x+y).coalesce(1)

word_count.take(20)

[(u'aided', 1),
 (u'funereal', 1),
 (u'throbbing', 1),
 (u'systematic', 1),
 (u'pardon', 12),
 (u'fastfalling', 1),
 (u'brotherprofessional', 1),
 (u'usfor', 1),
 (u'lidderdale', 1),
 (u'all', 1166),
 (u'yellow', 6),
 (u'four', 24),
 (u'brightening', 1),
 (u'protest', 3),
 (u'sleep', 179),
 (u'abide', 2),
 (u'absorbing', 2),
 (u'mansion', 1),
 (u'lapped', 2),
 (u'appetite', 5)]

Get all words that occur more than once and sort them alphabetically:

In [55]:
# TO DO

filtered_word_count = word_count.filter(lambda x:x[1]>1)

alphabetically_sorted_rdd = filtered_word_count.sortByKey(ascending=True)

alphabetically_sorted_rdd.take(20)

[(u'a', 2962),
 (u'abaft', 2),
 (u'abandoned', 2),
 (u'abating', 2),
 (u'abbey', 9),
 (u'abhorred', 2),
 (u'abide', 2),
 (u'able', 47),
 (u'abnormally', 2),
 (u'aboard', 7),
 (u'aboon', 2),
 (u'aboot', 3),
 (u'about', 239),
 (u'above', 17),
 (u'abraham', 4),
 (u'abreast', 3),
 (u'abroad', 6),
 (u'absence', 2),
 (u'absent', 2),
 (u'absolute', 8)]

Now, get all words that occur more than once and sort them by frequency. Select the first 20. Hints : look at the `takeOrdered` action.

In [56]:
# TO DO 

freq_sorted_rdd = filtered_word_count.takeOrdered(20, key = lambda x: -x[1])

#print freq_sorted_rdd[0:20]
print 'Top 20 Frequent Words'
print '\n'.join(map(lambda (w, c): '{0}: {1}'.format(w, c), freq_sorted_rdd))

Top 20 Frequent Words
the: 8037
and: 5896
i: 4712
to: 4540
of: 3738
a: 2962
in: 2558
he: 2543
that: 2455
it: 2141
was: 1877
as: 1581
we: 1535
for: 1534
is: 1526
his: 1467
you: 1450
me: 1446
not: 1404
with: 1320


### A simple exercice

+ Create a Python collection of 10,000 integers
+ Create a Spark base RDD from that collection
+ Subtract one from each value using map
+ Perform action collect to view results
+ Perform action count to view counts
+ Apply transformation filter and view results with collect

In [57]:
# TO DO
data = xrange(1, 10001)
data[0]
len(data)

# While creating a RDD, we also need to mention the number of partitions which is 10 in our case
xrangeRDD = sc.parallelize(data, 10)

# Create function to subtract 1
def sub(value):
    return (value - 1)

subRDD = xrangeRDD.map(lambda x:sub(x))

print xrangeRDD.collect()
print subRDD.collect()

print xrangeRDD.count()
print subRDD.count()

# Define a function to filter a single value
def myfilter(value):
    """Return whether value is below 500.

    Args:
        value (int): A number.

    Returns:
        bool: Whether `value` is less than ten.
    """
    if (value < 500):
        return True
    else:
        return False


# Filter is a transformation so no tasks are run
filteredRDD = subRDD.filter(lambda x: myfilter(x))

# View the results using collect(). Collect is an action that triggers the filter transformation to run
print filteredRDD.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 22

10000
10000
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 21

### An improved WordCount

Print the top 10 most frequent words with their probability of appearance


In [58]:
# TO DO
total_words = int(word_count.count())

word_prob_rdd = word_count.map(lambda x :(x[0],x[1]/float(total_words)))

prob_sorted_rdd = word_prob_rdd.takeOrdered(10, key = lambda x: -x[1])

#print freq_sorted_rdd[0:20]
print 'Top 10 Frequent Words'
print '\n'.join(map(lambda (w, c): '{0}: {1}'.format(w, c), prob_sorted_rdd))

Top 10 Frequent Words
the: 0.749510398209
and: 0.549846125152
i: 0.439429264198
to: 0.423388976965
of: 0.348596474867
a: 0.276228667351
in: 0.23855264385
he: 0.237153781591
that: 0.228947123007
it: 0.199664273058


Get rid of special characters (.,:!?')

In [86]:
### TO DO

#Important Point

#I have already implemented the first version of the WordCount Program by removing the special symbols
#because without it, I was getting some special symbol as the most frequent word, which is a trivial solution. In order to
#remove the symbols I did the following


#Create a function to clean the string from punctuations and numbers
#def clean_str(x):
#    '''This function cleans the string and lowers it'''
#    punc='!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~0123456789'
#    lowercased_str = x.lower()
#    for ch in punc:
#        lowercased_str = lowercased_str.replace(ch, '')
                
#    return lowercased_str


# TO DO  - map step

#Load the text from Bram Stoker Dracula Novel
#data = sc.textFile('./SparkData/pg345.txt')

#Apply a map transformation on the data rdd using lambda function (clean_str function) in pyspark
#data = data.map(lambda x:clean_str(x))

# There will be no one change in rest of the steps. Please look at the output of my wordcount program,
# free of any special symbols or numbers

Identify the transformations and the actions in your script


In [60]:
# TO DO

# Transformation is a function that changes rdd data while action is a function that doesn't 
# change the data but gives the output adter running a computation on the rdd data

# Transformation Functions : Map, Filter, Union etc
# Action Functions : Reduce, Collect, Take, Count etc

# My Script for ----Print the top 10 most frequent words with their probability of appearance-------
# contains following Transformations and Actions in order

# 1) Collect (Action )
# 2) Map (Transformation)
# 3) FlatMap (Transformation)
# 4) Take (Action)
# 5) Map (Transformation)
# 6) Take (Action)
# 7) ReduceByKey (Transformation)
# 8) Coalesce (Transformation)
# 9) Take (Action)
# 10) Count (Action)
# 11) Map (Transformation)
# 12) TakeOrdered (Action)

# I have used take(n) function many times just to see the output which won't be required for find word count
# but I am including it here

# Therefore based on this, I am using 6 actions and 6 transformations

How many times are the transformations evaluated? (Hint: it depends)


In [61]:
# TO DO
# Answer
# An important point to notice is that Spark performs lazy evaluations. What this means is Sparks adds transformation in
# a directed acyclic graph (DAG) of computation and only when driver requests some data, does this DAG actually 
# gets executed. One advantage of this is that Spark can make many optimization decisions after it had a chance 
# to look at the DAG in entirety. This would not be possible if it executed everything as soon as it got it. 
# It saves time and unwanted processing power.

Can you reduce this number? (Hint: check out "persist")

In [87]:
# TO DO
# In order to reduce the number of transformations, we can use RDD Persistence and RDD Cache

# When we use the cache() method we can store all the RDD in-memory.

# We can persist the RDD in memory and use it efficiently across parallel operations.

#The difference between cache() and persist() is that using cache() the default storage level is
# MEMORY_ONLY while using persist() we can use various storage levels (described below). 
# It is a key tool for an interactive algorithm. Because, when we persist RDD each node stores 
# any partition of it that it computes in memory and makes it reusable for future use. 
# This process speeds up the further computation ten times.

#Benefits of RDD Persistence in Spark
#1) Time Efficient
#2) Cost Efficient
#3) Lessen the Computation Time

#Storage Level of Persisted RDDs
#1) MEMORY_ONLY 
#2) MEMORY_AND_DISK
#3) MEMORY_ONLY_SER
#4) MEMORY_AND_DISK_SER
#5) DISK_ONLY

#Reference : https://data-flair.training/blogs/spark-in-memory-computing/

In [63]:
sc.stop()

## Part 3 : Spark SQL and dataframes

In this part, you will explore Spark DataFrames and the SQL Context. In particular, we will work on a database that contains a sample of the world population by working on data that comes from [pplapi](http://pplapi.com/). The file [agents.json](./SparkData/agents.json) is a file that was extracted from this api using the following command :


`wget https://s3-eu-west-1.amazonaws.com/course.oc-static.com/courses/4297166/agents.json`

In [64]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.read.json("./SparkData/agents.json")

Print the 5 first lines of the dataframe `df`

In [65]:
# TO DO
df.show(5,truncate= True)

+------------+----------+------------------+------------------+------+
|country_name|        id|          latitude|         longitude|   sex|
+------------+----------+------------------+------------------+------+
|       China| 227417393| 33.15219798270325|100.85840672174572|  Male|
|       Haiti|6821129477|19.325567983697297|-72.43795260265814|Female|
|       India|2078667700|23.645271492037235| 80.85636526088884|Female|
|       China| 477556555| 33.45864668881662| 93.33604038078953|Female|
|       India|1379059984|28.816938290678692|  80.7728698035823|Female|
+------------+----------+------------------+------------------+------+
only showing top 5 rows



Write the spark instructions that enable to display the number of French agents.

In [66]:
# TO DO
df.filter(df.country_name =='France').count()

94

Write the spark instructions that enable to display the number of Indian female agents.

In [67]:
# TO DO
df.filter((df.country_name =='India') & (df.sex == 'Female')).count()

828

Using the notion of temporary view (function `createTempView`), create a temporary view associated to the dataframe `df`. 

In [68]:
#df.createTempView("temp_table")
df.createOrReplaceTempView("agents_table")

Write some SQL query on the resulting table as shown in the example below.

In [69]:
spark.sql("SELECT country_name,id FROM agents_table ORDER BY id DESC LIMIT 10").show()

+-----------------+----------+
|     country_name|        id|
+-----------------+----------+
| French Polynesia|7170821229|
|       Cabo Verde|7167692449|
|         Suriname|7166451460|
|         Suriname|7166235088|
|            Macau|7166034642|
|       Montenegro|7164357515|
|Equatorial Guinea|7163867872|
|           Bhutan|7163256789|
|           Bhutan|7163004645|
|           Bhutan|7162877973|
+-----------------+----------+



In [70]:
# TO DO - Write some requests.
spark.sql("SELECT country_name,latitude,id FROM agents_table ORDER BY id DESC LIMIT 10").show()

+-----------------+-------------------+----------+
|     country_name|           latitude|        id|
+-----------------+-------------------+----------+
| French Polynesia|-15.004219445056265|7170821229|
|       Cabo Verde|  16.00676587564149|7167692449|
|         Suriname|  4.008871704322331|7166451460|
|         Suriname|   3.96442417744574|7166235088|
|            Macau| 21.944944804684596|7166034642|
|       Montenegro|  42.32131745506727|7164357515|
|Equatorial Guinea|  3.651402073464487|7163867872|
|           Bhutan| 27.419739555133912|7163256789|
|           Bhutan| 27.281480489455422|7163004645|
|           Bhutan|  27.37149433886258|7162877973|
+-----------------+-------------------+----------+



In [71]:
spark.stop()
sc.stop()

You can also create a dataframe from an existing RDD as shown on the example below

In [72]:
sc=pyspark.SparkContext("local[*]","Dataframe examples in Spark")
sqlContext = pyspark.SQLContext(sc)

In [73]:
documents_rdd = sc.parallelize([
        [1, 'cats are cute', 0],
        [2, 'dogs are playfull', 0],
        [3, 'lions are big', 1],
        [4, 'cars are fast', 1]])
users_rdd = sc.parallelize([
        [0, 'Alice', 20],
        [1, 'Bob', 23],
        [2, 'Charles', 32]])

In [74]:
documents_df = documents_rdd.toDF(['doc_id', 'text', 'user_id'])
users_df = users_rdd.toDF(['user_id', 'name', 'age'])

# printing the inferred schema for documents
documents_df.printSchema()

root
 |-- doc_id: long (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: long (nullable = true)



Some functions can be apply to a column or to different columns. Here, we compute the age avarage in the dataframe `users_df`. 

In [75]:
from pyspark.sql import functions as fn

user_age_df = users_df.select(fn.avg('age')).show()

+--------+
|avg(age)|
+--------+
|    25.0|
+--------+




Compute the max of age in the dataframe `users_df`. 

In [76]:
# TO DO 
user_age_df = users_df.select(fn.max('age')).show()

+--------+
|max(age)|
+--------+
|      32|
+--------+



Join (as in SQL) the two dataframes `users_df` and `documents_df`

In [77]:
# TO DO 
users_df.show(10,truncate=True)

documents_df.show(10,truncate=True)

inner_join = users_df.join(documents_df, users_df.user_id == documents_df.user_id)

inner_join.show(20,truncate=True)

+-------+-------+---+
|user_id|   name|age|
+-------+-------+---+
|      0|  Alice| 20|
|      1|    Bob| 23|
|      2|Charles| 32|
+-------+-------+---+

+------+-----------------+-------+
|doc_id|             text|user_id|
+------+-----------------+-------+
|     1|    cats are cute|      0|
|     2|dogs are playfull|      0|
|     3|    lions are big|      1|
|     4|    cars are fast|      1|
+------+-----------------+-------+

+-------+-----+---+------+-----------------+-------+
|user_id| name|age|doc_id|             text|user_id|
+-------+-----+---+------+-----------------+-------+
|      0|Alice| 20|     1|    cats are cute|      0|
|      0|Alice| 20|     2|dogs are playfull|      0|
|      1|  Bob| 23|     3|    lions are big|      1|
|      1|  Bob| 23|     4|    cars are fast|      1|
+-------+-----+---+------+-----------------+-------+



Outer Join (Left) (as in SQL) the two dataframes `users_df` and `documents_df`

In [78]:
# TO DO
left_join = users_df.join(documents_df, users_df.user_id == documents_df.user_id,how='left')

left_join.show(20,truncate=True)

+-------+-------+---+------+-----------------+-------+
|user_id|   name|age|doc_id|             text|user_id|
+-------+-------+---+------+-----------------+-------+
|      0|  Alice| 20|     1|    cats are cute|      0|
|      0|  Alice| 20|     2|dogs are playfull|      0|
|      1|    Bob| 23|     3|    lions are big|      1|
|      1|    Bob| 23|     4|    cars are fast|      1|
|      2|Charles| 32|  null|             null|   null|
+-------+-------+---+------+-----------------+-------+



## Part 4 : MLlib - Spark for data analysis and machine learning

In this part, we will see how to create a text classifiation application with Spark. We will use some data from the newsgroups [Usenet](https://en.wikipedia.org/wiki/Usenet_newsgroup) and the objective is to predict the topic of the news.
 
The train data is [here](./SparkData/20ng-train-all-terms.txt) and the test data [here](./SparkData/20ng-test-all-terms.txt).
The are obtained using the following commands: 
`wget http://ana.cachopo.org/datasets-for-single-label-text-categorization/20ng-train-all-terms.txt`
and
`wget http://ana.cachopo.org/datasets-for-single-label-text-categorization/20ng-test-all-terms.txt`

Load these data as some RDDs and translate them in DataFrames.

In [79]:
sc.stop()

findspark.init()
from pyspark.ml.feature import CountVectorizer

import pandas as pd
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, ArrayType

sc=pyspark.SparkContext("local[*]","machine learning in Spark")
sqlContext = pyspark.SQLContext(sc)

from pyspark.sql import Row

def load_dataframe1(path):
    pandas_df = pd.read_csv(path, header=None, sep='\t', names=['category', 'text'])
    pandas_df[['category', 'text']] = pandas_df[['category', 'text']]
    
    custom_schema = StructType([
    StructField("category", StringType(),True),
    StructField("text", StringType(),True)])
    
    spark_df = sqlContext.createDataFrame(pandas_df,schema=custom_schema)
    
    #spark_df = spark_df.withColumn("text", spark_df.text.cast("array<string>"))
    
    return spark_df

    
train_data = load_dataframe1("./SparkData/train-all-terms.txt")
test_data = load_dataframe1("./SparkData/test-all-terms.txt")

train_data.printSchema()

root
 |-- category: string (nullable = true)
 |-- text: string (nullable = true)



A first step is to represent our data, i.e. the messages in the form of a bag-of-word representation using the spark method `CountVectorizer` documented [here](https://spark.apache.org/docs/2.1.0/ml-features.html#countvectorizer)

In [80]:
# TO DO  - bag of word representation of test and train data
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer


# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")

train_data_tokenized = regexTokenizer.transform(train_data)
test_data_tokenized = regexTokenizer.transform(test_data)

# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"] 
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

train_data_remove_words = stopwordsRemover.transform(train_data_tokenized)
test_data_remove_words = stopwordsRemover.transform(test_data_tokenized)

# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

cv_model = countVectors.fit(train_data_remove_words)

train_features = cv_model.transform(train_data_remove_words)
test_features = cv_model.transform(test_data_remove_words)

print type(train_features)

<class 'pyspark.sql.dataframe.DataFrame'>


Displayong of the distint labels in the datasets

In [81]:
train_data.select("category").distinct().sort("category").show(truncate=False)

+------------------------+
|category                |
+------------------------+
|alt.atheism             |
|comp.graphics           |
|comp.os.ms-windows.misc |
|comp.sys.ibm.pc.hardware|
|comp.sys.mac.hardware   |
|comp.windows.x          |
|misc.forsale            |
|rec.autos               |
|rec.motorcycles         |
|rec.sport.baseball      |
|rec.sport.hockey        |
|sci.crypt               |
|sci.electronics         |
|sci.med                 |
|sci.space               |
|soc.religion.christian  |
|talk.politics.guns      |
|talk.politics.mideast   |
|talk.politics.misc      |
|talk.religion.misc      |
+------------------------+



We will now apply a [NaiveBayes](https://spark.apache.org/docs/latest/ml-classification-regression.html#naive-bayes) classifier to our problem. 
Take the time to read the doc and apply it to our problem. You will first have to associate a number to each label. You can use the [`String Indexer`](https://spark.apache.org/docs/2.1.0/ml-features.html#stringindexer) function of pyspark for that.


In [82]:
# TO DO - Transformation of the label into a number
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")
train_features = label_stringIdx.fit(train_data).transform(train_features)
test_features = label_stringIdx.fit(test_features).transform(test_features)

In [83]:
# TO DO - Application of the NaivesBayes models
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(smoothing=1)
naive_bayes_model = nb.fit(train_features)

In [84]:
# TO DO - Application of the trained model to the test dataset and displaying the prediction compared to the real value for the 20 first messages
predictions = naive_bayes_model.transform(test_features)
predictions.select("text","category","probability","label","prediction").orderBy("probability", ascending=True).show(n = 2000, truncate = 30)

+------------------------------+------------------------+------------------------------+-----+----------+
|                          text|                category|                   probability|label|prediction|
+------------------------------+------------------------+------------------------------+-----+----------+
|clinton am press briefing b...|      talk.politics.misc|[0.0,0.0,0.0,0.0,0.0,0.0,0....| 18.0|      18.0|
|clinton press briefing by g...|      talk.politics.misc|[0.0,0.0,0.0,0.0,0.0,0.0,0....| 18.0|      18.0|
|clinton president s press c...|      talk.politics.misc|[0.0,0.0,0.0,0.0,0.0,0.0,0....| 18.0|      18.0|
|clinton am press briefing b...|      talk.politics.misc|[0.0,0.0,0.0,0.0,0.0,0.0,0....| 18.0|      18.0|
|according to greek governme...|   talk.politics.mideast|[0.0,0.0,0.0,0.0,0.0,0.0,0....| 15.0|      15.0|
|your armenian grandparents ...|   talk.politics.mideast|[0.0,0.0,0.0,0.0,0.0,0.0,0....| 15.0|      15.0|
|of azeri soil is now occupi...|   talk.politi

The Evaluation of the learned model can be done by using the [evaluation](https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html) module of MLlib. Print the accuracy of the obtained model.

In [85]:
# TO DO
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.5751428897177205