#Vector Ensemble
Ensemble approach using Spark. This notebook leverages the consolidated vector CSV which includes normal, synonym, and hypernym vectors, see [master-lyricsdf-word_syn_hype_vectors.csv](../../data/conditioned/master-lyricsdf-word_syn_hype_vectors.csv)

In [1]:
%matplotlib inline
import numpy as np
import scipy as sp
import matplotlib as mpl
import matplotlib.cm as cm
import matplotlib.pyplot as plt
import pandas as pd
pd.set_option('display.width', 500)
pd.set_option('display.max_columns', 100)
pd.set_option('display.notebook_repr_html', True)
import seaborn as sns
sns.set_style("whitegrid")
sns.set_context("poster")

In [2]:
## MLJ: Additional Extras
import time
import itertools
import json
import pickle

In [3]:
import os
# os.environ['PYSPARK_PYTHON'] = '/anaconda/bin/python'

In [4]:
import findspark
findspark.init()
print findspark.find()
# Depending on your setup you might have to change this line of code
#findspark makes sure I dont need the below on homebrew.
#os.environ['SPARK_HOME']="/usr/local/Cellar/apache-spark/1.5.1/libexec/"
#the below actually broke my spark, so I removed it. 
#Depending on how you started the notebook, you might need it.
# os.environ['PYSPARK_SUBMIT_ARGS']="--master local pyspark --executor-memory 4g"

/home/vagrant/spark


In [5]:
import pyspark
conf = (pyspark.SparkConf()
    .setMaster('local[4]')
    .setAppName('pyspark')
    .set("spark.executor.memory", "2g"))
sc = pyspark.SparkContext(conf=conf)

In [6]:
sc._conf.getAll()

[(u'spark.executor.memory', u'2g'),
 (u'spark.master', u'local[4]'),
 (u'spark.rdd.compress', u'True'),
 (u'spark.driver.memory', u'8g'),
 (u'spark.serializer.objectStreamReset', u'100'),
 (u'spark.submit.deployMode', u'client'),
 (u'spark.app.name', u'pyspark')]

In [7]:
import sys
rdd = sc.parallelize(xrange(2),2)
rdd.map(lambda x: sys.version).collect()

['2.7.10 |Anaconda 2.3.0 (64-bit)| (default, May 28 2015, 17:02:03) \n[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]',
 '2.7.10 |Anaconda 2.3.0 (64-bit)| (default, May 28 2015, 17:02:03) \n[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]']

In [8]:
sys.version

'2.7.10 |Anaconda 2.3.0 (64-bit)| (default, May 28 2015, 17:02:03) \n[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]'

In [9]:
from pyspark.sql import SQLContext
sqlsc=SQLContext(sc)

##Setup Data For Pipeline
###Load Dataframe into Pandas for initial manipulation

In [10]:
# load the lyrics from the approved "master" dataframe
lyrics_pd_df = pd.read_csv("../../data/conditioned/master-lyricsdf-word_syn_hype_vectors.csv")  

In [11]:
lyrics_pd_df.head(1)

Unnamed: 0,index,position,year,title.href,title,artist,lyrics,decade,song_key,lyrics_url,lyrics_abstract,noun_vector,adj_vector,noun_syn_vector,adj_syn_vector,noun_syn_hype_vector,adj_syn_hype_vector
0,0,1,1970,https://en.wikipedia.org/wiki/Bridge_over_Trou...,Bridge over Troubled Water,Simon and Garfunkel,When you're weary. Feeling small. When tears a...,1970,1970-1,http://lyrics.wikia.com/Simon_And_Garfunkel:Br...,When you're weary. Feeling small. When tears a...,time bridge water,rough troubled,time bridge water,troubled rough,bridge time water,rough troubled


###Add Labels for Data based on position
This will change based upon the current run. A straight-forward usage is to see how well top and bottom 50 can be predicted.
**Note: Spark ML seems picky about `label` being the column name**

In [12]:
# use positions for labeling
pcols = ['bin_10_percent','bin_25_percent','is_top_10_percent','is_top_25_percent','is_top_50_percent']

# linear regression only supports binary topics
pbinarycols = ['is_top_10_percent','is_top_25_percent','is_top_50_percent'] 

