Skip to content

Commit

Permalink
feat: ✨ Sync favorites tracks to Tidal
Browse files Browse the repository at this point in the history
  • Loading branch information
c0ball authored and timrae committed Jun 10, 2024
1 parent fab1548 commit 54526a0
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 60 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ venv/
.mypy_cache/
.dmypy.json
dmypy.json

# Visual Studio Code
.vscode/
10 changes: 7 additions & 3 deletions src/spotify_to_tidal/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ def main():
parser = argparse.ArgumentParser()
parser.add_argument('--config', default='config.yml', help='location of the config file')
parser.add_argument('--uri', help='synchronize a specific URI instead of the one in the config')
parser.add_argument('--sync-favorites', action='store_true', help='enable synchronization of favorites')
args = parser.parse_args()

with open(args.config, 'r') as f:
Expand All @@ -24,13 +25,16 @@ def main():
spotify_playlist = spotify_session.playlist(args.uri)
tidal_playlists = _sync.get_tidal_playlists_dict(tidal_session)
tidal_playlist = _sync.pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists)
_sync.sync_list(spotify_session, tidal_session, [tidal_playlist], config)
_sync.sync_playlists_wrapper(spotify_session, tidal_session, [tidal_playlist], config)
elif config.get('sync_playlists', None):
# if the config contains a sync_playlists list of mappings then use that
_sync.sync_list(spotify_session, tidal_session, _sync.get_playlists_from_config(spotify_session, tidal_session, config), config)
_sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_playlists_from_config(spotify_session, tidal_session, config), config)
else:
# otherwise just use the user playlists in the Spotify account
_sync.sync_list(spotify_session, tidal_session, _sync.get_user_playlist_mappings(spotify_session, tidal_session, config), config)
_sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_user_playlist_mappings(spotify_session, tidal_session, config), config)

if args.sync_favorites:
_sync.sync_favorites_wrapper(spotify_session, tidal_session, config)

if __name__ == '__main__':
main()
Expand Down
2 changes: 1 addition & 1 deletion src/spotify_to_tidal/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

def open_spotify_session(config) -> spotipy.Spotify:
credentials_manager = spotipy.SpotifyOAuth(username=config['username'],
scope='playlist-read-private',
scope='playlist-read-private, user-library-read',
client_id=config['client_id'],
client_secret=config['client_secret'],
redirect_uri=config['redirect_uri'],
Expand Down
164 changes: 115 additions & 49 deletions src/spotify_to_tidal/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import datetime
from difflib import SequenceMatcher
from functools import partial
from typing import List, Sequence, Set, Mapping
from typing import Callable, List, Sequence, Set, Mapping
import math
import requests
import sys
Expand Down Expand Up @@ -165,24 +165,42 @@ async def repeat_on_request_error(function, *args, remaining=5, **kwargs):
time.sleep(sleep_schedule.get(remaining, 1))
return await repeat_on_request_error(function, *args, remaining=remaining-1, **kwargs)

async def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spotify_playlist):
def _get_tracks_from_spotify_playlist(offset: int, spotify_session: spotipy.Spotify, playlist_id: str):
fields="next,total,limit,items(track(name,album(name,artists),artists,track_number,duration_ms,id,external_ids(isrc)))"
return spotify_session.playlist_tracks(playlist_id, fields, offset=offset)

async def _fetch_all_from_spotify_in_chunks(spotify_session: spotipy.Spotify, fetch_function: Callable) -> List[dict]:
output = []
print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'")
results = _get_tracks_from_spotify_playlist( 0, spotify_session, spotify_playlist["id"] )
output.extend([r['track'] for r in results['items'] if r['track'] is not None])
results = fetch_function(0, spotify_session)
output.extend([item['track'] for item in results['items'] if item['track'] is not None])

# get all the remaining tracks in parallel
# Get all the remaining tracks in parallel
if results['next']:
offsets = [ results['limit'] * n for n in range(1, math.ceil(results['total']/results['limit'])) ]
extra_results = await atqdm.gather( *[asyncio.to_thread(_get_tracks_from_spotify_playlist, offset, spotify_session=spotify_session, playlist_id=spotify_playlist["id"]) for offset in offsets ] )
offsets = [results['limit'] * n for n in range(1, math.ceil(results['total'] / results['limit']))]
extra_results = await asyncio.gather(
*[asyncio.to_thread(fetch_function, offset, spotify_session) for offset in offsets]
)
for extra_result in extra_results:
output.extend([r['track'] for r in extra_result['items'] if r['track'] is not None])
output.extend([item['track'] for item in extra_result['items'] if item['track'] is not None])

