In [1]:
from collections import Counter
import numpy as np
import json
from spotify import SpotifyClient

In [2]:
# constants
SENTENCE_BEGIN = "<s>"
SENTENCE_END = "</s>"
UNK = "<UNK>"


# UTILITY FUNCTIONS
def create_ngrams(tokens: list, n: int) -> list:
  """Creates n-grams for the given token sequence.
  Args:
    tokens (list): a list of tokens as strings
    n (int): the length of n-grams to create

  Returns:
    list: list of tuples of strings, each tuple being one of the individual n-grams
  """
  return [tuple(tokens[i:i+n]) for i in range(len(tokens)-n+1)]

def read_file(path: str) -> list:
  """
  Reads the contents of a file in line by line.
  Args:
    path (str): the location of the file to read

  Returns:
    list: list of strings, the contents of the file
  """
  f = open(path)

  js = json.load(f)
  playlists = js['playlists']
  sentences = []

  for playlist in playlists:
    if not playlist['tracks'] or 'name' not in playlist:
      continue
    sentences.append(f"{playlist['name']} {' '.join(track['track_uri'] for track in playlist['tracks'])}")
  return sentences

def tokenize_line(line: str, ngram: int, 
                   by_char: bool = True, 
                   sentence_begin: str=SENTENCE_BEGIN, 
                   sentence_end: str=SENTENCE_END):
  """
  Tokenize a single string. Glue on the appropriate number of 
  sentence begin tokens and sentence end tokens (ngram - 1), except
  for the case when ngram == 1, when there will be one sentence begin
  and one sentence end token.
  Args:
    line (str): text to tokenize
    ngram (int): ngram preparation number
    by_char (bool): default value True, if True, tokenize by character, if
      False, tokenize by whitespace
    sentence_begin (str): sentence begin token value
    sentence_end (str): sentence end token value

  Returns:
    list of strings - a single line tokenized
  """
  inner_pieces = None
  if by_char:
    inner_pieces = list(line)
  else:
    # otherwise split on white space
    inner_pieces = line.split()

  if ngram == 1:
    tokens = [sentence_begin] + inner_pieces + [sentence_end]
  else:
    tokens = ([sentence_begin] * (ngram - 1)) + inner_pieces + ([sentence_end] * (ngram - 1))
  # always count the unigrams
  return tokens


def tokenize(data: list, ngram: int, 
                   by_char: bool = True, 
                   sentence_begin: str=SENTENCE_BEGIN, 
                   sentence_end: str=SENTENCE_END):
  """
  Tokenize each line in a list of strings. Glue on the appropriate number of 
  sentence begin tokens and sentence end tokens (ngram - 1), except
  for the case when ngram == 1, when there will be one sentence begin
  and one sentence end token.
  Args:
    data (list): list of strings to tokenize
    ngram (int): ngram preparation number
    by_char (bool): default value True, if True, tokenize by character, if
      False, tokenize by whitespace
    sentence_begin (str): sentence begin token value
    sentence_end (str): sentence end token value

  Returns:
    list of strings - all lines tokenized as one large list
  """
  total = []
  # also glue on sentence begin and end items
  for line in data:
    line = line.strip()
    # skip empty lines
    if len(line) == 0:
      continue
    tokens = tokenize_line(line, ngram, by_char, sentence_begin, sentence_end)
    total += tokens
  return total


