# Project imports

In [1]:
import pyspark as ps
from pyspark.sql.types import *
import pyspark.sql.functions as F
import sys


# Create spark connection and context

In [2]:
def start_session(master='local',appname='capstone'):
    """
    creates pyspark session
    
    ARGS:
        master - master name (str)
        appname - app name (str)
    Return:
        spark - pyspark connection
    """
    spark = ps.sql.SparkSession.builder \
                .master(master) \
                .appName(appname) \
                .getOrCreate()
    sc = spark.sparkContext
    return spark
def add_context(spark):
    """
    creates pyspark session
    
    ARGS:
        spark - pyspark connection
    Return:
        sc - pyspark connection context
    """
    sc = spark.sparkContext
    return sc

# Pull in data to pyspark dataframes


In [3]:
#creates spark dataframe for features by artist
def create_artist_features_df(spark):
    """
    Reads data_by_artist.csv in data directory
    
    ARGS:
        spark - pyspark connection
    Return:
        df_features - pyspark dataframe object
    """
    path = sys.path[0].replace('/notebooks','')
    df_artist_features = spark.read.csv(f'{path}/data/data_by_artist.csv',header=True).cache()
    #cast correct data types to output dataframe
    df_features = df_artist_features.withColumn('acousticness',df_artist_features['acousticness'].cast("float").alias('acousticness')) \
                                    .withColumn('danceability',df_artist_features['danceability'].cast("float").alias('danceability')) \
                                    .withColumn('duration_ms',df_artist_features['duration_ms'].cast("float").alias('duration_ms')) \
                                    .withColumn('energy',df_artist_features['energy'].cast("float").alias('energy')) \
                                    .withColumn('instrumentalness',df_artist_features['instrumentalness'].cast("float").alias('instrumentalness')) \
                                    .withColumn('liveness',df_artist_features['liveness'].cast("float").alias('liveness')) \
                                    .withColumn('loudness',df_artist_features['loudness'].cast("float").alias('loudness')) \
                                    .withColumn('speechiness',df_artist_features['speechiness'].cast("float").alias('speechiness')) \
                                    .withColumn('tempo',df_artist_features['tempo'].cast("float").alias('tempo')) \
                                    .withColumn('valence',df_artist_features['valence'].cast("float").alias('valence')) \
                                    .withColumn('popularity',df_artist_features['popularity'].cast("float").alias('popularity')) \
                                    .withColumn('key',df_artist_features['key'].cast("long").alias('acousticness')) \
                                    .withColumn('mode',df_artist_features['mode'].cast("long").alias('mode')) \
                                    .withColumn('count',df_artist_features['count'].cast("long").alias('count')) \
                                    .cache()
    return df_features

#creates spark dataframe for playlist level data
def create_playlists_df(spark):
    """
    Creates spark dataframe for playlist level data from mpd.slice.0-999.json
    
    ARGS:
        spark - pyspark connection
    Return:
        df_playlists_clean - pyspark dataframe object
    """
    path = sys.path[0].replace('/notebooks','')
    million_list = spark.read.json(f'{path}/data/mpd.slice.0-999.json',multiLine=True).cache()
    million_list_lists = million_list.select("info", F.explode("playlists").alias('playlist'))
    df_playlists = million_list_lists.select("playlist.*","*")
    df_playlists_clean = df_playlists.drop('tracks') \
                                        .drop('info') \
                                        .drop('playlist')
    df_playlists_clean.cache()
    return df_playlists_clean

#creates spark dataframe for track level data in playlists
def create_tracks_df(spark):
    """
    Creates spark dataframe for track level data from mpd.slice.0-999.json
    
    ARGS:
        spark - pyspark connection
    Return:
        df_playlists_clean - pyspark dataframe object
    """
    path = sys.path[0].replace('/notebooks','')
    million_list = spark.read.json(f'{path}/data/mpd.slice.0-999.json',multiLine=True).cache()
    million_list_lists = million_list.select("info", F.explode("playlists").alias('playlist'))
    df_playlists = million_list_lists.select("playlist.*","*")
    million_exploded_song = df_playlists.select("description","name","pid","num_followers",  F.explode("tracks").alias('tracks'))
    playlist_tracks = million_exploded_song.select("tracks.*","*").withColumnRenamed("name","playlist_name") \
                                                                .withColumnRenamed("description","playlist_description") \
                                                                .withColumnRenamed("num_followers","playlist_followers") 
    df_tracks = playlist_tracks.drop('tracks')
    df_tracks.cache()
    return df_tracks
    

