<a href="https://colab.research.google.com/github/ppastram/onerpm-publishing/blob/main/Chartmetric_Related_Artists.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#@title Required Libraries and Functions
!pip install ratelimit
import requests
import time
import pandas as pd
from google.colab import drive
drive.mount('/content/drive')
from ratelimit import limits, sleep_and_retry

class ChartMetricAPI():
    host = "https://api.chartmetric.com/api/token"
    key = "5qisPE8cvOfdvORPcTl5tPtOTdbEditnvs05WKHgjybsuep6wImg0ZaaA3Xu2aAA"
    access_token = None
    access_token_expiration = None
    get_token_header = {"Content-Type": "application/json"}
    token_expire_interval = 3500
    max_api_retry_count = 10
    max_calls_per_freq = 100
    rate_limit_freq = 60

    def __init__(self,host=host,key=key,token_expire_interval=token_expire_interval,max_api_retry_count=max_api_retry_count,max_calls_per_freq=max_calls_per_freq,rate_limit_freq=rate_limit_freq):
            self.host = host
            self.key = key
            self.token_expire_interval = token_expire_interval
            self.max_api_retry_count = max_api_retry_count
            self.max_calls_per_freq = max_calls_per_freq
            self.rate_limit_freq = rate_limit_freq
            try:
                self.access_token = self.getAccessToken()
                if self.access_token is None:
                    raise Exception("Request for access token failed.")
            except Exception as e:
                print(e)
            else:
                self.access_token_expiration = time.time() + self.token_expire_interval

    ##ChartMetric API Throttler Function
    @sleep_and_retry
    @limits(calls=max_calls_per_freq, period=rate_limit_freq)
    def check_limit():
      ''' Empty function just to check for calls to API '''
      return

    ##ChartMetric API Connection Function
    def getAccessToken(self):
        api_key = self.key #Insert ChartMetric API Token Here
        auth_url = self.host
        heads = {"Content-Type": "application/json"}
        refreshtokenkey = "refreshtoken"
        data = "{" + f'"{refreshtokenkey}":"{api_key}"' + "}"

        try:
            request = requests.post(auth_url, headers=heads, data=data)
            print('Access Token Request Status: ' + str(request.status_code))
            request.raise_for_status()
        except Exception as e:
            print(e)
            return None
        else:
            self.access_token_expiration = time.time() + self.token_expire_interval
            return request.json()['token']

    # Decorator defined to force token refresh when it is about to expire.
    class Decorators():
        @staticmethod
        def refreshToken(decorated):
            # Function is used to check the API token expiration and refresh if necessary
            def wrapper(api, *args, **kwargs):
                if time.time() > api.access_token_expiration:
                    api.access_token = api.getAccessToken()
                return decorated(api, *args, **kwargs)

            return wrapper

    # API Call Execution Function
    @Decorators.refreshToken
    def executeAPICall(self, api_url, headers, parameters):
        resp = requests.get(url=api_url, headers=headers, params=parameters)

        if resp.status_code == requests.codes.ok:
            return resp.json() if 'obj' in resp.json() and resp.json()['obj'] else None

        # Handling for HTTP 400 errors
        elif resp.status_code == 400:
            print(f"HTTP 400 Error for URL: {api_url} with parameters: {parameters}")
            print("Response: ", resp.text)  # Log detailed response
            return None

        # Handling for other errors
        elif resp.status_code in [502, 503, 504, 429]:
            for attempt in range(self.max_api_retry_count):
                time.sleep(1)  # Exponential backoff logic can be implemented here
                resp = requests.get(url=api_url, headers=headers, params=parameters)
                if resp.status_code == requests.codes.ok:
                    return resp.json() if 'obj' in resp.json() and resp.json()['obj'] else None
                else:
                    print(f"Retry {attempt + 1}/{self.max_api_retry_count} unsuccessful. Error Code: {resp.status_code}")
            return None

        else:
            print(f"Unhandled Error Code: {resp.status_code}")
            return None

    @Decorators.refreshToken
    def getCMArtistIdByName(self, artist_name, limit=10, offset=0):
        api_url = "https://api.chartmetric.com/api/search"
        params = {
            'q': artist_name,
            'limit': limit,
            'offset': offset,
            'type': 'artists'
        }
        headers = {"Authorization": "Bearer " + self.access_token}

        resp = self.executeAPICall(api_url, headers, params)

        if resp is None:
            return None
        else:
            try:
                return int(resp['obj']['artists'][0]['id'])
            except IndexError:
                print(f"No artist found with name {artist_name}")
                return None

    @Decorators.refreshToken
    def getNeighboringArtists(self, artist_id, metric='cm_artist_rank', limit=10, type=None):
        api_url = f"https://api.chartmetric.com/api/artist/{artist_id}/neighboring-artists"
        headers = {
            'Authorization': f'Bearer {self.access_token}',
            'Content-Type': 'application/json'
        }
        parameters = {
            'metric': metric,
            'limit': limit
        }
        if type:
            parameters['type'] = type

        return self.executeAPICall(api_url, headers, parameters)

    def saveNeighboringArtistsToCSV(self, artist_name, metric='cm_artist_rank', limit=10, type=None, base_directory="/content/drive/Shared drives/Global | Rights Management/- BizDev A&R/Sourcing Leads"):
        artist_id = self.getCMArtistIdByName(artist_name)
        if not artist_id:
            print(f"Could not find artist ID for {artist_name}")
            return

        data = self.getNeighboringArtists(artist_id, metric, limit, type)
        if data and 'cluster_artists' in data:
            tracks_df = pd.json_normalize(data['cluster_artists'])
            tracks_df['artist'] = artist_name  # Add artist name to the DataFrame
            file_path = f"{base_directory}/neighboring_artists_{artist_name}.csv"
            tracks_df.to_csv(file_path, index=False)
            print(f"File saved successfully at: {file_path}")
        else:
            print("No data found to save.")


    @Decorators.refreshToken
    def getRelatedArtists(self, artist_id, limit=50, fromDaysAgo=None, toDaysAgo=0):
        api_url = f"https://api.chartmetric.com/api/artist/{artist_id}/relatedartists"
        headers = {
            'Authorization': f'Bearer {self.access_token}',
            'Content-Type': 'application/json'
        }
        parameters = {
            'limit': limit,
            'toDaysAgo': toDaysAgo
        }
        if fromDaysAgo is not None:
            parameters['fromDaysAgo'] = fromDaysAgo

        return self.executeAPICall(api_url, headers, parameters)

    def saveRelatedArtistsToCSV(self, artist_name, limit=100, fromDaysAgo=None, toDaysAgo=0, base_directory="/content/drive/Shared drives/Global | Rights Management/- BizDev A&R/Sourcing Leads"):
        artist_id = self.getCMArtistIdByName(artist_name)
        if not artist_id:
            print(f"Could not find artist ID for {artist_name}")
            return

        data = self.getRelatedArtists(artist_id, limit, fromDaysAgo, toDaysAgo)
        if data and 'obj' in data:
            related_artists_df = pd.json_normalize(data['obj'])
            related_artists_df['artist'] = artist_name  # Add artist name to the DataFrame
            file_path = f"{base_directory}/related_artists_{artist_id}.csv"
            related_artists_df.to_csv(file_path, index=False)
            print(f"File saved successfully at: {file_path}")
        else:
            print("No data found to save.")

    #Function to get tracks by artist from ChartMetricAPI.  Includes tracks where the artist is featured.
    @Decorators.refreshToken
    def getArtistTracks(self,cm_artist_id,limit=100):
        api_url = f"https://api.chartmetric.com/api/artist/{cm_artist_id}/tracks"
        heads = {"Authorization":"Bearer "+self.access_token}
        params = {
            'limit':limit
        }
        return self.executeAPICall(api_url,heads,params)