pos_dict = {
    'bin_10_percent': {
      10.0:range(1,11),
      20.0:range(11,21),
      30.0:range(21,31),
      40.0:range(31,41),
      50.0:range(41,51),
      60.0:range(51,61), 
      70.0:range(61,71),
      80.0:range(71,81),
      90.0:range(81,91),
      100.0:range(91,101)  
    }, 'bin_25_percent': {
      25.0:range(1,26),
      50.0:range(26,51),
      75.0:range(51,76),
      100.0:range(76,101)
    }, 'is_top_10_percent': {
      1.0:range(1,11),
      0.0:range(11,101)            
    }, 'is_top_25_percent': {
      1.0:range(1,26),
      0.0:range(26,101)
    }, 'is_top_50_percent': {
      1.0:range(1,51),
      0.0:range(51,101)
    }
}

pos_dict_descrips = {
    'bin_10_percent': "10 Percent Splits (10 Topics)",
    'bin_25_percent': "25 Percent Splits (4 Topics)",
    'is_top_10_percent': "Top 10 versus Bottom 90 (2 Topics)",
    'is_top_25_percent': "Top 25 versus Bottom 75 (2 Topics)",      
    'is_top_50_percent': "Top 50 versus Bottom 50 (2 Topics)"
}


In [13]:

def labelForPosition(pos,pos_dict_key):
    for k,p in pos_dict[pos_dict_key].iteritems():
        if pos in p:
            return k
    return None

#label is position
for pos_dict_key in pos_dict.keys():
    lyrics_pd_df[pos_dict_key] = lyrics_pd_df.position.apply(lambda p : labelForPosition(p,pos_dict_key))

#sanity check
lyrics_pd_df.sample(3).head()

Unnamed: 0,index,position,year,title.href,title,artist,lyrics,decade,song_key,lyrics_url,lyrics_abstract,noun_vector,adj_vector,noun_syn_vector,adj_syn_vector,noun_syn_hype_vector,adj_syn_hype_vector,bin_10_percent,is_top_50_percent,is_top_25_percent,bin_25_percent,is_top_10_percent
1367,1367,68,1983,https://en.wikipedia.org/wiki/Wanna_Be_Startin...,Wanna Be Startin' Somethin',Michael Jackson,I said you wanna be startin' somethin'. You go...,1980,1983-68,http://lyrics.wikia.com/Michael_Jackson:Wanna_...,I said you wanna be startin' somethin'. You go...,cunnin declinin head coo sa,treacherous high loud,head sa coo,high loud treacherous,coo head sa,high loud treacherous,70,0,0,75,0
3080,3080,81,2000,https://en.wikipedia.org/wiki/Where_I_Wanna_Be...,Where I Wanna Be,Donell Jones,(verse one). I just left my baby girl a messag...,2000,2000-81,http://lyrics.wikia.com/Donell_Jones:Where_I_W...,(verse one). I just left my baby girl a messag...,year love share,teenage real,love share year,adolescent real,love share year,adolescent real,90,0,0,100,0
4020,4020,21,2010,https://en.wikipedia.org/wiki/Empire_State_of_...,Empire State of Mind,Jay-Z,"Yeah. Yeah, I'm up at Brooklyn, now I'm down i...",2010,2010-21,http://lyrics.wikia.com/Jay-Z:Empire_State_Of_...,"Yeah. Yeah, I'm up at Brooklyn, now I'm down i...",courtside fife street brand light nigga shit y...,high new big hot famous welcome yellow fair na...,yankee school foreigner king light winter life...,long particular high fair bad bright hot virgi...,air bellow boundary cab city corner crap dolla...,bad bare blazing bright celebrated cold fair g...,30,1,1,25,0


###Filter out Non-Lyric Records
**Non-Lyrics due to:**
* Instrumentals
* Licensing restrictions on lyrics.wikia
* No lyrics added to lyrics.wikia

In [14]:
# these are the noun cols
ncols = ['noun_vector','noun_syn_vector','noun_syn_hype_vector']

In [15]:
# Check for nulls (which may include instrumentals or otherwise unavailable )
print "How many empty lyrics are there? {}".format(len(np.where(pd.isnull(lyrics_pd_df[['lyrics']]))[0]))

How many empty lyrics are there? 159


In [16]:
lyrics_pd_df.shape

(4500, 22)

In [17]:
# filter out null lyrics
lyrics_pd_df = lyrics_pd_df.dropna(axis=0, how='any', thresh=None, subset=['lyrics'], inplace=False)

In [18]:
lyrics_pd_df.shape

(4341, 22)

In [19]:
# ALSO NEED TO REMOVE EMPTY NCOL ROWS RESULTING FROM VOCAB SHRINKAGE OPERATIONS
for col in ncols:
    lyrics_pd_df = lyrics_pd_df.dropna(axis=0, how='any', thresh=None, subset=[col], inplace=False)
    print "shape after removing {} empties --> {}".format(col,lyrics_pd_df.shape)

