In [11]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

# One-Off Analysis of Top 10 Songs in 50 Longest User Sessions

## 🎯 Objective

This analysis aims to identify the **top 10 songs played in the top 50 longest user sessions** from the Last.fm Dataset-1K. <br>
Identifying the top 10 songs will offer insights into which tracks are most popular among users who engage in extended listening sessions.



## 📚 Background & Definitions

We analyse user listening data from Last.fm.

_What is Last.fm?_ <br>
Last.fm is a service that allows users to track their music streaming activities across various devices and platforms. <br>
Last.fm uses this data to create user profiles on Last.fm, which showcase individual music statistics, listening history, and support personalized music recommendations. 


In this analysis, we define:

- **Session**: a sequence of songs played by a user where each song is played within 20 minutes of the previous one.
- **Top 50 longest sessions**: the sessions with the longest duration.
- **Top 10 songs**: the songs that are played most frequently within the longest sessions.

## 🥁 Data 

The data was compiled and distributed by Oscar Celma and is available for non-commercial use.
You can find more information [here](http://ocelma.net/MusicRecommendationDataset/lastfm-1K.html).

We use the Last.fm Dataset - 1K, which includes the listening habits of 1,000 users during a period from February 2005 to September 2013. <br> 
This data captures detailed information about each song played by these users, including timestamps of when each track was played.

### Fetch the data

In [3]:
import logging
from tqdm import tqdm
import urllib.request
import os
import tarfile

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

In [4]:
config = {
    "data_directory": "data",
    "output_directory": "results",
    "url": "http://mtg.upf.edu/static/datasets/last.fm/lastfm-dataset-1K.tar.gz",
    "expected_files": [
        "lastfm-dataset-1K/userid-profile.tsv",
        "lastfm-dataset-1K/userid-timestamp-artid-artname-traid-traname.tsv"
    ]
}

In [5]:
data_dir = config["data_directory"]
url = config['url']
expected_filepaths = [os.path.join(data_dir, path) for path in config["expected_files"]]

In [5]:
class DownloadProgressBar(tqdm):
    def update_to(self, b=1, bsize=1, tsize=None):
        if tsize is not None:
            self.total = tsize
        self.update(b * bsize - self.n)

In [6]:
def download_file(url: str, dest_path: str) -> None:
    """Download a file from a URL to a destination path with a progress bar"""
    logger.info(f"Downloading the data from {url}")

    with DownloadProgressBar(
        unit="B", unit_scale=True, miniters=1, desc=url.split("/")[-1]
    ) as t:
        urllib.request.urlretrieve(url, filename=dest_path, reporthook=t.update_to)

    logger.info("Download completed")

In [7]:
def extract_file(tar_path: str, extract_path: str) -> None:
    """Extract a tar.gz file to a specified path with a progress bar"""
    logger.info("Unzipping the tar.gz file")

    with tarfile.open(tar_path, "r:gz") as tar:
        total = len(tar.getmembers())

        with tqdm(total=total, unit="files", desc="Extracting files") as progress:
            for member in tar.getmembers():
                print(f"Extracting: {member.name}")
                tar.extract(member, path=extract_path)
                progress.update(1)

    logger.info("Files extracted")

In [8]:
def is_data_extracted(data_dir: str, expected_files: list) -> None:
    """Check if the data directory contains the specific .tsv files.

    Returns True if both .tsv files are present, False otherwise.
    """
    return all(os.path.isfile(file_path) for file_path in expected_files)


In [9]:
def extract(data_dir: str, url: str, expected_files: list) -> None:
    """Checks if the data directory contains required .tsv files. If not, downloads and extracts the datasets"""

    if not is_data_extracted(data_dir, expected_files):
        logger.info(f"Required files are missing in {data_dir}")
        filename = url.split("/")[-1]
        file_path = os.path.join(data_dir, filename)
        download_file(url, file_path)
        extract_file(file_path, data_dir)
    else:
        logger.info("All required files are present. Skipping download.")

In [10]:
extract(data_dir, url, expected_filepaths)

INFO:root:Required files are missing in data
INFO:root:Downloading the data from http://mtg.upf.edu/static/datasets/last.fm/lastfm-dataset-1K.tar.gz
lastfm-dataset-1K.tar.gz: 673MB [05:27, 2.06MB/s]                              
INFO:root:Download completed
INFO:root:Unzipping the tar.gz file
Extracting files:  75%|███████▌  | 3/4 [00:00<00:00, 29.65files/s]

Extracting: lastfm-dataset-1K
Extracting: lastfm-dataset-1K/userid-profile.tsv
Extracting: lastfm-dataset-1K/README.txt
Extracting: lastfm-dataset-1K/userid-timestamp-artid-artname-traid-traname.tsv


Extracting files: 100%|██████████| 4/4 [04:17<00:00, 64.41s/files]
INFO:root:Files extracted


### Load the data

In [6]:
def print_first_n_lines(file_path, n=5):
    """Print the first n lines of a file."""
    try:
        with open(file_path, 'r') as file:
            for i in range(n):
                print(file.readline().strip())
    except FileNotFoundError:
        print("File not found.")
    except Exception as e:
        print(f"An error occurred: {e}")

#### Users

In [7]:
users_path = expected_filepaths[0]

In [13]:
print_first_n_lines(users_path)

#id	gender	age	country	registered
user_000001	m		Japan	Aug 13, 2006
user_000002	f		Peru	Feb 24, 2006
user_000003	m	22	United States	Oct 30, 2005
user_000004	f			Apr 26, 2006


In [14]:
users = spark.read.csv(users_path, sep="\t", header=True).cache()

users.show(5, truncate=False)

+-----------+------+----+-------------+------------+
|#id        |gender|age |country      |registered  |
+-----------+------+----+-------------+------------+
|user_000001|m     |NULL|Japan        |Aug 13, 2006|
|user_000002|f     |NULL|Peru         |Feb 24, 2006|
|user_000003|m     |22  |United States|Oct 30, 2005|
|user_000004|f     |NULL|NULL         |Apr 26, 2006|
|user_000005|m     |NULL|Bulgaria     |Jun 29, 2006|
+-----------+------+----+-------------+------------+
only showing top 5 rows



In [15]:
users.printSchema()

root
 |-- #id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registered: string (nullable = true)



Notes:
- If we need to use this dataset, we should cast `age` into IntegerType and `registered` into DateType
- For the purpose of this analysis, we don't need any of the information of this dataset, so we remove it from the cache

In [16]:
users.unpersist()

DataFrame[#id: string, gender: string, age: string, country: string, registered: string]

#### Events

In [8]:
events_path = expected_filepaths[1]
print_first_n_lines(events_path)

user_000001	2009-05-04T23:08:57Z	f1b1cf71-bd35-4e99-8624-24a6e15f133a	Deep Dish		Fuck Me Im Famous (Pacha Ibiza)-09-28-2007
user_000001	2009-05-04T13:54:10Z	a7f7df4a-77d8-4f12-8acd-5c60c93f4de8	坂本龍一		Composition 0919 (Live_2009_4_15)
user_000001	2009-05-04T13:52:04Z	a7f7df4a-77d8-4f12-8acd-5c60c93f4de8	坂本龍一		Mc2 (Live_2009_4_15)
user_000001	2009-05-04T13:42:52Z	a7f7df4a-77d8-4f12-8acd-5c60c93f4de8	坂本龍一		Hibari (Live_2009_4_15)
user_000001	2009-05-04T13:42:11Z	a7f7df4a-77d8-4f12-8acd-5c60c93f4de8	坂本龍一		Mc1 (Live_2009_4_15)


<br>
We will refer to this dataset as "events" because each record captures a distinct user interaction with music tracks,
including details like who listened (user_id), when (timestamp), and what was played (artist and track info).

The dataset doesn't have column headers, so we define its schema to properly parse the data.

In [9]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

events_schema = StructType([
    StructField("user_id", StringType()),
    StructField("timestamp", TimestampType()),  # timestamp with timezone
    StructField("musicbrainz_artist_id", StringType()),
    StructField("artist_name", StringType()),
    StructField("musicbrainz_track_id", StringType()),
    StructField("track_name", StringType())
])


In [12]:
events = spark.read.csv(events_path, sep='\t', header=False, schema=events_schema).cache()

events.show(5, truncate=False)

+-----------+-------------------+------------------------------------+-----------+--------------------+------------------------------------------+
|user_id    |timestamp          |musicbrainz_artist_id               |artist_name|musicbrainz_track_id|track_name                                |
+-----------+-------------------+------------------------------------+-----------+--------------------+------------------------------------------+
|user_000001|2009-05-04 23:08:57|f1b1cf71-bd35-4e99-8624-24a6e15f133a|Deep Dish  |NULL                |Fuck Me Im Famous (Pacha Ibiza)-09-28-2007|
|user_000001|2009-05-04 13:54:10|a7f7df4a-77d8-4f12-8acd-5c60c93f4de8|坂本龍一   |NULL                |Composition 0919 (Live_2009_4_15)         |
|user_000001|2009-05-04 13:52:04|a7f7df4a-77d8-4f12-8acd-5c60c93f4de8|坂本龍一   |NULL                |Mc2 (Live_2009_4_15)                      |
|user_000001|2009-05-04 13:42:52|a7f7df4a-77d8-4f12-8acd-5c60c93f4de8|坂本龍一   |NULL                |Hibari (Live_2009_4_15)    

In [13]:
events.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- musicbrainz_artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- musicbrainz_track_id: string (nullable = true)
 |-- track_name: string (nullable = true)



## 🧹 Data Cleaning

It appears that all the necessary information for our analysis is contained within the `events` dataset, making it unnecessary to clean/pre-process the `users` dataset.


### Inspecting the Data

In [14]:
total_rows = events.count()
print(f"There are {total_rows:,} rows in the dataframe")

There are 19,150,868 rows in the dataframe


In [15]:
events.select(F.min("timestamp").alias("Min Timestamp"), F.max("timestamp").alias("Max Timestamp")).show()

+-------------------+-------------------+
|      Min Timestamp|      Max Timestamp|
+-------------------+-------------------+
|2005-02-14 00:00:07|2013-09-29 18:32:04|
+-------------------+-------------------+



### Missing Values

In [16]:
missing_values = events.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in events.columns]).collect()[0]

