# Spark (with PySpark)


In this tutorial, you will learn how to use Apache Spark, a framework for large-scale data processing, within a notebook.
Upon completing this lab you will be able to :
 + Program in Spark with the Python Language
 + Demonstrate how to read and process data using Spark
 + Compare and contrast RDD and Dataframes.
 + Build a simple machine learning application with Spark.



# Requirements : (Recommended) Just complile it in Google Colab


# Else: Installing Spark, PySpark and configuration to run it on jupyter notebooks

According to your OS, you have to follow the following tutorial :

+ [Windows](https://changhsinlee.com/install-pyspark-windows-jupyter/)
+ [Mac Os X](https://jmedium.com/pyspark-in-mac/)

At the end of the installation, you should be able to run the following code that is a kind of Hello word in PySpark.

In [2]:
!pip install findspark
!pip install pyspark
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.sql('''select 'spark' as hello ''')
df.show()

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
+-----+
|hello|
+-----+
|spark|
+-----+



In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
spark.stop()

## Part 1 :  Spark Context

When writing a spark program, the first thing to do is to define a `SparkContext`.

In Spark, communication occurs between a driver and executors. The driver has Spark jobs that it needs to run and these jobs are split into tasks that are submitted to the executors for completion. The results from these tasks are delivered back to the driver.

Here, we will use the `findspark` package that has to be installed using the following command:

` pip3 install findspark`

Then we can use the `findspark.init()`function to locate the Spark process.




In [5]:
import findspark

findspark.init()

import pyspark

sc=pyspark.SparkContext("local[*]","First program in Spark")

Try printing out sc to see its type.

In [6]:
# TO DO
print(sc)

<SparkContext master=local[*] appName=First program in Spark>


You can use Python's `dir()` function to get a list of all the attributes (including methods) accessible through the `sc` object.

In [7]:
dir(sc)

['PACKAGE_EXTENSIONS',
 '__annotations__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getnewargs__',
 '__getstate__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_accumulatorServer',
 '_active_spark_context',
 '_assert_on_driver',
 '_batchSize',
 '_callsite',
 '_checkpointFile',
 '_conf',
 '_dictToJavaMap',
 '_do_init',
 '_encryption_enabled',
 '_ensure_initialized',
 '_gateway',
 '_getJavaStorageLevel',
 '_initialize_context',
 '_javaAccumulator',
 '_jsc',
 '_jvm',
 '_lock',
 '_next_accum_id',
 '_pickled_broadcast_vars',
 '_python_includes',
 '_repr_html_',
 '_serialize_to_jvm',
 '_temp_dir',
 '_unbatched_serializer',
 'accumulator',
 'addArchive',
 'addFile',
 'addJobTag',


## Part 2 : Understanding Spark RDD’s

### WordCount in Spark
In this part, we will write the wordcount in Spark and apply it on the novel Dracula of Bram Stocker (from the Gutemberg project).

First read the [pg345.txt](./SparkData/pg345.txt) file.

In [8]:
data = sc.textFile('/content/drive/MyDrive/pg345.txt')
type(data)

To see the content, of the file, we need to run the action `collect` on the  RDD `data`

In [9]:
data.collect()

['The Project Gutenberg EBook of Dracula, by Bram Stoker',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or',
 're-use it under the terms of the Project Gutenberg License included',
 'with this eBook or online at www.gutenberg.org/license',
 '',
 '',
 'Title: Dracula',
 '',
 'Author: Bram Stoker',
 '',
 'Release Date: August 16, 2013 [EBook #345]',
 '',
 'Language: English',
 '',
 '',
 '*** START OF THIS PROJECT GUTENBERG EBOOK DRACULA ***',
 '',
 '',
 '',
 '',
 'Produced by Chuck Greif and the Online Distributed',
 'Proofreading Team at http://www.pgdp.net (This file was',
 'produced from images generously made available by The',
 'Internet Archive)',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 '                                DRACULA',
 '',
 '',
 '',
 '',
 '',
 '                                DRACULA',
 '',
 '                                  _by_',
 '',
 '                              Bram Stoke

With the RDD `data`, from the previous cell, execute a `flatMap()` for each line in the input and then convert it to lower case, remove the commas, split the words on a space and store in the RDD `words`

In [10]:
# TO DO  - map step
words = data.flatMap(lambda line: line.lower().replace(',', '').split(' '))
words.collect()

['the',
 'project',
 'gutenberg',
 'ebook',
 'of',
 'dracula',
 'by',
 'bram',
 'stoker',
 '',
 'this',
 'ebook',
 'is',
 'for',
 'the',
 'use',
 'of',
 'anyone',
 'anywhere',
 'at',
 'no',
 'cost',
 'and',
 'with',
 'almost',
 'no',
 'restrictions',
 'whatsoever.',
 '',
 'you',
 'may',
 'copy',
 'it',
 'give',
 'it',
 'away',
 'or',
 're-use',
 'it',
 'under',
 'the',
 'terms',
 'of',
 'the',
 'project',
 'gutenberg',
 'license',
 'included',
 'with',
 'this',
 'ebook',
 'or',
 'online',
 'at',
 'www.gutenberg.org/license',
 '',
 '',
 'title:',
 'dracula',
 '',
 'author:',
 'bram',
 'stoker',
 '',
 'release',
 'date:',
 'august',
 '16',
 '2013',
 '[ebook',
 '#345]',
 '',
 'language:',
 'english',
 '',
 '',
 '***',
 'start',
 'of',
 'this',
 'project',
 'gutenberg',
 'ebook',
 'dracula',
 '***',
 '',
 '',
 '',
 '',
 'produced',
 'by',
 'chuck',
 'greif',
 'and',
 'the',
 'online',
 'distributed',
 'proofreading',
 'team',
 'at',
 'http://www.pgdp.net',
 '(this',
 'file',
 'was',
 'prod

Execute the command with the action `take()` and retrieve the first 10 words from the `flatMap()`transformation

In [11]:
# TO DO
words.take(10)

['the',
 'project',
 'gutenberg',
 'ebook',
 'of',
 'dracula',
 'by',
 'bram',
 'stoker',
 '']

Perform a classic `map()` to create a tuple where each word has a count of 1

In [12]:
# TO DO
word_counts = words.map(lambda word: (word, 1))
word_counts.collect()

[('the', 1),
 ('project', 1),
 ('gutenberg', 1),
 ('ebook', 1),
 ('of', 1),
 ('dracula', 1),
 ('by', 1),
 ('bram', 1),
 ('stoker', 1),
 ('', 1),
 ('this', 1),
 ('ebook', 1),
 ('is', 1),
 ('for', 1),
 ('the', 1),
 ('use', 1),
 ('of', 1),
 ('anyone', 1),
 ('anywhere', 1),
 ('at', 1),
 ('no', 1),
 ('cost', 1),
 ('and', 1),
 ('with', 1),
 ('almost', 1),
 ('no', 1),
 ('restrictions', 1),
 ('whatsoever.', 1),
 ('', 1),
 ('you', 1),
 ('may', 1),
 ('copy', 1),
 ('it', 1),
 ('give', 1),
 ('it', 1),
 ('away', 1),
 ('or', 1),
 ('re-use', 1),
 ('it', 1),
 ('under', 1),
 ('the', 1),
 ('terms', 1),
 ('of', 1),
 ('the', 1),
 ('project', 1),
 ('gutenberg', 1),
 ('license', 1),
 ('included', 1),
 ('with', 1),
 ('this', 1),
 ('ebook', 1),
 ('or', 1),
 ('online', 1),
 ('at', 1),
 ('www.gutenberg.org/license', 1),
 ('', 1),
 ('', 1),
 ('title:', 1),
 ('dracula', 1),
 ('', 1),
 ('author:', 1),
 ('bram', 1),
 ('stoker', 1),
 ('', 1),
 ('release', 1),
 ('date:', 1),
 ('august', 1),
 ('16', 1),
 ('2013', 1),


Write the reducing function

In [13]:
# TO DO
word_counts = word_counts.reduceByKey(lambda a, b: a + b)
word_counts.collect()

[('of', 3719),
 ('dracula', 29),
 ('by', 520),
 ('bram', 5),
 ('stoker', 5),
 ('', 9306),
 ('this', 629),
 ('for', 1514),
 ('use', 48),
 ('anyone', 5),
 ('at', 1084),
 ('no', 424),
 ('cost', 3),
 ('and', 5840),
 ('with', 1314),
 ('almost', 69),
 ('restrictions', 2),
 ('you', 1302),
 ('it', 1876),
 ('give', 79),
 ('re-use', 2),
 ('under', 68),
 ('license', 13),
 ('online', 6),
 ('www.gutenberg.org/license', 2),
 ('title:', 1),
 ('release', 3),
 ('date:', 1),
 ('august', 5),
 ('16', 4),
 ('2013', 1),
 ('#345]', 1),
 ('language:', 1),
 ('***', 6),
 ('start', 25),
 ('distributed', 6),
 ('proofreading', 2),
 ('http://www.pgdp.net', 2),
 ('(this', 2),
 ('file', 5),
 ('from', 635),
 ('images', 2),
 ('generously', 2),
 ('available', 7),
 ('internet', 2),
 ('archive)', 2),
 ('[illustration:', 1),
 ('new', 76),
 ('_publishers_', 2),
 ('united', 13),
 ('states', 13),
 ('america', 3),
 ('according', 1),
 ('to', 4518),
 ('[_all', 1),
 ('rights', 3),
 ('reserved._]', 1),
 ('country', 19),
 ('press',

Get all words that occur more than once and sort them alphabetically:

In [14]:
# TO DO
import re

def get_duplicate_words(text):
    words = re.findall(r'\b\w+\b', text.lower())
    word_counts = {}
    for word in words:
        if word in word_counts:
            word_counts[word] += 1
        else:
            word_counts[word] = 1

    duplicate_words = sorted([word for word, count in word_counts.items() if count > 1])
    return duplicate_words

Now, get all words that occur more than once and sort them by frequency. Select the first 20. Hints : look at the `takeOrdered` action.

In [15]:
# TO DO
frequent_words = word_counts.filter(lambda x: x[1] > 1).takeOrdered(20, key=lambda x: -x[1])
frequent_words

[('', 9306),
 ('the', 7984),
 ('and', 5840),
 ('i', 4527),
 ('to', 4518),
 ('of', 3719),
 ('a', 2933),
 ('he', 2523),
 ('in', 2512),
 ('that', 2415),
 ('it', 1876),
 ('was', 1848),
 ('as', 1573),
 ('for', 1514),
 ('we', 1499),
 ('is', 1489),
 ('his', 1457),
 ('not', 1350),
 ('with', 1314),
 ('you', 1302)]

### A simple exercice

+ Create a Python collection of 10,000 integers
+ Create a Spark base RDD from that collection
+ Subtract one from each value using map
+ Perform action collect to view results
+ Perform action count to view counts
+ Apply transformation filter and view results with collect

In [16]:
# TO DO
import findspark
findspark.init()
import pyspark
from pyspark import SparkContext

# Create a Python collection of 10,000 integers
data = range(10000)

# Create a Spark base RDD from that collection
distData = sc.parallelize(data)


# Subtract one from each value using map
subtractedRDD = distData.map(lambda x: x - 1)

# Perform action collect to view results
results = subtractedRDD.collect()
print("Collected results (first 10):", results[:10])


# Perform action count to view counts
counts = subtractedRDD.count()
print("Number of elements:", counts)

# Apply transformation filter and view results with collect
filteredRDD = subtractedRDD.filter(lambda x: x > 5000)
filteredResults = filteredRDD.collect()
print("Filtered results (first 10):", filteredResults[:10])


Collected results (first 10): [-1, 0, 1, 2, 3, 4, 5, 6, 7, 8]
Number of elements: 10000
Filtered results (first 10): [5001, 5002, 5003, 5004, 5005, 5006, 5007, 5008, 5009, 5010]


### An improved WordCount

Print the top 10 most frequent words with their probability of appearance


In [17]:
# TO DO
from pyspark.sql.functions import desc

# Assuming 'word_counts' RDD from the previous code is available

total_words = word_counts.map(lambda x: x[1]).sum()
top_words = word_counts.takeOrdered(10, key=lambda x: -x[1])

for word, count in top_words:
    probability = count / total_words
    print(f"{word}: {probability}")

: 0.05356587808668624
the: 0.04595636907845507
and: 0.0336153801876475
i: 0.02605767570367812
to: 0.026005871179416336
of: 0.02140678063662004
a: 0.016882518851090773
he: 0.014522534968053877
in: 0.014459218327289473
that: 0.01390088067691245


Get rid of special characters (.,:!?')


In [18]:
# TO DO
import re

def remove_special_characters(text):
  text = re.sub(r'[.,:!?\']', '', text)
  return text

Identify the transformations and the actions in your script


In [20]:
# TO DO
'Transformations:'
'''
1. textFile('/content/drive/MyDrive/pg345.txt'): Reads the text file into an RDD.
2. flatMap(lambda line: line.lower().replace(',', '').split(' ')): Flattens the lines into individual words, converts to lowercase and removes commas.
3. map(lambda word: (word, 1)): Creates key-value pairs of words and their counts (1).
4. reduceByKey(lambda a, b: a + b): Combines the counts for each unique word.
5. filter(lambda x: x[1] > 1): Filters out words with counts less than or equal to 1.
6. parallelize(data): Creates an RDD from a Python collection (for the simple exercise).
7. map(lambda x: x - 1): Subtracts 1 from each element in the RDD.
8. filter(lambda x: x > 5000): Filters elements greater than 5000.
9. map(lambda x: x[1]): Extracts word counts.'''


'Actions:'
'''
1. collect(): Retrieves all elements of the RDD to the driver.
2. take(10): Retrieves the first 10 elements of the RDD.
3. collect(): Retrieves all elements of the RDD to the driver.
4. collect(): Retrieves all elements of the RDD to the driver.
5. takeOrdered(20, key=lambda x: -x[1]): Retrieves the top 20 frequent words.
6. collect(): Retrieves all elements of the RDD to the driver (simple exercise).
7. count(): Counts the number of elements in the RDD.
8. collect(): Retrieves all elements of the RDD to the driver (simple exercise, filtered).
9. sum(): Sums up all word counts.
10. takeOrdered(10, key=lambda x: -x[1]): Retrieves the top 10 most frequent words with their probability of appearance. '''

'\n1. collect(): Retrieves all elements of the RDD to the driver.\n2. take(10): Retrieves the first 10 elements of the RDD.\n3. collect(): Retrieves all elements of the RDD to the driver.\n4. collect(): Retrieves all elements of the RDD to the driver.\n5. takeOrdered(20, key=lambda x: -x[1]): Retrieves the top 20 frequent words.\n6. collect(): Retrieves all elements of the RDD to the driver (simple exercise).\n7. count(): Counts the number of elements in the RDD.\n8. collect(): Retrieves all elements of the RDD to the driver (simple exercise, filtered).\n9. sum(): Sums up all word counts.\n10. takeOrdered(10, key=lambda x: -x[1]): Retrieves the top 10 most frequent words with their probability of appearance. '

How many times are the transformations evaluated? (Hint: it depends)


In [21]:
# TO DO

'''

Transformations in Spark are lazy, meaning they are not executed immediately.
They are only evaluated when an action is called.  Therefore,
the transformations themselves are not evaluated multiple times.
Each transformation creates a new RDD that represents the *plan* for the computation.
When an action is triggered, the entire lineage of transformations from the initial data source to the action is executed as a single job.

So, the transformations are evaluated **once**, as part of a single job, triggered by an action.
The number of times an action is called determines how many times the *entire* transformation pipeline is executed.

'''

'\n\nTransformations in Spark are lazy, meaning they are not executed immediately.\nThey are only evaluated when an action is called.  Therefore,\nthe transformations themselves are not evaluated multiple times.\nEach transformation creates a new RDD that represents the *plan* for the computation.\nWhen an action is triggered, the entire lineage of transformations from the initial data source to the action is executed as a single job.\n\nSo, the transformations are evaluated **once**, as part of a single job, triggered by an action.\nThe number of times an action is called determines how many times the *entire* transformation pipeline is executed.\n\n'

Can you reduce this number? (Hint: check out "persist")

In [23]:
# TO DO

import findspark
import pyspark
from pyspark.sql import SparkSession
from google.colab import drive
import re
from pyspark import SparkContext

!pip install findspark
!pip install pyspark
findspark.init()

spark = SparkSession.builder.getOrCreate()

df = spark.sql('''select 'spark' as hello ''')
df.show()
drive.mount('/content/drive')

# Instead of creating a new SparkContext each time, reuse the existing one.
sc = spark.sparkContext

# Try printing out sc to see its type.
print(sc)

# Read the file and persist it in memory
data = sc.textFile('/content/drive/MyDrive/pg345.txt').persist()

# Now, all the transformations will be executed only once and the results are cached.
words = data.flatMap(lambda line: line.lower().replace(',', '').split(' '))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)


# Get all words that occur more than once and sort them alphabetically:
def get_duplicate_words(text):
    words = re.findall(r'\b\w+\b', text.lower())
    word_counts = {}
    for word in words:
        if word in word_counts:
            word_counts[word] += 1
        else:
            word_counts[word] = 1

    duplicate_words = sorted([word for word, count in word_counts.items() if count > 1])
    return duplicate_words

frequent_words = word_counts.filter(lambda x: x[1] > 1).takeOrdered(20, key=lambda x: -x[1])
print(frequent_words)



# Stop the SparkContext when you are finished.
spark.stop()


+-----+
|hello|
+-----+
|spark|
+-----+

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
<SparkContext master=local[*] appName=pyspark-shell>
[('', 9306), ('the', 7984), ('and', 5840), ('i', 4527), ('to', 4518), ('of', 3719), ('a', 2933), ('he', 2523), ('in', 2512), ('that', 2415), ('it', 1876), ('was', 1848), ('as', 1573), ('for', 1514), ('we', 1499), ('is', 1489), ('his', 1457), ('not', 1350), ('with', 1314), ('you', 1302)]


In [24]:
sc.stop()

## Part 3 : Spark SQL and dataframes

In this part, you will explore Spark DataFrames and the SQL Context. In particular, we will work on a database that contains a sample of the world population by working on data that comes from [pplapi](http://pplapi.com/). The file [agents.json](./SparkData/agents.json) is a file that was extracted from this api using the following command :


`wget https://s3-eu-west-1.amazonaws.com/course.oc-static.com/courses/4297166/agents.json`

In [25]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.read.json("/content/drive/MyDrive/agents.json")

Print the 5 first lines of the dataframe `df`

In [26]:
# TO DO
df.show(5)

+------------+----------+------------------+------------------+------+
|country_name|        id|          latitude|         longitude|   sex|
+------------+----------+------------------+------------------+------+
|       China| 227417393| 33.15219798270325|100.85840672174572|  Male|
|       Haiti|6821129477|19.325567983697297|-72.43795260265814|Female|
|       India|2078667700|23.645271492037235| 80.85636526088884|Female|
|       China| 477556555| 33.45864668881662| 93.33604038078953|Female|
|       India|1379059984|28.816938290678692|  80.7728698035823|Female|
+------------+----------+------------------+------------------+------+
only showing top 5 rows



Write the spark instructions that enable to display the number of French agents.

In [29]:
# TO DO
from pyspark.sql.functions import col

# Filter for French agents and count them
french_agents_count = df.filter(col("country_name") == "France").count()

# Display the count
print(f"Number of French agents: {french_agents_count}")

Number of French agents: 94


Write the spark instructions that enable to display the number of Indian female agents.

In [32]:
# TO DO
# Filter for Indian female agents and count them
indian_female_agents_count = df.filter((col("country_name") == "India") & (col("sex") == "Female")).count()

# Display the count
print(f"Number of Indian female agents: {indian_female_agents_count}")

Number of Indian female agents: 828


Using the notion of temporary view (function `createTempView`), create a temporary view associated to the dataframe `df`.

In [33]:
df.createTempView("temp_table")


Write some SQL query on the resulting table as shown in the example below.

In [35]:
# TO DO - Write some requests.

spark.sql("SELECT country_name, id FROM temp_table ORDER BY id DESC LIMIT 10").show()

+-----------------+----------+
|     country_name|        id|
+-----------------+----------+
| French Polynesia|7170821229|
|       Cabo Verde|7167692449|
|         Suriname|7166451460|
|         Suriname|7166235088|
|            Macau|7166034642|
|       Montenegro|7164357515|
|Equatorial Guinea|7163867872|
|           Bhutan|7163256789|
|           Bhutan|7163004645|
|           Bhutan|7162877973|
+-----------------+----------+



In [36]:
spark.stop()
sc.stop()

You can also create a dataframe from an existing RDD as shown on the example below

In [37]:
sc=pyspark.SparkContext("local[*]","Dataframe examples in Spark")
sqlContext = pyspark.SQLContext(sc)





In [38]:
documents_rdd = sc.parallelize([
        [1, 'cats are cute', 0],
        [2, 'dogs are playfull', 0],
        [3, 'lions are big', 1],
        [4, 'cars are fast', 1]])
users_rdd = sc.parallelize([
        [0, 'Alice', 20],
        [1, 'Bob', 23],
        [2, 'Charles', 32]])

In [39]:
documents_df = documents_rdd.toDF(['doc_id', 'text', 'user_id'])
users_df = users_rdd.toDF(['user_id', 'name', 'age'])

# printing the inferred schema for documents
documents_df.printSchema()


root
 |-- doc_id: long (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: long (nullable = true)



Some functions can be apply to a column or to different columns. Here, we compute the age avarage in the dataframe `users_df`.

In [40]:
from pyspark.sql import functions as fn

user_age_df = users_df.select(fn.avg('age')).show()

+--------+
|avg(age)|
+--------+
|    25.0|
+--------+




Compute the max of age in the dataframe `users_df`.

In [41]:
# TO DO
user_age_df = users_df.select(fn.max('age')).show()

+--------+
|max(age)|
+--------+
|      32|
+--------+



Join (as in SQL) the two dataframes `users_df` and `documents_df`

In [42]:
# TO DO
joined_df = users_df.join(documents_df, on='user_id', how='inner')
joined_df.show()


+-------+-----+---+------+-----------------+
|user_id| name|age|doc_id|             text|
+-------+-----+---+------+-----------------+
|      0|Alice| 20|     1|    cats are cute|
|      0|Alice| 20|     2|dogs are playfull|
|      1|  Bob| 23|     3|    lions are big|
|      1|  Bob| 23|     4|    cars are fast|
+-------+-----+---+------+-----------------+



Outer Join (Left) (as in SQL) the two dataframes `users_df` and `documents_df`

In [43]:
# TO DO
joined_df = users_df.join(documents_df, on='user_id', how='left')
joined_df.show()

+-------+-------+---+------+-----------------+
|user_id|   name|age|doc_id|             text|
+-------+-------+---+------+-----------------+
|      0|  Alice| 20|     2|dogs are playfull|
|      0|  Alice| 20|     1|    cats are cute|
|      1|    Bob| 23|     4|    cars are fast|
|      1|    Bob| 23|     3|    lions are big|
|      2|Charles| 32|  NULL|             NULL|
+-------+-------+---+------+-----------------+

