# Explore Jaccard Similarity of Playlists

## Initialize Notebook

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pandas as pd
import matplotlib as plt

import mpd

In [None]:
# Will allow us to embed images in the notebook
%matplotlib inline
# change default plot size
plt.rcParams['figure.figsize'] = (15,10)

## Load Data

In [None]:
mpd_all=mpd.load(spark, "onebig", 1)

## Vectorize the playlist tracks

Convert the playlists to sparse feature vectors using countVectors.  This prepares the data set for use in machine learning functions.

Use a standard mllib pipeline to fit the data to the CountVectorizer() and transform the dataframe into one that contains the sparse vector features.

This is based on initial countVector exploration form mpd-expore notebook.

In [None]:
pDF=mpd_all.select("pid", "tracks.track_uri")

model, result = mpd.vectorizecol(pDF, "track_uri", "features")

In [None]:
result.show(10)

In [None]:
trackcv = model

In [None]:
type(trackcv.vocabulary)

In [None]:
trackcv.vocabulary[0]

In [None]:
from pyspark.sql.types import StringType

tvdf = sqlContext.createDataFrame(trackcv.vocabulary, StringType())

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

tvdf = tvdf.withColumn("id", monotonically_increasing_id())

In [None]:
tvdf.show(5, truncate=False)

In [None]:
from pyspark.sql.functions import explode
tname=mpd_all.select(explode("tracks").alias("tracks")).select("tracks.track_name", "tracks.track_uri").distinct()

In [None]:
tname.printSchema()

In [None]:
tvdf.join(tname, tname.track_uri == tvdf.value).orderBy("id").show(5)

## Understand sparse vector type

The features column now contains a sparse vector.  We can see the first playlist in its sparse vector format.

Note that the type of returned data is a Row and it contains a field named 'features' which contains the sparse vector.

In [None]:
result.select("features").head(1)

In [None]:
result.select('features').show(1, truncate=False)

It's easier to understand desplay types by looking at constructed sparse vector.

The vector is constructed from a string representation of a sparse vector which is a list that contains three elements: vector length, values, and value indecies.

In [None]:
from pyspark.mllib.linalg import Vectors

Vectors.parse('(3, [0], [2])')

Note the above conversion of the string representation of a sparse vector to a sparse vector datatype shows how these two formats are displayed.

When using the show() method the string representation is returned.  When using the head on the feature we actually see a native sparse vector datatype in a row.  This shows countvectorizer is actually producing sparse vector types, it's only show() that turns it into a string.

Since the feture is actually a row with a sparse vector in it, may be able take the feature column as a sparse matrix already and perform the distance measures on it.

## Compute minhash LSH

First attempt at producing a distance metric for the playlists to understand their similarity.

In [None]:
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

In [None]:
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)

The result dataframe still contains all playlists

In [None]:
result.count()

### Apply minhash to limited subset to avoid error

Trying to apply the minhash to the entire data set raises an error that i don't understand.  Using only 10 elements avoids it but doesn't give much meaningful information when computing the Jaccard similarity.

In [None]:
model = mh.fit(result.limit(10))

In [None]:
transformA = model.transform(result.limit(10))

In [None]:
transformB = model.transform(result.limit(10))

In [None]:
transformA.select("hashes").limit(1)

In [None]:
model.approxSimilarityJoin(transformA, transformB, 0.6, distCol="JaccardDistance")

Apply do an approximate simliarlty join on the truncated data

In [None]:
jd=model.approxSimilarityJoin(transformA, transformB,10.0, distCol="JaccardDistance")\
    .select(col("datasetA.pid").alias("idA"),
            col("datasetB.pid").alias("idB"),
            col("JaccardDistance"))

In [None]:
jd.show()

Attempt a sampled subset of the data.

In [None]:
train = result.rdd.sample(False, .01, 1)
train = train.toDF()

In [None]:
train.show()

model.approxNearestNeighbors(dfA, key, 2).show()

### Inspect the minhash 

In [None]:
from pyspark.sql.functions import col, expr, when

In [None]:
result.show(1)

In [None]:
transformA.select("hashes").show(5, truncate=False)

## Figure out how to work with sparse vectors

Understand how to apply methods to colums via RDD converstion and the map() function.

In [None]:
result.dtypes

Able to access the column as a sparse vector type but this doesn't let me operate on it

In [None]:
result.select("features").rdd.map(lambda x: type(x.features)).take(5)

Able to call the sparseVector methods on the x element.

To return this as a dataframe on which we can run summary stats, the key is to [cast the lambda result as a Row](https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection). The toDF() works on rows.

In [None]:
from pyspark.sql import Row