In [17]:
spark.createDataFrame([missing_values]).show()

INFO:numexpr.utils:NumExpr defaulting to 4 threads.


+-------+---------+---------------------+-----------+--------------------+----------+
|user_id|timestamp|musicbrainz_artist_id|artist_name|musicbrainz_track_id|track_name|
+-------+---------+---------------------+-----------+--------------------+----------+
|      0|        0|               602166|          0|             2168588|         1|
+-------+---------+---------------------+-----------+--------------------+----------+



Notes:
- no missing `user_id`, `timestamp`, or `artist_name`
- there is only 1 entry which is missing `track_name`
- quite a few missing values for `musicbrainz_artist_id` and `musicbrainz_track_id`

In [18]:
events.filter(F.col("track_name").isNull()).show()

+-----------+-------------------+---------------------+--------------------+--------------------+----------+
|    user_id|          timestamp|musicbrainz_artist_id|         artist_name|musicbrainz_track_id|track_name|
+-----------+-------------------+---------------------+--------------------+--------------------+----------+
|user_000585|2009-01-14 03:56:02|                 NULL|Silence'' Foundat...|                NULL|      NULL|
+-----------+-------------------+---------------------+--------------------+--------------------+----------+



In [19]:
missing_artist_id = missing_values['musicbrainz_artist_id']
missing_track_id = missing_values['musicbrainz_track_id']

