# Web Scraping

Install the additional packages (if necessary)

In [3]:
# pip install psycopg2-binary requests spotify  #Connecting to PostgreSQL AND sending HTTP access to spotify's WEB API
# pip install sqlalchemy      # packages for SQL toolkit (database interactions)
# pip install selenium        # packages used to automate web browser interaction on python

# Setup PostgreSQL Server in pgAdmin

1. Launch pgAdmin:
- Open pgAdmin and log in using the credentials you set up during the PostgreSQL installation (username: postgres, password: your password).

2. Create a New Server Connection:
- In the pgAdmin dashboard, right-click on Servers and select Create > Server.
- In the General tab, enter a name for your server connection (e.g., "MyServer").

- Under Connection:

` Host name/address: localhost (if running on your machine). `

`Port: 5432 (default PostgreSQL port).`

`Maintenance database: postgres (default database).`

`Username: postgres (or the username you set during installation).`

`Password: The password you set for the postgres user.`

- Click Save.

3. Verify Connection:

Once connected, you should see the server listed under Servers in the left pane of pgAdmin.

4. Create a Database
- Create a New Database:

`Right-click on Databases and select Create > Database.`

`Give your database a name (e.g., spotify_data).`

`Click Save.`

5. Set Up Tables to Store Data
To create a table where you will store your data, you can either use SQL queries in pgAdmin or have the script automatically create the table (if you use the pandas.to_sql() method).

- Create Table Manually in pgAdmin
Open Query Tool:

Right-click on your newly created database (e.g., spotify_data) and select Query Tool.

Create a Table:

Run an SQL query to create a table. Here's an example query based on the data you're scraping:

```
CREATE TABLE playlist_data (
    index SERIAL PRIMARY KEY,
    song_title VARCHAR(255),
    artist VARCHAR(255),
    album VARCHAR(255)
);
```
Execute the Query:

Click the Execute button (the "lightning bolt" icon) to run the SQL query and create the table.

In [6]:
from sqlalchemy import create_engine

# Function to store data in PostgreSQL DB
def store_data_in_db(df):
    try:
        # Database connection parameters (replace with your actual details)
        #db_url = "postgresql://your_username:your_password@your_host:your_port/your_database"
        db_url = "postgresql://postgres:2128@localhost:5434/spotify_data"
        
        # Create SQLAlchemy engine
        engine = create_engine(db_url)
        
        # Store DataFrame into the PostgreSQL database
        df.to_sql('playlist_data', engine, if_exists='replace', index=False)
        print("Data successfully stored in PostgreSQL DB.")
    
    except Exception as e:
        print(f"Error while storing data in DB: {e}")

# Selenium Automation

In [8]:
import time
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# Setup Chrome options
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument("incognito")


def scrape_playlist(spotify_playlist_url, output_csv):
    spotify_playlist_url = spotify_playlist_url.split('?')[0]
    print(f"Stripped URL: {spotify_playlist_url}")

    # Initialize the WebDriver
    driver = webdriver.Edge(options=chrome_options)
    driver.get(spotify_playlist_url)

    time.sleep(10)  # Allow time for the page to load
    # Close the popup dialog if it appears
    try:
        close_button = driver.find_element(By.XPATH, "//button[contains(@class, 'Button-sc-1dqy6lx-0') and text()='Close']")
        if close_button.is_displayed():
            close_button.click()  # Click the "Close" button
            print("Popup dialog detected and closed.")
            time.sleep(1)  # Allow the page to stabilize after closing the dialog
    except Exception:
        print("No popup dialog detected.")

    # Initialize variables
    all_songs = []
    seen_songs = set()  # Set to track unique songs
    scroll_pause_time = 3  # Delay after each scroll
    previous_row_count = 0
    global_index = 1  # Global counter for unique indexes

    while True:
        # Extract song data from currently loaded rows
        songs = extract_song_data(driver, global_index)
        
        # Filter out duplicate songs
        for song in songs:
            song_tuple = (song["Song_Title"], song["Artist"], song["Album"])  # Use a tuple to track unique songs
            if song_tuple not in seen_songs:
                seen_songs.add(song_tuple)
                all_songs.append(song)

        # Update the global index to continue from the last assigned index
        global_index += len(songs)

        # Scroll to the last visible element
        try:
            last_element = driver.find_elements(By.XPATH, "//div[@role='row']")[-1]
            driver.execute_script("arguments[0].scrollIntoView(true);", last_element)
            print("Scrolled to the last visible element.")
        except IndexError:
            print("No rows found to scroll.")
            break

        # Check the number of rows loaded
        rows = driver.find_elements(By.XPATH, "//div[@role='row']")
        current_row_count = len(rows)
        print(f"Rows loaded: {current_row_count - 1}")

        # Break if no new rows are loaded
        if current_row_count == previous_row_count:
            print("Reached the bottom of the page.")
            break
        previous_row_count = current_row_count

        # Delay to allow content to load
        time.sleep(scroll_pause_time)

    # Quit the driver
    driver.quit()

    # Save the data to a CSV file
    store_db = save_to_csv(all_songs, output_csv)

    return store_db


