# Spark RDDs Lab

### Introduction

In this lesson, we'll practice working with Spark RDDs and the Spark UI.  Along the way, we'll explore the different attributes about RDDs:
    
* in memory storage
* distributed jobs performed in parallel 
* resiliency through the use of dags
* and lazy operations that only kick off when an action is invoked

Ok, let's get started.

### Setting up

Let's begin by creating our spark context.  Set the number of partitions equal to 2, and pass through the configuration.

In [1]:
import pandas as pd

In [2]:
url = "s3://jigsaw-labs/spotify_tracks.csv"

In [3]:
df = pd.read_csv(url)

So if we look at one of the records, we can see that we have a list of song tracks with the artist name, a track id, and various attributes of the song.

In [4]:
df[:1].dtypes

acousticness        float64
artists              object
danceability        float64
duration_ms           int64
energy              float64
explicit              int64
id                   object
instrumentalness    float64
key                   int64
liveness            float64
loudness            float64
mode                  int64
name                 object
popularity            int64
release_date         object
speechiness         float64
tempo               float64
valence             float64
year                  int64
dtype: object

Ok, now let's feed this list of tracks into a Spark cluster.  We'll first we'll need to create our spark cluster.  Set the application name to `musicTracks` and connect locally, allocating 2 cores to the cluster.

In [5]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("musicTracks").setMaster("local[2]")
sc = SparkContext.getOrCreate(conf=conf)

In [6]:
sc.appName
# 'music_tracks'

'musicTracks'

In [7]:
sc.master

# 'local[2]'

'local[2]'

Now that the cluster is setup, we might as well open up the Spark UI now so that it's available.

In [8]:
sc

Ok, next let's create an RDD from our music tracks below.

In [9]:
tracks = df.to_dict('records')

In [10]:
tracks_rdd = sc.parallelize(tracks)

tracks_rdd
# ParallelCollectionRDD[1] at readRDDFromFile at PythonRDD.scala:274

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

Now if we look at the Spark UI, note that there are no jobs listed, and the event timeline should be blank.  So this goes back to Spark being lazy.  Even though we directed Spark to read in the data, Spark will not take action until we invoke an action.  

> In fact, if we look at the executors page of the dashboard, we'll see that none of our memory was consumed.

<img src="./executors.png" width="60%">

Ok, so now let's call our first action by asking for the number of records in our dataset.

In [45]:
tracks_rdd.count()

174389

In [46]:
tracks_rdd.take(1)

[{'acousticness': 0.991,
  'artists': "['Mamie Smith']",
  'danceability': 0.598,
  'duration_ms': 168333,
  'energy': 0.2239999999999999,
  'explicit': 0,
  'id': '0cS0A1fUEUd1EW3FcF8AEI',
  'instrumentalness': 0.000522,
  'key': 5,
  'liveness': 0.379,
  'loudness': -12.628,
  'mode': 0,
  'name': 'Keep A Song In Your Soul',
  'popularity': 12,
  'release_date': '1920',
  'speechiness': 0.0936,
  'tempo': 149.976,
  'valence': 0.634,
  'year': 1920}]

### Querying the Data

From here, we can perform some queries on the data.  Let's begin by getting a sense of the range of our data.  Sort the tracks by year and then let's look at the first record.

