In [4]:
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
import time
from requests.exceptions import ReadTimeout
import pandas as pd
# Importante definir keys para la API de Spotify
sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials(client_id="",
                                                           client_secret=""),
                                                           requests_timeout=20)



In [None]:
#Cargamos CSV proveniente del scrapping en la web de TikTok y añadimos codigo de Pais para utilizar posteriormente

country_code = pd.read_csv('scrapping\ISO-3166-1-alpha-2-country-codes-spotify.tsv', delimiter='\t', names= ['id','Pais','Year','Comment'])
df = pd.read_csv('top_songs_last120.csv')
country_code.drop(columns=['Year','Comment'], axis=0)
df_country = df.join(country_code.set_index('Pais'), on='Pais')
df_country

En esta primera consulta, buscamos las canciones, tratando de ajustar las búsqueda al número de carácteres máximos de la API. Sobre todo de cara a colaboraciones múltiples dónde sólo vamos a necesitar a un artista.

In [None]:
df_country['Track_ID'] = None
df_country['Explicit'] = None
df_country['Release_Date'] = None
df_country['Release_Date_Precision'] = None
df_country['Album_Type'] = None
df_country['Popularity'] = None
df_country['Duration'] = None

for index, row in df_country.iterrows():
    try:
        # Evaluar longitudes para la fila actual
        cancion_len = len(row['Cancion']) if pd.notna(row['Cancion']) else 0
        artista_len = len(row['Artista']) if pd.notna(row['Artista']) else 0

        if cancion_len < 250 and artista_len < 250:
            response = sp.search(q='artist:' + row['Artista'] + ' track:' + row['Cancion'], type='track', market=row['id'])
        elif cancion_len > 250:
            response = sp.search(q='artist:' + row['Artista'] + ' track:' + row['Cancion'].split()[0], type='track', market=row['id'])
        elif artista_len > 250:
            response = sp.search(q='artist:' + row['Artista'].split("&")[0] + ' track:' + row['Cancion'], type='track', market=row['id'])

        time.sleep(0.5)  # Respetar límites de la API

        # Verificar si hay resultados
        if response['tracks']['items']:
            track = response['tracks']['items'][0]  # Primer resultado relevante
            track_id = track['id']
            explicit = track['explicit']
            release_date = track['album']['release_date']
            release_date_precision = track['album']['release_date_precision']
            album_type = track['album']['album_type']
            popularity = track['popularity']
            duration = track['duration_ms']
        else:
            track_id = None
            explicit = None
            release_date = None
            release_date_precision = None
            album_type = None
            popularity = None

        # Guardar los valores en el DataFrame
        df_country.loc[index, 'Track_ID'] = track_id
        df_country.loc[index, 'Explicit'] = explicit
        df_country.loc[index, 'Release_Date'] = release_date
        df_country.loc[index, 'Release_Date_Precision'] = release_date_precision
        df_country.loc[index, 'Album_Type'] = album_type
        df_country.loc[index, 'Popularity'] = popularity
        df_country.loc[index, 'Duration'] = duration
        

    except ReadTimeout:
        print(f"ReadTimeout en la fila {index}. Reintentando...")
        time.sleep(2)
        continue
    except KeyError as e:
        print(f"Error en la estructura de la respuesta: {e}")
        continue
df_country

Pasamos a revisar el porcentaje de nan para ver que ajuste debemos hacer en la consulta.

In [None]:
porcentaje_nan = (
        df_country.groupby('Pais')['Track_ID']
        .apply(lambda x: x.isna().mean() * 100)  # Proporción de NaN en porcentaje
        .reset_index(name='Porcentaje_NaN').sort_values(by='Porcentaje_NaN', ascending=True)     # Convertir a DataFrame
    )
print(porcentaje_nan)

Localizamos las canciones que no hemos podido identificar y procedemos a definir una función que mejore la detección de artistas multiples y podamos reiniciar la búsqueda.

In [None]:
df_none = df_country[(df_country['Track_ID'].isnull())]
df_none

In [113]:
def split_first(val):
    # Verifica si el valor contiene los separadores deseados y realiza el split
    if ' x ' in val:
        return val.split(' x ')[0]
    elif ' -' in val:
        return val.split(' -')[0]
    elif ' (' in val:
        return val.split(' (')[0]
    elif ' VS ' in val:
        return val.split(' VS ')[0]
    elif ' / ' in val:
        return val.split(' / ')[0]
    else:
        return val  # Retorna el valor original si no contiene ningún separador