def extract_song_data(driver, start_index):
    """Extract song data from the loaded rows."""
    songs = []
    try:
        # Wait for rows to load
        WebDriverWait(driver, 10).until(
            EC.presence_of_all_elements_located((By.XPATH, "//div[@role='row']"))
        )
        rows = driver.find_elements(By.XPATH, "//div[@role='row']")
        print(f"Found {len(rows) - 1} rows after scrolling.")

        for idx, row in enumerate(rows, start=start_index):
            if idx == start_index:
                continue  # Skip the first row if needed
            try:
                # Extract song details
                song_title = extract_text(row, ".//div[@aria-colindex='2']//div[@data-encore-id='text' and contains(@class, 'standalone-ellipsis-one-line')]")
                artist_element = row.find_elements(By.XPATH, ".//div[@aria-colindex='2']//a[@draggable='true']")
                artist_name = artist_element[0].text if artist_element else "N/A"
                album = extract_text(row, ".//div[@aria-colindex='3']//a[@class='standalone-ellipsis-one-line']")

                songs.append({
                    "Index": idx - 1,
                    "Song_Title": song_title,
                    "Artist": artist_name,
                    "Album": album
                })

            except Exception as e:
                print(f"Error extracting data from row {idx}: {e}")

    except Exception as e:
        print("Error locating rows:", e)

    return songs


def extract_text(element, xpath):
    """Helper function to extract text from an element."""
    try:
        sub_element = element.find_element(By.XPATH, xpath)
        return sub_element.text if sub_element else "N/A"
    except Exception:
        return "N/A"


def save_to_csv(songs, output_csv):
    """Save the extracted song data to a CSV file."""
    df = pd.DataFrame(songs)
    
	# Remove rows where 'Song Title' is 'N/A'
    df = df[df["Song_Title"] != "N/A"]
    
	# Remove duplicates (if any) just before saving
    df = df.drop_duplicates(subset=["Song_Title", "Artist", "Album"], keep="first")
    
    # Sort the DataFrame by the 'Index' column in ascending order
    df = df.sort_values(by="Index", ascending=True)
    # Reset the DataFrame's internal index to avoid confusion
    df = df.reset_index(drop=True)
    
    print(df)  # Print DataFrame in Jupyter Notebook
    df.to_csv(output_csv, index=False)
    print(f"Data exported to {output_csv}")
    return df

# Main Function

In [15]:
# Run the Scraper and Store Data in DB

def main():
    # Spotify playlist URL
    #spotify_url = "https://open.spotify.com/playlist/2lkRkYxrcmBE4qMEkqWtKY?si=7Do9uG10R-aAC9f6aB2bOQ"  # Replace with your playlist URL
    spotify_url = input("Please input Spotify URL here :") # user need to paste the Spotify's playlist here
    output_path = "spotify_playlist.csv"  # Replace with your desired output path
    #Scrape the playlist and get the DataFrame
    playlist_df = scrape_playlist(spotify_url, output_path)

    # Store the scraped data into PostgreSQL
    store_data_in_db(playlist_df)

if __name__ == "__main__":
    main()


Please input Spotify URL here : https://open.spotify.com/album/3RDqXDc1bAETps54MSSOW0?si=1AnD_7lZTsaV5mk4qq6ctg


Stripped URL: https://open.spotify.com/album/3RDqXDc1bAETps54MSSOW0
Popup dialog detected and closed.
Found 16 rows after scrolling.
Scrolled to the last visible element.
Rows loaded: 16
Found 16 rows after scrolling.
Scrolled to the last visible element.
Rows loaded: 16
Reached the bottom of the page.
    Index                            Song_Title       Artist Album