> Take a look at the [Pyspark documentation](https://spark.apache.org/docs/1.1.1/api/python/pyspark.rdd.RDD-class.html#sortBy) to do so.

In [49]:
sortedTracks = tracks_rdd.sortBy(lambda track: track['year'])

In [50]:
sortedTracks.take(1)

[{'acousticness': 0.991,
  'artists': "['Mamie Smith']",
  'danceability': 0.598,
  'duration_ms': 168333,
  'energy': 0.2239999999999999,
  'explicit': 0,
  'id': '0cS0A1fUEUd1EW3FcF8AEI',
  'instrumentalness': 0.000522,
  'key': 5,
  'liveness': 0.379,
  'loudness': -12.628,
  'mode': 0,
  'name': 'Keep A Song In Your Soul',
  'popularity': 12,
  'release_date': '1920',
  'speechiness': 0.0936,
  'tempo': 149.976,
  'valence': 0.634,
  'year': 1920}]

> So it looks like the earliest track is indeed from 1990.

Now let's find the year of the most recent track.

In [51]:
sortedTracksDesc = tracks_rdd.sortBy(lambda track: track['year'], ascending = False)

> So we can see that the most recent track is from 2021.

In [52]:
sortedTracksDesc.take(1)

[{'acousticness': 0.778,
  'artists': "['not applicable', 'Riccardo Muti', 'Wiener Philharmoniker']",
  'danceability': 0.711,
  'duration_ms': 217360,
  'energy': 0.0983,
  'explicit': 0,
  'id': '55VqxXi21UxYikKbrMXv54',
  'instrumentalness': 0.0,
  'key': 1,
  'liveness': 0.62,
  'loudness': -28.235,
  'mode': 1,
  'name': "Neujahrsgruß / New Year's Address / Allocution du Nouvel An",
  'popularity': 29,
  'release_date': '2021-01-08',
  'speechiness': 0.899,
  'tempo': 111.518,
  'valence': 0.359,
  'year': 2021}]

Now if we look at the previous two spark jobs, which involved sorting, we can see that the jobs led to shuffling.

<img src="./shuffle_events.png" width="80%">

> Now this makes sense because if we think about sorting, it involves grouping together our data, and this involves moving our data from across partitions and nodes. 

One way, perhaps to limit the amount of shuffling is to limit the amount of data that needs to be transferred.  For example above, we are moving across, and returning the entire record.  But what we really care is about finding the minimum and maximum years in our dataset, not the movies associated with them.  So let's write a new function that only returns the maximum year in the dataset.  

Hopefully, this will reduce shuffling.

In [11]:
sortedYearsDesc = tracks_rdd.map(lambda track: track['year']).sortBy(lambda year: year, ascending = False)

In [12]:
sortedYearsDesc.take(1)

[2021]

### Aggregate Metrics

Let's wrap up by performing a couple of aggregate metrics.  Let's begin by simply counting the number of records in each year.  To do this we'll need a group by.  

> Remember that this query results in shuffling, so try to reduce the amount of shuffling by limiting the amount of data that needs to be transferred for this query.

> Also, use Pyspark to sort the return values by year, as seen in the answer below. 

In [29]:
tracks_by_year = tracks_rdd.map(lambda track: track['year']). \
groupBy(lambda year: year).sortBy(lambda year_amount: year_amount[0]).map(lambda year_amount: (year_amount[0], len(year_amount[1]))).collect()

In [30]:
tracks_by_year[:5]

# [(1920, 349), (1921, 156), (1922, 121), (1923, 185), (1924, 236)]

[(1920, 349), (1921, 156), (1922, 121), (1923, 185), (1924, 236)]

In [31]:
tracks_by_year[-5:]

# [(2017, 2156), (2018, 2714), (2019, 2329), (2020, 4294), (2021, 1840)]

[(2017, 2156), (2018, 2714), (2019, 2329), (2020, 4294), (2021, 1840)]

Now from here, we could use our RDDs to calculate some aggregate metrics, like the average tempo, or loudness per year.  However, doing so purely in RDDs is pretty tricky.  If you'd like to give it it a shot, take a look at [this link](https://stackoverflow.com/questions/40087483/spark-average-of-values-instead-of-sum-in-reducebykey-using-scala) to see how you might do so.  

Operations like this, will become much easier when move over to using dataframes in Pyspark, which we'll move to in the next section.   

### Resources

[Reduce by Key Stackoverflow](https://stackoverflow.com/questions/40087483/spark-average-of-values-instead-of-sum-in-reducebykey-using-scala)