In [None]:
import os
import json
import pandas as pd
import psycopg2
from psycopg2.extras import execute_batch
from datetime import timedelta

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Your Credentials

In [None]:
path_to_history_folder = "path_to_unzipped_folder"  # Ex: */Spotify Extended Streaming History

In [None]:
### Your PostgreSQL DB Credentials

# Database connection parameters
db_params = {
    'dbname': 'your_db_name',
    'user': 'your_db_username',
    'password': 'your_super_secret_password',
    'host': 'DB_HOST' # Your EC2 IP address
}

## Combine JSON Files

In [None]:
def combine_json_files_to_dataframe(directory: str) -> pd.DataFrame:
    """
    Reads all JSON files in the provided directory and combines them into a single DataFrame.
    Assumes each JSON file contains a list of dictionaries.

    :param directory: Path to the directory containing JSON files.
    :return: A pandas DataFrame containing combined data from all JSON files.
    """
    all_data = []

    for filename in os.listdir(directory):
        if filename.endswith(".json"):
            file_path = os.path.join(directory, filename)

            with open(file_path, "r", encoding="utf-8") as file:
                try:
                    data = json.load(file)
                    if isinstance(data, list):
                        all_data.extend(data)
                    elif isinstance(data, dict):
                        all_data.append(data)
                except json.JSONDecodeError as e:
                    print(f"Error reading {filename}: {e}")

    return pd.DataFrame(all_data)

df = combine_json_files_to_dataframe(path_to_history_folder) # Ex: */Spotify Extended Streaming History

# Uncomment to save dataframe to .csv file
#df.to_csv("extended_listening_history.csv", index=False)

df.head()

Unnamed: 0,ts,username,platform,ms_played,conn_country,ip_addr_decrypted,user_agent_decrypted,master_metadata_track_name,master_metadata_album_artist_name,master_metadata_album_album_name,...,episode_name,episode_show_name,spotify_episode_uri,reason_start,reason_end,shuffle,skipped,offline,offline_timestamp,incognito_mode
0,2018-11-29T19:34:54Z,1268261379,Windows 10 (10.0.17134; x64; AppX),187361,US,72.33.2.214,unknown,Oblivion (Creation),Jhené Aiko,Trip,...,,,,trackdone,trackdone,True,,False,1543520000000.0,False
1,2018-11-29T19:41:37Z,1268261379,Windows 10 (10.0.17134; x64; AppX),402760,US,72.33.2.214,unknown,Babe I'm Gonna Leave You - Remaster,Led Zeppelin,Led Zeppelin x Led Zeppelin,...,,,,trackdone,trackdone,True,,False,1543520000000.0,False
2,2018-11-29T19:44:59Z,1268261379,Windows 10 (10.0.17134; x64; AppX),201732,US,72.33.2.214,unknown,Photosynthesis,Saba,Bucket List Project,...,,,,trackdone,trackdone,True,,False,1543520000000.0,False
3,2018-11-29T19:49:48Z,1268261379,Windows 10 (10.0.17134; x64; AppX),287868,US,72.33.2.214,unknown,Getaway (feat. JMSN),NIve,Getaway (feat. JMSN),...,,,,trackdone,trackdone,True,,False,1543521000000.0,False
4,2018-11-29T19:53:24Z,1268261379,Windows 10 (10.0.17134; x64; AppX),216000,US,72.33.2.214,unknown,Cain,EXES,Cain,...,,,,trackdone,trackdone,True,,False,1543521000000.0,False


In [11]:
df.columns

Index(['ts', 'username', 'platform', 'ms_played', 'conn_country',
       'ip_addr_decrypted', 'user_agent_decrypted',
       'master_metadata_track_name', 'master_metadata_album_artist_name',
       'master_metadata_album_album_name', 'spotify_track_uri', 'episode_name',
       'episode_show_name', 'spotify_episode_uri', 'reason_start',
       'reason_end', 'shuffle', 'skipped', 'offline', 'offline_timestamp',
       'incognito_mode'],
      dtype='object')

## Spotify Listening History Aggregation

Includes:

