In [8]:
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import IDF
from pyspark.sql.types import DoubleType


In [2]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [20]:
documents = sqlContext.createDataFrame([
    (0, "The sky is blue blue"),
    (1, "The sky is blue and beautiful"),
    (2, "Look at the bright blue sky!,"),
   ], ["doc_id", "doc_text"])


documents.printSchema()

root
 |-- doc_id: long (nullable = true)
 |-- doc_text: string (nullable = true)



In [21]:
doc=documents.rdd

In [22]:
doc.take(1)

[Row(doc_id=0, doc_text='The sky is blue blue')]

In [3]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity

In [4]:
import re
#from stemming.porter2 import stem
def clean_word(w):
   # w=w.lower().strip()
    w=re.sub('\n',"",w)
    return re.sub("[^a-z| |0-9]|,\,|\.|\;|\:|\;|\?|\!|\[|\]|\}|\{(?i)\b|[0-9]|((?:https?://|www\d{0,3}))|[//]|[#]|[$]|", "", (w.lower()))


In [23]:
toy=(documents.rdd.map(lambda x: (x.doc_id, clean_word (x.doc_text)))).toDF().withColumnRenamed("_1","doc_id").withColumnRenamed("_2","doc_text")

In [None]:
toy.take(1)

In [24]:
df = (toy.rdd
      .map(lambda x : (x[0],x[1].split(" ")))
     .toDF()
  .withColumnRenamed("_1","doc_id")
  .withColumnRenamed("_2","features"))

In [None]:
df.printSchema()

In [25]:
htf = HashingTF(inputCol="features", outputCol="tf",numFeatures=10)
tf = htf.transform(df)
tf.show(truncate=False)

+------+------------------------------------+--------------------------------------+
|doc_id|features                            |tf                                    |
+------+------------------------------------+--------------------------------------+
|0     |[the, sky, is, blue, blue]          |(10,[0,1,2,8],[1.0,1.0,1.0,2.0])      |
|1     |[the, sky, is, blue, and, beautiful]|(10,[0,1,2,3,8],[1.0,1.0,1.0,1.0,2.0])|
|2     |[look, at, the, bright, blue, sky]  |(10,[0,2,6,8,9],[1.0,1.0,2.0,1.0,1.0])|
+------+------------------------------------+--------------------------------------+



In [26]:
idf = IDF(inputCol="tf", outputCol="tfidf")
tfidf = idf.fit(tf).transform(tf)
tfidf.show(truncate=True)