# Create new joined spark dataframes via sparksql

In [4]:
def create_joined_df(spark):
    """
    Creates temptable objects from track and features dataframes, joins them to show complete datasets
    
    ARGS:
        spark - pyspark connection
    Return:
        df_joined - pyspark dataframe object
    """
    df_tracks = create_tracks_df(spark)
    df_features = create_artist_features_df(spark)
    df_tracks.createOrReplaceTempView("tracks")
    df_features.createOrReplaceTempView("artist_features")
    df_joined = spark.sql("""
                                    SELECT t.artist_name
                                    ,t.pid
                                    ,t.pos
                                    ,t.playlist_name
                                    ,t.playlist_followers
                                    ,a.acousticness AS acousticness
                                    ,a.danceability AS danceability
                                    ,a.energy AS energy
                                    ,a.instrumentalness AS instrumentalness
                                    ,a.liveness AS liveness
                                    ,a.loudness AS loudness
                                    ,a.speechiness AS speechiness
                                    ,a.tempo AS avg_tempo
                                    ,a.valence AS valence
                                    ,a.popularity AS popularity
                                    FROM tracks t
                                    JOIN artist_features a
                                    ON t.artist_name = a.artists
                                    ORDER BY t.pid ASC, t.pos ASC
                                    """)
    return df_joined

def create_joined_playlist_df(spark):
    """
    Creates temptable objects from track and features dataframes, joins them to show averages grouped by playlist
    
    ARGS:
        spark - pyspark connection
    Return:
        df_playlist_joined - pyspark dataframe object
    """
    df_tracks = create_tracks_df(spark)
    df_features = create_artist_features_df(spark)
    df_tracks.createOrReplaceTempView("tracks")
    df_features.createOrReplaceTempView("artist_features")
    df_joined = spark.sql("""
                                    SELECT t.pid
                                    ,MAX(t.playlist_name) as playlist_name
                                    ,MAX(t.playlist_followers) AS num_followers
                                    ,AVG(a.acousticness) AS acousticness
                                    ,AVG(a.danceability) AS danceability
                                    ,AVG(a.energy) AS energy
                                    ,AVG(a.instrumentalness) AS instrumentalness
                                    ,AVG(a.liveness) AS liveness
                                    ,AVG(a.loudness) AS loudness
                                    ,AVG(a.speechiness) AS speechiness
                                    ,AVG(a.tempo) AS avg_tempo
                                    ,AVG(a.valence) AS valence
                                    ,AVG(a.popularity) AS popularity
                                    FROM tracks t
                                    JOIN artist_features a
                                    ON t.artist_name = a.artists
                                    GROUP BY pid
                                    ORDER BY pid ASC
                                    """)
    return df_joined

def create_joined_track_by_artist(spark):
    """
    Creates temptable objects from track and features dataframes, joins them to show complete datasets, groups by artist
    
    ARGS:
        spark - pyspark connection
    Return:
        df_joined - pyspark dataframe object
    """
    df_tracks = create_tracks_df(spark)
    df_features = create_artist_features_df(spark)
    df_tracks.createOrReplaceTempView("tracks")
    df_features.createOrReplaceTempView("artist_features")
    artist_features_join = spark.sql("""
                                SELECT t.artist_name
                                ,AVG(a.acousticness) AS acousticness
                                ,AVG(a.danceability) AS danceability
                                ,AVG(a.energy) AS energy
                                ,AVG(a.instrumentalness) AS instrumentalness
                                ,AVG(a.liveness) AS liveness
                                ,AVG(a.loudness) AS loudness
                                ,AVG(a.speechiness) AS speechiness
                                ,AVG(a.tempo) AS avg_tempo
                                ,AVG(a.valence) AS valence
                                ,AVG(a.popularity) AS popularity
                                ,COUNT(ALL t.track_name) AS total_tracks_in_playlists
                                FROM tracks t
                                JOIN artist_features a
                                ON t.artist_name = a.artists
                                GROUP BY t.artist_name
                                ORDER BY t.artist_name ASC
                                """)
    return artist_features_join

