Extract Preview URL

In [11]:
import subprocess
import json
import time
import pandas as pd
import math
import os
import re
import glob

def get_spotify_previews_batch(df_batch):
    data_json = df_batch.to_json(orient="records")
    result = subprocess.run(
        ["node", "spotify_preview.js"],
        input=data_json,
        stdout=subprocess.PIPE,   # Capture stdout explicitly
        stderr=subprocess.STDOUT,  # Merge stderr into stdout
        text=True,
        encoding='utf-8'
    )
    # Print all output so any warnings or messages are visible
    print(result.stdout)
    try:
        return json.loads(result.stdout)
    except json.JSONDecodeError:
        raise ValueError(f"Invalid JSON output: {result.stdout}")

def get_last_processed_batch(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)
        return 0
    batch_files = [f for f in os.listdir(directory) if f.startswith("batch_") and f.endswith(".csv")]
    batch_numbers = []
    for filename in batch_files:
        match = re.search(r"batch_(\d+)\.csv", filename)
        if match:
            batch_numbers.append(int(match.group(1)))
    return max(batch_numbers, default=0)

def get_new_merged_filepath(directory, base_name='merged_file.csv'):
    merged_filepath = os.path.join(directory, base_name)
    if not os.path.exists(merged_filepath):
        return merged_filepath
    else:
        i = 1
        new_name = f"merged_file({i}).csv"
        new_filepath = os.path.join(directory, new_name)
        while os.path.exists(new_filepath):
            i += 1
            new_name = f"merged_file({i}).csv"
            new_filepath = os.path.join(directory, new_name)
        return new_filepath

def merge_initial_batches(directory):
    """
    Merge any existing batch CSV files (those starting with "batch_") from the given directory.
    Save the merged result (using a new filename if necessary) and delete these batch files.
    """
    csv_files = glob.glob(os.path.join(directory, 'batch_*.csv'))
    if not csv_files:
        print(f"No batch CSV files found in {directory} during initial merge.")
        return pd.DataFrame()
    dfs = [pd.read_csv(file) for file in csv_files]
    merged = pd.concat(dfs, ignore_index=True)
    merged = merged.sort_values(by=['previewUrl'], ascending=False)\
                   .drop_duplicates(subset=['track_uri'], keep='first')
    merged_filepath = get_new_merged_filepath(directory)
    merged.to_csv(merged_filepath, index=False)
    print(f"Initial merged file saved to {merged_filepath}")
    # Delete the batch files
    for file in csv_files:
        try:
            os.remove(file)
            print(f"Deleted batch file: {file}")
        except Exception as e:
            print(f"Could not delete {file}: {e}")
    return merged