print(f"Proportion of missing MusicBrainz Artist IDs: {missing_artist_id / total_rows * 100:.2f}%")
print(f"Proportion of missing MusicBrainz Track ID: {missing_track_id / total_rows * 100:.2f}%")

Proportion of missing MusicBrainz Artist IDs: 3.14%
Proportion of missing MusicBrainz Track ID: 11.32%


### Distinct Values

In [20]:
events.select([F.approx_count_distinct(F.col(c)).alias(c) for c in events.columns]).show()

+-------+---------+---------------------+-----------+--------------------+----------+
|user_id|timestamp|musicbrainz_artist_id|artist_name|musicbrainz_track_id|track_name|
+-------+---------+---------------------+-----------+--------------------+----------+
|    925| 18266792|                99615|     175171|              946997|   1069655|
+-------+---------+---------------------+-----------+--------------------+----------+



### Distinct `musicbrainz_track_id` for `track_name`, `artist_name`

In [21]:
events.groupBy("track_name", "artist_name").agg(F.countDistinct("musicbrainz_track_id").alias("distinct_track_id")).orderBy(F.col("distinct_track_id").desc()).show(truncate=False)

+--------------------------------------------------------------------------------------+-----------------------+-----------------+
|track_name                                                                            |artist_name            |distinct_track_id|
+--------------------------------------------------------------------------------------+-----------------------+-----------------+
|I Still Haven'T Found What I'M Looking For                                            |U2                     |6                |
|You Think I Ain'T Worth A Dollar, But I Feel Like A Millionaire                       |Queens Of The Stone Age|5                |
|Another Brick In The Wall, Part 2                                                     |Pink Floyd             |5                |
|Everybody'S Got Something To Hide Except Me And My Monkey                             |The Beatles            |5                |
|San Francisco (Be Sure To Wear Flowers In Your Hair)                              