Collecting ratelimit
  Downloading ratelimit-2.2.1.tar.gz (5.3 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: ratelimit
  Building wheel for ratelimit (setup.py) ... [?25l[?25hdone
  Created wheel for ratelimit: filename=ratelimit-2.2.1-py3-none-any.whl size=5895 sha256=574681a9feeb99e9549489d04df516826e28e91b7f5b374af50c9f4db51dbd13
  Stored in directory: /root/.cache/pip/wheels/27/5f/ba/e972a56dcbf5de9f2b7d2b2a710113970bd173c4dcd3d2c902
Successfully built ratelimit
Installing collected packages: ratelimit
Successfully installed ratelimit-2.2.1
Mounted at /content/drive


In [None]:
#@title Search Related Artists

# List of artists to query
artist_list = ['Papaa Tyga']
base_directory="/content/drive/Shared drives/Global | Rights Management/- BizDev A&R/Sourcing Leads/Related Artists"


api = ChartMetricAPI()
for artist in artist_list:
  artist_id = api.getCMArtistIdByName(artist)
  related = api.getRelatedArtists(artist_id)
  artist_tracks_obj = related['obj']
  related_artists = pd.json_normalize(artist_tracks_obj)
  file_path = f"{base_directory}/related_artists_{artist}.csv"
  related_artists.to_csv(file_path, index=False)
  print(f"File saved successfully at: {file_path}")


Access Token Request Status: 200
File saved successfully at: /content/drive/Shared drives/Global | Rights Management/- BizDev A&R/Sourcing Leads/Related Artists/related_artists_Papaa Tyga.csv


Files will be saved here: [Related Artists](https://drive.google.com/drive/folders/17cEhxFEPEeh0Dr-uF88SFlcfO8H9YZp0)

In [None]:
#@title Get Track DSP Stats (last three Yrs) from an Artist or Artist List

import pandas as pd
import os
from datetime import date

# Assuming ChartMetricAPI class and necessary functions are already defined
cm = ChartMetricAPI()

def get_track_stats_over_time_with_transformation(artist_list, platforms, start_date, end_date, base_output_folder):
    final_df = pd.DataFrame()

    for artist in artist_list:
        artist_id = cm.getCMArtistIdByName(artist)
        if not artist_id:
            print(f"Artist {artist} not found.")
            continue

        artist_tracks = cm.getArtistTracks(artist_id)
        if not artist_tracks:
            print(f"No tracks found for artist {artist}")
            continue

        tracks_df = pd.json_normalize(artist_tracks['obj'])

        for _, track in tracks_df.iterrows():
            cm_track_id = track['id']
            track_name = track['name']
            isrc = track['isrc']

            for platform in platforms:
                track_stats = cm.getTrackStats(cm_track_id, platform, since=start_date, until=end_date)
                for item in track_stats['obj']:
                  stats_data = []
                  for stat in item['data']:
                      # Ensure 'value' key exists in stat
                      if 'value' in stat:
                          stats_data.append({
                              'Artist': artist,
                              'Track': track_name,
                              'ISRC': isrc,
                              'Platform': platform,
                              'Value': stat['value'],
                              'Date': pd.to_datetime(stat['timestp']).strftime('%Y-%m-%d')
                          })
                  if stats_data:  # Check if stats_data is not empty
                      temp_df = pd.DataFrame(stats_data)
                      temp_df['Value_Diff'] = temp_df['Value'].diff()  # Calculate the difference
                      final_df = pd.concat([final_df, temp_df], ignore_index=True)
                  else:
                      print(f"No valid data for track {track_name} on {platform}")

        # Create artist specific folder and track_stats subfolder
        artist_folder = os.path.join(base_output_folder, artist.replace(" ", "_"))
        track_stats_folder = os.path.join(artist_folder, "track_stats")
        os.makedirs(track_stats_folder, exist_ok=True)

        # Save data to a single CSV file within artist's folder
        filename = f"{artist.replace(' ', '_')}_track_stats_{start_date}_{end_date}.csv"
        filepath = os.path.join(track_stats_folder, filename)
        final_df.to_csv(filepath, index=False)
        print(f"Data saved to {filepath}")

# Example Usage
platforms = ['spotify', 'youtube', 'soundcloud','shazam','tiktok','genius']
end_date = date.today()
start_date = end_date - timedelta(days=3*365)
base_output_folder = "/content/drive/Shared drives/Global | Rights Management/- BizDev A&R/Sourcing Leads"
get_track_stats_over_time_with_transformation(artist_list, platforms, start_date, end_date, base_output_folder)

