## Сбор данных


    Настройка логирования

In [None]:
import logging

logging.basicConfig(level=logging.INFO, filename="py_log.log",filemode="w",
                    format="%(asctime)s %(levelname)s %(message)s", encoding='UTF-8')


    Импорт базовых бибилотек и функция для обновления токена доступа

In [2]:
import requests

import pandas as pd
import numpy as np

from tqdm import tqdm
from time import sleep

import re

import matplotlib.pyplot as plt
import seaborn as sns

CLIENT_ID = "CLIENT_ID"
CLIENT_SECRET = "CLIENT_SECRET"
REFRESH_TOKEN = "REFRESH_TOKEN"

def refresh_access_token():
    url = "https://accounts.spotify.com/api/token"
    data = {
        "grant_type": "refresh_token",
        "refresh_token": REFRESH_TOKEN,
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET
    }
    response = requests.post(url, data=data)
    return response.json()["access_token"]

In [None]:
access_token = refresh_access_token()
base_api_url = 'https://api.spotify.com/v1/'
search_api_url = base_api_url+'search'

    Парсинг артистов spotify api
    

In [13]:
def pars_artists():
    artists = []
    for offset in range(0, 200, 50):
        for attempt in range(3):
            try:
                params = {
                    'q' : f'genre:hip hop',
                    'limit': 50,
                    'offset': offset,
                    'type': 'artist',
                }
                page = requests.get(search_api_url,
                                    headers={"Authorization": f"Bearer {access_token}"},
                                    params=params)
                res = page.json()
                items = res['artists']['items']
                artists.extend(items)
                logging.info(f'артисты с {offset} до {offset+50} добавлены')
            except Exception as error:
                logging.error(f"ошибка {error} на попытке {attempt+1}/3")
    logging.info("Все артисты успешно добавлены")
    return artists

In [14]:
artist = pars_artists()

    Парсинг треков по аристам spotify api
    

In [15]:
import requests
from time import sleep

def get_tracks_by_artist(artist):
    tracks = {}
    for offset in range(0, 1000, 50):
        for attempt in range(3):
            try:
                params = {
                    'q': f'artist:{artist}',
                    'limit': 50,
                    'type': 'track',
                    'offset': offset
                    }
                
                page = requests.get(
                    search_api_url,
                    headers={"Authorization": f"Bearer {access_token}"},
                    params=params
                    )

                res = page.json()
                items = res['tracks']['items']
                if not items:
                    logging.warning(f'досрочное завершение парсинга по артисту {artist}')
                    return list(tracks.values())
                    

                for track in items:
                    for track_artists in track['artists']:
                        if track_artists['name'] == artist:
                            tracks[track['id']] = track
                            logging.info(f'трек {track['name']} успешно добавлен')
                            break
                break 

            except Exception as error:
                logging.error(f"Ошибка  {error} на аристе {artist} на попытке {attempt+1}/3")
                sleep(10)
    logging.info('функция успешно завершилась')
    return list(tracks.values())

In [17]:
artists_data = pd.read_csv('datasets/artists_dataset.csv', index_col=0)

In [None]:
all_tracks = []
c = 0
for artist_id in tqdm(artists_data['name']):
    tracks = get_tracks_by_artist(artist_id)
    all_tracks.extend(tracks)
    c += 1
    if c % 50 == 0:
        access_token = refresh_access_token()
        sleep(2)
    sleep(3)


    Парсинг мелодий с rapid_api по track_id

In [23]:
def pars_rapid_by_track(track_id):
    for attempt in range(3):
        try:
            url = f"https://track-analysis.p.rapidapi.com/pktx/spotify/{track_id}"
            headers = {
                        "x-rapidapi-key": "x-rapidapi-key",
                        "x-rapidapi-host": "x-rapidapi-host",   
                       }
            sleep(3)
            page = requests.get(url, headers=headers, timeout=10)
            if page.status_code != 200:
                logging.warning(f"Ответ {page.status_code},  для трека {track_id} на попытке {attempt+1}/3")
                sleep(5)
                continue

            data = page.json()
            logging.info(f"информация по песне {track_id} добавлена")
            return data
        except Exception as error:
            logging.error(f'Ошибка {error} на песне {track_id} на попытке {attempt+1}/3')
            sleep(2)
            
    logging.critical(f'Информация по песне {track_id} не найдена, она будет пропущена')
    return {'id': None,
  'key': None,
  'mode': None,
  'camelot': None,
  'tempo': None,
  'duration': None,
  'popularity': None,
  'energy': None,
  'danceability': None,
  'happiness': None,
  'acousticness': None,
  'instrumentalness': None,
  'liveness': None,
  'speechiness': None,
  'loudness': None}
        

    Подготовка для скраппинга

In [20]:
tracks_data = pd.read_csv('datasets/tracks_dataset.csv', index_col=0)

In [None]:
tracks_info = []

for i in tqdm(range(len(tracks_data['id']))):
    info = pars_rapid_by_track(tracks_data['id'][i])
    tracks_info.append(info)
    sleep(3)