In [22]:
# print example of a song with multiple artists and multiple recording
(events.select("musicbrainz_artist_id", "artist_name", "musicbrainz_track_id", "track_name").filter(
    (events["track_name"] == "Any Colour You Like")
).distinct().orderBy("artist_name").show(truncate=False))


+------------------------------------+--------------------+------------------------------------+-------------------+
|musicbrainz_artist_id               |artist_name         |musicbrainz_track_id                |track_name         |
+------------------------------------+--------------------+------------------------------------+-------------------+
|28503ab7-8bf2-4666-a7bd-2644bfc7cb1d|Dream Theater       |2202990e-35e6-43ee-ae31-9aac5635e1a5|Any Colour You Like|
|0cee95e3-a61d-4cd1-a39c-75464a05b606|Easy Star All-Stars |a4aadf18-f092-4baf-8520-cc1d2696726f|Any Colour You Like|
|83d91898-7763-47d7-b03b-b92132375c47|Pink Floyd          |7c278a16-ae04-460c-88ea-39155cadcd09|Any Colour You Like|
|83d91898-7763-47d7-b03b-b92132375c47|Pink Floyd          |20f8d183-6124-4cde-af19-04478f4fdb89|Any Colour You Like|
|83d91898-7763-47d7-b03b-b92132375c47|Pink Floyd          |132a3134-f168-4ee8-9e5b-0f1d52ecb5ff|Any Colour You Like|
|83d91898-7763-47d7-b03b-b92132375c47|Pink Floyd          |0fa82

## 📊 Analysis

### Session Definition

Define user sessions based on the 20-minute rule (i.e. 1200 seconds)

In [23]:
from pyspark.sql.window import Window
threshold = 1200 

In [24]:
# partition the data by user_id and order by timestamp (ascending)
window = Window.partitionBy("user_id").orderBy("timestamp")

In [25]:
# get the timestamp of the previously played song for each user
events = events.withColumn("previous_timestamp", F.lag(F.col("timestamp")).over(window))

In [26]:
# compute the time difference in seconds between the start of consecutive songs 
events = events.withColumn("time_diff_seconds", F.unix_timestamp("timestamp") - F.unix_timestamp("previous_timestamp"))

In [27]:
# compute if a new session started, i.e. if it's the first song played or more than x seconds have elapsed between 2 songs, where x = threshold
events = events.withColumn("new_session",
                   ((F.col("time_diff_seconds").isNull()) | (F.col("time_diff_seconds") > threshold)
                   ).cast("integer"))

In [28]:
# create session identifiers
events = events.withColumn("session_id", F.sum(F.col("new_session")).over(window))

In [29]:
# show top 20 rows for debugging purposes - could be removed to avoid unnecessary computation as show() triggers an action and computes the results immediately
events.show()