class LanguageModel:

  def __init__(self, n_gram):
    """Initializes an untrained LanguageModel
    Args:
      n_gram (int): the n-gram order of the language model to create
    """
    self.n = n_gram
    self.prob_matrix = {}
    self.generate_matrix = {}

  
  def train(self, tokens: list, verbose: bool = False) -> None:
    """Trains the language model on the given data. Assumes that the given data
    has tokens that are white-space separated, has one sentence per line, and
    that the sentences begin with <s> and end with </s>
    Args:
      tokens (list): tokenized data to be trained on as a single list
      verbose (bool): default value False, to be used to turn on/off debugging prints
    """
    # initialize counter and remove the unknowns
    self.counts = Counter(tokens)
    # replace unknown tokens with UNKS
    tokens = [token if self.counts[token] > 1 else UNK for token in tokens]
    unks = []
    for tok in self.counts:
      if self.counts[tok] == 1:
        unks.append(tok)
    if unks:
      for unk in unks:
        del self.counts[unk]
      self.counts[UNK] = len(unks)

    self.vocab_len = len(self.counts)
    n_grams = create_ngrams(tokens, self.n)
    self.n_gram_counts = Counter(n_grams)

    # if n is 1, then just use the counts created above
    if self.n == 1:
      for tok in n_grams:
        self.prob_matrix[tok[0]] = (self.counts[tok[0]] + 1) / (len(tokens) + self.vocab_len) # laplace smoothing
    # if n > 1, calculate and save the probability of ngram | (n-1)gram
    else:
      self.n_minus_1_counts = Counter(create_ngrams(tokens, self.n - 1))
      for ngram in self.n_gram_counts:
        # laplace here too
        self.prob_matrix[ngram] = (self.n_gram_counts[ngram] + 1) / (self.n_minus_1_counts[ngram[:-1]] + self.vocab_len)


  def score(self, sentence_tokens: list) -> float:
    """Calculates the probability score for a given string representing a single sequence of tokens.
    Args:
      sentence_tokens (list): a tokenized sequence to be scored by this model
      
    Returns:
      float: the probability value of the given tokens for this model
    """
    # replace unknowns in the sentence with UNKS
    sentence_tokens = [tok if tok in self.counts and self.counts[tok] > 1 else UNK for tok in sentence_tokens]
    probability = 1
    if self.n == 1:
      for tok in sentence_tokens:
        # get probability and multiply
        probability *= self.prob_matrix[tok]
    else:
      sentence_ngrams = create_ngrams(sentence_tokens, self.n)
      for ngram in sentence_ngrams:
        # handling cases where the ngram does not appear in the training data
        if ngram not in self.n_gram_counts:
          denom = self.vocab_len
          if ngram[:-1] in self.n_minus_1_counts:
            denom += self.n_minus_1_counts[ngram[:-1]]
          cur_prob = 1 / denom
        else:
          # already applied laplace in train so no need here
          cur_prob = self.prob_matrix[ngram]
        probability *= cur_prob
    return probability
  
  
  def generate_sentence(self, starter_input) -> list:
    """Generates a single sentence from a trained language model using the Shannon technique.
      
    Returns:
      list: the generated sentence as a list of tokens
    """
    # for models where n > 2, create a <s> padding so we start with a valid (n-1)-gram
    # sentence = [SENTENCE_BEGIN] + [SENTENCE_BEGIN for _ in range(self.n - 2)]
    sentence = [SENTENCE_BEGIN for _ in range(self.n - 2)] + starter_input
    
    def generate_next_word(sentence):
      if sentence[-1] == SENTENCE_END:
        return sentence
      
      # get the last (n-1)-gram in the sentence
      cur_n_minus_1_gram = tuple(sentence[-self.n + 1:])

      next_word = SENTENCE_BEGIN
      while next_word == SENTENCE_BEGIN: # to make sure we don't select <s>
        if self.n == 1:
          next_word = np.random.choice(list(self.prob_matrix.keys()), p=list(self.prob_matrix.values()))
        else:
          # find the different options for next word
          options = []
          cur_probs = []
          for ngram in self.n_gram_counts:
            # we found a matching (n-1)-gram, add the last word as an option
            if ngram[:-1] == cur_n_minus_1_gram:
              options.append(ngram[-1])
              cur_probs.append(self.prob_matrix[ngram])
          cur_probs = [p / sum(cur_probs) for p in cur_probs] # normalizing probabilities
          next_word = np.random.choice(options, p=cur_probs)

      sentence.append(next_word)
      return generate_next_word(sentence)
    
    return generate_next_word(sentence)
  

  def generate(self, n: int) -> list:
    """Generates n sentences from a trained language model using the Shannon technique.
    Args:
      n (int): the number of sentences to generate
      
    Returns:
      list: a list containing lists of strings, one per generated sentence
    """
    # PROVIDED
    return [self.generate_sentence() for i in range(n)]


