In [1]:
import boto3
import pandas as pd

In [2]:
train_file = pd.read_csv('data/train.csv', index_col=None, sep=',') \
    .drop_duplicates()
train_file.columns = ['uid', 'repoid', 'uname', 'reponame', 'date']
train_file = train_file.drop_duplicates()
train_file.head()

Unnamed: 0,uid,repoid,uname,reponame,date
0,24466870,59996401,zzkkui,yapi,2018-03-13T01:07:02Z
1,24466870,5239185,zzkkui,quill,2018-03-13T01:07:02Z
2,24466870,76567547,zzkkui,vue-quill-editor,2018-03-13T01:07:02Z
3,24466870,105479936,zzkkui,react-email-editor,2018-03-13T01:07:02Z
4,24466870,16179237,zzkkui,virtual-dom,2018-03-13T01:07:02Z


In [3]:
# take only uid / repoid
train_file = train_file[['uid', 'repoid']]
train_file.head()

Unnamed: 0,uid,repoid
0,24466870,59996401
1,24466870,5239185
2,24466870,76567547
3,24466870,105479936
4,24466870,16179237


In [4]:
import findspark
findspark.init()

In [5]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import reverse

from pyspark.mllib.recommendation import ALS
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry, RowMatrix

In [6]:
sc = SparkContext(appName="RR")
sqlContext = SQLContext(sc)

In [7]:
def first_two_column(row):
    return (int(row[0]), int(row[1]))

In [8]:
training_rdd = sqlContext.createDataFrame(train_file).rdd.map(first_two_column)

In [9]:
# returns
# 1. idx -> keys
# 2. idx -> feature_vecs
def train(training_rdd):
    # training_rdd - (uid, prod_id)
    
    # this takes a short while too
    model = ALS.trainImplicit( \
        training_rdd.map(lambda rr: (rr[0], rr[1], 1)),
        rank=16,
        iterations=10,
        lambda_=0.1,
        alpha=80.0
    )
    return model.productFeatures()

In [10]:
def similarity(feature_vecs, columnSimilarities_threshold):
    # transpose `prod_features_rdd`
    def transpose(rm):
        cm = CoordinateMatrix(
            rm.rows.zipWithIndex().flatMap(
                lambda x: [MatrixEntry(x[1], j, v) for j, v in enumerate(x[0])]
            )
        )
        return cm.transpose().toRowMatrix()
    rowmat = RowMatrix(feature_vecs)
    colmat = transpose(rowmat)
    sims = colmat.columnSimilarities(columnSimilarities_threshold)
    return sims

In [11]:
def most_similar(sims):
    # list of `MatrixEntry(5, 649, 0.2423914277654137)`
    sim_entries = sims.entries

    # duplicate sim_entries to get all pairs of (i, j, val)
    # list of `(23, (66, -0.14212846655328612))`
    # n entries for every product === total length n*n
    sim_entries_full = sim_entries.flatMap(lambda rr: ((rr.i, (rr.j, rr.value)), (rr.j, (rr.i, rr.value))))

    def foldByKeyCompare(r1, r2):
        if r1 is None:
            return r2
        return r1 if r1[1] > r2[1] else r2
    
    # for each prod, find prod with highest similarity
    # list of `(23, (66, -0.14212846655328612))`, 1 entry for every product
    sim_entries_highest = sim_entries_full.foldByKey(None, foldByKeyCompare)
    
    return sim_entries_highest

In [12]:
def join_key(keys, most_similars):
    '''
    keys - list of repo ids
        [28, 12044, 147180, 284868 ...]
    most_similars - most similar repo for each repo, expressed as index in `keys`
        (428, (743, 0.0031531581486397564))
    '''
    
    # order keys
    # [(0, 2421308), (1, 5218272), (2, 5691060), (3, 8193932) ...]
    orderedKeys = keys.zipWithIndex().map(lambda rr: (rr[1], rr[0]))
    
    # join with one repo
    # reshape to expose another repo index
    # `(28, (14325757, 0.0020377382811490874))`
    sim_entries_left_half = most_similars \
        .join(orderedKeys) \
        .map(lambda rr: (rr[1][0][0], (rr[1][1], rr[1][0][1])))
    
    # join another repo
    # reshape to flat result
    # `(109245282, 14325757, 0.014610917357937986)`
    repo_pairs = sim_entries_left_half.join(orderedKeys).map(lambda rr: (rr[1][0][0], rr[1][1], rr[1][0][1]))
    
    return repo_pairs

In [13]:
def timer(func, desc):
    import time
    start = time.time()
    result = func()
    end = time.time()
    print(desc, ":", end - start)
    return result

