# CMU Machine Learning with Large Datasets
## Homework 4 - Machine Learning at Scale: Part A

Before starting with this notebook, make sure you have already completed the data conversion step on AWS.

Note that we will not be autograding this notebook because of the open-ended nature of it (although you will have to submit this notebook). To make grading easier and to learn about your thought process, throughout the notebook, we include questions you have to anwswer in your writeup. We have indicated locations in the notebook corresponding to these questions with a ✰ symbol.

### 0. Start a Spark Session and Install Libraries

As a first step, you should 

- start a Spark session on your cluster, and 

- check how many executor instances you have and whether that matches your configuration

In [1]:
# YOUR CODE HERE
spark
sc = spark.sparkContext
# YOUR CODE HERE

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1616608229804_0005,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Throughout this assignment, you will be generating plots. `Matplotlib` and other useful Python libraries do not come pre-installed on the cluster. Therefore, you will have to ssh into your master node (think about why it should be the master) using your keypair created earlier and install `matplotlib`. You might have to do this later again for other libraries you use.

Run the cell below to ensure you installation was successful. If an error occurs, you might want to double check your installation.

In [2]:
import matplotlib.pyplot as plt

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 1. Data Loading and Preparation

Earlier, we have extracted relevant features from and converted format of the full raw Million Song Dataset. We now want to load our converted dataset from the S3 Storage.

Use something like this: 

```
df = spark.read.format("csv")
        .option("header", "false")
        .option("inferSchema", "true")
        .load("s3://<bucket_name>/<path>/<file_name>.csv")
```

Note that although you can load all chunks of the dataset using `*`, we recommend you only load in a subset while developing so that processing takes shorter time when you are just verifying your ideas.

In [17]:
# YOUR CODE HERE
df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("escape", "\"").load("s3://10605hw4/*.csv")
# YOUR CODE HERE

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now if we inspect the `df` we just created by running the below cell:

