# Spotify Data Pipeline Workshop

This notebook will guide you through the different steps to successfully complete a Data Pipeline

### Py Libraries Installation and Initialising the Spark Session

In [None]:
# Install necessary packages.
!pip install -q findspark  # Used to locate Spark in the environment
!pip install py4j          # Enables Python to communicate with the JVM (required for PySpark)
!pip install plotly         # For interactive visualizations
!pip install google-generativeai  # For using Gemini (or other generative AI models)

!pip install prefect       # For workflow orchestration
!pip install typing_extensions  # For type hinting

# Installing and setting up MySQL
!pip -q install PyMySQL    # Python driver for MySQL
!pip install prettytable   # For displaying data in tables
!pip install sqlalchemy    # Object-relational mapper (ORM) for database interaction
!pip install ipython-sql   # For running SQL queries in Jupyter notebooks

# Load the SQL magic extension for Jupyter notebooks
%load_ext sql
%config SqlMagic.style = '_DEPRECATED_DEFAULT'  # Set the style for displaying SQL results

# Connect to MySQL database running in a Docker container
%sql mysql+pymysql://root:root@host.docker.internal:8083

# Import necessary libraries
import os
import sys

import findspark
findspark.init()  # Initialize findspark to locate Spark
findspark.find()  # Find the Spark installation

import pyspark
from pyspark.sql import DataFrame, SparkSession
from prefect import task, flow  # For creating Prefect tasks and flows
from typing import List
import pyspark.sql.types as T  # For Spark data types
import pyspark.sql.functions as F  # For Spark SQL functions

# Create a SparkSession
spark = SparkSession \
        .builder \
        .appName("MWC Data Analytics Lab") \
        .config("spark.jars", "./pkgs/mysql-connector-j-9.2.0.jar") \
        .getOrCreate()

spark  # Display the SparkSession object

In [None]:
%sql SHOW DATABASES;

### Create a Database for the Workshop
We will be creating tables throughout the Workshop and will need a Database to store them. Create a database in MySql using the Magic command "%sql" at the beginning of the line.

Name of the Database: "mwc_lab"

In [None]:
%sql CREATE DATABASE IF NOT EXISTS mwc_lab;

In [None]:
%sql 

# Creating our Secrets -- Spotify and Gemini
### Getting Started with Spotify's Web API
> https://developer.spotify.com/documentation/web-api/tutorials/getting-started

* App Name: Up to you, not relevant. Can use "MWC"
* App Description: Up to you, not relevant. Can use "End-to-end Data Analytics Workshop"
* Redirect URIs: http://localhost:8888/callback
* APIs Used: Web API

Once you have completed all the steps, you should have access to your Client ID and Client Secret. Copy them in the cell below. 

### Getting Started with Gemini

> https://pypi.org/project/google-generativeai/ 

Only the first three steps are needed, **We just need to create an API Key**.
Once you have the API Key, copy it in the cell below "gemini_api_key"

In [None]:
client_id = ""   # Introduce your Spotify's Client ID
client_secret = "" # Introduce your Spotify's Client Secret
gemini_api_key = "" # Introduce your Gemini API Key

> **BIG DISCLAIMER: Storing such information in python variables IS NOT a good practice. In order to remove a bit of complexity from the workshop, we are proceeding this way. Please DO NOT SHARE these.**

# Building a Spotify API Connector

We will need several interactions with the API, so we will build our own python connector to interact with the several entrypoints that the API provides. 

There is an already built-in library for this, SpotiPy (https://spotipy.readthedocs.io/en/2.22.1/). 
However, for the sake of making the workshop a bit more interesting, let's build our own to demystify the complexity of working with APIs.

In [None]:
from pyspark.sql import SparkSession, DataFrame, Column
from typing import List
from datetime import date

import pyspark.sql.functions as F
import requests
import json

# Get the Spark Context from the existing Spark Session
sc = spark.sparkContext

class SpotifyAPI:
    """
    A class to interact with the Spotify API.
    """
    def __init__(self) -> object:
        """
        Initializes the SpotifyAPI object with client credentials and base URLs.
        """
        self.client_id = client_id  # Assuming client_id is defined elsewhere
        self.client_secret = client_secret  # Assuming client_secret is defined elsewhere
        self.base_url = "https://api.spotify.com/v1"  # Spotify API base URL
        self.token_url = "https://accounts.spotify.com/api/token"  # Spotify token URL
        self.access_token = self.get_access_token()  # Get the access token

    def get_access_token(self) -> str:
        """
        Retrieves an access token from the Spotify API using client credentials.
        """
        response = requests.post(
            self.token_url,
            data={"grant_type": "client_credentials"},
            auth=(self.client_id, self.client_secret)
        )
        return response.json()["access_token"]

    def parse_json(self, endpoint: str, object_id: str = None, params: dict = {}) -> str:
        """
        Sends a GET request to the Spotify API and returns the JSON response.
        """
        url = f'{self.base_url}/{endpoint}/'

        if object_id:
            url += f"{object_id}"

        try:
            response = requests.get(
                url,
                headers=dict(Authorization=f"Bearer {self.access_token}"),
                params=params
            )
            response.raise_for_status()  # Raise an exception for bad status codes (4xx or 5xx)
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"Error fetching data from {url}: {e}")
            return None

    @staticmethod
    def read_json_to_df(obj) -> object:
        """
        Converts a JSON object to a Spark DataFrame.
        """
        return spark.read.json(sc.parallelize([json.dumps(obj)]))