shape after removing noun_vector empties --> (4121, 22)
shape after removing noun_syn_vector empties --> (4105, 22)
shape after removing noun_syn_hype_vector empties --> (4105, 22)


In [20]:
print "Final ROWS after removing empties -->", lyrics_pd_df.shape[0]

Final ROWS after removing empties --> 4105


###Set up Master Matrix

In [None]:
nhype = 

In [21]:
def getUniqueWordsSorted(df,word_col):
    u = []
    for r in df.iterrows():
        ws = r[1][word_col]
        if not isinstance(ws,float):
            vs = ws.split()          
            for v in vs:
                if not v in u:
                    u.append(v)
        
    return sorted(u)

In [22]:
all_words_dict = {}

for col in ncols:
    print "\n--- {} ---\n".format(col)
    all_words_dict[col] = getUniqueWordsSorted(lyrics_pd_df, col)
    print "\tHow long is dict? {}".format(len(all_words_dict[col]))
    print "\tWhat are the first 5 entries? [{}]".format(all_words_dict[col][:5])


--- noun_vector ---

	How long is dict? 5138
	What are the first 5 entries? [['60', '8-bit', '>jeep', '>mayback', '\\n|officialsite']]

--- noun_syn_vector ---

	How long is dict? 3230
	What are the first 5 entries? [['abdomen', 'ability', 'abnormality', 'abortion', 'abrasion']]

--- noun_syn_hype_vector ---

	How long is dict? 3230
	What are the first 5 entries? [['abdomen', 'ability', 'abnormality', 'abortion', 'abrasion']]


In [23]:
def compareWordLists(alist,blist):
    same = []
    ina = []
    inb = []
    
    for a in alist:
        if a in blist:
            same.append(a)
        else:
            ina.append(a)
    
    for b in blist:
        if b not in same:
            inb.append(b)
    return sorted(same), sorted(ina), sorted(inb)

In [24]:
tcomp = compareWordLists(all_words_dict[ncols[1]],all_words_dict[ncols[2]])
print "For syn versus syn-hype words..."
print "How many are same? ", len(tcomp[0])
print "How many are only in syn? ", len(tcomp[1])
print "How many are only in hype? ", len(tcomp[2])

For syn versus syn-hype words...
How many are same?  3230
How many are only in syn?  0
How many are only in hype?  0


###Set up dictionaries for string and vectorized data?

In [None]:
# convert array of strings to array of arrays
def stringsToUniqueVectors(strings):
    vectors = []
    for s in strings:
        # test for NaN
        if not isinstance(s,float):
            tmp = s.split()
            cs = []
            for t in tmp:
                if not t in cs:
                    cs.append(t)
            vectors.append(sorted(cs))
        # just in case, handle empty    
        else:
            vectors.append([])
            
    return vectors

In [None]:
# dict for k=col, v=array of strings
nstrings = {}

# dict for k=col, v=array of arrays
nvectors = {}

# initialize
for col in ncols:
    nstrings[col] = []
    nvectors[col] = []

In [None]:
# populate
for col in ncols:
    nstrings[col] = lyrics_pd_df[col].values
    nvectors[col] = stringsToUniqueVectors(nstrings[col]) 

In [None]:
# verification
for col in ncols:
    idx = 6
    print "`{}`[{}]: {} -->\n\t{}".format(col,idx,nstrings[col][idx],nvectors[col][idx])

###Convert and manipulate with Spark

In [None]:
# convert from pandas to spark dataframe
lyricsdf = sqlsc.createDataFrame(lyrics_pd_df)

In [None]:
lyricsdf.show(3)

