In [2]:
## Installing Spotify's API library in Python

# pip install spotipy

In [3]:
## Importing libraries

import sys
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
import pandas as pd

from datetime import datetime

sys.path.append("/home/tabas/personal-dev/pyprojects")
import pipelines.utils.personal_env as penv

In [4]:
# Importing Spotify Credentials

CLIENT_ID = penv.spotify_client_id
CLIENT_SECRET = penv.spotify_client_secret

In [5]:
# Spotify Authentication

auth_manager = SpotifyClientCredentials(client_id=CLIENT_ID, client_secret=CLIENT_SECRET)
sp = spotipy.Spotify(auth_manager=auth_manager)

In [6]:
# Available markets (use Bash Operator with Spotify to get available markets and pass through XCOM)

markets = [
            "AD", "AE", "AG", "AL", "AM", "AO", "AR", "AT", "AU", "AZ", "BA", "BB", "BD", 
            "BE", "BF", "BG", "BH", "BI", "BJ", "BN", "BO", "BR", "BS", "BT", "BW", "BY", 
            "BZ", "CA", "CD", "CG", "CH", "CI", "CL", "CM", "CO", "CR", "CV", "CW", "CY", 
            "CZ", "DE", "DJ", "DK", "DM", "DO", "DZ", "EC", "EE", "EG", "ES", "ET", "FI", 
            "FJ", "FM", "FR", "GA", "GB", "GD", "GE", "GH", "GM", "GN", "GQ", "GR", "GT", 
            "GW", "GY", "HK", "HN", "HR", "HT", "HU", "ID", "IE", "IL", "IN", "IQ", "IS", 
            "IT", "JM", "JO", "JP", "KE", "KG", "KH", "KI", "KM", "KN", "KR", "KW", "KZ", 
            "LA", "LB", "LC", "LI", "LK", "LR", "LS", "LT", "LU", "LV", "LY", "MA", "MC", 
            "MD", "ME", "MG", "MH", "MK", "ML", "MN", "MO", "MR", "MT", "MU", "MV", "MW", 
            "MX", "MY", "MZ", "NA", "NE", "NG", "NI", "NL", "NO", "NP", "NR", "NZ", "OM", 
            "PA", "PE", "PG", "PH", "PK", "PL", "PR", "PS", "PT", "PW", "PY", "QA", "RO", 
            "RS", "RW", "SA", "SB", "SC", "SE", "SG", "SI", "SK", "SL", "SM", "SN", "SR", 
            "ST", "SV", "SZ", "TD", "TG", "TH", "TJ", "TL", "TN", "TO", "TR", "TT", "TV", 
            "TW", "TZ", "UA", "UG", "US", "UY", "UZ", "VC", "VE", "VN", "VU", "WS", "XK", 
            "ZA", "ZM", "ZW"]

In [7]:
## Getting informations

df = pd.DataFrame()

for i in range(len(markets)):
    
    limit=50
    offset=0
    
    while offset < 1000:    # Spotify limit is 1000
    
        newAlbums = sp.search(q="tag:new", market=markets[i], type="album", limit=limit, offset=offset)
        
        data = pd.DataFrame.from_dict(newAlbums['albums']['items'])
        
        df = pd.concat([df, data])
        
        offset=offset+limit
    print("Successfully got request from ", markets[i], "market")

Successfully got request from  AD market
Successfully got request from  AE market
Successfully got request from  AG market
Successfully got request from  AL market
Successfully got request from  AM market
Successfully got request from  AO market
Successfully got request from  AR market
Successfully got request from  AT market
Successfully got request from  AU market
Successfully got request from  AZ market
Successfully got request from  BA market
Successfully got request from  BB market
Successfully got request from  BD market
Successfully got request from  BE market
Successfully got request from  BF market
Successfully got request from  BG market
Successfully got request from  BH market
Successfully got request from  BI market
Successfully got request from  BJ market
Successfully got request from  BN market
Successfully got request from  BO market
Successfully got request from  BR market
Successfully got request from  BS market
Successfully got request from  BT market
Successfully got

In [8]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 185000 entries, 0 to 49
Data columns (total 14 columns):
 #   Column                  Non-Null Count   Dtype 
