In [10]:
from configparser import ConfigParser

config = ConfigParser()
config.read('config_private.ini')

['config_private.ini']

In [13]:
from spotipy import Spotify, SpotifyOAuth, CacheFileHandler


scope = ["playlist-modify-public", "user-read-private"]

cahce_handler = CacheFileHandler(username=config["SPOTIFY"]["username"])
auth_manager = SpotifyOAuth(
    client_id=config["SPOTIFY"]["client_id"], 
    client_secret=config["SPOTIFY"]["client_secret"], 
    redirect_uri=config["SPOTIFY"]["redirect_uri"], 
    scope=scope, 
    cache_handler=cahce_handler,
)
spotify = Spotify(auth_manager=auth_manager)

token_info = auth_manager.cache_handler.get_cached_token()
if auth_manager.is_token_expired(token_info=token_info):
    auth_manager.refresh_access_token(refresh_token=token_info["refresh_token"])

AttributeError: module 'time' has no attribute 'utctime'

In [None]:
# Helper methods

import re

from urllib.parse import quote_plus

def strip_non_alphanumeric(s):
    pattern = re.compile('[^a-zA-Z0-9_ ]+')
    return pattern.sub('', s)


def format_track_query(artist_name, track_name):
    search_term = strip_non_alphanumeric(s=f"{artist_name} {track_name}")
    return search_term


def get_artist_id(artist_name):
    query = quote_plus(strip_non_alphanumeric(artist_name))
    result = spotify.search(q=query, market="US", type="artist")  # limit 10 by default should be enough

    for item in result["artists"]["items"]:
        item_name = item["name"].lower()

        if item_name == artist_name:
            return item["id"]


def get_albums(artist_id):
    offset = 0
    limit = 50
    
    albums = []
    next = True
    while next:
        response = spotify.artist_albums(artist_id=artist_id, country="US", limit=limit, offset=offset)
        next = response["next"]
        offset += limit
        albums.extend(response["items"])

    return albums


def get_album_tracks(album_id):
    offset = 0
    limit = 50
    
    tracks = []
    next = True
    while next:
        response = spotify.album_tracks(album_id=album_id, market="US", limit=limit, offset=offset)
        next = response["next"]
        offset += limit
        tracks.extend(response["items"])

    return tracks


def get_track_id(artist_name, track_name):
    query = quote_plus(format_track_query(artist_name, track_name))
    result = spotify.search(q=query, market="US", type="track")

    for item in result["tracks"]["items"]:
        item_name = item["name"].lower()
        item_artist = item["artists"][0]["name"].lower()
        if item_name == track_name and item_artist == artist_name:
            return item["id"]

    # Some tracks aren't searchable :( Look in artist's albums instead. 
    # Some single-track albums have the track

    artist_id = get_artist_id(artist_name)

    if not artist_id:
        print(f"Artist not found: {artist_name}")
        return

    albums = get_albums(artist_id=artist_id)

    for album in albums:
        album_name = album["name"].lower()
        if album_name == track_name:
            album_tracks = get_album_tracks(album_id=album["id"])
            for album_track in album_tracks:
                album_track_name = album_track["name"].lower()
                if album_track_name == track_name:
                    return album_track["id"]
    
    print(f"Track `{track_name}` not found for artist `{artist_name}`")
    return

# normalize not used yet
def normalize_search(search_str):
    words = search_str.lower().split(" ")
    remove_words = ["the", "a", "an"]
    filtered_words = list(filter(lambda x: x not in remove_words, words))
    normalized_search = " ".join(filtered_words)

    return normalized_search


In [9]:
import pendulum
from time import mktime

start_datetime = pendulum.datetime(year=2022, month=12, day=21, hour=17, tz="US/Pacific")
end_datetime = pendulum.datetime(year=2022, month=12, day=21, hour=22, tz="US/Pacific")

start_datetime_utc_str = start_datetime.in_timezone(tz="UTC").to_datetime_string()
end_datetime_utc_str = end_datetime.in_timezone(tz="UTC").to_datetime_string()


# Either File History or LastFM below

In [None]:
# File History

import json
from os import listdir
from pathlib import Path
history_path = Path.home().joinpath("Downloads").joinpath("MyData")
history_files = sorted([f for f in listdir(history_path) if f.startswith("StreamingHistory")])

history_all = []
for filename in history_files:
    with open(history_path.joinpath(filename), encoding='utf-8') as f:  # some title contain unicode chars
        history_page = json.load(f)
        history_all.extend(history_page)

search_tracks = []
for history_obj in history_all:
    if (
        history_obj["endTime"] < start_datetime_utc_str or 
        history_obj["endTime"] > end_datetime_utc_str or 
        history_obj["msPlayed"] < 60000
    ):
        continue
    search_track = {
        "artist_name": history_obj["artistName"].lower(),
        "track_name": history_obj["trackName"].lower(),
    }
    search_tracks.append(search_track)

In [None]:
# LastFM

from time import mktime

from pylast import md5, LastFMNetwork, User


api_key = config["LAST_FM"]["api_key"],
api_secret = config["LAST_FM"]["api_secret"],
username = config["LAST_FM"]["username"]
password_hash = md5(config["LAST_FM"]["password"])

last_fm = LastFMNetwork(
    api_key=api_key,
    api_secret=api_secret,
    username=username,
    password_hash=password_hash,
)

begin_seconds = int(start_datetime.format(fmt="X"))
end_seconds = int(end_datetime.format(fmt="X"))

user = User(user_name=username, network=last_fm)
search_tracks = user.get_recent_tracks(limit=999, time_from=begin_seconds, time_to=end_seconds)
search_tracks.reverse()

# Get Tracks

In [None]:
track_ids = []

search_count = 0
for search_track in search_tracks:
    track_id = get_track_id(search_track["artist_name"], search_track["track_name"])
    search_count += 1
    if track_id:
        track_ids.append(track_id)

print(f"search_count={search_count}, found_count={len(track_ids)}")

In [None]:
track_ids = [t for t in track_ids if t]
def get_chunks(l, n):
    for i in range(0, len(l), n):
        yield l[i:i + n]

playlist =  spotify.user_playlist_create(user=config["SPOTIFY"]["username"], name="Critical Chip 2022")

playlist_add_items_max = 100
for chunk in get_chunks(l=track_ids, n=playlist_add_items_max):
    add_tracks_response = spotify.playlist_add_items(
        playlist_id=playlist["id"],
        items=chunk,
    )