In [None]:
!pip install boto3



In [None]:
import base64
import datetime
import requests
from urllib.parse import urlencode
import csv



In [None]:
client_id = '2e0ed8c4b9094d1eaf9f1d9b58d1ae31'
client_secret = 'ab5bf8a5cc1e49c289b5a3d82f2c6e3c'

In [None]:
class SpotifyApi(object):
  access_token = None
  access_token_expires = datetime.datetime.now()
  access_token_did_expire = True 
  client_id = None
  client_secret = None
  token_url = 'https://accounts.spotify.com/api/token'

  def __init__(self, client_id,client_secret, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.client_id = client_id
    self.client_secret = client_secret
  
  def get_client_credentials(self):
  
    """
    Returns a base64 encoded string
    """
    client_id = self.client_id
    client_secret = self.client_secret
    if client_id == None or client_secret == None:
      raise Exception ("You must set client_id and client_secret")

    client_creds = f"{client_id}:{client_secret}"
    client_creds_b64 = base64.b64encode(client_creds.encode())
    return client_creds_b64.decode()

  def get_token_headers(self):
    client_creds_b64 = self.get_client_credentials()
    return {
    "Authorization" : f"Basic {client_creds_b64}"#Basic <base64 encoded client_id:client_secret>
}
  
  def get_token_data(self):
    return {
    "grant_type" : "client_credentials"
}

  def perform_auth(self):
    token_url = self.token_url
    token_data = self.get_token_data()
    token_headers = self.get_token_headers()

    r = requests.post(token_url, data = token_data, headers = token_headers) 
    

    if r.status_code not in range(200,299):
      raise Exception("Could not authenticate client")
        #return False
    now = datetime.datetime.now()
    data = r.json()
    access_token = data['access_token']
    self.access_token = access_token
    expires_in = data['expires_in'] #seconds
    expires = now +datetime.timedelta(seconds = expires_in)
    self.access_token_expires = expires
    self.access_token_did_expire = expires < now
    return True

  def get_access_token(self):
    token = self.access_token
    expires = self.access_token_expires
    now = datetime.datetime.now()
    if expires < now:
      self.perform_auth()
      return self.get_access_token()
    elif token == None:
      self.perform_auth()
      return self.get_access_token()
    return token

  def get_resource_header(self):
    access_token = self.get_access_token()
    headers = {
        "Authorization" : f"Bearer {access_token}"
    }
    return headers

  def get_resource(self, lookup_id, resource_type = 'albums', version = 'v1'):
    endpoint = f"https://api.spotify.com/{version}/{resource_type}/{lookup_id}"
    headers = self.get_resource_header()
    r = requests.get(endpoint, headers=headers)
    if r.status_code not in range(200,299):
      return {}
    return r.json()

  def get_resource_for_artists(self, lookup_id, resource_type = 'artists', version = 'v1'):
    endpoint = f"https://api.spotify.com/{version}/{resource_type}/{lookup_id}/albums"
    headers = self.get_resource_header()
    r = requests.get(endpoint, headers=headers)
    if r.status_code not in range(200,299):
      return {}
    return r.json()

  def get_artist_albums(self,_id, album_type=None, country=None, limit=20, offset=0):
    """ Get Spotify catalog information about an artist's albums
            Parameters:
                - artist_id - the artist ID, URI or URL
                - album_type - 'album', 'single', 'appears_on', 'compilation'
                - country - limit the response to one particular country.
                - limit  - the number of albums to return
                - offset - the index of the first album to return
    """
    self.album_type = album_type
    self.country = country
    self.limit = limit  
    self.offset = offset

    return self.get_resource_for_artists(_id)

  def get_album(self,_id):
    
    return self.get_resource(_id,resource_type='albums')

  def get_artist(self, _id):
    return self.get_resource(_id, resource_type = 'artists')

  def get_playlist(self, _id):
    return self.get_resource(_id, resource_type = 'playlists')


  def base_search(self, query_params):
    headers = self.get_resource_header()
    endpoint = "https://api.spotify.com/v1/search"
    lookup_url = f"{endpoint}?{query_params}"

    r = requests.get(lookup_url, headers=headers)

    if r.status_code not in range(200,299):
      return {}
    return r.json()

  def search(self, query = None, operator=None, operator_query=None, search_type = 'artist'):
    if query == None:
      raise Exception("A query is required")
    if isinstance(query, dict):
      query = ' '.join([f"{k}:{v}" for k,v in query.items()])
    if operator != None and operator_query != None:
      if operator.lower() == 'or' or operator.lower == 'not':
        operator = operator.upper()
        if isinstance(operator_query,str):
          query = f"{query} {operator} {operator_query}"
    
    query_params = urlencode({"q":query, "type": search_type.lower()})
    print(query_params)
    return self.base_search(query_params)


In [None]:
spotify = SpotifyApi(client_id, client_secret)


In [None]:
final_data_dictionary = {
    'Year Relased' : [],
    'Album Length' : [],
    'Album Name' : [],
    'Artist' : [],
}

In [None]:
def get_artists_from_playlist(playlist_uri):
  '''
  playlist_uri: Playlist to analyse
  return: A dictionary(artist uri: artist name) of all primary artists in a playlist
  '''
  artists = {}
  playlist_tracks = spotify.get_playlist(playlist_uri)
  for song in playlist_tracks['tracks']['items']:
    if song['track']:
      #print(song['track']['artists'][0]['name'])
      artists[song['track']['artists'][0]['uri']] = song['track']['artists'][0]['name']
  return artists

In [None]:
def spotify_playlists_uri():
    playlists = '37i9dQZF1DX0XUsuxWHRQd'
    return playlists

In [None]:
def gather_data_local():
  with open("rapcaviar_albums.csv", 'w') as file:
    header = list(final_data_dictionary.keys())
    writer = csv.DictWriter(file, fieldnames=header)
    writer.writeheader()
    albums_obtained = []

    artists = get_artists_from_playlist(spotify_playlists_uri())
    for artist_split in list(artists.keys()):
      artist = artist_split.split(':')[2]
      print(artist)
      artists_albums = spotify.get_artist_albums(artist,album_type='album',limit=50)
      try:
        for album in artists_albums['items']:
          key = album['name'] + album['artists'][0]['name'] + album['release_date'][:4]
          album_uri = album['uri'].split(':')[2]
      except KeyError:
        pass
        if key not in albums_obtained:
          albums_obtained.append(key)
          album_data = spotify.get_album(album_uri)
          album_length_ms = 0
          #For every song in the album
          for song in album_data['tracks']['items']:
              album_length_ms = song['duration_ms'] + album_length_ms
          writer.writerow({'Year Released': album_data['release_date'][:4],
                           'Album Length' : album_length_ms,
                           'Album Name' : album_data['name'],
                           'Artist' : album_data['artists'][0]['name']       
                           })
  return final_data_dictionary
  s3_resource = boto3.resource('s3')
  date = datetime.now()
  filename = f'{date.year}/{date.month}/{date.day}/rapcaviar_albums.csv'
  response = s3_resource.Object('spotify-analysis-data', filename).upload_file("/tmp/rapcaviar_albums.csv")

  return response



In [None]:
def lambda_handler(event, context):
    gather_data_local()
    
