In [1]:
import json
import pandas as pd
import re
import nltk
from sklearn.model_selection import train_test_split
from nltk.corpus import stopwords
from gensim.models.doc2vec import TaggedDocument
import multiprocessing
from gensim.models import Doc2Vec
from tqdm import tqdm
from sklearn import utils
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, f1_score
from nltk.stem import WordNetLemmatizer 
from sklearn.feature_extraction.text import CountVectorizer,TfidfVectorizer
from sklearn.base import TransformerMixin
from sklearn.pipeline import Pipeline
import spacy
from sklearn.ensemble import RandomForestClassifier

In [2]:
def clean_lyrics(lyrics):
    if lyrics is None:
        return lyrics
    
    # combine lists of tokens into single string
    lyrics = ' '.join(lyrics)
            
    # remove apostrophes
    lyrics = lyrics.replace('\'', '')
            
    # remove song structure tags or instructions in brackets
    lyrics = re.sub(r'[\*\[|\(|\{].*\n*.*[\]\)\}\*]' , ' ', lyrics)
   
    # remove variations of Verse 1, VERSE 2, etc...
    for verse in ['verse', 'VERSE', 'Verse']:
        lyrics = re.sub(verse+' \d*', '', lyrics)
    
    # some structure markers formatted as allcaps without brackets
    for word in ['OUTRO', 'INSTRUMENTAL', 'PRE', 'HOOK',
                 'PRODUCED', 'REFRAIN', 'POST', 'REPEAT', '2x', '3x', '4x',
                 'CHORUS', 'INTRO', 'INTERLUDE']:
        lyrics = lyrics.replace(word, '')
        
    # remove varations of Chorus
    lyrics = re.sub(r'\n*Chorus:*.*' , ' ', lyrics)
    lyrics = re.sub(r'^Chorus:*.*' , ' ', lyrics)
    lyrics = re.sub(r'\nRepeat [C|c]horus:*.*' , ' ', lyrics)
    
    # remove variations of Intro
    lyrics = re.sub(r'Intro[\s|\n|:].*', ' ', lyrics)
    
    # remove variations of Instrumental
    lyrics = re.sub(r'-+.*[i|I]nstrumental.*-+', ' ', lyrics)
    lyrics = re.sub(r'\nBrief instrumental.*\n', ' ', lyrics)
    lyrics = re.sub(r'\nInstrumental', ' ', lyrics)
    lyrics = re.sub(r'\nInstrumental break', ' ', lyrics)
    lyrics = re.sub(r'\nInstrumental--', ' ', lyrics)
    lyrics = re.sub(r'\n~Instrumental~', ' ', lyrics)
    
    # remove variations of Bridge
    lyrics = re.sub(r'\n\[*Bridge:\[*', ' ', lyrics)
    
    # remove variations of Hook
    lyrics = re.sub(r'Hook:.*', ' ', lyrics)
    
    # remove varations of Repeat
    lyrics = re.sub(r'Repeat\s.*', ' ', lyrics)
    lyrics = re.sub(r'\nRepeat$', ' ', lyrics)
    
    # remove credits
    lyrics = re.sub(r'.*[P|p]roduced [B|b]y.*', ' ', lyrics)
    lyrics = re.sub(r'.*[W|w]ritten [B|b]y.*', ' ', lyrics)
    
    # remove strays and typos
    lyrics = re.sub(r'\[Outro\[', ' ', lyrics)
    lyrics = re.sub(r'Sax & background & instrumental\)', ' ', lyrics)
    lyrics = re.sub(r'\nSource: ', ' ', lyrics)
    lyrics = re.sub(r'Shotgun 2: 58 Trk 1 \n  \nJr. Walker & The All Stars '\
                    +'\nAnd/or The Funk Brothers - instrumental \nPop Chart '\
                    +'#4 Feb 13, 1965 \nSoul Label - 35008   \n ', ' ', lyrics)
    lyrics = re.sub(r'- musical interlude -', ' ', lyrics)
    lyrics = re.sub(r'\nRefrain:', ' ', lyrics)
            
    # replace all punctuations with spaces
    lyrics = re.sub(r'[^\w\s]', ' ', lyrics)
            
    # replace consecutive whitespaces with single space
    lyrics = re.sub(r'\s+', ' ', lyrics)
    
    # convert all tokens to lowercase
    lyrics = lyrics.lower()

    if lyrics[:29] == 'we do not have the lyrics for' or lyrics == 'instrumental':
        lyrics = None
    return lyrics


In [3]:
with open('../data/top_hits_lyrics.json') as json_file:
    top_hits_lyrics = json.load(json_file)

In [4]:
top_hits_lyrics[0]