---  ------                  --------------   ----- 
 0   album_type              185000 non-null  object
 1   total_tracks            185000 non-null  int64 
 2   is_playable             185000 non-null  bool  
 3   external_urls           185000 non-null  object
 4   href                    185000 non-null  object
 5   id                      185000 non-null  object
 6   images                  185000 non-null  object
 7   name                    185000 non-null  object
 8   release_date            185000 non-null  object
 9   release_date_precision  185000 non-null  object
 10  type                    185000 non-null  object
 11  uri                     185000 non-null  object
 12  artists                 185000 non-null  object
 13  restrictions            22 non-null      object
dtypes: bool(1), int64(1), object(12)
memory usage

In [13]:
df = pd.concat([df, df])

In [14]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 1480000 entries, 0 to 49
Data columns (total 14 columns):
 #   Column                  Non-Null Count    Dtype 
---  ------                  --------------    ----- 
 0   album_type              1480000 non-null  object
 1   total_tracks            1480000 non-null  int64 
 2   is_playable             1480000 non-null  bool  
 3   external_urls           1480000 non-null  object
 4   href                    1480000 non-null  object
 5   id                      1480000 non-null  object
 6   images                  1480000 non-null  object
 7   name                    1480000 non-null  object
 8   release_date            1480000 non-null  object
 9   release_date_precision  1480000 non-null  object
 10  type                    1480000 non-null  object
 11  uri                     1480000 non-null  object
 12  artists                 1480000 non-null  object
 13  restrictions            176 non-null      object
dtypes: bool(1), int64(1), object

In [15]:
## Importing Credentials from Google Cloud

from google.cloud import storage
from google.oauth2 import service_account

CREDENTIALS = service_account.Credentials.from_service_account_file(penv.bq_path)
STORAGE = storage.Client(credentials=CREDENTIALS)

In [16]:
# Acessing Bucket Path

bucket = STORAGE.get_bucket(penv.bucket_path)

In [17]:
# Getting currentTimestamp

currentTimestamp = datetime.today().strftime('%Y-%m-%d %X')

# Adding currentTimestamp on file name, so it doesn't overwrite itself. 
# Also, it helps keep track on incremental models

file_name = f"spotify_api_test_data__{currentTimestamp}"

In [19]:
## Defining a function called avro_df_prep to prepare the dataframe for the Avro format

def avro_df_prep():

    # pip install fastavro

    from fastavro import writer, parse_schema

    # Converting all columns to string, because Avro doesn't support object type

    columns_to_convert = [  # Lista de colunas definidas no esquema Avro
        'album_type', 'external_urls', 'href',
       'id', 'images', 'name', 'release_date', 'release_date_precision',
       'type', 'uri', 'artists', 'restrictions']

    df[columns_to_convert] = df[columns_to_convert].astype(str)

    # Declaring dataframe schema

    schema = {
        'name': 'spotify'
        , 'type': 'record'
        , 'fields': [
                        {'name': 'album_type', 'type': 'string'}, 
                        {'name': 'total_tracks', 'type': 'int'}, 
                        {'name': 'is_playable', 'type': 'boolean'}, 
                        {'name': 'external_urls', 'type': 'string'},
                        {'name': 'id', 'type': 'string'},
                        {'name': 'images', 'type': 'string'},  
                        {'name': 'name', 'type': 'string'}, 
                        {'name': 'release_date', 'type': 'string'}, 
                        {'name': 'release_date_precision', 'type': 'string'}, 
                        {'name': 'href', 'type': 'string'}, 
                        {'name': 'type', 'type': 'string'}, 
                        {'name': 'uri', 'type': 'string'}, 
                        {'name': 'artists', 'type': 'string'}, 
                        {'name': 'restrictions', 'type': 'string'}, 
                    ]
    }

    parsed_schema = parse_schema(schema)
    records = df.to_dict('records')

    # Writing an Avro file on 'archive' directory

    with open(f'/home/tabas/personal-dev/pyprojects/pipelines/archive/{file_name}.avro', 'wb') as out:
       writer(out, parsed_schema, records)
            

In [35]:
avro_df_prep()

: 

In [None]:
## Writing Dataframe to Bucket folder with desired file format 

file_formats = [
                'csv'
                , 'parquet'
                , 'json'
                , 'orc'
                , 'avro'
]

