# Collect data from Spotify

In [None]:
top_20_spotify_genres = [
    "pop", "hip-hop", "rock", "r-n-b", "edm", "indie", "latin", "k-pop",
    "jazz", "reggae", "metal", "alternative", "dance", "house", "classical",
    "singer-songwriter", "punk", "soul", "trance", "techno"
]

len(top_20_spotify_genres)

In [None]:
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
import api_config as conf
import pandas as pd
import time

# Authenticate with Spotify
sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials(
    client_id=conf.SPOTIFY_CLIENT_ID,
    client_secret=conf.SPOTIFY_CLIENT_SECRET
))

# Function to search for tracks by genre with pagination
def search_tracks_by_genre(genre, total_limit=1000, batch_size=50):
    tracks = []
    for offset in range(0, total_limit, batch_size):
        print(f"Fetching tracks for genre: {genre}, Offset: {offset}")
        results = sp.search(q=f"genre:{genre}", type="track", limit=batch_size, offset=offset)
        if not results["tracks"]["items"]:
            break
        for track in results["tracks"]["items"]:
            tracks.append({
                "id": track['id'],
                "track_name": track["name"],
                "artist": ", ".join(artist["name"] for artist in track["artists"]),
                "album": track["album"]["name"],
                "popularity": track["popularity"],
                "release_date": track["album"]["release_date"],
                "spotify_url": track["external_urls"]["spotify"],
                "preview_url": track["preview_url"]
            })
        time.sleep(1)
    return tracks

# Define genres and retrieve data
genres = top_20_spotify_genres
# genres = ["Jazz", "Pop", "Rock", "Hip-Hop/Rap", "Electronic/Dance"]  # List of genres to search
all_tracks = []

for genre in genres:
    try:
        genre_tracks = search_tracks_by_genre(genre, total_limit=500)
        for track in genre_tracks:
            track["Genre"] = genre  # Add genre column
        all_tracks.extend(genre_tracks)
    except:
        print("query error!")

# Convert to DataFrame
df = pd.DataFrame(all_tracks)

# Display results
print(f"Total tracks fetched: {len(df)}")
df.head()

In [None]:
df.to_csv('raw_spotify.csv', index=False)

# Collect data from Youtube

In [None]:
import pandas as pd
import time
import json
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import api_config as conf

youtube = build("youtube", "v3", developerKey=conf.YOUTUBE_API_KEY)

df = pd.read_csv("raw_spotify.csv")

results = []

def get_song_details(song_name):
    """Search for a song on YouTube and retrieve video details."""
    try:
        search_response = youtube.search().list(
            q=song_name,
            part="snippet",
            maxResults=1,
            type="video"
        ).execute()
        
        if "items" not in search_response or not search_response["items"]:
            return {"song_name": song_name, "error": "No results found"}
        
        first_result = search_response["items"][0]
        video_id = first_result["id"]["videoId"]
        video_title = first_result["snippet"]["title"]
        channel_title = first_result["snippet"]["channelTitle"]
        publish_date = first_result["snippet"]["publishedAt"]

        # Fetch video statistics
        video_response = youtube.videos().list(
            part="statistics",
            id=video_id
        ).execute()
        
        if "items" not in video_response or not video_response["items"]:
            return {"song_name": song_name, "error": "No video details found"}
        
        stats = video_response["items"][0]["statistics"]

        return {
            "song_name": song_name,
            "video_id": video_id,
            "title": video_title,
            "artist": channel_title,
            "release_date": publish_date,
            "views": stats.get("viewCount", "N/A"),
            "likes": stats.get("likeCount", "N/A"),
            "comments": stats.get("commentCount", "N/A"),
            "error": None
        }
    
    except HttpError as e:
        if e.resp.status == 403:
            print("API quota exceeded. Saving progress...")
            save_progress()
            return None
        else:
            return {"song_name": song_name, "error": f"API error: {str(e)}"}

    except Exception as e:
        return {"song_name": song_name, "error": f"Unexpected error: {str(e)}"}

def save_progress():
    """Save current results to a CSV file."""
    progress_df = pd.DataFrame(results)
    progress_df.to_csv("youtube_base_on_spotify.csv", index=False)
    print("Progress saved to 'youtube_base_on_spotify.csv'")

# Load previous progress if available
try:
    previous_results = pd.read_csv("youtube_base_on_spotify.csv")
    completed_songs = set(previous_results["song_name"])
    results = previous_results.to_dict("records")
    print(f"Loaded previous results: {len(completed_songs)} songs")
except FileNotFoundError:
    completed_songs = set()

# Query all songs
for index, row in df.iterrows():
    song_name = row["track_name"]
    
    if song_name in completed_songs:
        print(f"Skipping '{song_name}' (already processed)")
        continue 

    print(f"Searching for '{song_name}'...")
    song_data = get_song_details(song_name)

    if song_data is None:
        print("API quota exceeded. Stopping execution.")
        break

    results.append(song_data)
    time.sleep(0.5)

# Save progress at the end
save_progress()

# Collect dat from Last.fm

In [None]:
import requests
import pandas as pd
from tqdm import tqdm  # ใช้แสดง Progress Bar

# API Key & URL
API_KEY = "1db5247b8f20dc92f92121447757d907"
BASE_URL = "http://ws.audioscrobbler.com/2.0/"

spotify_data = pd.read_csv(r'C:\Git-Repository\dataEn_final_project\collect_data\datasets\raw_data\spotify.csv')
name_track = spotify_data[['track_name', 'artist']]

# ฟังก์ชันดึงข้อมูลจาก API
def get_track_info(track, artist):
    params = {
        "method": "track.getInfo",
        "api_key": API_KEY,
        "artist": artist,
        "track": track,
        "format": "json"
    }
    response = requests.get(BASE_URL, params=params)

    if response.status_code == 200:
        data = response.json()
        if "track" in data:
            return {
                "Track": track,
                "Artist": artist,
                "Listeners": data["track"].get("listeners", "N/A"),
                "Playcount": data["track"].get("playcount", "N/A"),
                "Album": data["track"].get("album", {}).get("title", "N/A"),
                "Tags": [tag["name"] for tag in data["track"].get("toptags", {}).get("tag", [])],
                "URL": data["track"].get("url", "N/A")
            }
    return None  # ถ้าหาไม่เจอ

# 🔹 Query ข้อมูลทั้งหมด
results = []
total_tracks = len(name_track)
progress_check = total_tracks // 10  # คำนวณจุดอัพเดตทุก 10%

for index, row in tqdm(name_track.iterrows(), total=total_tracks, disable=True):
    track_data = get_track_info(row["track_name"], row["artist"])
    if track_data:
        results.append(track_data)

    # แจ้งสถานะทุก 10%
    if progress_check > 0 and index % progress_check == 0:
        print(f"✅ Progress: {int((index / total_tracks) * 100)}% Completed...")

# 🔹 สร้าง DataFrame ใหม่จากผลลัพธ์
track_info_df = pd.DataFrame(results)

In [None]:
track_info_df.to_csv('last_fm_base_on_spotify.csv', index=False)