0       1                             Adore You  Miley Cyrus   N/A
1       2                         We Can't Stop  Miley Cyrus   N/A
2       3  SMS (Bangerz) (feat. Britney Spears)  Miley Cyrus   N/A
3       4                     4x4 (feat. Nelly)  Miley Cyrus   N/A
4       5             My Darlin' (feat. Future)  Miley Cyrus   N/A
5       6                         Wrecking Ball  Miley Cyrus   N/A
6       7     Love Money Party (feat. Big Sean)  Miley Cyrus   N/A
7       8                           #GETITRIGHT  Miley Cyrus   N/A
8       9                                 Drive  Miley Cyrus   N/A
9      10             FU (

# 2nd Part - Using SPOTIFY API to extract more information

In [None]:
import psycopg2
import requests
import pandas as pd

# Spotify Developer credentials
SPOTIFY_CLIENT_ID = "e5a1718fa9a0478db7131428ccecdff6"
SPOTIFY_CLIENT_SECRET = "4cdc467c23c74aaaa856b72b25d88167"

# PostgreSQL database connection URL
DB_URL = "postgresql://postgres:2128@localhost:5434/spotify_data"  # Replace with your database credentials


In [None]:

def get_spotify_access_token(client_id, client_secret):
    """Authenticate with Spotify API and get an access token."""
    url = "https://accounts.spotify.com/api/token"
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    data = {"grant_type": "client_credentials"}
    response = requests.post(url, headers=headers, data=data, auth=(client_id, client_secret))
    if response.status_code == 200:
        return response.json()["access_token"]
    else:
        raise Exception(f"Failed to authenticate with Spotify API: {response.json()}")

def get_song_data_from_spotify(song_title, access_token):
    """Search for a song on Spotify and return its details."""
    url = "https://api.spotify.com/v1/search"
    headers = {"Authorization": f"Bearer {access_token}"}
    params = {"q": song_title, "type": "track", "limit": 1}
    response = requests.get(url, headers=headers, params=params)
    if response.status_code == 200:
        results = response.json()
        if results["tracks"]["items"]:
            track = results["tracks"]["items"][0]
            return {
                "Song_Title": track["name"],
                "Artist": ", ".join([artist["name"] for artist in track["artists"]]),
                "Album": track["album"]["name"],
                "Spotify URL": track["external_urls"]["spotify"],
                "Popularity": track["popularity"]  # Add popularity score
            }
        else:
            return None  # No results found
    else:
        raise Exception(f"Failed to search for song: {response.json()}")

def get_song_titles_from_postgres(db_url, table_name):
    """Retrieve song titles from PostgreSQL."""
    engine = create_engine(db_url)
    query = f"SELECT DISTINCT \"Song_Title\" FROM {table_name};"
    df = pd.read_sql(query, engine)
    return df["Song_Title"].tolist()

def store_song_data_in_postgres(song_data, db_url, table_name):
    """Store the retrieved song data into PostgreSQL."""
    engine = create_engine(db_url)
    df = pd.DataFrame(song_data)
    df.to_sql(table_name, engine, if_exists="replace", index=False)
    print(f"Data successfully stored in the '{table_name}' table.")

def analyze_playlist(db_url, table_name):
    """Analyze the playlist data stored in PostgreSQL."""
    engine = create_engine(db_url)
    query = f"SELECT * FROM {table_name};"
    df = pd.read_sql(query, engine)

    # Ensure the Popularity column exists
    if "Popularity" not in df.columns:
        print("No popularity data available for analysis.")
        return

    # Calculate average popularity
    avg_popularity = df["Popularity"].mean()
    print(f"Average Popularity: {avg_popularity:.2f}")

    # Find the most popular song
    most_popular_song = df.loc[df["Popularity"].idxmax()]
    print("\nMost Popular Song:")
    print(most_popular_song)

    # Find the least popular song
    least_popular_song = df.loc[df["Popularity"].idxmin()]
    print("\nLeast Popular Song:")
    print(least_popular_song)

    # Count the number of songs by artist
    artist_counts = df["Artist"].value_counts()
    print("\nNumber of Songs by Artist:")
    print(artist_counts)

    # Save analysis results to a CSV file
    analysis_output_path = "playlist_analysis.csv"
    df.to_csv(analysis_output_path, index=False)
    print(f"\nAnalysis results saved to {analysis_output_path}")

def main():
    # Spotify API authentication
    access_token = get_spotify_access_token(SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET)

    # Retrieve song titles from PostgreSQL
    table_name = "playlist_data"  # Replace with your table name
    song_titles = get_song_titles_from_postgres(DB_URL, table_name)
    print(f"Retrieved {len(song_titles)} song titles from PostgreSQL.")

    # Fetch song data from Spotify API
    song_data = []
    for song_title in song_titles:
        try:
            data = get_song_data_from_spotify(song_title, access_token)
            if data:
                song_data.append(data)
                print(f"Retrieved data for: {song_title}")
            else:
                print(f"No data found for: {song_title}")
        except Exception as e:
            print(f"Error retrieving data for {song_title}: {e}")

    # Store the retrieved data back into PostgreSQL
    output_table_name = "spotify_song_data"  # Replace with your desired output table name
    store_song_data_in_postgres(song_data, DB_URL, output_table_name)

    # Analyze the playlist data
    analyze_playlist(DB_URL, output_table_name)

if __name__ == "__main__":
    main()

## Loading Data by query on postgreSQL and show average Popularity score in gauge bar

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sqlalchemy import create_engine, text
import plotly.graph_objects as go
import pandas as pd

def fetch_data_from_db(db_url, table_name):
    """Fetch data from PostgreSQL database"""
    try:
        engine = create_engine(db_url)
        with engine.connect() as connection:
            query = text(f'SELECT * FROM {table_name}')
            df = pd.read_sql(query, connection)
        return df
    except Exception as e:
        print(f"Error fetching data from database: {e}")
        return pd.DataFrame()  # Return empty DataFrame on error

# Plotting a gauge chart for Popularity Score
def plot_popularity_gauge(df):
    avg_popularity = df['Popularity'].mean()
    min_popularity = df['Popularity'].min()
    max_popularity = df['Popularity'].max()

    # Create the gauge chart    
    fig = go.Figure(go.Indicator(
        mode="gauge+number+delta",
        value=avg_popularity,
        delta={'reference': min_popularity, 'increasing': {'color': "green"}},
        gauge={
            'axis': {'range': [0, 100], 'tickwidth': 1, 'tickcolor': "white"},
            'bar': {'color': "rgba(0,0,0,0)"},  # Transparent bar
            'bgcolor': "white",
            'borderwidth': 2,
            'bordercolor': "gray",
            'steps': [
                {'range': [0, 25], 'color': "#B3B3B3", 'name': "Obscure"},
                {'range': [25, 50], 'color': "#808080", 'name': "Niche"},
                {'range': [50, 70], 'color': "#535353", 'name': "Known"},
                {'range': [70, 85], 'color': "#1ED760", 'name': "Popular"},
                {'range': [85, 100], 'color': "#1DB954", 'name': "Hit"},
            ],
            'threshold': {
                'line': {'color': "red", 'width': 4},
                'thickness': 0.75,
                'value': avg_popularity
            },
            'bar': {'color': 'cyan',  #change needle colour
                    'thickness' : 0.5      # needle thickness (range between 0 - 1)
                   },
            'shape': "angular"  # Makes the needle pointer more visible
        },
        title={
            'text': "Average Popularity Score", #<span style='font-size:0.8em'>{:.1f}</span>".format(avg_popularity),
            'font': {'size': 24}
        },
        number={'font': {'size': 28}, 'suffix': ""},
        domain={'x': [0, 1], 'y': [0, 1]}
    ))

    # Add multi-line annotation (legend) below
    fig.update_layout(
        paper_bgcolor="white",   
        font={'color': "black"},
        annotations=[
            dict(
                x=0.5,
                y=-0.2,  # Lower placement
                showarrow=False,
                text="🎶 <b>Popularity Scale:</b><br>"
                     "0–25: Obscure | 25–50: Niche | 50–70: Known | 70–85: Popular | 85–100: Hit",
                font=dict(size=12, color="blue"),
                xref="paper",
                yref="paper",
                align="center"
            )
        ]
    )

    fig.show()

# Main function to execute the analysis and visualization
def main():
    # DB URL and table name (modify as needed)
    db_url = "postgresql://postgres:2128@localhost:5434/spotify_data"
    table_name = "spotify_song_data"
 
    # Fetch data from the database
    df = fetch_data_from_db(db_url, table_name)
 
    # Check if the popularity column exists
    if 'Popularity' in df.columns:
        # Visualize the popularity score with the gauge chart
        plot_popularity_gauge(df)
    else:
        print("No 'Popularity' column found in the data.")

if __name__ == "__main__":
    main()