for i in range(len(file_formats)):
    
    blob = bucket.blob(f"{penv.bucket_folder}/{file_name}.{file_formats[i]}")
    
    if file_formats[i] == 'csv':
        print("Begin at: ", datetime.today().strftime('%Y-%m-%d %X'))
        blob.upload_from_string(df.to_csv(), '/text/csv')
        print("Sucessfully written in ", file_formats[i])
        print("End at: ", datetime.today().strftime('%Y-%m-%d %X'))
        
    if file_formats[i] == 'parquet':
        print("Begin at: ", datetime.today().strftime('%Y-%m-%d %X'))
        blob.upload_from_string(df.to_parquet(), '/text/plain')
        print("Sucessfully written in ", file_formats[i])
        print("End at: ", datetime.today().strftime('%Y-%m-%d %X'))
        
    if file_formats[i] == 'json':
        print("Begin at: ", datetime.today().strftime('%Y-%m-%d %X'))
        blob.upload_from_string(df.to_json(orient='table'), '/text/plain')
        print("Sucessfully written in ", file_formats[i])
        print("End at: ", datetime.today().strftime('%Y-%m-%d %X'))
        
    if file_formats[i] == 'orc':
        print("Begin at: ", datetime.today().strftime('%Y-%m-%d %X'))
        blob.upload_from_string(df.reset_index().to_orc(index=None), '/text/plain')
        print("Sucessfully written in ", file_formats[i])        
        print("End at: ", datetime.today().strftime('%Y-%m-%d %X'))
        
    if file_formats[i] == 'avro':
        print("Begin at: ", datetime.today().strftime('%Y-%m-%d %X'))
        avro_df_prep()
        blob.upload_from_filename(f'/home/tabas/personal-dev/pyprojects/pipelines/archive/{file_name}.avro', 'wb', '/text/plain')
        print("Sucessfully written in ", file_formats[i])        
        print("End at: ", datetime.today().strftime('%Y-%m-%d %X'))

#1st result

- Begin at:  2025-01-24 21:38:26
- Sucessfully written in  csv
- End at:  2025-01-24 21:38:48

- Begin at:  2025-01-24 21:38:48
- Sucessfully written in  parquet
- End at:  2025-01-24 21:38:51

- Begin at:  2025-01-24 21:38:51
- Sucessfully written in  json
- End at:  2025-01-24 21:39:17

- Begin at:  2025-01-24 21:39:17
- Sucessfully written in  orc
- End at:  2025-01-24 21:39:37

- Begin at:  2025-01-24 21:39:37
- Erro

#2nd result

- Begin at:  2025-01-24 23:04:22
- Sucessfully written in  csv
- End at:  2025-01-24 23:04:45
- Begin at:  2025-01-24 23:04:45
- Sucessfully written in  parquet
- End at:  2025-01-24 23:04:48
- Begin at:  2025-01-24 23:04:48
- Sucessfully written in  json
- End at:  2025-01-24 23:05:18
- Begin at:  2025-01-24 23:05:18
- Sucessfully written in  orc
- End at:  2025-01-24 23:05:39
- Begin at:  2025-01-24 23:05:39
- Sucessfully written in  avro
-End at:  2025-01-24 23:05:58


### 3rd
Begin at:  2025-01-27 14:59:22
Sucessfully written in  csv
End at:  2025-01-27 15:00:12
Begin at:  2025-01-27 15:00:12
Sucessfully written in  parquet
End at:  2025-01-27 15:00:19
Begin at:  2025-01-27 15:00:19
Sucessfully written in  json
End at:  2025-01-27 15:01:16
Begin at:  2025-01-27 15:01:16
Sucessfully written in  orc
End at:  2025-01-27 15:02:01
Begin at:  2025-01-27 15:02:01
Sucessfully written in  avro
End at:  2025-01-27 15:02:48


### 4th

Begin at:  2025-01-27 15:04:02
Sucessfully written in  csv
End at:  2025-01-27 15:05:40
Begin at:  2025-01-27 15:05:40
Sucessfully written in  parquet
End at:  2025-01-27 15:05:53
Begin at:  2025-01-27 15:05:53
Sucessfully written in  json
End at:  2025-01-27 15:07:37
Begin at:  2025-01-27 15:07:37
Sucessfully written in  orc
End at:  2025-01-27 15:09:00
Begin at:  2025-01-27 15:09:00
Sucessfully written in  avro
End at:  2025-01-27 15:10:29