# IST 718: Big Data Analytics

- Professor: Daniel Acuna <deacuna@syr.edu>

## General instructions:

- You are welcome to discuss the problems with your classmates but __you are not allowed to copy any part of your answers either from your classmates or from the internet__
- You can put the homework files anywhere you want in your https://jupyterhub.ischool.syr.edu/ workspace but _do not change_ the file names. The TAs and the professor use these names to grade your homework.
- Remove or comment out code that contains `raise NotImplementedError`. This is mainly to make the `assert` statement fail if nothing is submitted.
- The tests shown in some cells (i.e., `assert` and `np.testing.` statements) are used to grade your answers. **However, the professor and TAs will use __additional__ test for your answer. Think about cases where your code should run even if it passess all the tests you see.**
- Before downloading and submitting your work through Blackboard, remember to save and press `Validate` (or go to 
`Kernel`$\rightarrow$`Restart and Run All`). 
- Good luck!

In [None]:
# Load the packages needed for this part
# create spark and sparkcontext objects
from pyspark.sql import SparkSession
import numpy as np

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

from pyspark.ml import feature, regression, Pipeline, pipeline, evaluation, tuning, clustering
from pyspark.sql import types, Row, functions as fn
from pyspark import sql
import pandas as pd
import matplotlib.pyplot as plt

# Part 2: Feature engineering and recommendation

In this project, we are going to study a dataset of Spotify songs for which we have a number of features.

In [None]:
spotify = spark.read.csv('spotify_songs.csv', header=True, inferSchema=True)

In [None]:
spotify.limit(5).toPandas()

## Question 1. (10 pts)
First, we will try to understand how the duration, tempo, and key are related to danceability. Unfortunately, each of these features is in different scales, and the feature key is categorical.

Create a pipeline called `featurize` that performs the following feature engineering steps
- Standardizes and `duration_ms` and `tempo` (you have to combine `feature.VectorAssembler` with `feature.StandardScaler`)
- Create dummy variables for `key` (you have to use `feature.OneHotEncoder`. This encoder uses the *last category* as the baseline. Be careful when interpreting it)

You have to create a last step in this featurizer that combines the two kinds of engineered features into a column called `features` 

In [None]:
# create pipeline to produce principal components of data
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# test the featurizer here
featurize.transform(spotify).select('features').first().features.toArray()

In [None]:
# 10 pts
assert type(featurize) == pipeline.PipelineModel
assert feature.StandardScalerModel in list(map(type, featurize.stages))
assert feature.OneHotEncoderModel in list(map(type, featurize.stages))
assert feature.VectorAssembler in list(map(type, featurize.stages))
assert len(featurize.transform(spotify).select('features').first().features.toArray()) == 13

## Question 2: (15 pts)
We will now compare a model without feature engineering to one with feature engineering.

First, create a vanilla pipeline that takes `duration`, `tempo`, and `key` without any feature engineering and assembles them into a column `features`. Call this pipeline `vanilla_features`

In [None]:
# create pipeline for vanilla featurizer
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# test your pipeline
vanilla_features.transform(spotify).first().features.toArray()

In [None]:
# 5 pts
assert type(vanilla_features) == pipeline.PipelineModel
assert len(vanilla_features.transform(spotify).select('features').first().features.toArray()) == 3

Now, create two regression pipeline estimators (don't fit them) `model_fe` and `model_vanilla` where `model_fe` uses the featurizer from Question 1 to create the features and `model_vanilla` creates the features using the previous pipeline. Remember that you are predicting `danceability`.

In [None]:
# create pipeline for models
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# 5 pts
assert type(model1) == pipeline.Pipeline
assert len(model1.getStages()) == 2
assert type(model2) == pipeline.Pipeline
assert len(model2.getStages()) == 2

With the code below, we will evaluate the performance of each of the models and print them

In [None]:
regression_evaluator = evaluation.RegressionEvaluator(labelCol='danceability', metricName='rmse')
training_df, validation_df = spotify.randomSplit([0.8, 0.2], seed=0)

print("RMSE model 1: ", regression_evaluator.evaluate(model1.fit(training_df).transform(validation_df)))
print("RMSE model 2: ", regression_evaluator.evaluate(model2.fit(training_df).transform(validation_df)))

**(5 pts)** Based on the results above, what can you say about the model with feature engineering. Is there are big difference in performance? If not, why would it be worth doing feature engineering anyway? Answer below

YOUR ANSWER HERE

## Question 3: (25 pts) Clustering

We will now make recommendation of songs based on k-means. Create a pipeline where you fit a 10-cluster KMeans to the following features **after standardization**

In [None]:
feature_list = ['acousticness',
 'danceability',
 'duration_ms',
 'energy',
 'instrumentalness',
 'key',
 'liveness',
 'loudness',
 'mode',
 'speechiness',
 'tempo',
 'time_signature',
 'valence']

Name the pipeline `spotify_clustering` and make sure that the `KMeans` model has a prediction column called `cluster`

In [None]:
# create pipeline spotify_clustering
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# 10 pts
assert type(spotify_clustering) == pipeline.PipelineModel
assert feature.StandardScalerModel in set(map(type, spotify_clustering.stages))
assert spotify_clustering.stages[-1].extractParamMap()[(spotify_clustering.stages[-1].k)] == 10
assert spotify_clustering.stages[-1].extractParamMap()[(spotify_clustering.stages[-1].predictionCol)] == 'cluster'

As you all know, the professor is a big fan of Meat Loaf (the artists, obviously) and his song "I will do anything for love (But I won't do that)" because it is close to the professor's mantra: "I will do anything for data (But I won't overfit)".

In [None]:
meat_loaf = spotify.where(fn.col('artist') == "Meat Loaf")
print(meat_loaf.first().song_title)

In the cell below, extract the cluster number of Meat Loaf's song and store it in `meat_loaf_cluster_id`. Also, create a Spark DataFrame `similar_songs` with the songs from that Meat Loaf's cluster.

In [None]:
# create variable meat_loaf_cluster_id and dataframe similar_songs
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# use the following code to find suggestions
similar_songs.select('song_title', 'artist').limit(10).toPandas()

In [None]:
# 10 pts
assert 0 <= meat_loaf_cluster_id <= 9
assert similar_songs.count() < spotify.count()
assert similar_songs.where('cluster == 0')
assert similar_songs.where('cluster = ' + str(meat_loaf_cluster_id)).where('artist = "Meat Loaf"').count() == 1

One of the problems wih `KMeans` is that clusters are sometimes unbalanced. Analyze the clustering by creating a dataframe `cluster_analysis` where the first column is the cluster (`cluster`) and the second is the number of song for such cluster (`n_songs`).

In [None]:
# create dataframe cluster_analysis
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# plot the results
(cluster_analysis
 .toPandas()
 .sort_values('n_songs', ascending=False)
 .reset_index()
 .n_songs.plot(y='n_songs', kind='bar')
);
plt.xlabel('cluster rank')
plt.ylabel('# songs');

In [None]:
# 5 pts
assert cluster_analysis.count() == 10
assert type(cluster_analysis) == sql.dataframe.DataFrame