In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import Word2Vec
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.clustering import KMeans
import numpy as np

In [2]:
# when running pyspark, need to start pyspark with the following command: 
# pyspark --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0 
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "AKIAIIOIY7E4AMGJ7XDA")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "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")

In [4]:
# Load table
data = spark.read.json(path='s3n://songdatamsan694/songs_with_vecs')

In [5]:
# convert features back to correct dense vector format
toVector = udf(lambda x: Vectors.dense(x[1]), VectorUDT())
# transform and cache (!)
songs_with_vecs = data.select('artist','song','text',toVector('vector').alias('vector')).cache()

In [6]:
songs_with_vecs.show()

+------+--------------------+--------------------+--------------------+
|artist|                song|                text|              vector|
+------+--------------------+--------------------+--------------------+
|  abba|ahes my kind of girl|[look, at, her, f...|[-0.0024252008445...|
|  abba|     andante andante|[take, it, easy, ...|[-0.0568260343315...|
|  abba|      as good as new|[ill, never, know...|[-0.0306186712032...|
|  abba|                bang|[making, somebody...|[-0.0459822696028...|
|  abba|      bangaboomerang|[making, somebody...|[-0.0377701730561...|
|  abba|  burning my bridges|[well, you, hoot,...|[0.01259161644709...|
|  abba|           cassandra|[down, in, the, s...|[0.02807766261544...|
|  abba|          chiquitita|[chiquitita, tell...|[-0.0278077961043...|
|  abba|         crazy world|[i, was, out, wit...|[-0.0174255188314...|
|  abba|     crying over you|[im, waitin, for,...|[0.01892783018056...|
|  abba|               dance|[oh, my, love, it...|[-0.0709373955

In [7]:
# Register result as a table locally
# mode('overwrite') in case table exists from previous iteration
songs_with_vecs.registerTempTable('songs_with_vecs')

In [8]:
# Select song to compare
song_vec = sqlContext.sql("SELECT vector FROM songs_with_vecs WHERE artist = 'abba' AND song = 'dancing queen'")
song_vec.show()

+--------------------+
|              vector|
+--------------------+
|[0.05038172153682...|
+--------------------+



In [9]:
# Broadcast song to compare to all other songs
song_vec_broadcast = sc.broadcast(song_vec.collect()[0][0])
song_vec_broadcast.value

DenseVector([0.0504, -0.1141, -0.0461, 0.0738, -0.0757, -0.0993, -0.0976, -0.0484, -0.1739, -0.0348, -0.033, -0.0017, 0.0439, 0.0192, 0.014, -0.0582, 0.0933, 0.0272, 0.0556, 0.1162, -0.0085, 0.033, 0.011, -0.0601, 0.0667, -0.0946, 0.0811, 0.0911, -0.0177, 0.0544, -0.0109, 0.1807, -0.0125, -0.0819, 0.0454, 0.0714, 0.0626, 0.1547, -0.003, -0.0188, 0.011, 0.0078, 0.1095, -0.0571, -0.0427, 0.0539, 0.1073, -0.0371, 0.0533, -0.0304])

In [10]:
# Function to alculate distance from broadcast song to all other songs
# Need to be careful about types here, UDFs don't play well with np.float's, and they convert output to string as default
sqlContext.registerFunction('vDistance',lambda v: float(np.linalg.norm(v-song_vec_broadcast.value)), FloatType())

In [11]:
# sort by distance column showing artist and song
sqlContext.sql("SELECT artist, song, vDistance(vector) AS distance FROM songs_with_vecs ORDER BY vDistance(vector) ASC").show()

+----------------+--------------------+-----------+
|          artist|                song|   distance|
+----------------+--------------------+-----------+
|            abba|       dancing queen|        0.0|
|   kylie minogue|       dancing queen| 0.03220256|
|            abba|      reina danzante|0.035378244|
|            glee|       dancing queen|0.072509475|
|regine velasquez|       dancing queen|  0.1293951|
|            ub40|       music so nice| 0.19217849|
|  rolling stones|      gloom and doom| 0.21669285|
|       ll cool j|    ahh lets get ill| 0.21674484|
|      neil young|        harvest moon|  0.2203535|
|  elvis costello|         ghost train| 0.22305809|
|     wyclef jean|         rebel music| 0.22346081|
|        lou reed|        modern dance| 0.22520155|
|           yello|         daily disco|    0.22683|
|    ugly kid joe|funky fresh count...| 0.22688478|
|         erasure|         sunday girl| 0.22884552|
|      will smith|          summertime| 0.22978778|
|  tragicall