En este proceso también añadimos la opción de market y asignamos el país del ranking al que pertenece. Para que al ser España país por defecto, no nos quedemos sin sacar información de canciones no disponibles aquí pero si en otros países.

In [None]:
for index, row in df_none.iterrows():
    try:
        # Evaluar longitudes para la fila actual
        cancion_len = len(row['Cancion']) if pd.notna(row['Cancion']) else 0
        artista_len = len(row['Artista']) if pd.notna(row['Artista']) else 0

        if cancion_len < 250 and artista_len < 250:
            response = sp.search(q='artist:' + split_first(row['Artista']) + ' track:' + split_first(row['Cancion']), type='track', market=row['id'])
        elif cancion_len > 250:
            response = sp.search(q='artist:' + split_first(row['Artista'])+ ' track:' +split_first(row['Cancion']).split()[0], type='track', market=row['id'])
        elif artista_len > 250:
            response = sp.search(q='artist:' + row['Artista'].split("&")[0] + ' track:' + split_first(row['Cancion']), type='track', market=row['id'])

        time.sleep(0.5)  # Respetar límites de la API

        # Verificar si hay resultados
        if response['tracks']['items']:
            track = response['tracks']['items'][0]  # Primer resultado relevante
            track_id = track['id']
            explicit = track['explicit']
            release_date = track['album']['release_date']
            release_date_precision = track['album']['release_date_precision']
            album_type = track['album']['album_type']
            popularity = track['popularity']
            duration = track['duration_ms']
        else:
            track_id = None
            explicit = None
            release_date = None
            release_date_precision = None
            album_type = None
            popularity = None
            duration = None

        # Guardar los valores en el DataFrame
        df_none.loc[index, 'Track_ID'] = track_id
        df_none.loc[index, 'Explicit'] = explicit
        df_none.loc[index, 'Release_Date'] = release_date
        df_none.loc[index, 'Release_Date_Precision'] = release_date_precision
        df_none.loc[index, 'Album_Type'] = album_type
        df_none.loc[index, 'Popularity'] = popularity
        df_none.loc[index, 'Duration'] = duration

    except ReadTimeout:
        print(f"ReadTimeout en la fila {index}. Reintentando...")
        time.sleep(2)
        continue
    except KeyError as e:
        print(f"Error en la estructura de la respuesta: {e}")
        continue
df_none

In [None]:
porcentaje_nan = (
        df_none.groupby('Pais')['Track_ID']
        .apply(lambda x: x.isna().mean() * 100)  # Proporción de NaN en porcentaje
        .reset_index(name='Porcentaje_NaN').sort_values(by='Porcentaje_NaN', ascending=True)     # Convertir a DataFrame
    )
print(porcentaje_nan)

In [None]:
df_new_none = df_none[(df_none['Track_ID'].isnull())]
pd.options.display.max_rows = 99
df_new_none

In [None]:
df_country.update(df_none)
df_country

In [None]:
nan_percent = round(((len(df_country[df_country['Track_ID'].isnull()]) / len(df_country)) * 100),2)
nan_percent

In [None]:
country_nan = (
        df_country.groupby('Pais')['Track_ID']
        .apply(lambda x: x.isna().mean() * 100)  # Proporción de NaN en porcentaje
        .reset_index(name='Porcentaje_NaN').sort_values(by='Porcentaje_NaN', ascending=False)
    )
country_nan[country_nan['Porcentaje_NaN'] > nan_percent]

In [None]:
#https://open.spotify.com/intl-es/track/1TQ2UYCN7nhfj3cfcFC76V?si=27d97ce208e54951
track = 'spotify:track:1TQ2UYCN7nhfj3cfcFC76V'
track = sp.track(track)

print(track["name"])

In [134]:
df_country.to_csv('top120_spoti_basicinfo.csv',index='False')

In [4]:
from spotipy.oauth2 import SpotifyOAuth