+-----------+-------------------+---------------------+--------------------+--------------------+--------------------+-------------------+-----------------+-----------+----------+
|    user_id|          timestamp|musicbrainz_artist_id|         artist_name|musicbrainz_track_id|          track_name| previous_timestamp|time_diff_seconds|new_session|session_id|
+-----------+-------------------+---------------------+--------------------+--------------------+--------------------+-------------------+-----------------+-----------+----------+
|user_000034|2005-09-15 21:01:31| 2feb192c-2363-46d...|    The Afghan Whigs|                NULL|How Soon Is Now (...|               NULL|             NULL|          1|         1|
|user_000034|2005-09-15 21:05:03| fe3503fb-146f-4d6...|    Afrika Bambaataa|8e025002-e43c-41d...|Zulu Nation Throw...|2005-09-15 21:01:31|              212|          0|         1|
|user_000034|2005-09-15 21:18:12| fe3503fb-146f-4d6...|    Afrika Bambaataa|d081e710-a4cf-43d...|Jaz

### Session Ranking

In [30]:
# for each user session, identify the min and the max timestamp
df_sessions = events.groupBy("user_id", "session_id").agg(
    F.min("timestamp").alias("min_timestamp"),
    F.max("timestamp").alias("max_timestamp")
)

In [31]:
# compute the session duration
df_sessions = (df_sessions.withColumn("session_duration", 
                                      F.unix_timestamp("max_timestamp") - F.unix_timestamp("min_timestamp"))
              )

In [45]:
# identify the top 50 longest sessions
top_50_sessions = df_sessions.orderBy(F.desc("session_duration")).limit(50).cache()

top_50_sessions.orderBy(F.desc("session_duration")).show()

+-----------+----------+-------------------+-------------------+----------------+
|    user_id|session_id|      min_timestamp|      max_timestamp|session_duration|
+-----------+----------+-------------------+-------------------+----------------+
|user_000949|       151|2006-02-12 17:49:31|2006-02-27 11:29:37|         1273206|
|user_000997|        18|2007-04-26 00:36:02|2007-05-10 17:55:03|         1271941|
|user_000949|       559|2007-05-01 02:41:15|2007-05-14 00:05:52|         1113877|
|user_000544|        75|2007-02-12 13:03:52|2007-02-23 00:51:08|          906436|
|user_000949|       139|2005-12-09 08:26:38|2005-12-18 04:40:04|          764006|
|user_000949|       125|2005-11-11 03:30:37|2005-11-18 22:50:07|          674370|
|user_000949|       189|2006-03-18 23:04:14|2006-03-26 18:13:45|          673771|
|user_000544|        55|2007-01-06 01:07:04|2007-01-13 13:57:45|          651041|
|user_000250|      1285|2008-02-21 15:31:45|2008-02-28 21:18:03|          625578|
|user_000949|   

## 🏆 Results and Discussion

### Song Ranking

In [33]:
# filter the events dataframe to keep only the songs played in the top 50 sessions
songs_in_top_50_sessions = (events.select("user_id", "session_id", "musicbrainz_track_id", "artist_name", "track_name")
                            .join(top_50_sessions.select("user_id", "session_id"), 
                                  ["user_id", "session_id"], "inner")
                           ).cache()

#### Method 1: use Track Name and Artist Name to uniquely identify a song 

In [34]:
song_counts = (songs_in_top_50_sessions.groupBy("track_name", "artist_name")
                            .agg(F.count("*").alias("count"))
                           )

In [35]:
# identify the most played 10 songs 
top_10_songs = song_counts.orderBy(F.desc("count")).limit(10).cache()

In [36]:
# show the top 10 most frequent track and artist combinations - could be removed to avoid unnecessary computation
top_10_songs.show()

+--------------------+--------------------+-----+
|          track_name|         artist_name|count|
+--------------------+--------------------+-----+
|              Jolene|                Cake| 1215|
|          Heartbeats|           The Knife|  864|
|How Long Will It ...|Jeff Buckley & Ga...|  809|
|Anthems For A Sev...| Broken Social Scene|  659|
|     St. Ides Heaven|       Elliott Smith|  646|
|         Bonus Track|         The Killers|  634|
|Starin' Through M...|                2Pac|  616|
|     Beast Of Burden|  The Rolling Stones|  613|
|           The Swing|           Everclear|  604|
| When You Were Young|         The Killers|  520|
+--------------------+--------------------+-----+



#### Method 2: add the MusicBrainz Track ID to uniquely identify a song

To enhance granularity, we use the `musicbrainz_track_id` to precisely identify a song, distinguishing, for example, between two different recordings of the same song by the same artist.


In [37]:
# musicbrainz_track_id can be null, so we first exclude the null values
non_null_track_id = songs_in_top_50_sessions.filter(songs_in_top_50_sessions.musicbrainz_track_id.isNotNull())

In [38]:
song_counts_with_track_id = (non_null_track_id.groupBy("track_name", "artist_name", "musicbrainz_track_id")
                            .agg(F.count("*").alias("count"))
                           )

In [39]:
top_10_songs_with_track_id = song_counts_with_track_id.orderBy(F.desc("count")).limit(10).cache()

In [40]:
# show the top 10 most frequent track and artist combinations, also considering the track_id - could be removed to avoid unnecessary computation
top_10_songs_with_track_id.show()

+--------------------+--------------------+--------------------+-----+
|          track_name|         artist_name|musicbrainz_track_id|count|
+--------------------+--------------------+--------------------+-----+
|              Jolene|                Cake|60f0bfa4-8da9-484...| 1215|
|          Heartbeats|           The Knife|db4c9220-df76-4b4...|  864|
|How Long Will It ...|Jeff Buckley & Ga...|c2b14074-15d6-40c...|  809|
|Anthems For A Sev...| Broken Social Scene|91951530-d978-464...|  659|
|     St. Ides Heaven|       Elliott Smith|45c773c3-c8f0-41a...|  646|
|Starin' Through M...|                2Pac|b649b4ba-4912-4ad...|  616|
|     Beast Of Burden|  The Rolling Stones|55956d3e-c5d4-433...|  613|
|           The Swing|           Everclear|9ad11ca6-c5b9-4a2...|  604|
| When You Were Young|         The Killers|4ad08552-6c35-49e...|  520|
|            Mojo Pin|        Jeff Buckley|19dcc05e-6f91-42e...|  489|
+--------------------+--------------------+--------------------+-----+



In [41]:
(events.select("musicbrainz_artist_id", "artist_name", "musicbrainz_track_id", "track_name").filter(
    (events["track_name"] == "Bonus Track") &
    (events["artist_name"] == "The Killers")
).distinct().show(truncate=False))


+------------------------------------+-----------+--------------------+-----------+
|musicbrainz_artist_id               |artist_name|musicbrainz_track_id|track_name |
+------------------------------------+-----------+--------------------+-----------+
|95e1ead9-4d31-4808-a7ac-32c3614c116b|The Killers|NULL                |Bonus Track|
+------------------------------------+-----------+--------------------+-----------+



Recall that Method 1 identifies songs using only `track_name` and `artist_name`, while Method 2 enhances identification by incorporating the `musicbrainz_track_id`.

Despite this methodological difference, the rankings produced by both methods are largely similar, except for one notable deviation: Method 1 ranks "Bonus Track" by The Killers as the sixth most popular song. <br>
However, "Bonus Track" likely denotes an unspecified piece of music included as an extra on an album, rather than a distinct song by The Killers. <br>
This ambiguity does not provide meaningful insights into song popularity, given the lack of information about the actual song.

Furthermore, the inclusion of the `musicbrainz_track_id` in Method 2 allows for more precise identification of songs, enhancing the accuracy of our rankings. <br>
This is particularly beneficial if we aim to stream the exact version of a song.

Given these advantages, we decide to proceed with Method 2.

### Export Results

In [42]:
import pandas as pd

results_dir = config["output_directory"]
file_path = os.path.join(results_dir, 'top_10_songs_in_50_longest_user_sessions.tsv')

df = top_10_songs_with_track_id.select("track_name", "artist_name", "musicbrainz_track_id").toPandas()
df.to_csv(file_path, sep='\t', index=False, header=True)