# Table of Contents

- [ 1 - Introduction](#1)
- [ 2 - Data Source](#2)
- [ 3 - Exploratory Data Analysis](#3)
- [ 4 - ETL Pipeline with AWS Glue and Terraform](#4)
    - [ 4.1 - Landing Zone](#4-1)
    - [ 4.2 - Transformation Zone](#4-2)
    - [ 4.3 - Serving Zone](#4-3)

<a id='1'></a>
##  1 - Introduction

Built an end-to-end data pipeline to solve the challenge of fragmented music discovery and trend analysis in the rapidly evolving streaming industry. Integrated Spotify's multi-source data (new releases, album tracks, artist metadata) using medallion architecture with Apache Iceberg tables and dbt transformations to enable real-time identification of emerging music trends, artist performance patterns, and genre preferences that drive user engagement and playlist optimization strategies.



In [1]:
# Import libraries
import os
import json
import requests
import pandas as pd  

from IPython.display import HTML
from typing import Dict, Any, Callable
from dotenv import load_dotenv

<a id='2'></a>
## 2 - Data Source

The data source is Spotify API, which contains information about artists, tracks, new releases and relevant information. To get access to the API resources, you need to create a Spotify account and generate an access token, which is a string that contains the credentials and permissions that you can use to access a given resource.

In [2]:
# Get variables cient id and client secret to get the token
load_dotenv('./src/env', override=True)

CLIENT_ID = os.getenv('CLIENT_ID')
CLIENT_SECRET = os.getenv('CLIENT_SECRET')

In [6]:
# Get Token
# Get Token Function

def get_token(client_id: str, client_secret: str, url: str) -> Dict[Any, Any]:
    """Allows to perform a POST request to obtain an access token
    
    Args:
        client_id(str): App client id
        client_secret(str): App client secret
        url(str): url to perform the post request
        
    Returns:
        Dict[Any, Any]:Dictionary containing the access token
        
    """

    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }

    payload = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret
    }

    try:
        response = requests.post(url= url, headers=headers, data=payload)
        print(type(response))
        response.raise_for_status()
        response_json = json.loads(response.content)
        
        return response_json

    except Exception as err:
        print(f"Error: {err}")
        return {}
    
URL = "https://accounts.spotify.com/api/token"
token = get_token(CLIENT_ID, CLIENT_SECRET, URL)

print(token)

<class 'requests.models.Response'>
{'access_token': 'BQDdnuAI6IxvhQlw6wqViZJ0_cQgDAu4DRUwTkYn5h61KaDCSVqA_g_tgFc95VQYTHI6k-gLKeP1RuQ6O2YMMThaf-CPDQdrxK1SIZDpNBVD2RXlmRhZU5UWz_0-L2UFlzLj8I6PCU0', 'token_type': 'Bearer', 'expires_in': 3600}


Whenever you send an API request to the spotify API, you need to include in the request the access token, as an authorization header. Function `get_auth_header` expects the access token and returns the authorization header that can be included in the API request.

In [13]:
def get_auth_header(access_token: str) -> Dict[str, str]:
    return {'Authorization': f'Bearer {access_token}'}

2.1. The first endpoint is to the `new_releases' path in the API, which gives a list of new album releases featured in Spotify

In [None]:
def get_new_releases(url:str, access_token: str, offset: int=0, limit: int=20, next: str="") -> Dict[Any, Any]:
    """Performs a GET request to the new release endpoint
    """

    if next == "":
        request_url = f"{url}?offset={offset}&limit={limit}"
    else:
        request_url = f"{next}"

    headers = get_auth_header(access_token)

    try:
        response = requests.get(url= request_url, headers= headers) 
        response.raise_for_status()
        
        return response.json()
    
    except Exception as err:
        print(f"Error: {err}")
        return {}
    
URL = "https://api.spotify.com/v1/browse/new-releases"
new_releases = get_new_releases(URL, access_token= token.get('access_token'))

In [15]:
new_releases.keys()

dict_keys(['albums'])

In [16]:
new_releases['albums'].keys()

dict_keys(['href', 'items', 'limit', 'next', 'offset', 'previous', 'total'])

In [19]:
items = new_releases['albums'].get('items')
items[0]

{'album_type': 'album',
 'artists': [{'external_urls': {'spotify': 'https://open.spotify.com/artist/4gvjmrtzydbMpyJaXUtwvP'},
   'href': 'https://api.spotify.com/v1/artists/4gvjmrtzydbMpyJaXUtwvP',
   'id': '4gvjmrtzydbMpyJaXUtwvP',
   'name': 'Addison Rae',
   'type': 'artist',
   'uri': 'spotify:artist:4gvjmrtzydbMpyJaXUtwvP'}],
 'available_markets': ['AR',
  'AU',
  'AT',
  'BE',
  'BO',
  'BR',
  'BG',
  'CA',
  'CL',
  'CO',
  'CR',
  'CY',
  'CZ',
  'DK',
  'DO',
  'DE',
  'EC',
  'EE',
  'SV',
  'FI',
  'FR',
  'GR',
  'GT',
  'HN',
  'HK',
  'HU',
  'IS',
  'IE',
  'IT',
  'LV',
  'LT',
  'LU',
  'MY',
  'MT',
  'MX',
  'NL',
  'NZ',
  'NI',
  'NO',
  'PA',
  'PY',
  'PE',
  'PH',
  'PL',
  'PT',
  'SG',
  'SK',
  'ES',
  'SE',
  'CH',
  'TW',
  'TR',
  'UY',
  'US',
  'GB',
  'AD',
  'LI',
  'MC',
  'ID',
  'JP',
  'TH',
  'VN',
  'RO',
  'IL',
  'ZA',
  'SA',
  'AE',
  'BH',
  'QA',
  'OM',
  'KW',
  'EG',
  'MA',
  'DZ',
  'TN',
  'LB',
  'JO',
  'PS',
  'IN',
  'BY',
  'KZ'

2.2. The second endpoint is to `albums tracks` path in the API, which allows you to get Spotify catalog information about an album’s tracks. We will perform a GET request to test the API and we will need Spotify ID of the album to get the catalog information for that album id. The Spotify ID is the album id of the tracks which we will get in new releases.

<a id='4'></a>
## 4 - ETL Pipeline with AWS Glue and Terraform

Now we will start creating the required resources and infrastructure for your data pipeline. We will use a medallion architecture.

The pipeline will be composed by the following steps:
- An extraction job to get the data from the two API endpoints. This data will be stored in the landing zone of your Data Lake in JSON format.
- A transformation job that takes the JSON data extracted from the API endpoints, normalizes some nested fields, adds metadata and stores the dataset in Iceberg format.
- The creation of some schemas in your Data Warehouse hosted in Redshift.