sparselen=result.select("features").rdd.map(lambda x: Row(length=x.features.numNonzeros())).toDF()

In [None]:
sparselen.printSchema()

The summary stats are promising and show we are close to the same averages as with the full play lists data set.

In [None]:
sparselen.describe().show()

Add an index to the RDD to support joins with the original results.  The goal is to select only the sparse vectors that are not all zero valued.

In [None]:
slen=result.select("features").rdd.map(lambda x:x.features.numNonzeros()).zipWithIndex().toDF()

In [None]:
slen.show(10)

### Compare the extracted features to the original feature set

Use the summary stats on the original data to confirm that we haven't lost much detail from the default vocabulary size.

In [None]:
origlen = result.select("track_uri").rdd.map(lambda x: Row(length=len(x[0]))).toDF()

In [None]:
origlen.describe().show()

Need to remove the zero length vectors to run the LSH hash algorithms.

How many zero vectors do we have?

In [None]:
sparselen.where(sparselen.length == 0).count()

In [None]:
origlen.where(origlen.length == 5).count()

Only about half of the playlists have disappeared completely from the minimum length in the original dataset.

Need to get the vector length into the original dataframe so I can select by size when i run the algorithms

## Add the length data to the result dataframe

Understand the [approaches to add columns to dataframes](https://stackoverflow.com/a/33683462)and use the user defined function, udf, to map the sparse vector method to the column.`

Columns can oly be added using the fullowing approaches:
- literal value
- transforming an existing column
- using join
- using funtion/udf

Adding an arbitrary RDD result is basically a join by adding an index to the RDD output.
- add row num to existing dataframe (i have pid)
- zipwithindex on RDD
- join

In [None]:
result.printSchema()

Compute the lenth of the playlist array using the built-in size() function.

In [None]:
from pyspark.sql.functions import size

r2=result
r2=r2.withColumn("plen", size(r2.track_uri))

Build a udf to compute the sparse vector non zero entry counts.  This is akin the the map() method for an RDD.

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

vectorlength = udf(lambda x: x.numNonzeros(), IntegerType())

In [None]:
r2=r2.withColumn("vlen", vectorlength(r2.features))

In [None]:
r2.printSchema()

Now have a dataframe that has been extended with length information.  This can be used in selects to elliminate data that throws errors on minhash (sparse vectors with all zero entries).

In [None]:
r2.show(10)

## Compute Jaccard similarity across full data set

Use only the sparse vectors that don't have all entries as zero because [MinHash can't transform empty sets](https://spark.apache.org/docs/2.2.0/ml-features.html#minhash-for-jaccard-distance).

In [None]:
sparsevec = r2.where(r2.vlen > 0)
sparsevec.count()

Sample the data set to get a more managable data size. 

Shouldn't need to do this since the 

In [None]:
sparsevec= sparsevec.rdd.sample(False, .01, 1).toDF()

In [None]:
sparsevec.count()

In [None]:
model = mh.fit(sparsevec)

In [None]:
transformA = model.transform(sparsevec)

In [None]:
transformB = model.transform(sparsevec)

In [None]:
jd=model.approxSimilarityJoin(transformA, transformB,0.6, distCol="JaccardDistance")\
    .select(col("datasetA.pid").alias("idA"),
            col("datasetB.pid").alias("idB"),
            col("JaccardDistance"))

In [None]:
jd.where(jd.JaccardDistance > 0.0).show(10)

In [None]:
jd.show(10)

In [None]:
jd.count()

In [None]:
jd2=model.approxSimilarityJoin(transformA, transformB,0.6, distCol="JaccardDistance")

In [None]:
jd2.printSchema()

In [None]:
type(result.features.getItem(0))

Extract a single sparse vector to feed to the nearest neighbor search. Pipe it through rdd to get a list from which to get one element.

In [None]:
testpl=result.select("features").rdd.map(lambda x: x.features).take(1)[0]

In [None]:
type(testpl)

In [None]:
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(transformA, testpl, 2).show()

Inspecting the search playlist and the two nearest neighbors shows that the two returned playlists are also named for the "throwback" theme.  Clearly there is a pattern detected here.

The songs not in the search playlist but in the nearest playlist would make good recommendations.

In [None]:
mpd_all.where(mpd_all.pid==0).show()

In [None]:
mpd_all.where(mpd_all.pid==131786).show()

In [None]:
mpd_all.where(mpd_all.pid==262260).show()

See how the 10 nearest neighbors stack up.

In [None]:
first10nn = model.approxNearestNeighbors(transformA, testpl, 10)

They are all at a similar distance.  Would be helpful to understand the distribution of distances.  Feel like i need some sort of graphing viz for that. 

In [None]:
first10nn.printSchema()

In [None]:
mpd.plothist(first10nn, "distCol", 11)

### Track rank for recommended lists

In [None]:
f10tracks=first10nn.select(explode("track_uri").alias("track_uri"))

In [None]:
f10tracks.printSchema()

In [None]:
f10tracks.show()

In [None]:
trackrank = f10tracks.select("track_uri").groupby("track_uri").count().sort(f.col("count").desc())

In [None]:
Y=trackrank.select("count").toPandas()

In [None]:
Y.size

In [None]:
X=pd.DataFrame({'X': range(1,Y.size+1,1)})

In [None]:
plt.pyplot.scatter(X,Y)

In [None]:
grank=trackrank.join(tvdf, trackrank.track_uri == tvdf.value)

In [None]:
grank.orderBy(desc("count"), desc("id")).show()

In [None]:
grank.count()

### Gather info from global data set

Use join to get global data on recommended lists like the playlist name.

In [None]:
first10nn.schema

In [None]:
first10nn = first10nn.withColumnRenamed("pid", "recpid")

In [None]:
def pnamelookup(pid):
    mpd_all.where(mpd_all.pid==pid)
    
pnamelookup=udf(pnamelookup)

In [None]:
pln=first10nn.join(mpd_all, mpd_all.pid == first10nn.recpid)

In [None]:
pln.printSchema()

In [None]:
pln.select("name", "modified_at", "num_edits", "num_followers","plen").show()

In [None]:
from pyspark.sql.functions import explode

In [None]:
pDF=pln.select("pid", explode("tracks").alias("track")).select("track.*")

In [None]:
pDF.printSchema()

In [None]:
pDF.select("track_name").show()

In [None]:
pDF.count()

In [None]:
recflat=mpd.playlist_flatten(pln.select("pid", "name", "modified_at", "num_edits", "num_followers","plen", "tracks"))

In [None]:
recflat.show(10)

In [None]:
for name in pln.select("name"):
    print(name)

## Explore challenge set

In [None]:
mpd_test=spark.read.json("../mpd-challenge/challenge_set.json", multiLine=True)

In [None]:
mpd_test.printSchema()

In [None]:
mpd_test.show()

In [None]:
cpl=mpd_test.select(explode("playlists").alias("playlist"))

In [None]:
cpl.printSchema()

In [None]:
type(cpl)

In [None]:
cpl.schema

In [None]:
cpl.show(3, truncate=False)

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Convenience function for turning JSON strings into DataFrames.
def jsonToDataFrame(json, schema=None):
  # SparkSessions are available with Spark 2.0+
  reader = spark.read
  if schema:
    reader.schema(schema)
  return reader.json(sc.parallelize([json]))


In [None]:
events = jsonToDataFrame("""
{
  "a": [1, 2]
}
""")

display(events.select(explode("a").alias("x")))



In [None]:
events.select(explode("a").alias("x")).show()

In [None]:
events = jsonToDataFrame("""
{
  "a": [{"b": 6, "c": 5}, {"b": 6, "c": 5}]
}
""")

display(events.select(explode("a").alias("x")))

In [None]:
te=events.select(explode("a").alias("x"))

In [None]:
te.printSchema()

In [None]:
te.select("x.c").show()

In [None]:
te.select("x.b", "x.c").show()

In [None]:
recdf=cpl.select("playlist.name", "playlist.num_holdouts", "playlist.pid", "playlist.num_tracks", "playlist.tracks", "playlist.num_samples")

In [None]:
recdf.show(5)

In [None]:
recdf.describe("num_samples").show()

In [None]:
# Doing the heavy lifting in Spark. We could leverage the `histogram` function from the RDD api

gre_histogram = recdf.select("num_samples").rdd.flatMap(lambda x: x).histogram(11)

# Loading the Computed Histogram into a Pandas Dataframe for plotting
pd.DataFrame(
    list(zip(*gre_histogram)), 
    columns=['bin', 'frequency']
).set_index(
    'bin'
).plot(kind='bar');

In [None]:
pln.printSchema()

In [None]:
import importlib
importlib.reload(mpd)

In [None]:
mpd.plothist(pln, "num_edits", 11)

In [None]:
mpd.plothist(recdf, "num_samples", 27)

In [None]:
mpd.plothist(recdf, "num_tracks", 27)

In [None]:
mpd.plothist(pln, "num_tracks", 27)

In [None]:
mpd.plothist(mpd_all, "num_tracks", 27)

In [None]:
mpd.plothist(mpd_all, "num_followers", 27)