def bind(func, *params):
    def newfunc():
        return func(*params)
    return newfunc

In [14]:
productFeatures = timer(bind(train, training_rdd), "ALS")

ALS : 9.76835012435913


In [15]:
import numpy as np
from numpy import linalg as LA

In [20]:
prod_features_arr = productFeatures.values().collect()
np_features = np.array(prod_features_arr)

In [22]:
np_keys = np.array(productFeatures.keys().collect())

In [17]:
np_la_norm = np.array(list(map(LA.norm, np_features)))

In [25]:
np.save('productFeatures', np_features)
np.save('prod_feature_norm', list(np_la_norm))
np.save('repo_ids', np_keys)

In [70]:
list(np_la_norm)

[]

In [72]:
np_features.shape

(8187300, 16)

In [58]:
np.argsort(np_features.dot(np_features[0]) / (np_la_norm * np_la_norm[0]))

array([8016338, 5723894, 2367101, ..., 5629124, 5331759,       0])

In [38]:
np_features[0] - np_features[2]

array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
        0.,  0.,  0.])

In [45]:
productFeatures_small = productFeatures.sample(False, 0.01)
timer(productFeatures_small.cache, "cache ALS")
print("using", len(productFeatures_small.collect()), "samples")

# productFeatures_small = sc.parallelize([(kk, tuple(vv)) for kk, vv in [
#     (0, (1 for ii in range(16))), \
#     (1, (1 for ii in range(16))), \
#     (2, (2 for ii in range(16))), \
#     (3, (ii for ii in range(16))) \
# ]])

cache ALS : 0.009809017181396484
using 5 samples


In [46]:
productFeatures_small.collect()

[(12,
  array('d', [-0.9812264442443848, -0.014173307456076145, -0.1922406405210495, 0.10215497016906738, -0.11043383181095123, 0.2534223794937134, -0.797737717628479, -0.12205567210912704, -0.6269376277923584, -0.2890850305557251, -0.4578281342983246, 0.48876720666885376, -0.002395555144175887, -0.38595688343048096, -0.2819075584411621, 0.7047893404960632])),
 (13,
  array('d', [-0.12482436746358871, -0.03038131631910801, 0.12130644917488098, 0.15238629281520844, -0.2096806913614273, 0.2443142831325531, -0.037369176745414734, 0.14703594148159027, -0.2823255658149719, -0.12168966978788376, 0.37711790204048157, 0.14086169004440308, -0.19714556634426117, -0.3474962115287781, -0.8193628787994385, 0.21149735152721405])),
 (10,
  array('d', [-0.9812264442443848, -0.014173307456076145, -0.1922406405210495, 0.10215497016906738, -0.11043383181095123, 0.2534223794937134, -0.797737717628479, -0.12205567210912704, -0.6269376277923584, -0.2890850305557251, -0.4578281342983246, 0.48876720666885376,

In [47]:
keys = productFeatures_small.keys()
feature_vecs = productFeatures_small.values()

sims = similarity(feature_vecs, 0.1)
most_similars = most_similar(sims)
timer(most_similars.cache, "similarity")

repo_pairs = join_key(keys, most_similars)
# repo_pairs.map(lambda rr: rr[:2]).collect()


similarity : 0.011590003967285156


[(12, 10), (13, 10), (14, 10), (11, 10), (10, 11)]

In [48]:
repo_pairs.collect()

[(12, 10, 1.0),
 (13, 10, 0.3958634364711903),
 (14, 10, -0.0009971295093847005),
 (11, 10, 1.0),
 (10, 11, 1.0)]

In [4]:
train_file.head()

Unnamed: 0,uid,repoid,uname,reponame,date
0,24466870,59996401,zzkkui,yapi,2018-03-13T01:07:02Z
1,24466870,5239185,zzkkui,quill,2018-03-13T01:07:02Z
2,24466870,76567547,zzkkui,vue-quill-editor,2018-03-13T01:07:02Z
3,24466870,105479936,zzkkui,react-email-editor,2018-03-13T01:07:02Z
4,24466870,16179237,zzkkui,virtual-dom,2018-03-13T01:07:02Z


In [20]:
train_file_names = train_file[['uname', 'reponame']]

In [21]:
train_file_names.head()

Unnamed: 0,uname,reponame
0,zzkkui,yapi
1,zzkkui,quill
2,zzkkui,vue-quill-editor
3,zzkkui,react-email-editor
4,zzkkui,virtual-dom


In [26]:
sqlContext.createDataFrame(train_file_names).take(5)

TypeError: field reponame: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>

In [None]:
training_rdd