return output


async def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spotify_playlist):
def _get_tracks_from_spotify_playlist(offset: int, spotify_session: spotipy.Spotify, playlist_id: str):
fields = "next,total,limit,items(track(name,album(name,artists),artists,track_number,duration_ms,id,external_ids(isrc)))"
return spotify_session.playlist_tracks(playlist_id=playlist_id, fields=fields, offset=offset)

print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'")
return await _fetch_all_from_spotify_in_chunks(spotify_session=spotify_session, fetch_function=lambda offset, session: _get_tracks_from_spotify_playlist(offset=offset, spotify_session=session, playlist_id=spotify_playlist["id"]))


async def get_tracks_from_spotify_favorites(spotify_session: spotipy.Spotify) -> List[dict]:
def _get_favorite_tracks(offset: int, spotify_session: spotipy.Spotify):
return spotify_session.current_user_saved_tracks(offset=offset)

print("Loading favorite tracks from Spotify")
tracks = await _fetch_all_from_spotify_in_chunks(spotify_session=spotify_session, fetch_function=_get_favorite_tracks)
tracks.reverse()
return tracks

def populate_track_match_cache(spotify_tracks_: Sequence[t_spotify.SpotifyTrack], tidal_tracks_: Sequence[tidalapi.Track]):
""" Populate the track match cache with all the existing tracks in Tidal playlist corresponding to Spotify playlist """
def _populate_one_track_from_spotify(spotify_track: t_spotify.SpotifyTrack):
Expand Down Expand Up @@ -238,7 +256,37 @@ def get_tracks_for_new_tidal_playlist(spotify_tracks: Sequence[t_spotify.Spotify
seen_tracks.add(tidal_id)
return output

async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config):
async def sync_tracks(spotify_tracks: Sequence[t_spotify.SpotifyTrack], old_tidal_tracks: Sequence[tidalapi.Track], config: dict, tidal_session: tidalapi.Session, tidal_playlist: tidalapi.UserPlaylist, sync_favorites: bool = False):
new_spotify_tracks = get_new_spotify_tracks(spotify_tracks=spotify_tracks, old_tidal_tracks=old_tidal_tracks)
if not new_spotify_tracks:
print("No new tracks to search in Spotify tracks")
return

new_tidal_tracks = await search_new_tracks_on_tidal(new_spotify_tracks=new_spotify_tracks, tidal_session=tidal_session, config=config)

update_tidal_playlist(
old_tidal_tracks=old_tidal_tracks,
new_tidal_tracks=new_tidal_tracks,
tidal_playlist=tidal_playlist,
tidal_session=tidal_session,
sync_favorites=sync_favorites
)

def get_new_spotify_tracks(spotify_tracks: Sequence[t_spotify.SpotifyTrack], old_tidal_tracks: Sequence[tidalapi.Track]) -> List[t_spotify.SpotifyTrack]:
''' Extracts only the new tracks in the Spotify playlist that are not already on Tidal or known match failures '''
populate_track_match_cache(spotify_tracks, old_tidal_tracks)
results = []
for spotify_track in spotify_tracks:
if not spotify_track['id']: continue
if not track_match_cache.get(spotify_track['id']) and not failure_cache.has_match_failure(spotify_track['id']):
results.append(spotify_track)
return results

async def search_new_tracks_on_tidal(new_spotify_tracks: List[t_spotify.SpotifyTrack], tidal_session: tidalapi.Session, config: dict) -> List[tidalapi.Track | None]:
''' Searches for the new Spotify tracks on Tidal '''
task_description = f"Searching Tidal for {len(new_spotify_tracks)} new Spotify tracks"
semaphore = asyncio.Semaphore(value=config.get('max_concurrency', 10))

async def _run_rate_limiter(semaphore):
''' Leaky bucket algorithm for rate limiting. Periodically releases items from semaphore at rate_limit'''
_sleep_time = config.get('max_concurrency', 10)/config.get('rate_limit', 10)/4 # aim to sleep approx time to drain 1/4 of 'bucket'
Expand All @@ -251,50 +299,68 @@ async def _run_rate_limiter(semaphore):
t0 = t
[semaphore.release() for i in range(new_items)] # leak new_items from the 'bucket'

# Create a new Tidal playlist if required
if not tidal_playlist:
print(f"No playlist found on Tidal corresponding to Spotify playlist: '{spotify_playlist['name']}', creating new playlist")
tidal_playlist = tidal_session.user.create_playlist(spotify_playlist['name'], spotify_playlist['description'])

# Extract the new tracks from the playlist that we haven't already seen before
spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session, spotify_playlist)
old_tidal_tracks = tidal_playlist.tracks()
tracks_to_search = get_new_tracks_from_spotify_playlist(spotify_tracks, old_tidal_tracks)
if not tracks_to_search:
print("No new tracks to search in Spotify playlist '{}'".format(spotify_playlist['name']))
return

