# Spark (with PySpark)


In this tutorial, you will learn how to use Apache Spark, a framework for large-scale data processing, within a notebook.
Upon completing this lab you will be able to : 
 + Program in Spark with the Python Language
 + Demonstrate how to read and process data using Spark
 + Compare and contrast RDD and Dataframes. 
 + Build a simple machine learning application with Spark.
 
 
 
# Requirements : Installing Spark, PySpark and configuration to run it on jupyter notebooks

According to your OS, you have to follow the following tutorial :

+ [Windows](https://changhsinlee.com/install-pyspark-windows-jupyter/)
+ [Mac Os X](https://jmedium.com/pyspark-in-mac/)
 
At the end of the installation, you should be able to run the following code that is a kind of Hello word in PySpark.

In [1]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.sql('''select 'spark' as hello ''')
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [2]:
spark.stop()

## Part 1 :  Spark Context

When writing a spark program, the first thing to do is to define a `SparkContext`. 

In Spark, communication occurs between a driver and executors. The driver has Spark jobs that it needs to run and these jobs are split into tasks that are submitted to the executors for completion. The results from these tasks are delivered back to the driver.

Here, we will use the `findspark` package that has to be installed using the following command: 

` pip3 install findspark`

Then we can use the `findspark.init()`function to locate the Spark process.




In [3]:
import findspark

findspark.init("/Users/Boubou/Downloads/spark-2.4.0-bin-hadoop2.7")

import pyspark

sc=pyspark.SparkContext("local[*]","First program in Spark")

Try printing out sc to see its type.

In [4]:
# TO DO
sc

You can use Python's `dir()` function to get a list of all the attributes (including methods) accessible through the `sc` object.

In [5]:
dir(sc)

['PACKAGE_EXTENSIONS',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getnewargs__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_accumulatorServer',
 '_active_spark_context',
 '_batchSize',
 '_callsite',
 '_checkpointFile',
 '_conf',
 '_dictToJavaMap',
 '_do_init',
 '_encryption_enabled',
 '_ensure_initialized',
 '_gateway',
 '_getJavaStorageLevel',
 '_initialize_context',
 '_javaAccumulator',
 '_jsc',
 '_jvm',
 '_lock',
 '_next_accum_id',
 '_pickled_broadcast_vars',
 '_python_includes',
 '_repr_html_',
 '_serialize_to_jvm',
 '_temp_dir',
 '_unbatched_serializer',
 'accumulator',
 'addFile',
 'addPyFile',
 'appName',
 'applicationId',
 'binaryFiles',
 'binaryRecords',
 'broadcas

## Part 2 : Understanding Spark RDD’s

### WordCount in Spark
In this part, we will write the wordcount in Spark and apply it on the novel Dracula of Bram Stocker (from the Gutemberg project).

First read the [pg345.txt](./SparkData/pg345.txt) file.

In [6]:
data = sc.textFile('./SparkData/pg345.txt')
type(data)

pyspark.rdd.RDD

To see the content, of the file, we need to run the action `collect` on the  RDD `data`

In [7]:
data.collect()

['The Project Gutenberg EBook of Dracula, by Bram Stoker',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or',
 're-use it under the terms of the Project Gutenberg License included',
 'with this eBook or online at www.gutenberg.org/license',
 '',
 '',
 'Title: Dracula',
 '',
 'Author: Bram Stoker',
 '',
 'Release Date: August 16, 2013 [EBook #345]',
 '',
 'Language: English',
 '',
 '',
 '*** START OF THIS PROJECT GUTENBERG EBOOK DRACULA ***',
 '',
 '',
 '',
 '',
 'Produced by Chuck Greif and the Online Distributed',
 'Proofreading Team at http://www.pgdp.net (This file was',
 'produced from images generously made available by The',
 'Internet Archive)',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 '                                DRACULA',
 '',
 '',
 '',
 '',
 '',
 '                                DRACULA',
 '',
 '                                  _by_',
 '',
 '                              Bram Stoke

With the RDD `data`, from the previous cell, execute a `flatMap()` for each line in the input and then convert it to lower case, remove the commas, split the words on a space and store in the RDD `words`

In [8]:
#Lower case and remove comas
lowercase = data.map(lambda x: x.replace(',',' ').lower())

#Read in text file and split each document into words
tokenized = lowercase.flatMap(lambda x: x.split(' '))

#Store in RDD words
words = tokenized.collect()
words

['the',
 'project',
 'gutenberg',
 'ebook',
 'of',
 'dracula',
 '',
 'by',
 'bram',
 'stoker',
 '',
 'this',
 'ebook',
 'is',
 'for',
 'the',
 'use',
 'of',
 'anyone',
 'anywhere',
 'at',
 'no',
 'cost',
 'and',
 'with',
 'almost',
 'no',
 'restrictions',
 'whatsoever.',
 '',
 'you',
 'may',
 'copy',
 'it',
 '',
 'give',
 'it',
 'away',
 'or',
 're-use',
 'it',
 'under',
 'the',
 'terms',
 'of',
 'the',
 'project',
 'gutenberg',
 'license',
 'included',
 'with',
 'this',
 'ebook',
 'or',
 'online',
 'at',
 'www.gutenberg.org/license',
 '',
 '',
 'title:',
 'dracula',
 '',
 'author:',
 'bram',
 'stoker',
 '',
 'release',
 'date:',
 'august',
 '16',
 '',
 '2013',
 '[ebook',
 '#345]',
 '',
 'language:',
 'english',
 '',
 '',
 '***',
 'start',
 'of',
 'this',
 'project',
 'gutenberg',
 'ebook',
 'dracula',
 '***',
 '',
 '',
 '',
 '',
 'produced',
 'by',
 'chuck',
 'greif',
 'and',
 'the',
 'online',
 'distributed',
 'proofreading',
 'team',
 'at',
 'http://www.pgdp.net',
 '(this',
 'file',

Execute the command with the action `take()` and retrieve the first 10 words from the `flatMap()`transformation

In [9]:
# TO DO 
tokenized.take(10)

['the',
 'project',
 'gutenberg',
 'ebook',
 'of',
 'dracula',
 '',
 'by',
 'bram',
 'stoker']

In [10]:
# count the occurrence of each word and store in wordCounts
wordCounts = tokenized.map(lambda x: (x, 1)).reduceByKey(lambda v1,v2:v1+v2).map(lambda x:(x[1],x[0])).sortByKey(False)
wordCounts.take(10)

[(20435, ''),
 (7984, 'the'),
 (5840, 'and'),
 (4528, 'i'),
 (4518, 'to'),
 (3719, 'of'),
 (2933, 'a'),
 (2523, 'he'),
 (2515, 'in'),
 (2415, 'that')]

Perform a classic `map()` to create a tuple where each word has a count of 1

In [11]:
# TO DO
unique_words = wordCounts.map(lambda x: (x[1],1))
unique_words

PythonRDD[15] at RDD at PythonRDD.scala:53

Write the reducing function

In [12]:
# TO DO 
reduc=unique_words.reduceByKey(lambda x,y:x+y)
reduc.take(20)

[('', 1),
 ('i', 1),
 ('of', 1),
 ('he', 1),
 ('in', 1),
 ('was', 1),
 ('as', 1),
 ('we', 1),
 ('is', 1),
 ('his', 1),
 ('at', 1),
 ('have', 1),
 ('but', 1),
 ('her', 1),
 ('when', 1),
 ('there', 1),
 ('*', 1),
 ('this', 1),
 ('are', 1),
 ('must', 1)]

Get all words that occur more than once and sort them alphabetically:

In [13]:
# TO DO
recurrent = wordCounts.filter(lambda x: x[0] > 1).sortBy(lambda x: x[1])
recurrent.take(10)

[(20435, ''),
 (223, '"'),
 (2, '"\'my'),
 (4, '"_17'),
 (2, '"_2'),
 (2, '"_24'),
 (4, '"_25'),
 (2, '"_6'),
 (2, '"_czarina'),
 (12, '"a')]

Now, get all words that occur more than once and sort them by frequency. Select the first 20. Hints : look at the `takeOrdered` action.

In [14]:
# TO DO 
recurrent.takeOrdered(20, key = lambda x: -x[0])

[(20435, ''),
 (7984, 'the'),
 (5840, 'and'),
 (4528, 'i'),
 (4518, 'to'),
 (3719, 'of'),
 (2933, 'a'),
 (2523, 'he'),
 (2515, 'in'),
 (2415, 'that'),
 (1878, 'it'),
 (1848, 'was'),
 (1573, 'as'),
 (1514, 'for'),
 (1499, 'we'),
 (1490, 'is'),
 (1457, 'his'),
 (1350, 'not'),
 (1314, 'with'),
 (1305, 'you')]

### A simple exercice

+ Create a Python collection of 10,000 integers
+ Create a Spark base RDD from that collection
+ Subtract one from each value using map
+ Perform action collect to view results
+ Perform action count to view counts
+ Apply transformation filter and view results with collect

In [15]:
#TO DO
import random
import numpy as np

#Create random collection of 10 000 integers
collection=np.random.randint(1,10000, size=10000)
collection

array([2275, 6986, 2550, ..., 7286, 1945, 4404])

In [16]:
#Convert collection to a Spark base RDD
base= sc.parallelize(collection)

#Substract one from each value using map
base_sub= base.map(lambda x: x-1)
base_sub.take(10) 

[2274, 6985, 2549, 1442, 3634, 3940, 628, 637, 4834, 5129]

In [17]:
#Perform action collect to view results
results = base_sub.collect()
results

[2274,
 6985,
 2549,
 1442,
 3634,
 3940,
 628,
 637,
 4834,
 5129,
 1747,
 4529,
 3068,
 1802,
 3483,
 1298,
 1880,
 3411,
 535,
 2901,
 6557,
 6141,
 879,
 9521,
 6206,
 5805,
 3517,
 2589,
 1808,
 7648,
 6082,
 7419,
 253,
 2026,
 5217,
 4577,
 4061,
 4110,
 6639,
 2186,
 6113,
 6169,
 1097,
 9287,
 7982,
 2363,
 8255,
 1815,
 5164,
 5934,
 8572,
 8593,
 53,
 957,
 8630,
 6577,
 471,
 6362,
 7876,
 6951,
 6258,
 1514,
 3286,
 2946,
 6350,
 9729,
 1223,
 3140,
 8175,
 4904,
 7725,
 7029,
 1333,
 3930,
 7117,
 8650,
 8953,
 8046,
 9124,
 6073,
 7592,
 2080,
 8236,
 1210,
 1152,
 3668,
 6580,
 2141,
 868,
 7560,
 6836,
 4207,
 9019,
 1034,
 8603,
 5092,
 5495,
 9825,
 1854,
 6354,
 4039,
 6487,
 8479,
 1068,
 3257,
 5001,
 2519,
 3673,
 6222,
 5842,
 3995,
 2310,
 868,
 6602,
 5557,
 46,
 1095,
 9407,
 9122,
 716,
 3277,
 6797,
 3919,
 2282,
 9957,
 7002,
 2883,
 1137,
 4428,
 5384,
 8564,
 9713,
 3638,
 7098,
 4540,
 6518,
 7648,
 3730,
 2183,
 9481,
 5677,
 3012,
 5107,
 8580,
 3555,

In [18]:
#Perform action count to view counts
base_sub.count()

10000

In [19]:
#Apply transformation filter and view results with collect
base_filter= base.filter(lambda x: (x-1)^3<1500)
results2 = base_filter.collect()
results2

[1443,
 629,
 638,
 1299,
 536,
 880,
 254,
 1098,
 54,
 958,
 472,
 1224,
 1334,
 1211,
 1153,
 869,
 1035,
 1069,
 869,
 47,
 1096,
 717,
 1138,
 751,
 1314,
 337,
 898,
 1103,
 468,
 928,
 226,
 1137,
 160,
 1232,
 1308,
 677,
 524,
 1478,
 983,
 836,
 356,
 332,
 1046,
 993,
 1000,
 1260,
 201,
 899,
 408,
 1287,
 703,
 787,
 819,
 568,
 474,
 1029,
 398,
 44,
 777,
 398,
 305,
 573,
 1384,
 149,
 830,
 46,
 895,
 1102,
 873,
 1322,
 444,
 562,
 726,
 848,
 622,
 968,
 1326,
 249,
 1131,
 1402,
 1152,
 203,
 1271,
 12,
 447,
 257,
 266,
 1316,
 1325,
 1141,
 1010,
 772,
 878,
 1387,
 22,
 1410,
 1130,
 685,
 1216,
 584,
 795,
 703,
 470,
 1302,
 58,
 1274,
 852,
 1470,
 845,
 678,
 257,
 712,
 537,
 461,
 598,
 1154,
 235,
 606,
 646,
 533,
 818,
 654,
 70,
 896,
 761,
 609,
 1328,
 106,
 1046,
 276,
 354,
 632,
 639,
 447,
 802,
 574,
 650,
 548,
 172,
 884,
 334,
 444,
 448,
 1303,
 1106,
 75,
 1259,
 425,
 1146,
 1187,
 231,
 190,
 546,
 1273,
 570,
 1303,
 908,
 1272,
 37,
 26,

In [20]:
base_filter.count()

1486

### An improved WordCount

Print the top 10 most frequent words with their probability of appearance


In [21]:
#Compute total number of words in document
total_nb=wordCounts.count()
total_nb

15675

In [22]:
#Use it to compute the probability of each word appearing
probability= wordCounts.map(lambda x: (x[1], x[0]/total_nb))
prob= probability.collect()
prob

[('', 1.3036682615629984),
 ('the', 0.5093460925039872),
 ('and', 0.3725677830940989),
 ('i', 0.2888676236044657),
 ('to', 0.28822966507177034),
 ('of', 0.2372567783094099),
 ('a', 0.18711323763955343),
 ('he', 0.16095693779904308),
 ('in', 0.16044657097288675),
 ('that', 0.15406698564593302),
 ('it', 0.11980861244019139),
 ('was', 0.11789473684210526),
 ('as', 0.10035087719298245),
 ('for', 0.09658692185007975),
 ('we', 0.09562998405103668),
 ('is', 0.09505582137161085),
 ('his', 0.0929505582137161),
 ('not', 0.0861244019138756),
 ('with', 0.08382775119617225),
 ('you', 0.08325358851674641),
 ('my', 0.07738437001594896),
 ('me', 0.07272727272727272),
 ('all', 0.06953748006379586),
 ('at', 0.06915470494417862),
 ('be', 0.06851674641148325),
 ('have', 0.06730462519936205),
 ('had', 0.06596491228070175),
 ('so', 0.06590111642743221),
 ('but', 0.06558213716108453),
 ('on', 0.06430622009569378),
 ('her', 0.060478468899521534),
 ('she', 0.05090909090909091),
 ('when', 0.048293460925039874),

*Comment: Spaces are here taken as words, thereby their probability is higher than 1 (illogical indeed), probably because there are more spaces than words in the textfile.*

Get rid of special characters (.,:!?')


In [23]:
#Getting Rid of special characters within the textfile
import regex as re
no_spec_char=data.map(lambda x: re.sub('[^A-Za-z0-9]+','', x))
no_spec_char.collect()

['TheProjectGutenbergEBookofDraculabyBramStoker',
 '',
 'ThiseBookisfortheuseofanyoneanywhereatnocostandwith',
 'almostnorestrictionswhatsoeverYoumaycopyitgiveitawayor',
 'reuseitunderthetermsoftheProjectGutenbergLicenseincluded',
 'withthiseBookoronlineatwwwgutenbergorglicense',
 '',
 '',
 'TitleDracula',
 '',
 'AuthorBramStoker',
 '',
 'ReleaseDateAugust162013EBook345',
 '',
 'LanguageEnglish',
 '',
 '',
 'STARTOFTHISPROJECTGUTENBERGEBOOKDRACULA',
 '',
 '',
 '',
 '',
 'ProducedbyChuckGreifandtheOnlineDistributed',
 'ProofreadingTeamathttpwwwpgdpnetThisfilewas',
 'producedfromimagesgenerouslymadeavailablebyThe',
 'InternetArchive',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 'DRACULA',
 '',
 '',
 '',
 '',
 '',
 'DRACULA',
 '',
 'by',
 '',
 'BramStoker',
 '',
 'Illustrationcolophon',
 '',
 'NEWYORK',
 '',
 'GROSSETDUNLAP',
 '',
 'Publishers',
 '',
 'Copyright1897intheUnitedStatesofAmericaaccording',
 'toActofCongressbyBramStoker',
 '',
 'Allrightsreserved',
 '',
 'PRINTEDINTHEUNITEDSTATES',
 '

In [24]:
#Getting rid of special characters within our word count
no_spec_char = tokenized.map(lambda x: re.sub('[^A-Za-z0-9]+','', x))

#Store in RDD words
words2 = no_spec_char.collect()
words2

['the',
 'project',
 'gutenberg',
 'ebook',
 'of',
 'dracula',
 '',
 'by',
 'bram',
 'stoker',
 '',
 'this',
 'ebook',
 'is',
 'for',
 'the',
 'use',
 'of',
 'anyone',
 'anywhere',
 'at',
 'no',
 'cost',
 'and',
 'with',
 'almost',
 'no',
 'restrictions',
 'whatsoever',
 '',
 'you',
 'may',
 'copy',
 'it',
 '',
 'give',
 'it',
 'away',
 'or',
 'reuse',
 'it',
 'under',
 'the',
 'terms',
 'of',
 'the',
 'project',
 'gutenberg',
 'license',
 'included',
 'with',
 'this',
 'ebook',
 'or',
 'online',
 'at',
 'wwwgutenbergorglicense',
 '',
 '',
 'title',
 'dracula',
 '',
 'author',
 'bram',
 'stoker',
 '',
 'release',
 'date',
 'august',
 '16',
 '',
 '2013',
 'ebook',
 '345',
 '',
 'language',
 'english',
 '',
 '',
 '',
 'start',
 'of',
 'this',
 'project',
 'gutenberg',
 'ebook',
 'dracula',
 '',
 '',
 '',
 '',
 '',
 'produced',
 'by',
 'chuck',
 'greif',
 'and',
 'the',
 'online',
 'distributed',
 'proofreading',
 'team',
 'at',
 'httpwwwpgdpnet',
 'this',
 'file',
 'was',
 'produced',
 '

**Identify the transformations and the actions in your script**


Recall definition of transformation and action operations in Spark:

Transformation: Transformation refers to the operation applied on a RDD to create new RDD. 
This script contains different types of transformations including : .flatMap(), .map(), .filter(), .reducebykey().

Actions: Actions refer to an operation which also applies on RDD, that instructs Spark to perform computation and send the result back to driver. Examples of actions in this script include function .reduce() and .count()


**How many times are the transformations evaluated? (Hint: it depends)**


Apache Spark carries the principle of 'lazy evaluation'. Lazy evaluation means that the execution will not start until an action is triggered. In Spark, the picture of lazy evaluation comes when Spark transformations occur.

Transformations are lazy in nature meaning when we call some operation in RDD, it does not execute immediately. Spark maintains the record of which operation is being called(Through DAG, advanced Directed Acyclic Graph (DAG) engine supporting cyclic data flow).

The transformations in this script are thus evaluated when the actions .reduce(), .count() are called.

Can you reduce this number? (Hint: check out "persist")

In [25]:
no_spec_char.persist()
no_spec_char.take(10)

['the',
 'project',
 'gutenberg',
 'ebook',
 'of',
 'dracula',
 '',
 'by',
 'bram',
 'stoker']

In [26]:
sc.stop()

## Part 3 : Spark SQL and dataframes

In this part, you will explore Spark DataFrames and the SQL Context. In particular, we will work on a database that contains a sample of the world population by working on data that comes from [pplapi](http://pplapi.com/). The file [agents.json](./SparkData/agents.json) is a file that was extracted from this api using the following command :


`wget https://s3-eu-west-1.amazonaws.com/course.oc-static.com/courses/4297166/agents.json`

In [27]:
findspark.init()
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.read.json("./SparkData/agents.json")

Print the 5 first lines of the dataframe `df`

In [28]:
df.take(5)

[Row(country_name='China', id=227417393, latitude=33.15219798270325, longitude=100.85840672174572, sex='Male'),
 Row(country_name='Haiti', id=6821129477, latitude=19.325567983697297, longitude=-72.43795260265814, sex='Female'),
 Row(country_name='India', id=2078667700, latitude=23.645271492037235, longitude=80.85636526088884, sex='Female'),
 Row(country_name='China', id=477556555, latitude=33.45864668881662, longitude=93.33604038078953, sex='Female'),
 Row(country_name='India', id=1379059984, latitude=28.816938290678692, longitude=80.7728698035823, sex='Female')]

Write the spark instructions that enable to display the number of French agents.

In [29]:
df.filter(df['country_name'] == 'France').count()

94

Write the spark instructions that enable to display the number of Indian female agents.

In [30]:
df.filter((df['country_name'] == 'India') & (df['sex'] == 'Female')).count()

828

Using the notion of temporary view (function `createTempView`), create a temporary view associated to the dataframe `df`. 

In [31]:
df.createTempView("temp_table")

Write some SQL query on the resulting table as shown in the example below.

In [32]:
spark.sql("SELECT country_name,id FROM temp_table ORDER BY id DESC LIMIT 10").show()

+-----------------+----------+
|     country_name|        id|
+-----------------+----------+
| French Polynesia|7170821229|
|       Cabo Verde|7167692449|
|         Suriname|7166451460|
|         Suriname|7166235088|
|            Macau|7166034642|
|       Montenegro|7164357515|
|Equatorial Guinea|7163867872|
|           Bhutan|7163256789|
|           Bhutan|7163004645|
|           Bhutan|7162877973|
+-----------------+----------+



In [33]:
#Write some requests.
spark.sql("SELECT longitude,latitude FROM temp_table ORDER BY country_name DESC LIMIT 10").show()

+------------------+-------------------+
|         longitude|           latitude|
+------------------+-------------------+
|30.130698837849696|-19.907160720192948|
| 30.02374889331959|-19.513641985675026|
|  29.9612635418984|-19.784932932665427|
|30.000406765602886| -20.06288367835551|
|30.347011275599456| -19.81578131773285|
| 30.27392728150642|-19.872292614691045|
| 30.04281298844486| -19.87689875548255|
|29.999999906862982| -20.04439507671722|
|29.746272412449848| -20.16811847260821|
| 30.22303853864903|-20.012281821898142|
+------------------+-------------------+



In [35]:
spark.sql("SELECT country_name, sex, count(*) FROM temp_table GROUP BY 2,1 ORDER BY 3 DESC LIMIT 10").show()

+-------------+------+--------+
| country_name|   sex|count(1)|
+-------------+------+--------+
|        China|  Male|     916|
|        India|  Male|     905|
|        China|Female|     896|
|        India|Female|     828|
|United States|  Male|     236|
|United States|Female|     225|
|    Indonesia|Female|     171|
|    Indonesia|  Male|     165|
|     Pakistan|  Male|     158|
|     Pakistan|Female|     157|
+-------------+------+--------+



In [34]:
spark.sql("SELECT country_name,id FROM temp_table ORDER BY sex DESC LIMIT 10").show()

+------------+----------+
|country_name|        id|
+------------+----------+
|    Thailand|5057542320|
|   Indonesia|3144232974|
|       India|1806935868|
|    Tanzania|5357138381|
|      Brazil|3275490483|
|       India|2278934249|
|       China|1293021300|
|       China|1272365006|
|      Turkey|4701404121|
|       India|2529129488|
+------------+----------+



In [36]:
spark.stop()
sc.stop()

You can also create a dataframe from an existing RDD as shown on the example below

In [37]:
sc=pyspark.SparkContext("local[*]","Dataframe examples in Spark")
sqlContext = pyspark.SQLContext(sc)

In [38]:
documents_rdd = sc.parallelize([
        [1, 'cats are cute', 0],
        [2, 'dogs are playfull', 0],
        [3, 'lions are big', 1],
        [4, 'cars are fast', 1]])
users_rdd = sc.parallelize([
        [0, 'Alice', 20],
        [1, 'Bob', 23],
        [2, 'Charles', 32]])

In [39]:
documents_df = documents_rdd.toDF(['doc_id', 'text', 'user_id'])
users_df = users_rdd.toDF(['user_id', 'name', 'age'])

# printing the inferred schema for documents
documents_df.printSchema()


root
 |-- doc_id: long (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: long (nullable = true)



Some functions can be applied to a column or to different columns. Here, we compute the age average in the dataframe `users_df`. 

In [40]:
from pyspark.sql import functions as fn

user_age_df = users_df.select(fn.avg('age')).show()

+--------+
|avg(age)|
+--------+
|    25.0|
+--------+




Compute the max of age in the dataframe `users_df`. 

In [41]:
max_age = users_df.select(fn.max('age')).show()

+--------+
|max(age)|
+--------+
|      32|
+--------+



Join (as in SQL) the two dataframes `users_df` and `documents_df`

In [42]:
join_df = users_df.join(documents_df, ['user_id'])
join_df.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- doc_id: long (nullable = true)
 |-- text: string (nullable = true)



Outer Join (Left) (as in SQL) the two dataframes `users_df` and `documents_df`

In [43]:
outer_join = users_df.join(documents_df, ['user_id'], 'outer')
outer_join.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- doc_id: long (nullable = true)
 |-- text: string (nullable = true)



## Part 4 : MLlib - Spark for data analysis and machine learning

In this part, we will see how to create a text classifiation application with Spark. We will use some data from the newsgroups [Usenet](https://en.wikipedia.org/wiki/Usenet_newsgroup) and the objective is to predict the topic of the news.
 
The train data is [here](./SparkData/20ng-train-all-terms.txt) and the test data [here](./SparkData/20ng-test-all-terms.txt).
The are obtained using the following commands: 
`wget http://ana.cachopo.org/datasets-for-single-label-text-categorization/20ng-train-all-terms.txt`
and
`wget http://ana.cachopo.org/datasets-for-single-label-text-categorization/20ng-test-all-terms.txt`

Load these data as some RDDs and translate them in DataFrames.

In [44]:
sc.stop()

findspark.init()

sc=pyspark.SparkContext("local[*]","machine learning in Spark")
sqlContext = pyspark.SQLContext(sc)

from pyspark.sql import Row

def load_dataframe(path):
    spark = SparkSession.builder.getOrCreate()
    df = spark.read.csv(path, sep = "\t", inferSchema=True, header = False)
    return(df)

train_data = load_dataframe("./SparkData/train-all-terms.txt")
test_data = load_dataframe("./SparkData/test-all-terms.txt")

train_data.show()
test_data.show()

+-----------+--------------------+
|        _c0|                 _c1|
+-----------+--------------------+
|alt.atheism|alt atheism faq a...|
|alt.atheism|alt atheism faq i...|
|alt.atheism|re gospel dating ...|
|alt.atheism|re university vio...|
|alt.atheism|re soc motss et a...|
|alt.atheism|re a visit from t...|
|alt.atheism|re political athe...|
|alt.atheism|re an anecdote ab...|
|alt.atheism|re political athe...|
|alt.atheism|re pompous ass km...|
|alt.atheism|re pompous ass li...|
|alt.atheism|re keith schneide...|
|alt.atheism|re keith schneide...|
|alt.atheism|re political athe...|
|alt.atheism|re political athe...|
|alt.atheism|re political athe...|
|alt.atheism|re don t more inn...|
|alt.atheism|re ancient islami...|
|alt.atheism|re political athe...|
|alt.atheism|re there must be ...|
+-----------+--------------------+
only showing top 20 rows

+-----------+--------------------+
|        _c0|                 _c1|
+-----------+--------------------+
|alt.atheism|re about the bib

A first step is to represent our data, i.e. the messages in the form of a bag-of-word representation using the spark method `CountVectorizer` documented [here](https://spark.apache.org/docs/2.1.0/ml-features.html#countvectorizer)

In [45]:
#Bag of word representation of test and train data

#First transform the two columns into strings
from pyspark.sql import functions
train_data = train_data.withColumn('_c1',functions.split(train_data._c1,' '))
test_data = test_data.withColumn('_c1',functions.split(test_data._c1,' '))

In [46]:
#Apply CountVectorizer
from pyspark.ml.feature import CountVectorizer
bow= CountVectorizer(inputCol="_c1", outputCol="features", vocabSize=1000, minDF=2.0)
bow_rep = bow.fit(train_data)
train_data = bow_rep.transform(train_data)
test_data = bow_rep.transform(test_data)

Displaying of the distinct labels in the datasets

In [47]:
train_data.select("_c0").distinct().sort("_c1").show(truncate=False)

+------------------------+
|_c0                     |
+------------------------+
|alt.atheism             |
|comp.graphics           |
|sci.med                 |
|misc.forsale            |
|sci.space               |
|talk.politics.guns      |
|sci.electronics         |
|comp.os.ms-windows.misc |
|talk.politics.mideast   |
|rec.motorcycles         |
|talk.politics.misc      |
|comp.sys.mac.hardware   |
|talk.religion.misc      |
|rec.autos               |
|soc.religion.christian  |
|sci.crypt               |
|rec.sport.baseball      |
|rec.sport.hockey        |
|comp.windows.x          |
|comp.sys.ibm.pc.hardware|
+------------------------+



We will now apply a [NaiveBayes](https://spark.apache.org/docs/latest/ml-classification-regression.html#naive-bayes) classifier to our problem. 
Take the time to read the doc and apply it to our problem. You will first have to associate a number to each label. You can use the [`String Indexer`](https://spark.apache.org/docs/2.1.0/ml-features.html#stringindexer) function of pyspark for that.


In [48]:
#Transformation of labels into numbers
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer().setInputCol("_c0").setOutputCol("label")
index = indexer.fit(train_data)
train_index = index.transform(train_data)
test_index = index.transform(test_data)

In [49]:
#Application of the NaivesBayes models
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

nb = NaiveBayes(smoothing=0.01, modelType="multinomial") #low values of additive smoothing seem to generate the best accuracy

In [50]:
# Application of the trained model to the test dataset and displaying the prediction compared to the real value for the 20 first messages
# create the trainer and set its parameters

# train the model
model = nb.fit(train_index)

# select example rows to display.
predictions = model.transform(test_index)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))


+-----------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|        _c0|                 _c1|            features|label|       rawPrediction|         probability|prediction|
+-----------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|alt.atheism|[re, about, the, ...|(1000,[0,1,2,3,4,...| 17.0|[-620.13548702559...|[3.72320229642808...|       1.0|
|alt.atheism|[re, amusing, ath...|(1000,[0,1,2,3,4,...| 17.0|[-2778.9342834798...|[1.62995167172794...|      17.0|
|alt.atheism|[re, yet, more, r...|(1000,[0,1,2,3,4,...| 17.0|[-1530.5082606322...|[5.95681356382199...|      17.0|
|alt.atheism|[re, christian, m...|(1000,[0,1,2,3,4,...| 17.0|[-1045.8196548400...|[4.68310879069159...|      19.0|
|alt.atheism|[re, after, years...|(1000,[0,1,2,3,4,...| 17.0|[-1964.8125621074...|[1.21222211422996...|      17.0|
|alt.atheism|[re, amusing, ath...|(1000,[0,1,2,3,4,...| 17.0|[-1501.1398842555..

The Evaluation of the learned model can be done by using the [evaluation](https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html) module of MLlib. Print the accuracy of the obtained model.