def create_features_by_list(spark):
    """
    Creates temptable objects from track and features dataframes, joins them to show complete datasets, groups by artist
    
    ARGS:
        spark - pyspark connection
    Return:
        df_playlist_features - pyspark dataframe object
    """
    df_joined = create_joined_df(spark)
    df_joined.createOrReplaceTempView("joined")
    df_playlist_features = spark.sql("""
                                    SELECT playlist_name as name
                                    ,MAX(pid) as pid
                                    ,MAX(playlist_followers) as followers
                                    ,AVG(acousticness) AS acousticness
                                    ,AVG(danceability) AS danceability
                                    ,AVG(energy) AS energy
                                    ,AVG(instrumentalness) AS instrumentalness
                                    ,AVG(liveness) AS liveness
                                    ,AVG(loudness) AS loudness
                                    ,AVG(speechiness) AS speechiness
                                    ,AVG(avg_tempo) AS avg_tempo
                                    ,AVG(valence) AS valence
                                    ,AVG(popularity) AS popularity
                                    FROM joined
                                    GROUP BY playlist_name
                                    """)
    return df_playlist_features

def create_features_by_list_popular(spark):
    """
    Creates temptable objects from track and features dataframes, joins them to show complete datasets, groups by artist
    
    ARGS:
        spark - pyspark connection
    Return:
        df_playlist_features - pyspark dataframe object
    """
    df_joined = create_joined_df(spark)
    df_joined.createOrReplaceTempView("joined")
    df_playlist_features_popular = spark.sql("""
                                    SELECT playlist_name as name
                                    ,MAX(pid) as pid
                                    ,MAX(playlist_followers) as followers
                                    ,AVG(acousticness) AS acousticness
                                    ,AVG(danceability) AS danceability
                                    ,AVG(energy) AS energy
                                    ,AVG(instrumentalness) AS instrumentalness
                                    ,AVG(liveness) AS liveness
                                    ,AVG(loudness) AS loudness
                                    ,AVG(speechiness) AS speechiness
                                    ,AVG(avg_tempo) AS avg_tempo
                                    ,AVG(valence) AS valence
                                    ,AVG(popularity) AS popularity
                                    FROM joined
                                    GROUP BY playlist_name
                                    HAVING (MAX(playlist_followers)<40 AND MAX(playlist_followers)>5)
                                    """)
    return df_playlist_features_popular

# If name = main block

In [5]:
if __name__ == '__main__':
    spark = start_session()
    sc = add_context(spark)

In [6]:
if __name__ == '__main__':
    feat_list = create_features_by_list(spark)
    feat_list.printSchema()
    feat_list.show()