# Configuración de SpotifyOAuth
sp = spotipy.Spotify(auth_manager=SpotifyOAuth(
    client_id="",
    client_secret="",
    redirect_uri="http://localhost:8080",
    scope="playlist-modify-public playlist-modify-private"
))

ids_playlist = df_country['Track_ID'].dropna().tolist()
playlist_id = '762Ez0qwDvrluGxbRTts8o'
batch_size = 100 
for i in range(0, len(ids_playlist), batch_size): 
    batch = ids_playlist[i:i+batch_size] 
    sp.playlist_add_items(playlist_id, batch) 
print("Canciones añadidas a la playlist exitosamente")

HTTP Error for POST to https://api.spotify.com/v1/playlists/762Ez0qwDvrluGxbRTts8o/tracks with Params: {'position': None} returned 400 due to Playlist size limit reached. Consider removing items from the playlist before retrying the request


SpotifyException: http status: 400, code:-1 - https://api.spotify.com/v1/playlists/762Ez0qwDvrluGxbRTts8o/tracks:
 Playlist size limit reached. Consider removing items from the playlist before retrying the request, reason: None

In [5]:
# Agrupar por 'ID' y crear una lista de 'Track_ID' no nulos para cada grupo
grouped_tracks = df_update.groupby('id')['Track_ID'].apply(lambda x: x.dropna().tolist()).to_dict()
grouped_tracks


{'AE': ['3pUtdI0esSIqvHNVzBpWih',
  '5GUUDaUy5eMG4zR9FdUOut',
  '3j8dK7vyTB0ARmp2iw43Sy',
  '1kJ5PN754l7A2R1It33txk',
  '15syIaOYh67J37lgZLlkDo',
  '5vNRhkKd0yEAg8suGBpjeY',
  '2X7jfQnEMQYYmvkhr6mzS9',
  '47VPDMI0LWEzreNh2tXYfS',
  '5MtQ3f8sfEgRffbnHOURxH',
  '5tCwupjT5bc13y5rsaVUlX',
  '2adk94I596gZPyC2eXKwmZ',
  '0c6ofcvyXFLNO0IToOJHzJ',
  '4DYxPJpFg8mFdF0GbEnsPV',
  '5OsqETtsbEFuvalW6QXQuM',
  '3p7CaIX8QOL0d77nuDSSru',
  '2EyBVhVXrvVmDL83lL7vOD',
  '50hHmIk4oYu9VVFwBZq6gh',
  '6FcqAyVAcYc5yZVtaWQfxW',
  '4NJtlrapihMPiOlZ396uus',
  '5Iegog3nLU1UH52WG59TkP',
  '18LrtjUyMolXdhGsKg2Ybu',
  '28ITZZMQ9DPgfhhhoT3ie5',
  '4BDSmwn5LtHTYSd7ZAH6oZ',
  '5OAGg2B2BjxzpiEcuzaCkZ',
  '7ES1kLL6A1JZ3VzbJ8gdcM',
  '4KiRDEG5A6FSdNfCq1WKE9',
  '1bYyZHW4IuGdWzzbm6ErMq',
  '3mMpc9fs05DzNvTmTkPnXS',
  '2Lumsra3kuU61wXkEKzKaK',
  '5G2f63n7IPVPPjfNIGih7Q',
  '3HH6ACm3OG9DtmfUGq3jOg',
  '5rwQGBi5mdmU7dqNM7x9Oo',
  '11dHZLY9Rfz8mPnydmSzwY',
  '2c7z5oRcPBbqRaBY2mLWcf',
  '55HpRb1dWGT3JeX9RpDpLa',
  '4LIM4qmpHAB

In [None]:
# Creamos n playlists para poder iterar con ellas en el Scrapping de Musicstax

sp = spotipy.Spotify(auth_manager=SpotifyOAuth(
    client_id="",
    client_secret="",
    redirect_uri="http://localhost:8080",
    scope="playlist-modify-public playlist-modify-private"))
user_id = sp.current_user()['id']
for id, track_ids in grouped_tracks.items():
    playlist_name = f"Top 100 Last 120 days {id}"
    playlist = sp.user_playlist_create(user=user_id, name=playlist_name, public=True)
    playlist_id = playlist['id']
 
    sp.playlist_add_items(playlist_id, track_ids)
    time.sleep(2)
print("Playlists creadas y canciones añadidas exitosamente")