+------+--------------------+--------------------+--------------------+
|doc_id|            features|                  tf|               tfidf|
+------+--------------------+--------------------+--------------------+
|     0|[the, sky, is, bl...|(10,[0,1,2,8],[1....|(10,[0,1,2,8],[0....|
|     1|[the, sky, is, bl...|(10,[0,1,2,3,8],[...|(10,[0,1,2,3,8],[...|
|     2|[look, at, the, b...|(10,[0,2,6,8,9],[...|(10,[0,2,6,8,9],[...|
+------+--------------------+--------------------+--------------------+



In [11]:
tfidf1=tfidf.select("doc_id","features","tfidf")

In [12]:
tfidf1.show(truncate=False)

+------+------------------------------------+---------------------------------------------------------------------+
|doc_id|features                            |tfidf                                                                |
+------+------------------------------------+---------------------------------------------------------------------+
|0     |[the, sky, is, blue]                |(10,[0,1,2,8],[0.0,0.28768207245178085,0.0,0.0])                     |
|1     |[the, sky, is, blue, and, beautiful]|(10,[0,1,2,3,8],[0.0,0.28768207245178085,0.0,0.6931471805599453,0.0])|
|2     |[look, at, the, bright, blue, sky]  |(10,[0,2,6,8,9],[0.0,0.0,1.3862943611198906,0.0,0.6931471805599453]) |
+------+------------------------------------+---------------------------------------------------------------------+



In [13]:
mat1=tfidf.rdd.map(lambda x: (x.tfidf))

In [30]:
mat2=tfidf.rdd.map(lambda x: (x.doc_id,set(x.features)))

In [28]:
mat1.take(1)

[SparseVector(10, {0: 0.0, 1: 0.2877, 2: 0.0, 8: 0.0})]

In [31]:
mat2.take(1)

[(0, {'blue', 'is', 'sky', 'the'})]

In [13]:
import numpy as np
from scipy.sparse import csr_matrix
def as_matrix(vec):
    data, indices = vec.values, vec.indices
    shape = 1, vec.size
    return csr_matrix((data, indices, np.array([0, vec.values.size])), shape)

mats = mat1.map(as_matrix)

In [14]:
from scipy.sparse import vstack

mat = mats.reduce(lambda x, y: vstack([x, y]))

In [15]:
def broadcast_matrix(mat):
    bcast = sc.broadcast((mat.data, mat.indices, mat.indptr))
    (data, indices, indptr) = bcast.value
    bcast_mat = csr_matrix((data, indices, indptr), shape=mat.shape)
    return bcast_mat

def parallelize_matrix(scipy_mat, rows_per_chunk=100):
    [rows, cols] = scipy_mat.shape
    i = 0
    submatrices = []
    while i < rows:
        current_chunk_size = min(rows_per_chunk, rows - i)
        submat = scipy_mat[i:i + current_chunk_size]
        submatrices.append((i, (submat.data, submat.indices, 
                                submat.indptr),
                            (current_chunk_size, cols)))
        i += current_chunk_size
    return sc.parallelize(submatrices)

In [16]:
from scipy.sparse import csr_matrix

In [17]:
a_mat_para = parallelize_matrix(mat, rows_per_chunk=100)
b_mat_dist = broadcast_matrix(mat)

In [111]:
from sklearn.metrics.pairwise import cosine_similarity
def find_matches_in_submatrix(sources, targets, inputs_start_index,
                              threshold=.0):
    cosimilarities = cosine_similarity(sources, targets)
    for i, cosimilarity in enumerate(cosimilarities):
        cosimilarity = cosimilarity.flatten()
        index=np.argsort(-cosimilarity).tolist()
        #df4=doc.filter(lambda x: x[0]==index[0]).map (lambda x: x[1])
        #print ('='*60)
        #print ('Document Similarity Analysis using Cosine Similarity\n')
        #print ('='*60)
        #print ("documents:",i+1,":",df4.collect())
        #target_index = index
        #target_index = cosimilarity.argsort()[-1]
        #target_index = -cosimilarity.argsort().tolist()
        source_index = inputs_start_index + i
        #source=doc.filter(lambda x: x[0]==source_index).map (lambda x: x[1]).collect()
        for j in range(3):
        #for target_index in enumerate(target_index):
            target_index =index[j]
            similarity = cosimilarity[target_index]
            #target=doc.filter(lambda x: x[0]==target_index).map (lambda x: x[1]).collect()
            #df5=doc.filter(lambda x: x[0]==index[j+1]).map (lambda x: x[1])
            #print ('-'*100 )
            #print (" similar documents\n","Top Doc:", j+1,":\n",df5.collect() )
            # print (df2.collect())
            #print ('-'*100 )
        #if cosimilarity[target_index] >= threshold:
            yield (source_index, target_index, j+1)
            #yield (source, target, j+1)

In [112]:
x=(a_mat_para.flatMap(
        lambda submatrix:
        find_matches_in_submatrix(csr_matrix(submatrix[1],
                                             shape=submatrix[2]),
                                   b_mat_dist,submatrix[0])))

In [114]:
coll=x.collect()

In [115]:
coll[0]

(0, 0, 1)

In [60]:
y=x.map(lambda x: (x[0],x[1],x[2]))

In [61]:
y.collect()

[(0, 0, 1),
 (0, 1, 2),
 (0, 2, 3),
 (1, 1, 1),
 (1, 0, 2),
 (1, 2, 3),
 (2, 2, 1),
 (2, 0, 2),
 (2, 1, 3)]

In [62]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType
schema = StructType([StructField("DOC",IntegerType(), True),StructField("Related_doc", IntegerType(), True),StructField("Related_num", StringType(), True)])
y1=sqlContext.createDataFrame(y, schema)

In [63]:
y1.show()

+---+-----------+-----------+
|DOC|Related_doc|Related_num|
+---+-----------+-----------+
|  0|          0|          1|
|  0|          1|          2|
|  0|          2|          3|
|  1|          1|          1|
|  1|          0|          2|
|  1|          2|          3|
|  2|          2|          1|
|  2|          0|          2|
|  2|          1|          3|
+---+-----------+-----------+



In [64]:
pivoted = (y1
    .groupBy("DOC")
    .pivot(
        "Related_num",
        ['1', '2', '3'])  # Optional list of levels
    .sum("Related_doc"))  # alternatively you can use .agg(expr))
pivoted.show()

+---+---+---+---+
|DOC|  1|  2|  3|
+---+---+---+---+
|  1|  1|  0|  2|
|  2|  2|  0|  1|
|  0|  0|  1|  2|
+---+---+---+---+



In [71]:
arr=pivoted.collect()

In [74]:
arr [1]

Row(DOC=2, 1=2, 2=0, 3=1)

In [36]:
y2=y1.filter(y1.DOC==0)

In [45]:
y3=y2.select("Related_doc")

In [46]:
y4=y3.rdd.map(lambda x: x[0]).collect()

In [47]:
y4

[0, 1, 2]

In [48]:
df7=doc.filter(lambda x: x[0]==(y4[0])).map (lambda x: x[1])

In [49]:
df7.take(1)

['The sky is blue']

In [None]:
y1=x.map(lambda x : x[0]).collect()

In [None]:
y2=x.map(lambda x : x[1]).collect()
y3=x.map(lambda x : x[2]).collect()

In [171]:
#from pyspark.sql.types import *
#field = StructType[StructField("DOC",StringType(), True),StructField("Related_doc1", StringType(), True),StructField("Related_doc2",StringType(), True), StructField("Related_doc3", StringType(), True)]
schema1 = StructType([])
df6 = sqlContext.createDataFrame(sc.emptyRDD(), schema1)
#schema = StructType(field)
df0=df6.rdd
#df6 = sqlContext.createDataFrame(sc.emptyRDD(), schema)
#df0=[]

In [180]:
#df6=sc.emptyRDD[("String", "String", "String")]
i=0
def related_docs(x):
    for i in range(3):##### change here
    #a=(y1[i])
        df4=doc.filter(lambda x: x[0]==i).map (lambda x: x[1]).collect()
        y2=y1.filter(y1.DOC==i)
        y3=y2.select("Related_doc")
        y4=y3.rdd.map(lambda x: x[0]).collect()
        #print ('='*60)
        #print ('Document Similarity Analysis using Cosine Similarity\n')
        #print ('='*60)
        #print ("documents:",i+1,":",df4.collect())
        #print (df4) 
        #print ('='*60)
        for j in (range(3)):
        #k=y1.filter(y1.DOC==i)
        
           df5=doc.filter(lambda x: x[0]==(y4[j])).map (lambda x: x[1]).collect()
        #rel= union(df4, df5, j+1)
        #rel=sc.union([print (df4),print (df5)])
        #print (df5)
        #df0=(df0,df4,df5)
        #rel=sc.union([df4, df5])
           print (str (df4),",",str (df5),",",str (j+1))
    #df0 = sqlContext.createDataFrame(df5, schema)
        #df5 = df5.zipWithIndex()#.map(lambda (v, k): (k, v))
        #df0 = df0.zipWithIndex()#.map(lambda (v, k): (k, v))
        #df0 = df0.join(df5) 
    #print ('-'*100 )    
        #print (" similar documents\n","Top Doc:", j+1,":\n",df5.collect() )
        
    # print (df2.collect())
    #print ('-'*100 )
    #sim= y3[i]
    #print ("similarity==",sim)

In [181]:
related_docs(1)

['The sky is blue'] , ['The sky is blue'] , 1
['The sky is blue'] , ['The sky is blue and beautiful'] , 2
['The sky is blue'] , ['Look at the bright blue sky!,'] , 3
['The sky is blue and beautiful'] , ['The sky is blue and beautiful'] , 1
['The sky is blue and beautiful'] , ['The sky is blue'] , 2
['The sky is blue and beautiful'] , ['Look at the bright blue sky!,'] , 3
['Look at the bright blue sky!,'] , ['Look at the bright blue sky!,'] , 1
['Look at the bright blue sky!,'] , ['The sky is blue'] , 2
['Look at the bright blue sky!,'] , ['The sky is blue and beautiful'] , 3


In [174]:
z=df0.map(lambda x: x[0]==related_docs(x))

In [175]:
z.collect()

Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

In [149]:
from __future__ import print_function
a_loop = lambda x: ((x, y) for y in xrange(10))
print_me = lambda ((x, y), z): print("{0}.{1}.{2}".format(x, y, z)))


SyntaxError: invalid syntax (<ipython-input-149-06b8905fdbdd>, line 3)

In [94]:
rel.take(1)

['Look at the bright blue sky!,']

In [None]:
df00=df4.union(df0)  

In [None]:
df0.collect()

In [None]:
df4=doc.filter(lambda x: x[0]==index[0]).map (lambda x: x[1])

In [None]:
tfidf

In [None]:
from sklearn.metrics.pairwise import cosine_similarity

In [None]:
mat1.take(1)

In [None]:
mat1

In [None]:
mats

In [None]:
mat

In [None]:
cosimilarities1 = cosine_similarity(mat, mat)

In [None]:
cosimilarities1

In [None]:
for i, cosimilarity in enumerate(cosimilarities1):
        cosimilarity = cosimilarity.flatten()
        print (cosimilarity)
        print ('###########')
        index=np.argsort(-cosimilarity)
        target_index = index.tolist()
        print (target_index)
        print ('####################')
        for target_index in (range(len(target_index))):
            similarity = cosimilarity[target_index]
            print (similarity)
            print ('@@@@')
        #source_index = inputs_start_index + i
        

In [None]:
len(target_index)-1

In [None]:
cosimilarity

In [None]:
target_index

In [None]:
similarity

In [None]:
a=(index[i]).tolist()

In [None]:
a

In [None]:
df3=doc.filter(lambda x: x[0]==a[0]).map (lambda x: x[1])

In [None]:
str (df3.collect())

In [None]:
df1=documents[documents.doc_id.isin([2,1, 0])]

In [None]:
df2.take(3)

In [None]:
df1.take(3)

In [None]:
df2=df1.select("doc_id")

In [None]:
mat1=df1.rdd.map(lambda x: (x[1]))

In [None]:
header = mat1.take(1)[0]
rows = mat1.filter(lambda line: line != header)

In [None]:
mat1str=[(rows.collect())]

In [None]:
df3.take(1)

In [None]:
df3str=str (df3.collect())

In [None]:
df3str

In [None]:
file = open("dump.txt", "w")
file.write ('\nDocument Similarity Analysis using Cosine Similarity\n')
for i in range(3):##### change here
    a=(index[i]).tolist()
    df3=doc.filter(lambda x: x[0]==a[0]).map (lambda x: str (x[1]))
    df3str=str (df3.collect()) 
    file.write ("\n")
    file.write ('='*60)
    file.write ("\ndocuments:")
    c=str(i+1)
    file.write (c)
    file.write (":\n")   
    file.write (df3str)
    file.write (" \nsimilar documents\n")
    for j in range(2):
        df2=doc.filter(lambda x: x[0]== a[j+1]).map (lambda x: str (x[1]))
        df2str=str (df2.collect())        
        file.write ("\nTop Doc:")
        b= str (j+1)
        file.write (b)
        file.write (":\n")        
        file.write (df2str)
        file.write ("\n")
        file.write ('-'*40 )
        file.write ("\n")
        #file.close()
file.close ()

In [None]:
a