In [18]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: string (nullable = true)
 |-- _c21: string (nullable = true)
 |-- _c22: string (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: string (nullable = true)
 |-- _c25: string (nullable = true)

We see a few problems:

- Because we did not include headers in the CSV files, Spark does not know the name of the features, and hence the "_c0", "_c1", ... that we see
- Although we set `inferSchema=True` when loading data, all array types were still interpreted as plain strings.

Let's first recover all the names of the features. You could reuse the feature name array you used in your `million_song_reader.py` from the conversion step.

In [19]:
# YOUR CODE HERE
metadata = [
        'artist_familiarity',  # metadata/songs
        'artist_hotttnesss',  # metadata/songs
        'artist_id',  # metadata/songs
        'artist_latitude',  # metadata/songs
        'artist_location',  # metadata/songs
        'artist_longitude',  # metadata/songs
        'artist_name',  # metadata/songs
        'title',  # metadata/songs
        'song_hotttnesss',# metadata/songs
        'artist_terms',  # metadata
        'artist_terms_freq',  # metadata
        'artist_terms_weight',  # metadata
        'danceability',  # analysis/songs
        'duration',  # analysis/songs
        'end_of_fade_in',  # analysis/songs
        'energy',  # analysis/songs
        'key',  # analysis/songs
        'key_confidence',  # analysis/songs
        'loudness',  # analysis/songs
        'mode',  # analysis/songs
        'mode_confidence',  # analysis/songs
        'start_of_fade_out',  # analysis/songs
        'tempo',  # analysis/songs
        'time_signature',  # analysis/songs
        'time_signature_confidence',  # analysis/songs
        'year',  # musicbrainz/songs
    ]
for i,name in enumerate(metadata):
    df = df.withColumnRenamed(f'_c{i}', name)
# YOUR CODE HERE

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now if we run the below cell again, we should see proper feature names being attached to the columns.

In [20]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist_familiarity: string (nullable = true)
 |-- artist_hotttnesss: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- title: string (nullable = true)
 |-- song_hotttnesss: string (nullable = true)
 |-- artist_terms: string (nullable = true)
 |-- artist_terms_freq: string (nullable = true)
 |-- artist_terms_weight: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- end_of_fade_in: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: string (nullable = true)
 |-- key_confidence: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- mode_confidence: string (nullable = true)
 |-- start_of_fade_out: string (nullable = true)
 |-- tempo: string (n

Note that there are still a few features, e.g. `artist_latitude`, not being converted to the correct type. Let's do this manually and convert numeric features to `pyspark.sql.types.DoubleType` (Hint: there should be 19 of them). ✰ List the 19 numeric features in your writeup.

Don't worry about array features for now.

In [21]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql.functions import udf

# YOUR CODE HERE
numeric_features = [
        'artist_familiarity',  # metadata/songs
        'artist_hotttnesss',  # metadata/songs
#         'artist_id',  # metadata/songs
        'artist_latitude',  # metadata/songs
#         'artist_location',  # metadata/songs
        'artist_longitude',  # metadata/songs
#         'artist_name',  # metadata/songs
#         'title',  # metadata/songs
        'song_hotttnesss',# metadata/songs
#         'artist_terms',  # metadata
#         'artist_terms_freq',  # metadata # array
#         'artist_terms_weight',  # metadata # array
        'danceability',  # analysis/songs
        'duration',  # analysis/songs
        'end_of_fade_in',  # analysis/songs
        'energy',  # analysis/songs
        'key',  # analysis/songs
        'key_confidence',  # analysis/songs
        'loudness',  # analysis/songs
        'mode',  # analysis/songs
        'mode_confidence',  # analysis/songs
        'start_of_fade_out',  # analysis/songs
        'tempo',  # analysis/songs
        'time_signature',  # analysis/songs
        'time_signature_confidence',  # analysis/songs
        'year',  # musicbrainz/songs
    ]
udf1 = udf(lambda x:x[1:-1], StringType())
for name in numeric_features:
    df = df.withColumn(name, udf1(name))
    df = df.withColumn(name, df[name].cast(DoubleType()))
# YOUR CODE HERE

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We are all set for now. Let's run the following cell to inspect everything except the arrays looks ok.

In [22]:
df.printSchema()
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist_familiarity: double (nullable = true)
 |-- artist_hotttnesss: double (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- title: string (nullable = true)
 |-- song_hotttnesss: double (nullable = true)
 |-- artist_terms: string (nullable = true)
 |-- artist_terms_freq: string (nullable = true)
 |-- artist_terms_weight: string (nullable = true)
 |-- danceability: double (nullable = true)
 |-- duration: double (nullable = true)
 |-- end_of_fade_in: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: double (nullable = true)
 |-- key_confidence: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: double (nullable = true)
 |-- mode_confidence: double (nullable = true)
 |-- start_of_fade_out: double (nullable = true)
 |-- tempo: double (n

For us to grade your checkpoint, run the following cell and ✰ include the output in your writeup.

Some sanity checks based on our reference solution:
- There should be 19 numeric features
- There should be around 580k data records
- `song_hotttnesss` should be a floating point number between 0 and 1, with mean around 0.36
- `artist_name` and `title` should be human-readable text, rather than undecoded bytes
- `artist_terms` should be a string literal of an array containing human-readable tags, rather than undecoded bytes
- The max of `year` should be 2011 (because MSD was published in 2011)

We will have some wiggle rooms in grading because everyone might have processed the data slightly differently.

In [23]:
double_cols = [t for t in df.dtypes if t[1]=='double']
str_cols = [t for t in df.dtypes if t[1] == 'string']
print('total feature {}, numeric feature {}, string feature {}'.format(len(df.dtypes),len(double_cols),len(str_cols)))
print('total {} records'.format(df.count()))
print('\nsample data record:')
head = df.head()
features = ['song_hotttnesss', 'artist_hotttnesss', 'artist_id', 'artist_latitude', 'artist_name',
           'title', 'danceability', 'duration', 'loudness', 'year', 'artist_terms', 'artist_terms_freq']
for f in features:
    print(f'  {f}: {head[f]}')
print()
df.select('song_hotttnesss', 'artist_hotttnesss', 'year').summary().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

total feature 26, numeric feature 19, string feature 7
total 581965 records

sample data record:
  song_hotttnesss: 0.0
  artist_hotttnesss: 0.40991324306220756
  artist_id: ['ARKS8IL1187FB4D6B8']
  artist_latitude: None
  artist_name: ['Marco Bailey']
  title: ['Spicy']
  danceability: 0.0
  duration: 335.3073
  loudness: -8.508
  year: 0.0
  artist_terms: ['hard trance', 'tech house', 'techno', 'hard house', 'trance', 'progressive trance', 'progressive house', 'electro', 'electronic', 'happy hardcore', 'gabba', 'hardstyle', 'schranz', 'tribal house', 'uk garage', 'new beat', 'belgium', 'deep house', 'minimal', 'house', 'tribal', 'hardcore', 'acid', 'spain', 'germany', 'italy', 'sweden', 'minimal techno', 'goa trance', 'club dance', 'united states', 'latin', 'ambient', 'dj', 'breakbeat', 'nederland', 'drum and bass', 'acid house', 'jumpstyle', 'hardtechno', 'techno artist', 'french', 'deep techno', 'slovenia']
  artist_terms_freq: [0.8987031903688264, 0.9023844447360804, 1.0, 0.808429

## End of Part A