# Search for each of the tracks on Tidal concurrently
task_description = "Searching Tidal for {}/{} tracks in Spotify playlist '{}'".format(len(tracks_to_search), len(spotify_tracks), spotify_playlist['name'])
semaphore = asyncio.Semaphore(config.get('max_concurrency', 10))
rate_limiter_task = asyncio.create_task(_run_rate_limiter(semaphore))
search_results = await atqdm.gather( *[ repeat_on_request_error(tidal_search, t, semaphore, tidal_session) for t in tracks_to_search ], desc=task_description )
search_results = await atqdm.gather(
*[repeat_on_request_error(tidal_search, t, semaphore, tidal_session) for t in new_spotify_tracks],
desc=task_description
)
rate_limiter_task.cancel()

# Add the search results to the cache
for idx, spotify_track in enumerate(tracks_to_search):
for idx, spotify_track in enumerate(new_spotify_tracks):
if search_results[idx]:
track_match_cache.insert( (spotify_track['id'], search_results[idx].id) )
track_match_cache.insert((spotify_track['id'], search_results[idx].id))
else:
color = ('\033[91m', '\033[0m')
print(color[0] + "Could not find track {}: {} - {}".format(spotify_track['id'], ",".join([a['name'] for a in spotify_track['artists']]), spotify_track['name']) + color[1])
print(color[0] + f"Could not find track {spotify_track['id']}: {','.join([a['name'] for a in spotify_track['artists']])} - {spotify_track['name']}" + color[1])

return search_results

# Update the Tidal playlist if there are changes
def update_tidal_playlist(old_tidal_tracks: Sequence[tidalapi.Track], new_tidal_tracks: List[tidalapi.Track | None], tidal_playlist: tidalapi.UserPlaylist, tidal_session: tidalapi.Session, sync_favorites: bool = False):
''' Updates the Tidal playlist or favorites with the new tracks '''
old_tidal_track_ids = [t.id for t in old_tidal_tracks]
new_tidal_track_ids = get_tracks_for_new_tidal_playlist(spotify_tracks)
new_tidal_track_ids = [t.id for t in new_tidal_tracks if t]

if new_tidal_track_ids == old_tidal_track_ids:
print("No changes to write to Tidal playlist")
print("No changes to write to Tidal")
elif new_tidal_track_ids[:len(old_tidal_track_ids)] == old_tidal_track_ids:
# Append new tracks to the existing playlist if possible
add_multiple_tracks_to_playlist(tidal_playlist, new_tidal_track_ids[len(old_tidal_track_ids):])
# Append new tracks to the existing playlist or favorites if possible
add_multiple_tracks_to_playlist(
playlist=tidal_playlist,
session=tidal_session,
track_ids=new_tidal_track_ids[len(old_tidal_track_ids):],
sync_favorites=sync_favorites
)
else:
# Erase old playlist and add new tracks from scratch if any reordering occured
set_tidal_playlist(tidal_playlist, new_tidal_track_ids)
# Erase old playlist or favorites and add new tracks from scratch if any reordering occured
set_tidal_playlist(
playlist=tidal_playlist,
session=tidal_session,
track_ids=new_tidal_track_ids,
sync_favorites=sync_favorites
)

def sync_playlists_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists, config):
for spotify_playlist, tidal_playlist in playlists:
asyncio.run(main=sync_playlist(spotify_session=spotify_session, tidal_session=tidal_session, spotify_playlist=spotify_playlist, tidal_playlist=tidal_playlist, config=config) )

def sync_list(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists, config):
for spotify_playlist, tidal_playlist in playlists:
# sync the spotify playlist to tidal
asyncio.run(sync_playlist(spotify_session, tidal_session, spotify_playlist, tidal_playlist, config) )
async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config):
# Create a new Tidal playlist if required
if not tidal_playlist:
print(f"No playlist found on Tidal corresponding to Spotify playlist: '{spotify_playlist['name']}', creating new playlist")
tidal_playlist = tidal_session.user.create_playlist(spotify_playlist['name'], spotify_playlist['description'])

spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session=spotify_session, spotify_playlist=spotify_playlist)
old_tidal_tracks = tidal_playlist.tracks()
await sync_tracks(spotify_tracks=spotify_tracks, old_tidal_tracks=old_tidal_tracks, config=config, tidal_session=tidal_session, tidal_playlist=tidal_playlist)

def sync_favorites_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
asyncio.run(main=sync_favorites(spotify_session=spotify_session, tidal_session=tidal_session, config=config))

async def sync_favorites(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
spotify_tracks = await get_tracks_from_spotify_favorites(spotify_session=spotify_session)
old_tidal_tracks = tidal_session.user.favorites.tracks(order='DATE')
await sync_tracks(spotify_tracks=spotify_tracks, old_tidal_tracks=old_tidal_tracks, config=config, tidal_session=tidal_session, tidal_playlist=tidal_session.user.favorites, sync_favorites=True)

def pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists: Mapping[str, tidalapi.Playlist]):
if spotify_playlist['name'] in tidal_playlists:
Expand All @@ -319,7 +385,7 @@ async def get_playlists_from_spotify(spotify_session: spotipy.Spotify, config):
results = spotify_session.user_playlists(config['spotify']['username'])
exclude_list = set([x.split(':')[-1] for x in config.get('excluded_playlists', [])])
playlists.extend([p for p in results['items'] if p['owner']['id'] == config['spotify']['username'] and not p['id'] in exclude_list])

# get all the remaining playlists in parallel
if results['next']:
offsets = [ results['limit'] * n for n in range(1, math.ceil(results['total']/results['limit'])) ]
Expand All @@ -333,14 +399,14 @@ def get_playlists_from_config(spotify_session: spotipy.Spotify, tidal_session: t
def get_playlist_ids(config):
return [(item['spotify_id'], item['tidal_id']) for item in config['sync_playlists']]
output = []
for spotify_id, tidal_id in get_playlist_ids(config):
for spotify_id, tidal_id in get_playlist_ids(config=config):
try:
spotify_playlist = spotify_session.playlist(spotify_id)
spotify_playlist = spotify_session.playlist(playlist_id=spotify_id)
except spotipy.SpotifyException as e:
print(f"Error getting Spotify playlist {spotify_id}")
raise e
try:
tidal_playlist = tidal_session.playlist(tidal_id)
tidal_playlist = tidal_session.playlist(playlist_id=tidal_id)
except Exception as e:
print(f"Error getting Tidal playlist {tidal_id}")
raise e
Expand Down
28 changes: 21 additions & 7 deletions src/spotify_to_tidal/tidalapi_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,30 @@ def clear_tidal_playlist(playlist: tidalapi.UserPlaylist, chunk_size: int=20):
indices = range(min(playlist.num_tracks, chunk_size))
_remove_indices_from_playlist(playlist, indices)
progress.update(len(indices))

def add_multiple_tracks_to_playlist(playlist: tidalapi.UserPlaylist, track_ids: List[int], chunk_size: int=20):

def clear_favorites(session: tidalapi.Session):
favorite_tracks = session.user.favorites.tracks()
track_ids = [track.id for track in favorite_tracks]
for track_id in track_ids:
session.user.favorites.remove_track(track_id)

def add_multiple_tracks_to_playlist(playlist: tidalapi.UserPlaylist, session: tidalapi.Session, track_ids: List[int], chunk_size: int = 20, sync_favorites: bool = False):
offset = 0
with tqdm(desc="Adding new tracks to Tidal playlist", total=len(track_ids)) as progress:
with tqdm(desc="Adding new tracks to Tidal", total=len(track_ids)) as progress:
while offset < len(track_ids):
count = min(chunk_size, len(track_ids) - offset)
playlist.add(track_ids[offset:offset+chunk_size])
if sync_favorites:
for track_id in track_ids[offset:offset + chunk_size]:
session.user.favorites.add_track(track_id)
else:
playlist.add(track_ids[offset:offset + chunk_size])
offset += count
progress.update(count)

def set_tidal_playlist(playlist: tidalapi.Playlist, track_ids: List[int]):
clear_tidal_playlist(playlist)
add_multiple_tracks_to_playlist(playlist, track_ids)
def set_tidal_playlist(playlist: tidalapi.Playlist, session: tidalapi.Session, track_ids: List[int], sync_favorites: bool=False):
if sync_favorites:
clear_favorites(session)
else:
clear_tidal_playlist(playlist=playlist)
add_multiple_tracks_to_playlist(playlist=playlist, session=session, track_ids=track_ids, sync_favorites=sync_favorites)

0 comments on commit 54526a0

Please sign in to comment.