<a href="https://colab.research.google.com/github/tobiasz-talaj/Spotify-Delta/blob/main/SpotifyDelta.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Configuration and imports

In [None]:
# Install dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://mirror.its.dal.ca/apache/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz # Check https://downloads.apache.org/spark/ for current version
!tar xvf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
# Set environment path
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.2-bin-hadoop2.7"

In [None]:
# Use findspark to make pyspark importable as a regular library, import modules
import findspark
findspark.init("spark-2.4.7-bin-hadoop2.7")
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from google.colab import drive

In [None]:
# Connect Colab to your Google Drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
# Define path where to find files
root_path = 'gdrive/My Drive/Vectra Assignment/'

## Analysis

In [None]:
# Set up Spark session
sc = SparkSession.builder.master("local[*]").getOrCreate()
sqlc = SQLContext(sc)

In [None]:
# Load data, create and slice dataframe
data_by_year = sc.read.option("header", True).csv(root_path+'data_by_year.csv', encoding='utf-8')
data_by_year = data_by_year[['year', 'energy', 'danceability']]

In [None]:
# Create function adding 'previous value' and 'delta' columns to original dataframe
def add_prev_value_and_delta(df):
  df = df.withColumn("prev_value_energy", F.lag(df.energy).over(my_window))
  df = df.withColumn("prev_value_danceability", F.lag(df.danceability).over(my_window))
  df = df.withColumn("delta_energy", F.when(F.isnull(df.energy - df.prev_value_energy), 0).otherwise(df.energy - df.prev_value_energy))
  df = df.withColumn("delta_danceability", F.when(F.isnull(df.danceability - df.prev_value_danceability), 0).otherwise(df.danceability - df.prev_value_danceability))
  return df

In [None]:
# Create window with partitioning and show the results
my_window = Window.partitionBy().orderBy("year")
add_prev_value_and_delta(data_by_year).show()

+----+-------------------+-------------------+-------------------+-----------------------+--------------------+--------------------+
|year|             energy|       danceability|  prev_value_energy|prev_value_danceability|        delta_energy|  delta_danceability|
+----+-------------------+-------------------+-------------------+-----------------------+--------------------+--------------------+
|1920|0.41869957020057297|  0.515750143266476|               null|                   null|                 0.0|                 0.0|
|1921| 0.2411363461538462|  0.432170512820513|0.41869957020057297|      0.515750143266476|-0.17756322404672678|-0.08357963044596295|
|1922|0.22617264462809922| 0.5756198347107437| 0.2411363461538462|      0.432170512820513|-0.01496370152574697| 0.14344932189023074|
|1923| 0.2624064864864865| 0.5773405405405401|0.22617264462809922|     0.5756198347107437|0.036233841858387295|0.001720705829796...|
|1924| 0.3443466101694912| 0.5498940677966102| 0.2624064864864865|   

## Testing

In [None]:
testing_dataframe_0 = sc.createDataFrame([(1937, 0.31051245083207246, 0.5426403933434191), (1938, 0.2812483922829584, 0.48051045016077204),], ['year', 'energy', 'danceability'])
testing_dataframe_1 = sc.createDataFrame([(1, 1, 1), (2, 2, 2),], ['year', 'energy', 'danceability']) 

In [None]:
import unittest

class FunctionalityTests(unittest.TestCase):
    
  def test_prev_value_a(self):
    tdf_0 = add_prev_value_and_delta(testing_dataframe_0)
    self.assertIsNone(tdf_0.where(tdf_0.year == 1937).select('prev_value_energy').collect()[0]['prev_value_energy'])
    self.assertEqual(tdf_0.where(tdf_0.year == 1938).select('prev_value_energy').collect()[0]['prev_value_energy'], 0.31051245083207246)
    self.assertIsNone(tdf_0.where(tdf_0.year == 1937).select('prev_value_danceability').collect()[0]['prev_value_danceability'])
    self.assertEqual(tdf_0.where(tdf_0.year == 1938).select('prev_value_danceability').collect()[0]['prev_value_danceability'], 0.5426403933434191)

  def test_prev_value_b(self):
    tdf_1 = add_prev_value_and_delta(testing_dataframe_1)
    self.assertIsNone(tdf_1.where(tdf_1.year == 1).select('prev_value_energy').collect()[0]['prev_value_energy'])
    self.assertEqual(tdf_1.where(tdf_1.year == 2).select('prev_value_energy').collect()[0]['prev_value_energy'], 1)
    self.assertIsNone(tdf_1.where(tdf_1.year == 1).select('prev_value_danceability').collect()[0]['prev_value_danceability'])
    self.assertEqual(tdf_1.where(tdf_1.year == 2).select('prev_value_danceability').collect()[0]['prev_value_danceability'], 1)

  def test_delta_a(self):
    tdf_0 = add_prev_value_and_delta(testing_dataframe_0)
    self.assertEqual(tdf_0.where(tdf_0.year == 1937).select('delta_energy').collect()[0]['delta_energy'], 0)
    self.assertEqual(tdf_0.where(tdf_0.year == 1938).select('delta_energy').collect()[0]['delta_energy'], -0.02926405854911407)
    self.assertEqual(tdf_0.where(tdf_0.year == 1937).select('delta_danceability').collect()[0]['delta_danceability'], 0)
    self.assertEqual(tdf_0.where(tdf_0.year == 1938).select('delta_danceability').collect()[0]['delta_danceability'], -0.06212994318264703)

  def test_delta_b(self):
    tdf_1 = add_prev_value_and_delta(testing_dataframe_1)
    self.assertEqual(tdf_1.where(tdf_1.year == 1).select('delta_energy').collect()[0]['delta_energy'], 0)
    self.assertEqual(tdf_1.where(tdf_1.year == 2).select('delta_energy').collect()[0]['delta_energy'], 1)
    self.assertEqual(tdf_1.where(tdf_1.year == 1).select('delta_danceability').collect()[0]['delta_danceability'], 0)
    self.assertEqual(tdf_1.where(tdf_1.year == 2).select('delta_danceability').collect()[0]['delta_danceability'], 1)

if __name__ == '__main__':
    unittest.main(argv=['arg'], exit=False)