# Create an instance of the SpotifyAPI class
sp = SpotifyAPI()

## Prefect -- Orchestrating tasks and Flows
Prefect is a **modern data workflow orchestration tool designed to help you build, run, and monitor robust and reliable data pipelines**. It provides a flexible and intuitive way to define complex workflows as code, making it easier to manage dependencies, handle errors, and observe the execution of your data processes.

Brief Introduction to Flows and Tasks:

Flows:
* A flow is the fundamental unit of a Prefect workflow. It represents the overall sequence of operations that make up your data pipeline.
* Flows are defined using the @flow decorator.
* They define the overall structure and dependencies of your workflow.
* Flows orchestrate the execution of tasks.
  
Tasks:
* A task is a discrete unit of work within a flow. It represents a single operation, such as reading data from a database, transforming data, or writing data to a file.
* Tasks are defined using the @task decorator.
* They encapsulate the logic for performing a specific operation.
* Tasks can have dependencies on other tasks.   
* Tasks can return values that can be used by other tasks in the flow.

>We will be running all our tasks on a temp server that is set up automatically for us for simplicity of the environment setup. However, we could do flows and deployments into a separate server and see them through their UI or add schedules 

>You can check how the UI looks like in a self-hosted Prefect server at http://0.0.0.0:4200/dashboard 

In [None]:
from prefect.cache_policies import NO_CACHE


@task
def get_api_response(endpoint: str, details: str = None):
    try:
        response = sp.parse_json(endpoint, details)
    except Exception as e:
        raise(e)

    return response


@task(cache_policy=NO_CACHE)
def write_df(df: DataFrame, target_db: str, table_name: str) -> None: 
    connection_properties = {
    "user" : "root",
    "passw" : "root",
    "driver" : "com.mysql.jdbc.Driver",
    "base_url" : "jdbc:mysql://host.docker.internal:8083/",
    "db" : f"{target_db}"
    }

    df.write \
      .format("jdbc") \
      .option("driver",connection_properties["driver"]) \
      .option("url", connection_properties["base_url"] + connection_properties["db"]) \
      .option("dbtable", f"{table_name}") \
      .option("user", connection_properties["user"]) \
      .option("password", connection_properties["passw"]) \
      .mode("overwrite") \
      .save()


@task
def read_mysql_table(db: str, table_name: str): 
    connection_properties = {
    "user" : "root",
    "passw" : "root",
    "driver" : "com.mysql.jdbc.Driver",
    "base_url" : "jdbc:mysql://host.docker.internal:8083/",
    "db" : f"{db}"
    }
    
    df = spark.read \
    .format("jdbc") \
    .option("driver",connection_properties["driver"]) \
    .option("url", connection_properties["base_url"] + connection_properties["db"]) \
    .option("dbtable", f"{table_name}") \
    .option("user", connection_properties["user"]) \
    .option("password", connection_properties["passw"]) \
    .load()

    return df

## Our first Table -- playlist_tracks

As a first step, let's create a first table that will hold the cleansed and formatted information extracted from our Playlist. 
Steps: 
* Define the playlist we will be working with
* Call the function get_playlist_tracks_api_call to get the data
* Reformat the spark DataFrame to have the following schema:
```bash
root
 |-- track_id: string (nullable = true)
 |-- artist_id: string (nullable = false)
 |-- album_id: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- track_popularity: long (nullable = true)
 |-- track_duration_ms: long (nullable = true)
 |-- album_release_date: string (nullable = true)
 |-- track_uri: string (nullable = true)
 |-- added_at: string (nullable = true)
```