root
 |-- name: string (nullable = true)
 |-- pid: long (nullable = true)
 |-- followers: long (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- avg_tempo: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- popularity: double (nullable = true)

+--------------------+---+---------+-------------------+------------------+------------------+--------------------+-------------------+-------------------+--------------------+------------------+-------------------+------------------+
|                name|pid|followers|       acousticness|      danceability|            energy|    instrumentalness|           liveness|           loudness|         speechiness|         avg_tempo|            valence|        popularit

In [7]:
if __name__ == '__main__':
    df = create_artist_features_df(spark)
    df.printSchema()
    df.show()

root
 |-- artists: string (nullable = true)
 |-- acousticness: float (nullable = true)
 |-- danceability: float (nullable = true)
 |-- duration_ms: float (nullable = true)
 |-- energy: float (nullable = true)
 |-- instrumentalness: float (nullable = true)
 |-- liveness: float (nullable = true)
 |-- loudness: float (nullable = true)
 |-- speechiness: float (nullable = true)
 |-- tempo: float (nullable = true)
 |-- valence: float (nullable = true)
 |-- popularity: float (nullable = true)
 |-- key: long (nullable = true)
 |-- mode: long (nullable = true)
 |-- count: long (nullable = true)

+--------------------+------------+------------+-----------+----------+----------------+----------+----------+-----------+----------+----------+----------+---+----+-----+
|             artists|acousticness|danceability|duration_ms|    energy|instrumentalness|  liveness|  loudness|speechiness|     tempo|   valence|popularity|key|mode|count|
+--------------------+------------+------------+-----------+----

In [8]:
if __name__ == '__main__':
    df1 = create_playlists_df(spark)
    df1.printSchema()
    df1.show()

root
 |-- collaborative: string (nullable = true)
 |-- description: string (nullable = true)
 |-- duration_ms: long (nullable = true)
 |-- modified_at: long (nullable = true)
 |-- name: string (nullable = true)
 |-- num_albums: long (nullable = true)
 |-- num_artists: long (nullable = true)
 |-- num_edits: long (nullable = true)
 |-- num_followers: long (nullable = true)
 |-- num_tracks: long (nullable = true)
 |-- pid: long (nullable = true)

+-------------+-----------+-----------+-----------+--------------------+----------+-----------+---------+-------------+----------+---+
|collaborative|description|duration_ms|modified_at|                name|num_albums|num_artists|num_edits|num_followers|num_tracks|pid|
+-------------+-----------+-----------+-----------+--------------------+----------+-----------+---------+-------------+----------+---+
|        false|       null|   11532414| 1493424000|          Throwbacks|        47|         37|        6|            1|        52|  0|
|        fal

In [9]:
if __name__ == '__main__':
    df2 = create_tracks_df(spark)
    df2.printSchema()
    df2.show()

root
 |-- album_name: string (nullable = true)
 |-- album_uri: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_uri: string (nullable = true)
 |-- duration_ms: long (nullable = true)
 |-- pos: long (nullable = true)
 |-- track_name: string (nullable = true)
 |-- track_uri: string (nullable = true)
 |-- playlist_description: string (nullable = true)
 |-- playlist_name: string (nullable = true)
 |-- pid: long (nullable = true)
 |-- playlist_followers: long (nullable = true)

+--------------------+--------------------+------------------+--------------------+-----------+---+--------------------+--------------------+--------------------+-------------+---+------------------+
|          album_name|           album_uri|       artist_name|          artist_uri|duration_ms|pos|          track_name|           track_uri|playlist_description|playlist_name|pid|playlist_followers|
+--------------------+--------------------+------------------+--------------------+--------

In [10]:
if __name__ == '__main__':
    df_joined = create_joined_df(spark)
    df_joined.printSchema()
    df_joined.show()

root
 |-- artist_name: string (nullable = true)
 |-- pid: long (nullable = true)
 |-- pos: long (nullable = true)
 |-- playlist_name: string (nullable = true)
 |-- playlist_followers: long (nullable = true)
 |-- acousticness: float (nullable = true)
 |-- danceability: float (nullable = true)
 |-- energy: float (nullable = true)
 |-- instrumentalness: float (nullable = true)
 |-- liveness: float (nullable = true)
 |-- loudness: float (nullable = true)
 |-- speechiness: float (nullable = true)
 |-- avg_tempo: float (nullable = true)
 |-- valence: float (nullable = true)
 |-- popularity: float (nullable = true)

+------------------+---+---+-------------+------------------+------------+------------+----------+----------------+----------+----------+-----------+----------+----------+----------+
|       artist_name|pid|pos|playlist_name|playlist_followers|acousticness|danceability|    energy|instrumentalness|  liveness|  loudness|speechiness| avg_tempo|   valence|popularity|
+----------------