<a href="https://colab.research.google.com/github/rjrahul24/spark-ud-2002/blob/master/Functional_V_S_Procedural_Programming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Procedural Programming

In this notebook we will try to differenciate Procedural Programming from Functional Programming and understand why it's important for a language like Spark to use Functional Programming.

You'll notice that the first time you run `count_occurences("Messi")`, you get the correct count. However, when you run the same code again `count_occurences("Messi")`, the results are no longer correct.This is because the global variable `player_count` stores the results outside of the count_occurences function. 

# Instructions

Run the code cells in this notebook to see the problem with Procedural Programming Techniques for multithreaded applications.

In [1]:
players_log = [
        "Messi",
        "Ronaldo",
        "Maradona",
        "Messi",
        "Lewandowski",
        "Mbappe",
        "Ronaldo",
        "Messi",
        "Neymar"
]

In [2]:
player_count = 0

In [3]:
def count_occurences(player_name):
    global player_count
    for player in players_log:
        if player == player_name:
            player_count = player_count + 1
    return player_count

In [5]:
# First Run

count_occurences("Messi")

3

In [6]:
# Second Run

count_occurences("Messi")

6

In [7]:
# Third Run

count_occurences("Messi")

9

# How to Solve the Issue

How might you solve this issue? You could get rid of the global variable and instead use player_count as an input to the function:

```python
def count_occurences(player_name, player_count):
    for player in players_log:
        if player == player_name:
            player_count = player_count + 1
    return player_count

```

How would this work with parallel programming? Spark splits up data onto multiple machines. If your songs list were split onto two machines, Machine A would first need to finish counting, and then return its own result to Machine B. And then Machine B could use the output from Machine A and add to the count.

However, that isn't parallel computing. Machine B would have to wait until Machine A finishes. You'll see in the next parts of the lesson how Spark solves this issue with a functional programming paradigm.

In Spark, if your data is split onto two different machines, machine A will run a function to count how many times 'Despacito' appears on machine A. Machine B will simultaneously run a function to count how many times 'Despacito' appears on machine B. After they finish counting individually, they'll combine their results together. You'll see how this works in the next parts of the lesson.

In [1]:
pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/89/db/e18cfd78e408de957821ec5ca56de1250645b05f8523d169803d8df35a64/pyspark-3.1.2.tar.gz (212.4MB)
[K     |████████████████████████████████| 212.4MB 68kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 18.5MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=925ad779f9b865b639746db7bbd6fe3ce28dfd4bca222b4b609c17f38e3b9e2b
  Stored in directory: /root/.cache/pip/wheels/40/1b/2c/30f43be2627857ab80062bef1527c0128f7b4070b6b2d02139
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [2]:
import pyspark

In [3]:
sc = pyspark.SparkContext(appName="maps_and_lazy_evaluation_sample")

players_log = [
        "Messi",
        "Ronaldo",
        "Maradona",
        "Messi",
        "Lewandowski",
        "Mbappe",
        "Ronaldo",
        "Messi",
        "Neymar"
]

# parallelize the log_of_players to use with Spark
distributed_player_log = sc.parallelize(players_log)

In [4]:
def convert_to_lowercase(player):
    return player.lower()

convert_to_lowercase("Ronaldo")

'ronaldo'

In [6]:
# The following code cells demonstrate how to apply this function using a map step. 
# The map step will go through each song in the list and apply the convert_song_to_lowercase() function.

distributed_player_log.map(convert_to_lowercase)

PythonRDD[1] at RDD at PythonRDD.scala:53

You'll notice that this code cell ran quite quickly. This is because of lazy evaluation. Spark does not actually execute the map step unless it needs to.

"RDD" in the output refers to resilient distributed dataset. RDDs are exactly what they say they are: fault-tolerant datasets distributed across a cluster. This is how Spark stores data.

To get Spark to actually run the map step, you need to use an "action". One available action is the collect method. The collect() method takes the results from all of the clusters and "collects" them into a single list on the master node.

In [7]:
distributed_player_log.map(convert_to_lowercase).collect()

['messi',
 'ronaldo',
 'maradona',
 'messi',
 'lewandowski',
 'mbappe',
 'ronaldo',
 'messi',
 'neymar']

In [8]:
distributed_player_log.map(lambda x: x.lower()).collect()

['messi',
 'ronaldo',
 'maradona',
 'messi',
 'lewandowski',
 'mbappe',
 'ronaldo',
 'messi',
 'neymar']