In [21]:
melodies = pd.read_csv('datasets/melodies.csv', index_col=0)

In [26]:
tracks = pd.read_csv('datasets/tracks_dataset.csv')
tracks = tracks.rename(columns={'artists' : "artists_ids", 'id' : 'track_id'})
tracks = tracks.drop(columns={'Unnamed: 0'})

In [27]:
def remove_feats(song_name):
    if '(' in song_name:
        start = song_name.find('(')
        finish = song_name.find(')')
        song_name = song_name[:start] + song_name[finish+1:]
    return song_name

def prepare_song_name(row):
    name = row['track_name']
    row['track_name'] = remove_feats(name)
    return row


def set_only_first_artist(artists):
    if ',' in artists:
        index = artists.find(',')
        artists = artists[:index]
    return artists

def prepare_artists_names(row):
    name = row['artists_names']
    row['artists_names'] = set_only_first_artist(name)
    return row

In [None]:
tracks_for_genius = pd.merge(tracks, artists_tracks, left_on='track_id', right_on='track_id', how='left')
tracks_for_genius = tracks_for_genius.drop(columns=['name_x'])
tracks_for_genius = tracks_for_genius.rename(columns={'name_y' : 'track_name'})
tracks_for_genius = tracks_for_genius.sort_values(by='popularity', ascending=False)
tracks_for_genius = tracks_for_genius[~tracks_for_genius['artists_names'].isna()]
tracks_for_genius = tracks_for_genius.apply(prepare_song_name, axis=1)
tracks_for_genius = tracks_for_genius.reset_index(drop=True)
tracks_for_genius

In [33]:
from selenium import webdriver as wd
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver import Keys
from bs4 import BeautifulSoup
from tqdm import tqdm


options = wd.ChromeOptions()
options.page_load_strategy = 'eager'
options.add_argument('--disable-blink-features=AutomatisationControlled') 
options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36') 
options.add_argument("--disable-features=VizDisplayCompositor")
options.add_argument("--disable-images")

    Скраппинг genius по названиям треков и аристам


In [None]:
driver = wd.Chrome(options=options)
driver.get('https://genius.com/')
driver.implicitly_wait(4)
driver.set_script_timeout(5)
wait=WebDriverWait(driver=driver, timeout=3, poll_frequency=1)

texts = []


for i in tqdm(range(len(tracks_for_genius))):
                        
    SONG_NAME_INPUT = ('xpath', '//input[@name="q"]')
    SEARCH_BUTTON = ('xpath', '//div[@class="global_search-submit_button u-clickable"]')
    try:
        for att in range(5):
            try:
                driver.get('https://genius.com/')
                time.sleep(1.5)
                driver.find_element(*SONG_NAME_INPUT).send_keys(tracks_for_genius[i])
                time.sleep(1.5)
                driver.find_element(*SONG_NAME_INPUT).send_keys(Keys.ENTER)
                WebDriverWait(driver, 5).until(EC.url_changes('https://genius.com/'))
                time.sleep(1.6)
                break
            except Exception as ex:
                logging.error(f'Error: {ex} for input name -  {tracks_for_genius[i].upper()}')
                continue
    except Exception as ex:
        logging.error(f'Error: {ex} - text not added for - {tracks_for_genius[i].upper()}')
        texts.append(None)
        continue
                
        
    SONGS_BLOCK = ('xpath', "//div[contains(@class, 'search_results_label') and text()='Songs']")
    is_song_block = driver.find_elements(*SONGS_BLOCK)
    
    if is_song_block:
        SONG_CARDS = ('xpath', '//div[text()="Songs"]/following::div[1]')
        song_cards = driver.find_element(*SONG_CARDS)
        html_song_cards = driver.execute_script('return arguments[0].outerHTML', song_cards)
        for atts in range(3):
             try:
                current_url = driver.current_url
                TOP_SONG_CARD = ('xpath', f'//div[text()="Songs"]/following::mini-song-card[{check_lang(html_song_cards)+1}]')
                wait.until(EC.visibility_of_element_located(TOP_SONG_CARD)).click()
                WebDriverWait(driver, 5).until(EC.url_changes(current_url))
                CONTAINER = ('xpath', '(//div[@class="Lyrics__Container-sc-cbcfa1dc-1 dfzvqs"])')

                elements = driver.find_elements(*CONTAINER)
                text=''
                for el in elements:
                    html_el = driver.execute_script('return arguments[0].outerHTML', el)


                    soup = BeautifulSoup(html_el, 'html.parser')
                    lyrics_container = soup.find("div", {"data-lyrics-container": "true"})

                    text_piece = lyrics_container.get_text(separator="\n", strip=True)
                    text+=(text_piece + '\n')

                texts.append(text)
                logging.info(f'Text added {i} - for {tracks_for_genius[i]}')
                break
             except Exception as ex:
                  continue
    else:
        texts.append(None)
        logging.info(f'No text {i} - for {tracks_for_genius[i]}')
        




driver.quit()
len(texts)


## EDA