In [None]:
import pandas as pd
from pyspark.mllib.feature import Word2Vec, Word2VecModel
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
import h5py
import os
import sys
import logging
from sklearn.model_selection import ParameterGrid
import shutil

In [None]:
partitions = os.cpu_count() - 2

In [None]:
# https://stackoverflow.com/questions/40208420/how-to-find-hdf5-file-groups-keys-within-python
with h5py.File('binarized.hdf') as f:
    print(f.keys())

In [None]:
sc = SparkContext('local[{cpus}]'.format(cpus=partitions), 'word2vec')
spark = SparkSession(sc)

In [None]:
# COLUMNS
LIKED = 'Liked'
MOVIE_ID = 'movieId'
USER_ID = 'userId'
TIMESTAMP = 'Timestamp'
TITLE = 'title'
GENRE = 'genres'

In [None]:
df_movies = pd.read_csv('ml-20m/movies.csv', index_col=MOVIE_ID)

In [None]:
df = pd.read_hdf('binarized.hdf', key='trg')

In [None]:
df = df.drop([TIMESTAMP], axis=1)
df = df[df[LIKED] == 1]

In [None]:
df = df.head(50000) # comment out for production

In [None]:
df[MOVIE_ID] = df.index.get_level_values(MOVIE_ID).astype(str)

In [None]:
# PARAMETERS
# The most ratings any user has had
df_gb = df.groupby([USER_ID])
WINDOW_SIZE = df_gb[LIKED].count().max() 

In [None]:
print(WINDOW_SIZE)

In [None]:
dict_str_groups = {k: list(v[MOVIE_ID]) for k, v in df_gb}

In [None]:
LEARNING_RATE = 'learning_rate'
VECTOR_SIZE = 'vector_size'
MIN_COUNT = 'min_count'
param_grid = ParameterGrid({
    LEARNING_RATE: [0.025, 0.01, 0.05],
    VECTOR_SIZE: [64, 128],
    MIN_COUNT: [5, 10, 20]
})

In [None]:
list(param_grid)

In [None]:
document = sc.parallelize(dict_str_groups.values(), partitions)

In [None]:
for params in param_grid:
    logging.debug('Params: {params}'.format(params=params))
    logging.debug('Start Train: {ts}'.format(ts=pd.Timestamp('now')))
    
    # Constant
    word2vec = Word2Vec()
    word2vec.setNumPartitions(partitions)
    word2vec.setWindowSize(WINDOW_SIZE)
    
    # Grid
    word2vec.setLearningRate(params[LEARNING_RATE])
    word2vec.setVectorSize(params[VECTOR_SIZE])
    word2vec.setMinCount(params[MIN_COUNT])
    
    # Fit and save
    model = word2vec.fit(document)
    logging.debug('Stop Train: {ts}'.format(ts=pd.Timestamp('now')))
    outdir = 'w2v_model_pt_{partitions}_mc_{min_count}_lr_{lr}_vs_{vs}.sparkmodel'.format(
        partitions=partitions, min_count=params[MIN_COUNT], lr=params[LEARNING_RATE],
        vs=params[VECTOR_SIZE]
    )
    if os.path.isdir(outdir):
        shutil.rmtree(outdir)
    model.save(sc, outdir)

In [None]:
def show_synonyms(search_str, num_synonyms):
    synonym_list = list()
    movie_index = df_movies[df_movies[TITLE].str.match(search_str)]
    print(movie_index)
    for mi in movie_index.index:
        synonym_list.extend([(i, df_movies.loc[int(i[0])][TITLE]) for i in 
                             list(model.findSynonyms(str(mi), num_synonyms))])
    return synonym_list

In [None]:
# word2vec2 = Word2VecModel.load(sc, 'trained_wor2vec_pyspark.sparkmodel')

In [None]:
show_synonyms('.*Matrix.*', 5)

In [None]:
show_synonyms('.*Private Ryan.*', 5)