HINT: There are nested fields in the response, you may need to access Array Structs. You can do so by simply using dots to access different items within an array of fields. 
E.g.:
```python
df.printSchema() # To know what schema we are working with

...
.select(F.col("item.subitem")) # To access nested fields
```

You can do all this in the empty function get_playlist_tracks_df in the cell below:

In [None]:
playlist_id = "167pv4bC72ZabUAzJ6tFyq" # You can use any playlist id that is public and made by an user. You can use this one by default.

In [None]:
@task
def get_playlist_tracks_api_call(playlist_id: str):
    """
    Retrieves tracks data for a given playlist ID from the Spotify API.

    Args:
        playlist_id: The ID of the Spotify playlist.

    Returns:
        A DataFrame containing the raw API response for the playlist tracks.
    """
    playlist_response = get_api_response("playlists", f"{playlist_id}/tracks")
    return sp.read_json_to_df(playlist_response["items"])

> To check whether your credentials work, call the function get_playlist_tracks_api_call

In [None]:
# TO BE COMPLETED
df = # ...
df.show(10, False)

In [None]:
@task
def get_playlist_tracks_df(playlist_id: str) -> DataFrame:
    """ TO BE COMPLETED!!!
    Processes the raw API response for playlist tracks and extracts relevant information into a structured DataFrame.

    Args:
        playlist_id: The ID of the Spotify playlist.

    Returns:
        A DataFrame containing the extracted track information.
    """
    df = get_playlist_tracks_api_call(playlist_id) # Check schema of this df to know what we are working with
    playlist_tracks_df = (
        df
        .withColumn("artist_id_conc", F.concat_ws(",", "track.album.artists.id"))
        .withColumn("artist_id", F.split("artist_id_conc", ",")[0])
        .select(
            F.col("track.id").alias("track_id"),
            F.col("artist_id"),
            # TO BE COMPLETED 
            # ...
            # ...
    ))
    return playlist_tracks_df



playlist_tracks_df = get_playlist_tracks_df(playlist_id)
playlist_tracks_df.show(20)

### Let's write this information into a table

For this purpose, we have already defined a function to write to our MySQL instance. We just need to call it with the params we want.
Function: write_df

In [None]:
write_df(
# TO BE COMPLETED
# ...
)

### Explore the data in MySql to double-check the write operation

Check whether the table has been written and it has data

In [None]:
%sql 

In [None]:
%sql

### Moving forward, we will work with a smaller subset of Data for performance

Let's choose our top 30 tracks based on popularity (top 30 most popular tracks in our playlist).


In [None]:
playlist_tracks_top30 = # TO BE COMPLETED!!!

## Brief analysis over playlist_tracks top 30
### Let's Visualise popularity against Release Year over our top 30

In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

df = playlist_tracks_top30.withColumn("release_year", F.year(F.to_date(F.col("album_release_date"))))
pandas_df = df.select("track_popularity", "release_year").toPandas()

# Scatter Plot
plt.figure(figsize=(12, 6))
sns.scatterplot(x="release_year", y="track_popularity", data=pandas_df)
plt.title("Track Popularity vs. Album Release Year")
plt.xlabel("Album Release Year")
plt.ylabel("Track Popularity")
plt.savefig("track_popularity_vs_release_year.png")
plt.show()

In [None]:
# Box Plot (Alternative)
plt.figure(figsize=(16, 8))
sns.boxplot(x="release_year", y="track_popularity", data=pandas_df)
plt.title("Track Popularity Distribution by Album Release Year")
plt.xlabel("Album Release Year")
plt.ylabel("Track Popularity")
plt.xticks(rotation=45) #rotate x axis labels
plt.savefig("track_popularity_dist_by_release_year.png")
plt.show()

## Second Table -- artist_albums

Now, let's go into the second step which will be get the different albums per artist.
* Complete the code for the function get_artist_albums_df
    * Get a list of artist ids with no duplicates -- Select distinct artist ids
    * Call the function that triggers the API call -- get_artists_albums
    * Format the output dataframe to follow the schema defined below:

```bash
root
 |-- artist_name: string (nullable = false)
 |-- name: string (nullable = true)
 |-- id: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- total_tracks: long (nullable = true)
```

In [None]:
def get_stats_over_col(col_name: Column, df: DataFrame): 
    avg = df.select(F.avg(col_name)).collect()[0][0]
    stddev = df.select(F.stddev(col_name)).collect()[0][0]
    return round(avg), round(stddev)