##Pipeline Using Spark
Reference [combine all features into a single feature vector](https://databricks.com/blog/2015/07/29/new-features-in-machine-learning-pipelines-in-spark-1-4.html)
![Ensemble Pipeline Overview](https://databricks.com/wp-content/uploads/2015/07/simple-pipeline.png)
* Tokenizer
* HashingTF
* Word2Vec
* OneHotEncoder
* Vector Assembler

In [None]:
# Pipeline adapted from:
# http://spark.apache.org/docs/latest/ml-guide.html
# https://databricks.com/blog/2015/07/29/new-features-in-machine-learning-pipelines-in-spark-1-4.html
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row

In [None]:
# Used in common for printing predictions
def printPredicts(predictsdf,pred_hits,ncol,pcol,pipeline_name="Pipeline",reg_param=None, max_iter=None, verbose=False):
    hits = 0
    misses = 0
    print "For reg_param: {} & max_iter: {}, how did pipeline '{}' do predicting pcol '{}' for label '{}'?".format(
        (reg_param if reg_param else "<unspecified>"),
        (max_iter if max_iter else "<unspecified>"),
        pipeline_name,pos_dict_descrips[pcol],ncol)
    
    for r in predictsdf.iterrows():
        song_key = r[1].song_key
        pred = pred_hits[song_key]  
        result = labelForPosition(r[1].position, pcol)
        correct = result == pred
        if correct:
            hits +=1
            if verbose:
                print "Correct ::: song_key --> {}, predicted {}".format(song_key, pred)
        else:
            misses +=1
            if verbose:
                print "Incorrect ::: song_key --> {}, predicted {}".format(song_key, pred)
    print "{} hits: {}, misses: {}".format(pipeline_name,hits,misses)

###Fit Linear Regression Models to songs prior to 2013 and predict on 2014

In [None]:
## establish training and test Spark Dataframes (to be filtered further as needed)
## e.g. traindf.select(['song_key',ncol,pcol])

# training on songs to 2013 (uses nstrings)
traindf = lyricsdf.filter(lyricsdf['year'] != 2014)
# test year 2014 (uses nstrings)
testdf = lyricsdf.filter(lyricsdf['year'] == 2014)

###Pipeline  :  Linear Regression
**Here is the output of** 
```python
LogisticRegression().explainParams()
```
* __elasticNetParam__: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
* __featuresCol__: features column name (default: features)
* __fitIntercept__: whether to fit an intercept term. (default: True)
* __labelCol__: label column name (default: label)
* __maxIter__: max number of iterations (>= 0) (default: 100)
* __predictionCol__: prediction column name (default: prediction)
* __probabilityCol__: Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities. (default: probability)
* __rawPredictionCol__: raw prediction (a.k.a. confidence) column name (default: rawPrediction)
* __regParam__: regularization parameter (>= 0) (default: 0.1)
* __threshold__: Threshold in binary classification prediction, in range [0, 1]. If threshold and thresholds are both set, they must match. (default: 0.5)
* __thresholds__: Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values >= 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class' threshold. (undefined)
* __tol__: the convergence tolerance for iterative algorithms (default: 1e-06)

In [None]:
def buildLinearRegressionPipelineModel(ncol, pcol, max_iter=10, reg_param=0.01):  
    tok = Tokenizer(inputCol=ncol, outputCol="words")
    htf = HashingTF(inputCol=tok.getOutputCol(), outputCol="features", numFeatures=200)
    lr = LogisticRegression(labelCol=pcol, maxIter=max_iter, regParam=reg_param)
    return Pipeline(stages=[tok, htf, lr])

In [None]:
# build up models for each of the ncols considering all binary labels (i.e. topics such as `is_top_10_percent`)
modelbinary_dict = {}

# initialize
for col in ncols:
    modelbinary_dict[col] = {}

# populate with binary labels
for ncol in ncols:
    for pcol in pbinarycols:
        modelbinary_dict[ncol][pcol] = buildLinearRegressionPipelineModel(ncol,pcol)        

###Quick Test

In [None]:
# quick test 
tncol = ncols[2]
tpcol = pbinarycols[2]
tdf = lyrics_pd_df[lyrics_pd_df['year'] == 2014]
treg_params = [.001, .01, .1, 1., 10., 100., 200., 300., 400., 500., 1000.]
tmax_iters = [10,20,30,40,50]

print "testing with\n\tncol: {}\n\tpcol: {}\n\treg_params: {}\n\ttmax_iters: {}".format(tncol,tpcol,treg_params,tmax_iters)

In [None]:
for reg_param in treg_params:
    print "\n--- reg_param: {} ---".format(reg_param)
    for max_iter in tmax_iters:
        model1 = buildLinearRegressionPipelineModel(
            tncol, tpcol, max_iter=max_iter, reg_param=reg_param).fit(traindf.select(['song_key',tncol,tpcol]))
    
        # Make predictions on test documents and print columns of interest.
        prediction1 = model1.transform(testdf.select(['song_key',tncol,tpcol]))
        selected1 = prediction1.select("song_key", tncol, "prediction")
    
        # build up predictions
        pred_hits1 = {}
        for row in selected1.collect():
            pred_hits1[row[0]] = row[2]
    
        # print predictions
        printPredicts(tdf,pred_hits1, tncol, tpcol, reg_param=reg_param, max_iter=max_iter, 
                      pipeline_name="LRPipeline Predict 2014", verbose=False)

In [None]:
print "execution start --> {}".format(time.strftime('%a, %d %b %Y %H:%M:%S', time.localtime()))

In [None]:
%%time
