<a href="https://colab.research.google.com/github/radengunawan/BSPPPUCpp/blob/master/Udacity_Maps.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!wget -q http://ftp.mirror.tw/pub/apache/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz

In [5]:
!ls

sample_data  spark-2.4.4-bin-hadoop2.7.tgz


In [0]:
!tar xf spark-2.4.4-bin-hadoop2.7.tgz

In [0]:
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()

**Maps**.

In Spark, maps take data as input, then convert the data with whatever function you put in the map.
They are like directions for the data telling how each input should get to the output.

The first code cell create a SparkContext object. With this object, we can input a dataset and parallelize the data accross a cluster (However since we are now using Spark in local mode on single machine, technically the dataset is **NOT** distributed yet.

Run the code below to instantiate a SparkContext object and then read in the log_of_song list into Spark.

In [0]:
### 
# You might have noticed this code in the screencast.
#
# import findspark
# findspark.init('spark-2.3.2-bin-hadoop2.7')
#
# The findspark Python module makes it easier to install
# Spark in local mode on your computer. This is convenient
# for practicing Spark syntax locally. 
# However, the workspaces already have Spark installed and you do not
# need to use the findspark module
#
###
import pyspark
sc = pyspark.SparkContext(appName = "maps_and_lazy_evaluation_example")

In [0]:
log_of_songs = [
                "Despacito",
                "Nice for what",
                "No tears left to cry",
                "Despacito",
                "Havana",
                "In my feelings",
                "Nice for what",
                "despacito",
                "All the stars"
]

In [0]:
# parallelize the log_of_songs to use with Spark:
distributed_song_log = sc.parallelize(log_of_songs)

This next code cell defines a function that converts a song title to lowercase. Then there is an example converting the word "Havana" to "havana".

In [0]:
def convert_song_to_lwrcase(song):
  return song.lower()

In [34]:
convert_song_to_lwrcase("Havana")

'havana'

The following code cells demonstrate ***how to apply this function using a map step***. 

The map step will go through each song in the list and apply the convert_song_to_lowercase() function.

In [35]:
distributed_song_log.map(convert_song_to_lwrcase)

PythonRDD[6] at RDD at PythonRDD.scala:53

It's noteciable that this code ran quickly.
This is because of lazy evaluation.
Why? Because actually Spark does not execute the map step unless it needs to.

"RDD" in the ourput refers to resilient (strong) distributed dataset.

RDDs are exactly what they are: fault-tolerant datasets distributed accross a cluster. This is how Spark stores data.

To get Spark to actually run the map step, we need to force it to do an action. One available action is collect method. 

IMPORTANT: 
The **collect()** methods takes the results from all of the clusters and collects them into a single list on the master node.

In [23]:
distributed_song_log.collect()

['Despacito',
 'Nice for what',
 'No tears left to cry',
 'Despacito',
 'Havana',
 'In my feelings',
 'Nice for what',
 'despacito',
 'All the stars']

In [24]:
distributed_song_log.map(convert_song_to_lwrcase).collect()

['despacito',
 'nice for what',
 'no tears left to cry',
 'despacito',
 'havana',
 'in my feelings',
 'nice for what',
 'despacito',
 'all the stars']

Note that Spark is not changing the original dataset. 
Spark is merely making a copy.
I can see this by running collect on the original dataset

In [36]:
distributed_song_log.collect()

['Despacito',
 'Nice for what',
 'No tears left to cry',
 'Despacito',
 'Havana',
 'In my feelings',
 'Nice for what',
 'despacito',
 'All the stars']

You don't always have to write a custom function for the map step. You can also use anonymous (lambda) functions as well as built-in Python functions like string.lower().
Anonymous functions are actually a Pythin feature for writing functional style programming.

In [25]:
distributed_song_log.map(lambda song: song.lower()).collect()

['despacito',
 'nice for what',
 'no tears left to cry',
 'despacito',
 'havana',
 'in my feelings',
 'nice for what',
 'despacito',
 'all the stars']

In [26]:
distributed_song_log.map(lambda x: x.lower()).collect()

['despacito',
 'nice for what',
 'no tears left to cry',
 'despacito',
 'havana',
 'in my feelings',
 'nice for what',
 'despacito',
 'all the stars']