avg_popularity, stddev_popularity = get_stats_over_col(col_name = F.col("track_popularity"), df = playlist_tracks_top30)
avg_release_date, stddev_release_date = get_stats_over_col(col_name = F.year(F.col("album_release_date")), df = playlist_tracks_top30)

> The next retrieval operation may take a few minutes since we need to do several API calls, one per artist

In [None]:
from functools import reduce

@task(cache_policy=NO_CACHE)
def get_artists_albums(artist_ids: List):
    """
    Retrieves and combines album data for a list of artist IDs.

    Args:
        artist_ids (list): A list of artist IDs (strings or integers).

    Returns:
        DataFrame: A single DataFrame containing combined album information for all artists.
    """
    ls_dfs=[]
    for i in range(len(artist_ids)):
        artist_albums = get_api_response("artists", f"{artist_ids[i]}/albums")
        ls_dfs += [sp.read_json_to_df(artist_albums["items"])]


    return reduce(DataFrame.unionAll, ls_dfs)


@task(cache_policy=NO_CACHE)
def get_artist_albums_df(df: DataFrame):
    """
    Retrieves and transforms album data for artists present in a DataFrame.

    Args:
        df (DataFrame): A DataFrame containing artist IDs.

    Returns:
        DataFrame: A DataFrame containing enriched and transformed album information.
    """
    artists_list = [row.artist_id for row in # {TO BE COMPLETED!!!}.collect()]  
    artist_albums_raw =  # TO BE COMPLETED!!! -- Call to the previous function
    artist_albums_df = (
        artist_albums_raw.withColumn("artist_name", F.concat_ws(", ", "artists.name"))
        .select(
            F.col("artist_name"),
            # TO BE COMPLETED 
            # ...
            # ...

        )
    )
    return artist_albums_df


artist_albums_df = get_artist_albums_df(playlist_tracks_top30)
artist_albums_df.printSchema()
artist_albums_df.show()

In [None]:
artist_albums_df.select(F.col("artist_name"),F.col("name"), F.col("id")).show(20, False)

In [None]:
write_df(
# TO BE COMPLETED 
# ...
)

### Explore the data in MySql to double-check the write operation

In [None]:
%sql

## Third and last table -- album_tracks

Similarly to how we did our second table, we now need to get the tracks from each album based on album_ids

* Complete the get_album_tracks function
    * Calculate duration_minutes col
    * Format the dataframe so it follows the schema below: 

```bash
root
 |-- track_id: string (nullable = true)
 |-- album_id: string (nullable = false)
 |-- track_name: string (nullable = true)
 |-- album_name: string (nullable = false)
 |-- artist_name: string (nullable = false)
 |-- duration_minutes: double (nullable = true)
 |-- explicit: boolean (nullable = true)
 |-- track_number: long (nullable = true)
```

In [None]:
@task(cache_policy=NO_CACHE)
def get_album_tracks(album_ids: List):
    """
    Retrieves and combines track data for a list of album IDs.

    Args:
        album_ids (List): A list of tuples, where each tuple contains (album_id, album_name).

    Returns:
        DataFrame: A single DataFrame containing combined track information for all albums.
    """
    ls_dfs=[]
    for i in range(len(album_ids)):
        album_tracks = get_api_response("albums", f"{album_ids[i][0]}/tracks")
        album_tracks_df = sp.read_json_to_df(album_tracks["items"])
        enriched = album_tracks_df.withColumn("album_id", F.lit(f"{album_ids[i][0]}")).withColumn("album_name", F.lit(f"{album_ids[i][1]}"))
        ls_dfs += [enriched]

    return reduce(DataFrame.unionAll, ls_dfs)


@task(cache_policy=NO_CACHE)
def get_album_tracks_df(df: DataFrame):
    """
    Retrieves and transforms track data for albums present in a DataFrame.

    Args:
        df (DataFrame): A DataFrame containing artist albums.

    Returns:
        DataFrame: A DataFrame containing enriched and transformed track information.
    """
    album_ids = [(row.id, row.name) for row in df.select(F.col("id"), F.col("name")).distinct().collect()]
    album_tracks_raw = get_album_tracks(album_ids)

    album_tracks_df = (
        album_tracks_raw
        .withColumn("artist_name", F.concat_ws(", ", "artists.name"))
        .withColumn("duration_minutes", #TO BE COMPLETED)
        .select(
            F.col("id").alias("track_id"),
            # TO BE COMPLETED 
            # ...
            # ...

        )
    )
    return album_tracks_df
    

album_tracks_df = get_album_tracks_df(artist_albums_df)

In [None]:
album_tracks_df.count()

### Let's write this information into a table

For this purpose, we have already defined a function to write to our MySQL instance. We just need to call it with the params we want.
Function: write_df

In [None]:
# TO BE COMPLETED 

### Explore the data in MySql to double-check the write operation

In [None]:
%sql

In [None]:
%sql ;

## Analysis over Album Tracks

Now we finally got to the data we were interested in -- All the albums and their songs from the most popular artists in one of our playlists.
We could elaborate a report to then check which songs from all these, could be potentially interesting to us or use this as a base training for a personal recommendation algorithm.

Let's deep dive into this dataset and generate some visualisations to digest the information better. 

To visualise data in a notebook, we will need to convert from Pyspark DataFrames to Pandas.

In [None]:
# Convert to Pandas DataFrame
pandas_df = album_tracks_df.toPandas()

#### Track Duration Distribution (Histogram or KDE Plot)

In [None]:
plt.figure(figsize=(10, 6))
sns.histplot(pandas_df['duration_minutes'], bins=30, kde=True)
plt.title("Distribution of Track Duration (Minutes)")
plt.xlabel("Track Duration (Minutes)")
plt.ylabel("Frequency")
plt.savefig("track_duration_distribution.png")
plt.show()


#### Explicit Content Distribution (Count Plot)

In [None]:
plt.figure(figsize=(8, 6))
sns.countplot(x='explicit', data=pandas_df)
plt.title("Distribution of Explicit Content")
plt.xlabel("Explicit")
plt.ylabel("Count")
plt.savefig("distribution_of_explicit_content.png")
plt.show()

#### Distribution of Track Number

In [None]:
plt.figure(figsize=(10, 6))
sns.histplot(pandas_df['track_number'], bins=20, kde=False)
plt.title("Distribution of Track Number")
plt.xlabel("Track Number")
plt.ylabel("Frequency")
plt.savefig("distribution_of_track_number.png")
plt.show()

#### Artist Track Count (Bar Plot)

In [None]:
artist_counts = pandas_df['artist_name'].value_counts().head(10) #Top 10 artists.
plt.figure(figsize=(12, 6))
sns.barplot(x=artist_counts.index, y=artist_counts.values)
plt.title("Top 10 Artists Track Count")
plt.xlabel("Artist Name")
plt.ylabel("Track Count")
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.savefig("top_10_artist_track_count.png")
plt.show()

#### Track Duration vs. Track Number (Scatter plot)

In [None]:
plt.figure(figsize=(10, 6))
sns.scatterplot(x='track_number', y='duration_minutes', data=pandas_df)
plt.title("Track Duration vs. Track Number")
plt.xlabel("Track Number")
plt.ylabel("Track Duration (Minutes)")
plt.savefig("track_duration_vs_track_number.png")
plt.show()

# Google Gemini Analysis Insights

Leveraging googles open api call to Gemini, we can make requests to their LLM and obtain the responses. 
This allows us to seamlessly interact with public genAI solutions and gives us tons of opportunities to automate actions and jump straight into helpful insights. 

For that, we just need to initialise a client with the key we got at the beginning of the workshop, define our prompt with all the material we want to include for context enriching, and that's it.


In [None]:
import textwrap
import google.generativeai as genai
import PIL.Image
from IPython.display import display
from IPython.display import Markdown

In [None]:
def to_markdown(text):
  text = text.replace('•', '  *')
  return Markdown(textwrap.indent(text, '> ', predicate=lambda _: True))

genai.configure(api_key=gemini_api_key)

In [None]:

# Storing the plot images from the previous exercises into python variables
img1 = PIL.Image.open("track_duration_vs_track_number.png")
# TO BE COMPLETED
# ...


# Initialising the client and preparing our context
model = genai.GenerativeModel('gemini-1.5-flash-latest')
response = model.generate_content(img1)
# TO BE COMPLETED
# ...

# Prompt definition
prompt = """ Create a detailed report over the track and album data extracted from Spotify. 
Create summary points per each image, explaining in simple terms what each plot represents.
Provide key findings and actionable insights based on the data extracted from these plots.
"""

# Response 
response = model.generate_content([prompt, img1, img2, img3, img4, img5, img6, img7], stream=True)
response.resolve()
formatted_text = to_markdown(response.text)
display(formatted_text)