In [3]:
def read_docs_topics_file(papers_file, delimiter=" "):
    """
    Reads documents latent topics from the file
    :param papers_file: file contains papers topics
    :param delimiter: The separator used in the file
    :return: ndarray (2d), one row for each paper, one column for each topic
    """
    papers_list = []
    with open(papers_file) as f:
        for line in f:
            paper_dis = [float(x) for x in line.replace("\n", "").split(delimiter) if x != ""]
            papers_list.append(paper_dis)
    # Check if all lines have the same length
    if len(set([len(x) for x in papers_list])) > 1:
        print("Error in papers file, papers have different number of features: {}".format(papers_file))
        raise ValueError("Error in papers file, papers have different number of features.")
    return np.array(papers_list)

In [5]:
import numpy as np
from scipy import sparse
from sklearn.metrics.pairwise import cosine_similarity
lda_file = '/Volumes/Work/datasets/citeulike/citeulike_2004_2007/lda_sklearn/theta_150.dat'
print("Reading papers LDA topics file: {}".format(lda_file))
papers_lda_topics = read_docs_topics_file(lda_file)
print("papers LDA matrix dimensions: {}".format(papers_lda_topics.shape))

Reading papers LDA topics file: /Volumes/Work/datasets/citeulike/citeulike_2004_2007/lda_sklearn/theta_150.dat
papers LDA matrix dimensions: (210137, 150)


In [None]:
similarity_matrix = cosine_similarity(sparse.csr_matrix(papers_lda_topics), dense_output=False)

print("Similarity matrix size: {} X {}, Nonzeros: {}, stored values: {}".format(similarity_matrix.shape[0],
                                                                                similarity_matrix.shape[1],
                                                                                similarity_matrix.count_nonzero(),
                                                                                similarity_matrix.nnz))
print('Size of similarity matrix: {} Bytes'.format(sys.getsizeof(similarity_matrix.data.nbytes + similarity_matrix.indptr.nbytes + similarity_matrix.indices.nbytes)))


In [None]:
# using the spark mlpackage
import pyspark.sql.functions as psf
df = rdd.toDF(["ID", "Office_Loc"]).withColumn("Office_Loc", psf.split(psf.regexp_replace("Office_Loc", " ", ""), ','))

#Compute TF-IDF:
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="Office_Loc", outputCol="tf")
tf = hashingTF.transform(df)

idf = IDF(inputCol="tf", outputCol="feature").fit(tf)
tfidf = idf.transform(tf)

#Compute L2 norm:
from pyspark.ml.feature import Normalizer
normalizer = Normalizer(inputCol="feature", outputCol="norm")
data = normalizer.transform(tfidf)


#Compute matrix product:
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
mat = IndexedRowMatrix(
    data.select("ID", "norm")\
        .rdd.map(lambda row: IndexedRow(row.ID, row.norm.toArray()))).toBlockMatrix()
dot = mat.multiply(mat.transpose())
dot.toLocalMatrix().toArray()

In [None]:
print("*** Broadcasting the similarity matrix ***")
#print("*** Parallelizing the similarity matrix ***")
#sc.parallelize(similarity_matrix)
similarity_matrix_broadcasted = sc.broadcast(similarity_matrix)
#similarity_matrix_broadcasted = similarity_matrix
print("*** Broadcasting the similarity matrix  Done !***")