##### Collect information, get file, and import the data

Find your time zone TZ Identifier here: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones  
You will need your auth token from Listenbrainz


In [None]:
# Install proper libraries and pacakges
! pip install liblistenbrainz --upgrade
! pip install musicbrainzngs --upgrade
! pip install pandas --upgrade
! pip install pyarrow --upgrade

# Import the necessary libraries
import liblistenbrainz
import musicbrainzngs as mbz
import pandas as pd
import time
from google.colab import files
import io

In [None]:
# Ask for Auth token
LB_User_Token = input('Please enter your user token: ')
# Ask for time zone
localtimezone = input('Please enter your Local Time Zone: ')

In [None]:
# Upload file to Colab
uploaded = files.upload()

# Upload CSV from Apple Data Export
listens = pd.read_csv(io.BytesIO(uploaded['Apple Music Play Activity.csv']), 
                      # Only select necessary columns
                      usecols=['Event Start Timestamp','Play Duration Milliseconds','Media Duration In Milliseconds','Song Name','Album Name', 'Container Type', 'Radio Type'], 
                      # Set the numbers to Int64 to avoid float's '.0'
                      dtype={'Play Duration Milliseconds': 'Int64', 'Media Duration In Milliseconds': 'Int64'})

print('There are now ' + str(len(listens)) + ' entries from your Apple Data Export.')

##### Process CSV to transform and only collect necessary data


In [None]:
# Exclude Apple Radio shows and stations. Radio songs are still saved
listens = listens[((listens['Container Type'] == 'RADIO') & (listens['Radio Type'] == 'SONG')) | (listens['Container Type'] != 'RADIO')]
print('There are now ' + str(len(listens)) + ' non-talk show entries from your Apple Data Export.')

# Delete Radio and Container Type columns since they are no longer needed
listens = listens.drop(columns=['Radio Type', 'Container Type'])

# Exclude listens less than 30s
listens = listens[listens['Play Duration Milliseconds'] > 30000]
print('There are now ' + str(len(listens)) + ' entries over 20s from your Apple Data Export.')

# Remove entries with missing essential information
listens.dropna(subset='Song Name', inplace=True)
#listens.dropna(subset='Artist', inplace=True) - FOR WHEN ARTIST IS INCLUDED IN DATA EXPORT
print('There are now ' + str(len(listens)) + ' valid entries over 20s from your Apple Data Export.')

# Turn Event Start Timestamp ISO 8601 to DateTime Format
listens['Event Start Timestamp'] = pd.to_datetime(listens['Event Start Timestamp'], format='ISO8601', utc='true')
#print (listens['Event Start Timestamp'].head)

# Adjust to correct timezone
listens['Event Start Timestamp'] = listens['Event Start Timestamp'].dt.tz_convert(localtimezone)
#print (listens['Event Start Timestamp'].head)

# Convert to unix timestamp
listens['Event Start Timestamp'] = (listens['Event Start Timestamp'] - pd.Timestamp("1970-01-01", tz='Etc/UTC')) // pd.Timedelta('1s')
#print (listens['Event Start Timestamp'].head)

# Replace all NaN values with empty strings or 0s
listens['Album Name'] = listens['Album Name'].fillna("")
listens['Media Duration In Milliseconds'] = listens['Media Duration In Milliseconds'].fillna(0)

# Sort by ascending Event Start Timestamp. This will upload the old songs first for better organization in ListenBrainz
listens = listens.sort_values(by=['Event Start Timestamp'], ascending=True)

# Export to CSV
#listens.to_csv('AppleMusicHistoryScrobble.csv', index=False)
print(listens.head())

##### Get missing artist data from MusicBrainz

This is a really clunky workaround to get missing Artist data to upload since Apple Music does not export artist data. It will search and return the first artist using the song and album names. It REALLY does not like EPs, Deluxes, Singles, Covers, or other special items and will leave them blank since there is no result. In my experience, it is around 75-85% accurate for artists that it finds, around 50% overall because it often does not find the artist.

Once Apple provides this data, this section will be deleted because it is no longer necessary.

Is it inefficient? Yes. Is it somewhat inaccurate? Also yes. Do I care? No. This is a bandaid fix until the real artist data comes with the data export.  
In reality this is just some garbage so that I can upload at least some songs to Listenbrainz while Apple doesn't export artists