def get_spotify_previews_in_batches(dataframe, batch_size=50, delay=2, output_dir=""):
    if not output_dir:
        output_dir = os.getcwd()
    last_batch = get_last_processed_batch(output_dir)
    print(f"Resuming from batch {last_batch + 1} in directory: {output_dir}...")
    dataframe = dataframe.iloc[last_batch * batch_size:]
    num_songs = len(dataframe)
    total_batches = math.ceil(num_songs / batch_size)
    for i in range(0, num_songs, batch_size):
        batch_df = dataframe.iloc[i:i+batch_size]
        batch_index = last_batch + (i // batch_size) + 1
        print(f"Processing batch {batch_index} of {total_batches + last_batch}...")
        batch_results = get_spotify_previews_batch(batch_df)
        batch_data = []
        for entry in batch_results:
            if entry is None:
                continue
            track_name = entry.get("track_name")
            track_uri = entry.get("track_uri")
            preview_url = None
            if "error" not in entry or not entry.get("error"):
                preview_urls = entry.get("previewUrls", [])
                preview_url = preview_urls[0] if preview_urls else None
            batch_data.append({
                "track_name": track_name,
                "track_uri": track_uri,
                "previewUrl": preview_url
            })
        batch_df_results = pd.DataFrame(batch_data)
        csv_filename = f"batch_{batch_index}.csv"
        batch_filepath = os.path.join(output_dir, csv_filename)
        batch_df_results.to_csv(batch_filepath, index=False)
        print(f"Batch saved to {batch_filepath}")
        if i + batch_size < num_songs:
            time.sleep(delay)

def final_merge(directory, combined_df):
    """
    Merge any new batch CSV files in the directory with the combined_df (which includes
    original Parquet data and any initial merged batch data). Then, delete the batch files
    and save the final merged DataFrame to a new CSV file.
    """
    csv_files = glob.glob(os.path.join(directory, 'batch_*.csv'))
    if csv_files:
        new_batches = pd.concat([pd.read_csv(file) for file in csv_files], ignore_index=True)
        for file in csv_files:
            try:
                os.remove(file)
                print(f"Deleted batch file: {file}")
            except Exception as e:
                print(f"Could not delete {file}: {e}")
        combined_df = pd.concat([combined_df, new_batches], ignore_index=True)
    combined_df = combined_df.sort_values(by=['previewUrl'], ascending=False)\
                             .drop_duplicates(subset=['track_uri'], keep='first')
    merged_filepath = get_new_merged_filepath(directory)
    combined_df.to_csv(merged_filepath, index=False)
    print(f"Final merged file saved to {merged_filepath}")
    return combined_df

if __name__ == "__main__":
    # Ask user for the directory where batch CSVs and merged files are saved.
    output_dir = input("Enter the directory where batch CSVs are saved (default: current working directory): ").strip()
    if not output_dir:
        output_dir = os.getcwd()

    # === INITIAL MERGE OF EXISTING BATCH FILES ===
    print("Performing initial merge and cleanup (if any batch files exist)...")
    initial_merged = merge_initial_batches(output_dir)

    # === LOAD & COMBINE ORIGINAL DATA FROM PARQUET ===
    # Prompt user for the exact Parquet file path
    parquet_path = input("Enter the full path to the Parquet file: ").strip()
    if not os.path.isfile(parquet_path):
        raise ValueError(f"Error: The file does not exist at {parquet_path}")
    try:
        df_parquet = pd.read_parquet(parquet_path)
        print("Parquet file loaded successfully!")
    except Exception as e:
        raise ValueError(f"Error reading Parquet file: {e}")
    
    # Ensure the 'previewUrl' column exists
    if 'previewUrl' not in df_parquet.columns:
        df_parquet['previewUrl'] = None
    df_parquet = df_parquet[['track_name', 'track_uri', 'previewUrl']]
    
    # Load any previously merged files (merged_file*.csv)
    merged_files = glob.glob(os.path.join(output_dir, "merged_file*.csv"))
    if merged_files:
        merged_dfs = [pd.read_csv(f) for f in merged_files]
        merged_existing = pd.concat(merged_dfs, ignore_index=True)
        print(f"Loaded {len(merged_existing)} tracks from existing merged files.")
    else:
        merged_existing = pd.DataFrame()

    # Combine the Parquet data with any merged data and initial merged batch data
    combined_df = df_parquet.copy()
    if not merged_existing.empty:
        combined_df = pd.concat([combined_df, merged_existing], ignore_index=True)
    if not initial_merged.empty:
        combined_df = pd.concat([combined_df, initial_merged], ignore_index=True)
    
    # Deduplicate so that records with a valid previewUrl are prioritized
    combined_df = combined_df.sort_values(by=['previewUrl'], ascending=False)\
                             .drop_duplicates(subset=['track_uri'], keep='first')
    
    # Identify processed tracks (those with a valid previewUrl)
    processed_track_uris = combined_df[~combined_df['previewUrl'].isna()]['track_uri'].unique()
    # Filter the Parquet data for tracks that still need preview extraction
    df_to_process = df_parquet[~df_parquet['track_uri'].isin(processed_track_uris)][['track_name', 'track_uri']]
    print(f"Found {len(processed_track_uris)} processed tracks. {len(df_to_process)} tracks remain to be processed.")
    
    # === BATCH PROCESS THE REMAINING TRACKS ===
    get_spotify_previews_in_batches(df_to_process, output_dir=output_dir)
    
    # === FINAL MERGE: COMBINE NEW BATCH FILES WITH THE COMBINED DATA ===
    print("Final merging and cleanup of batch CSV files...")
    final_df = final_merge(output_dir, combined_df)


Performing initial merge and cleanup (if any batch files exist)...
No batch CSV files found in /Users/xavierhua/Documents/GitHub/bt4222grp9/phase2_feature_engineering/spotify_previewurl_extraction/Ungchan_previewurl during initial merge.
Parquet file loaded successfully!
Loaded 50447 tracks from existing merged files.
Found 47605 processed tracks. 2842 tracks remain to be processed.
Resuming from batch 1 in directory: /Users/xavierhua/Documents/GitHub/bt4222grp9/phase2_feature_engineering/spotify_previewurl_extraction/Ungchan_previewurl...
Processing batch 1 of 57...
[{"track_name":"Back to Me","track_uri":"spotify:track:7MIesTteOBlxjwyE2jMHzw","previewUrls":[],"error":"No preview URLs found"},{"track_name":"I Am Done","track_uri":"spotify:track:5lkUNK4YsRB0MYSZ3uzf4W","previewUrls":[],"error":"No preview URLs found"},{"track_name":"Freight Train","track_uri":"spotify:track:3u4sWzzdgKjwdbI0AqTo5m","previewUrls":[],"error":"No preview URLs found"},{"track_name":"Levitate","track_uri":"s