{'id': '1rfofaqEpACxVEHIZBJe6W',
 'lyrics': ['[Intro: Pharrell Williams]',
  '\nHey',
  '\n',
  '\n[Chorus: Camila Cabello & ',
  'Pharrell Williams',
  ']',
  '\n',
  'Havana, ooh na-na (',
  'Ayy',
  ')',
  '\nHalf of my heart is in Havana, ooh na-na (',
  'Ayy, ayy',
  ')',
  '\n',
  'He took me back to East Atlanta, na-na-na, ah',
  '\n',
  'Oh, but my heart is in Havana (',
  'Ayy',
  ')',
  "\nThere's somethin' 'bout his manners (",
  'Uh-huh',
  ')',
  '\n',
  'Havana, ooh na-na (',
  'Uh',
  ')',
  '\n',
  '\n[Verse 1: Camila Cabello & ',
  'Pharrell Williams',
  ']',
  '\n',
  'He didn\'t walk up with that "how you doin\'?" (',
  'Uh',
  ')',
  '\nWhen he came in the room',
  "\nHe said there's a lot of girls I can do with (",
  'Uh',
  ')',
  "\nBut I can't without you",
  '\nI knew him forever in a minute (',
  'Hey',
  ')',
  '\nThat summer night in June',
  '\n',
  'And papa says he got malo in him (',
  'Uh',
  ')',
  "\nHe got me feelin' like...",
  '\n',
  '\n[Pre-Choru

In [5]:
top_hits_df = pd.DataFrame(top_hits_lyrics)
top_hits_df['clean_lyrics'] = top_hits_df['lyrics'].apply(lambda x: clean_lyrics(x))
top_hits_df = top_hits_df[top_hits_df['source'].notnull()]
top_hits_df = top_hits_df[top_hits_df['clean_lyrics'].notnull()]

In [6]:
top_hits_df.shape

(2805, 4)

In [7]:
with open('../data/songs_lyrics_5000.json') as json_file:
    not_hits_1 = json.load(json_file)

In [8]:
with open('../data/songs_lyrics_10000.json') as json_file:
    not_hits_2 = json.load(json_file)

In [9]:
not_hits_lyrics = not_hits_1 + not_hits_2

In [10]:
with open('../data/not_hits_lyrics.json', 'w') as f:
        json.dump(not_hits_lyrics, f)

In [11]:
not_hits_df = pd.DataFrame(not_hits_lyrics)
not_hits_df['clean_lyrics'] = not_hits_df['lyrics'].apply(lambda x: clean_lyrics(x))
not_hits_df = not_hits_df[not_hits_df['source'].notnull()]
not_hits_df = not_hits_df[not_hits_df['clean_lyrics'].notnull()]

In [12]:
not_hits_df.shape

(7937, 4)

In [13]:
# TODO: stratified sampling by decade
not_hits_df = not_hits_df.sample(n=top_hits_df.shape[0])

In [14]:
not_hits_df.shape

(2805, 4)

In [15]:
print(not_hits_df['clean_lyrics'].iloc[0])

 its been the longest winter without you i didnt know where to turn to see somehow i cant forget you after all that weve been through going coming thought i heard a knock whos there no one thinking that i deserve it now i realize that i really didnt know if you didnt notice you mean everything quickly im learning to love again all i know is imma be okay thought i couldnt live without you its gonna hurt when it heals too itll all get better in time and even though i really love you im gonna smile cause i deserve to itll all get better in time i couldnt turn on the tv without something there to remind me was it all that easy to just put aside your feelings if im dreaming dont wanna laugh hurt my feelings but thats the path i believe in and i know that time will heal it if you didnt notice boy you meant everything quickly im learning to love again all i know is imma be okay thought i couldnt live without you its gonna hurt when it heals too itll all get better in time and even though i re

In [16]:
top_hits_df['class'] = 1
not_hits_df['class'] = 0
df = pd.concat([top_hits_df, not_hits_df])

In [17]:
df.shape

(5610, 5)

In [19]:
df.to_json(r'/Users/Samuel/Documents/GitHub/DMML2019_Team_Tesla/data/lyrics_df.json', orient='split', index=None)

# Doc2Vec

In [None]:
train, test = train_test_split(df, test_size=0.3, random_state=42)
lemmatizer = WordNetLemmatizer() 

def tokenize_text(text):
    tokens = []
    for sent in nltk.sent_tokenize(text):
        for word in nltk.word_tokenize(sent):
            if len(word) < 2:
                continue
            tokens.append(lemmatizer.lemmatize(word.lower()))
    return tokens

train_tagged = train.apply(
    lambda r: TaggedDocument(words=tokenize_text(r['clean_lyrics']), tags=[r['class']]), axis=1)
test_tagged = test.apply(
    lambda r: TaggedDocument(words=tokenize_text(r['clean_lyrics']), tags=[r['class']]), axis=1)

In [None]:
train_tagged.values[1]

In [None]:
cores = multiprocessing.cpu_count()

In [None]:
model_dbow = Doc2Vec(dm=0, vector_size=100, negative=5, hs=0, min_count=2, sample = 0, workers=cores)
model_dbow.build_vocab([x for x in tqdm(train_tagged.values)])

In [None]:
%%time
for epoch in range(30):
    model_dbow.train(utils.shuffle([x for x in tqdm(train_tagged.values)]), total_examples=len(train_tagged.values), epochs=1)
    model_dbow.alpha -= 0.002
    model_dbow.min_alpha = model_dbow.alpha

In [None]:
def vec_for_learning(model, tagged_docs):
    sents = tagged_docs.values
    targets, regressors = zip(*[(doc.tags[0], model.infer_vector(doc.words, steps=20)) for doc in sents])
    return targets, regressors

In [None]:
y_train, X_train = vec_for_learning(model_dbow, train_tagged)
y_test, X_test = vec_for_learning(model_dbow, test_tagged)
logreg = LogisticRegression(n_jobs=1, C=1e5)
logreg.fit(X_train, y_train)
y_pred = logreg.predict(X_test)
print('Testing accuracy %s' % accuracy_score(y_test, y_pred))
print('Testing F1 score: {}'.format(f1_score(y_test, y_pred, average='weighted')))

# Bag Of Words

In [None]:
import string
from spacy.lang.en.stop_words import STOP_WORDS
from spacy.lang.en import English

# Create our list of punctuation marks
punctuations = string.punctuation

# Create our list of stopwords
nlp = spacy.load('en')
stop_words = spacy.lang.en.stop_words.STOP_WORDS

# Load English tokenizer, tagger, parser, NER and word vectors
parser = English()

# Creating our tokenizer function
def spacy_tokenizer(sentence):
    # Creating our token object, which is used to create documents with linguistic annotations.
    mytokens = parser(sentence)

    # Lemmatizing each token and converting each token into lowercase
    mytokens = [ word.lemma_.lower().strip() if word.lemma_ != "-PRON-" else word.lower_ for word in mytokens ]

    # Removing stop words
    mytokens = [ word for word in mytokens if word not in stop_words and word not in punctuations ]

    # return preprocessed list of tokens
    return mytokens

In [None]:
tfidf_vector = TfidfVectorizer(tokenizer = spacy_tokenizer)

In [None]:
X = df['clean_lyrics'] # the features we want to analyze
ylabels = df['class'] # the labels, or answers, we want to test against

X_train, X_test, y_train, y_test = train_test_split(X, ylabels, test_size=0.3, random_state=72)

In [None]:
classifier = LogisticRegression(solver="lbfgs")

# Create pipeline using Bag of Words
pipe = Pipeline([('vectorizer', tfidf_vector),
                 ('classifier', classifier)])

# model generation
pipe.fit(X_train,y_train)

from sklearn import metrics
# Predicting with a test dataset
predicted = pipe.predict(X_test)

# Model Accuracy
print(" test Accuracy:",metrics.accuracy_score(y_test, predicted))
print(" Precision:",metrics.precision_score(y_test, predicted, average=None))
print(" Recall:",metrics.recall_score(y_test, predicted, average=None))

In [None]:
classifier = RandomForestClassifier(n_estimators=1000)

# Create pipeline using Bag of Words
pipe = Pipeline([('vectorizer', tfidf_vector),
                 ('classifier', classifier)])

# model generation
pipe.fit(X_train,y_train)

from sklearn import metrics
# Predicting with a test dataset
predicted = pipe.predict(X_test)

# Model Accuracy
print(" test Accuracy:",metrics.accuracy_score(y_test, predicted))
print(" Precision:",metrics.precision_score(y_test, predicted, average=None))
print(" Recall:",metrics.recall_score(y_test, predicted, average=None))

# Audio Features

In [None]:
with open('../data/top_hits.json') as json_file:
    top_hits = json.load(json_file)

In [None]:
top_hits_songs_df = pd.DataFrame(top_hits)

In [None]:
top_hits_merged_df = pd.merge(top_hits_df, top_hits_songs_df, on='id', how='inner')

In [None]:
top_hits_merged_df.shape

In [None]:
with open('../data/songs.json') as json_file:
    not_hits = json.load(json_file)

In [None]:
not_hits_songs_df = pd.DataFrame(not_hits)

In [None]:
not_hits_merged_df = pd.merge(not_hits_df, not_hits_songs_df, on='id', how='inner')

In [None]:
not_hits_merged_df.shape

In [None]:
merged_df = pd.concat([top_hits_merged_df, not_hits_merged_df])

In [None]:
audio_features = ['acousticness', 'danceability',  'energy',
            'instrumentalness', 'liveness', 'loudness', 'mode',
            'speechiness', 'tempo', 'time_signature', 'valence']

In [None]:
X = merged_df[audio_features]
y = merged_df['class']
X_train, X_test, y_train, y_test = train_test_split(X, ylabels, test_size=0.3, random_state=72)

In [None]:
classifier = LogisticRegression()
classifier.fit(X_train, y_train)
predicted = classifier.predict(X_test)
print(" test Accuracy:",metrics.accuracy_score(y_test, predicted))

In [None]:
X.head()