In [None]:
# Create Artist Column if missing
if 'Artist' not in listens.columns:
    listens['Artist'] = ""

# Set rate limit for querying MusicBrainz
mbz.set_rate_limit(limit_or_interval=0.1, new_requests=1)
# Set user agent
mbz.set_useragent("AM to LB Import Tool", 0.1, contact="https://github.com/s-crypt")

# Iterate through resulting listens
for index, row in listens.iterrows():
    # If there is a song and album name but no artist...
    if row['Song Name'] != "" and row['Album Name'] != "" and row['Artist'] == "":
        
        # Create variables for API query, the song and album for the current row
        songname = str("+recording:\"" + row['Song Name'] + "\"")
        albumname = str("release:\"" + row['Album Name'] + "\"")
        print("\rSong: " + row['Song Name'] + "\r\n\tAlbum: " + row['Album Name'])

        # Get the first result when searching the song and album names
        mbzquery = str(songname + " AND " + albumname)
        #print(mbzquery)
        result = mbz.search_recordings(query=mbzquery, limit= 1)
        #print(result)

        # If there is a result (AKA if the result is not blank)
        if result['recording-count'] != 0:
            print("\r\t\tArtist: " + result['recording-list'][0]['artist-credit'][0]['name'])

            # Set the artist to all rows that contain this Song and Album combo (for songs listened to multiple times)
            listens.loc[(listens['Song Name'] == row['Song Name']) & (listens['Album Name'] == row['Album Name']), 'Artist'] = result['recording-list'][0]['artist-credit'][0]['name']

# Export to CSV     
listens.to_csv('AppleMusicHistoryScrobble.csv', index=False)
#files.download('AppleMusicHistoryScrobble.csv') #<====================== REMOVE THE POUND SIGN AT THE BEGINNING OF THE LINE IF YOU WANT TO DOWNLOAD THE CSV AND SCROBBLE ELSEWHERE
       
# Delete used variables
del index, row, songname, albumname, mbzquery, result

##### Loop to upload the songs to ListenBrainz


In [None]:
# Setup liblistenbrainz client
client = liblistenbrainz.ListenBrainz()
# Set Auth token
client.set_auth_token(LB_User_Token, check_validity=True)

# Count the number of submisions
listenssubmitted = 0
# Create an iterator for rows
rowparsed = -1
# Create submissions list
batchlistenlist = []
# Make sure index is the number of rows
listens = listens.reset_index(drop=True)

## Iterate through listens
for _, row in listens.iterrows():

    # If there is a song and album name but no artist, do not add to submissions
    if row['Song Name'] != "" and row['Album Name'] != "" and row['Artist'] != "":
            
            # If there is a 'listened at' time, add to batch with the timestamp
            if row['Event Start Timestamp'] != 0:
                    listen = liblistenbrainz.Listen(track_name=row['Song Name'], artist_name=row['Artist'], release_name=row['Album Name'], listened_at=row['Event Start Timestamp'],)
            # Otherwise, just add to batch
            else:
                # Add listen-class object to a variable
                listen = liblistenbrainz.Listen(track_name=row['Song Name'], release_name=row['Album Name'], artist_name=row['Artist'])
            
            # Append to the batch listen list
            batchlistenlist.append(listen)

            # Delete listen just in case
            del listen
    
    # Track and iterate the row parsed 
    rowparsed += 1
    # If there are 500 items in the batch
    if len(batchlistenlist) == 500:

        # Submit the batchlistenlist
        response = client.submit_multiple_listens(batchlistenlist)
        assert response['status'] == 'ok'

        # Add to number of listens submitted
        listenssubmitted += len(batchlistenlist)

        # Print the status of the upload
        print("\rSucessfully submitted " + str((listenssubmitted)) + " listens to ListenBrainz")

        # Reset batch list
        batchlistenlist = []

        # Wait 1s inbetween sumbissions in order to avoid spamming
        time.sleep(1)

# Submit last batch
response = client.submit_multiple_listens(batchlistenlist)
assert response['status'] == 'ok'

# Add to number of listens submitted
listenssubmitted += len(batchlistenlist)

# Print the status of the upload
print("\rSucessfully submitted " + str((listenssubmitted)) + " listens to ListenBrainz")

# Clean up and end program
#sys.exit()