In [3]:
data = read_file('spotify_million_playlist_dataset_challenge/challenge_set.json')
tokens = tokenize(data, 5, False)

model = LanguageModel(5)
model.train(tokens)
playlist = model.generate_sentence([SENTENCE_BEGIN, SENTENCE_BEGIN, "pool", "party"])

print(playlist)


['<s>', '<s>', '<s>', '<s>', '<s>', 'pool', 'party', 'spotify:track:4WjH9Bzt3kx7z8kl0awxh4', 'spotify:track:3fDrZa4ksxA5lgi0utGu6k', 'spotify:track:0uVyZywUNOp6S0dU5r8TS8', 'spotify:track:494OU6M7NOf4ICYb4zWCf5', 'spotify:track:66hayvUbTotekKU3H4ta1f', 'spotify:track:4WjH9Bzt3kx7z8kl0awxh4', 'spotify:track:3NBDgwEAGMj0aKRsU8zoO9', 'spotify:track:6OKaib6HaebKVCL5VOoIGR', 'spotify:track:5aJhuze13LOoLthmiG8YIA', '<UNK>', 'spotify:track:7tZZgrwL3rAnrNDHLOp4QA', 'spotify:track:0wsXdby1T3PWLauIkGUZzg', 'spotify:track:3q4zNC35eIv15Zt4cEDicP', 'spotify:track:6O6M7pJLABmfBRoGZMu76Y', 'spotify:track:17Fd6Yb7mSbinKG8LoWfFl', 'spotify:track:4PLOGIceFMY5RHNLjbNaMs', 'spotify:track:0qt5f5EL92o8Snzopsv0en', 'spotify:track:6nM9Sr6MPcfpqwQTD7PZGi', 'spotify:track:6JV2JOEocMgcZxYSZelKcc', 'spotify:track:2S5LNtRVRPbXk01yRQ14sZ', 'spotify:track:0cOBMETjhxublnnwhbnzJO', 'spotify:track:7py16W5fWYLFFS6BElKAjn', 'spotify:track:3bvjsHQd792zIO4SKFxXKs', 'spotify:track:1rIKgCH4H52lrvDcz50hS8', 'spotify:track:1lo

data = read_file('spotify_million_playlist_dataset_challenge/challenge_set.json')
tokens = tokenize(data, 5, by_char=False)



In [6]:
client = SpotifyClient()
client.get_song_title(playlist[8])

{'tracks': [{'album': {'album_type': 'single',
    'artists': [{'external_urls': {'spotify': 'https://open.spotify.com/artist/5UVzX8pQe6bb5ueNdfViih'},
      'href': 'https://api.spotify.com/v1/artists/5UVzX8pQe6bb5ueNdfViih',
      'id': '5UVzX8pQe6bb5ueNdfViih',
      'name': 'Chris Malinchak',
      'type': 'artist',
      'uri': 'spotify:artist:5UVzX8pQe6bb5ueNdfViih'}],
    'available_markets': ['CA', 'US'],
    'external_urls': {'spotify': 'https://open.spotify.com/album/5z6aqbVipcLm2KtFLYMhfj'},
    'href': 'https://api.spotify.com/v1/albums/5z6aqbVipcLm2KtFLYMhfj',
    'id': '5z6aqbVipcLm2KtFLYMhfj',
    'images': [{'height': 640,
      'url': 'https://i.scdn.co/image/ab67616d0000b273c9c9c8f342801ab3e62d0e43',
      'width': 640},
     {'height': 300,
      'url': 'https://i.scdn.co/image/ab67616d00001e02c9c9c8f342801ab3e62d0e43',
      'width': 300},
     {'height': 64,
      'url': 'https://i.scdn.co/image/ab67616d00004851c9c9c8f342801ab3e62d0e43',
      'width': 64}],
    'n