* Data Cleansing
    * This includes addressing instances where there are duplicated timestamps. I chose to inlcude these records, but you could disregard if you wish.
* Inserting listening into Postgres DB

In [None]:
print(f'Orignial DataFrame Size: {df.shape}')

# Filtering to only the necessary columns
df = df[['spotify_track_uri','master_metadata_track_name', 'master_metadata_album_artist_name',
       'master_metadata_album_album_name','ts','ms_played']]

# Renaming columns to allign with those in my PostgresDB table
df.columns = ['track_uri', 'track_name', 'artist_name', 'album_name', 'played_at', 'ms_played']

# Filtering out when a song was not played
# May consider adding cutoff for when a song wasn't played a meaningful amount of time or filtering out songs which were skipped.
df = df[(df["ms_played"] > 0)]

# Filters out the rows where there is no song, artist, or album data for some reason
df.dropna(inplace=True)

print(f'Filtered DataFrame Size: {df.shape}')

df.head()

  df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/spotify_2024/spotify_data/datasets/input_folder/extended_listening_history.csv')


Orignial DataFrame Size: (356995, 21)
Filtered DataFrame Size: (346524, 6)


Unnamed: 0,track_uri,track_name,artist_name,album_name,played_at,ms_played
0,spotify:track:1CmUZGtH29Kx36C1Hleqlz,Thrift Shop (feat. Wanz),Macklemore & Ryan Lewis,Thrift Shop (feat. Wanz),2013-02-22T14:20:05Z,30448
2,spotify:track:1CmUZGtH29Kx36C1Hleqlz,Thrift Shop (feat. Wanz),Macklemore & Ryan Lewis,Thrift Shop (feat. Wanz),2013-02-22T14:20:49Z,13252
3,spotify:track:1CmUZGtH29Kx36C1Hleqlz,Thrift Shop (feat. Wanz),Macklemore & Ryan Lewis,Thrift Shop (feat. Wanz),2013-02-22T14:24:50Z,235613
4,spotify:track:5BSndweF91KDqyxANsZcQH,Ho Hey,The Lumineers,The Lumineers,2013-02-22T14:25:27Z,37023
5,spotify:track:5ZkeWw5ptQnryw5HTubsoI,Rappers Delight,Hip Hop Classics,The Definitive Collection,2013-02-22T14:27:19Z,111307


Note: 
Because all timestamps must be unique, I have decided to keep all instances of duplacted timestamps. To accomplish this, I am manually differentiating each record's timestamp.

In [None]:
# Differentiating records with duplicate timestamps
df['played_at'] = pd.to_datetime(df['played_at'])

# Find duplicates in 'played_at'
duplicates = df[df.duplicated('played_at', keep=False)]
print("Duplicated Timestamps:", duplicates.shape[0])

# Apply a varied delta to differentiate duplicates
delta = timedelta(milliseconds=1)

# Counter for adding increments
counter = {}

# Update duplicates with varied delta
for idx in duplicates.index:
    played_at = df.at[idx, 'played_at']
    if played_at not in counter:
        counter[played_at] = 0
    else:
        counter[played_at] += 1
        df.at[idx, 'played_at'] += delta * counter[played_at]

# Check for any remaining duplicates
remaining_duplicates = df[df.duplicated('played_at', keep=False)]
print("Remaining duplicates:", remaining_duplicates.shape[0])

Duplicated Timestamps: 12829
Remaining duplicates: 0


In [None]:
# Creating Connection to Postgres Database
conn = psycopg2.connect(**db_params)
cur = conn.cursor()

# Zipping data for insertion
tuples = list(
        zip(
            df["track_uri"],
            df["track_name"],
            df["artist_name"],
            df["album_name"],
            df["played_at"],
            df["ms_played"],
        )
    )

# Batch size
batch_size = 1000

# Execute batch insert
execute_batch(
    cur,
    """INSERT INTO listening_history (track_uri, track_name, artist_name, album_name, played_at, ms_played)
    VALUES (%s, %s, %s, %s, %s, %s)
    ON CONFLICT (played_at) DO NOTHING""",
    tuples,
    page_size=batch_size
)

conn.commit()
conn.close()