# Documents Similarity

## Read Files

In [1]:
#path_name_documents = './Databases/prova/prova.jsonl'
#path_name_documents = './Databases/prova/prova10.jsonl'
path_name_documents = './Databases/prova/prova50.jsonl'
#path_name_documents = './Databases/prova/prova5000.jsonl' 
#path_name_documents = './Databases/prova/prova30000.jsonl'
#path_name_documents = './Databases/prova/prova2000.jsonl'

In [2]:
import json
import numpy as np
import string

def readFile(path_name):
    # Load the JSONL file into a list
    with open(path_name, 'r') as f:
        lines = f.readlines()

    # Convert each JSON object into a dictionary
    dicts = [json.loads(line) for line in lines]

    # Convert the dictionaries into arrays and stack them vertically
    arrays = np.vstack([np.array(list(d.values())) for d in dicts])

    # Convert the arrays into a list of lists
    text = arrays.tolist()
    
    return text

documents = readFile(path_name_documents)


In [3]:
import time

def time_it(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"Execution time: {end_time - start_time:.5f} seconds")
        return result
    return wrapper

## Tokenized

In [4]:
import json
import nltk
import re
import string
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer, WordNetLemmatizer


stop_words = set(stopwords.words('english'))

def stemmingLemming(filtered_tokens):
    stemmer = PorterStemmer()
    lemmatizer = WordNetLemmatizer()

    # Perform stemming or lemmatization on filtered tokens
    
    filtered_tokens = [lemmatizer.lemmatize(token) for token in filtered_tokens]
    filtered_tokens = [stemmer.stem(token) for token in filtered_tokens]

    return filtered_tokens
    
 
    

def tokenize(path_name):
    
    with open(path_name, "r") as f:
        data = f.readlines()

        # Create an empty list to store the tokenized documents
        tokenized_docs = []

        # Loop through each line in the JSONL file
        for line in data:
            # Parse the JSON string into a Python dictionary
            doc = json.loads(line)

            # Extract the text from the dictionary
            text = doc['text']
            text = text.lower()  # Convert to lowercase
            #text = re.sub(r'\d+', '', text)  # Remove all numbers
            text = text.translate(str.maketrans('', '', string.punctuation))  # Remove all punctuation

            # Tokenize the text using NLTK
            tokens = word_tokenize(text)
            tokensStemLem = stemmingLemming(tokens)

            # Add the tokenized document to the list
            tokenized_docs.append(tokensStemLem)

        # Print the tokenized documents
    return tokenized_docs


tokenized_docs = tokenize(path_name_documents)


# Sparse Vectors

## TF-IDF

In [5]:
from sklearn.feature_extraction.text import TfidfVectorizer


def calculateTFIDF(tokenized_docs):
    
    vectorizer = TfidfVectorizer()
    # Fit and transform the tokenized documents into a TF-IDF matrix
    tfidf_matrix = vectorizer.fit_transform([' '.join(doc) for doc in tokenized_docs])

    # Get the feature names (tokens)
    feature_names = vectorizer.get_feature_names_out()

    # Return the TF-IDF matrix and the feature names
    return tfidf_matrix, feature_names,vectorizer
    
        

tfidf_matrix_docs, feature_names_docs,vectorizer  = calculateTFIDF(tokenized_docs)

## Cosine Similarity

In [6]:
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity

def similarity(tfidf_matrix):
    # calcoliamo la cosine similarity tra i documenti
    cos_sim = cosine_similarity(tfidf_matrix)

    # creiamo una tabella con le cosine similarity per ogni coppia di documenti
    sim_table = pd.DataFrame(cos_sim, columns=['Doc ' + str(i+1) for i in range(cos_sim.shape[0])], index=['Doc ' + str(i+1) for i in range(cos_sim.shape[0])])
    
    return sim_table, cos_sim

cos_sim_table, cos_sim = similarity(tfidf_matrix_docs)
cos_sim_table


Unnamed: 0,Doc 1,Doc 2,Doc 3,Doc 4,Doc 5,Doc 6,Doc 7,Doc 8,Doc 9,Doc 10,...,Doc 41,Doc 42,Doc 43,Doc 44,Doc 45,Doc 46,Doc 47,Doc 48,Doc 49,Doc 50
Doc 1,1.0,0.110656,0.109044,0.060716,0.177886,0.058977,0.116177,0.125621,0.077924,0.0979,...,0.120425,0.024624,0.230842,0.055705,0.129963,0.103205,0.069016,0.101201,0.098645,0.152137
Doc 2,0.110656,1.0,0.283733,0.220578,0.162825,0.131854,0.107714,0.157167,0.216831,0.164395,...,0.140222,0.070117,0.1062,0.081366,0.145323,0.159547,0.111732,0.10535,0.138997,0.249414
Doc 3,0.109044,0.283733,1.0,0.173911,0.22775,0.099746,0.109399,0.146971,0.164795,0.136291,...,0.13863,0.058028,0.106178,0.077943,0.1451,0.14422,0.127869,0.159312,0.156291,0.222083
Doc 4,0.060716,0.220578,0.173911,1.0,0.090387,0.059282,0.058089,0.087157,0.159207,0.112255,...,0.086176,0.065704,0.069261,0.036991,0.07034,0.080001,0.078273,0.064788,0.079276,0.15617
Doc 5,0.177886,0.162825,0.22775,0.090387,1.0,0.091296,0.135467,0.124806,0.144199,0.120603,...,0.165375,0.100469,0.122009,0.05387,0.12883,0.131225,0.096039,0.141964,0.15757,0.183197
Doc 6,0.058977,0.131854,0.099746,0.059282,0.091296,1.0,0.044006,0.089065,0.078635,0.088639,...,0.091467,0.050299,0.039206,0.053383,0.079098,0.127049,0.058086,0.072913,0.093496,0.133366
Doc 7,0.116177,0.107714,0.109399,0.058089,0.135467,0.044006,1.0,0.131688,0.093923,0.094308,...,0.089265,0.055974,0.1093,0.034703,0.094217,0.052091,0.088159,0.068817,0.118031,0.177457
Doc 8,0.125621,0.157167,0.146971,0.087157,0.124806,0.089065,0.131688,1.0,0.103217,0.139351,...,0.133801,0.027777,0.085724,0.103432,0.137513,0.137954,0.076477,0.081885,0.098929,0.263209
Doc 9,0.077924,0.216831,0.164795,0.159207,0.144199,0.078635,0.093923,0.103217,1.0,0.111822,...,0.121011,0.05784,0.084841,0.050483,0.122392,0.113322,0.093318,0.100476,0.131275,0.158128
Doc 10,0.0979,0.164395,0.136291,0.112255,0.120603,0.088639,0.094308,0.139351,0.111822,1.0,...,0.139255,0.117162,0.079364,0.103379,0.099888,0.14257,0.124507,0.092218,0.209916,0.226091


## Sequential algorithms

In [8]:
from scipy.sparse import coo_matrix

def extract_document_terms(tfidf_matrix):
    matrix_coo = coo_matrix(tfidf_matrix)
    data = matrix_coo.data
    row = matrix_coo.row
    col = matrix_coo.col

    doc_terms = []
    current_doc = -1
    terms = []

    for i in range(len(data)):
        doc_id = row[i]
        term_id = col[i]
        term_value = data[i]

        if doc_id != current_doc:
            if terms:
                doc_terms.append((current_doc+1, terms))
                terms = []
            current_doc = doc_id

        terms.append((term_id, term_value))

    if terms:
        doc_terms.append((current_doc, terms))

    return doc_terms

doc_info_list = extract_document_terms(tfidf_matrix_docs)
#print(doc_info_list)

In [17]:
def my_map(doc_info):
    mapped_doc = []

    doc_id, terms = doc_info
    
    max_term_id = max(terms, key=lambda x: x[0])[0]
    doc_terms = [(term_id, value) for term_id, value in terms]
    mapped_doc.append((doc_id, max_term_id, doc_terms))

    return mapped_doc

In [18]:
def apply_my_map(doc_info_list):
    total_mapped = []

    for doc_info in doc_info_list:
        mapped = my_map(doc_info)
        total_mapped.extend(mapped)

    return total_mapped

#mapped_list = apply_my_map(doc_info_list)

In [16]:
def my_reduce(docs, threshold):
    pairs = []

    n_docs = len(docs)
    print(n_docs)
    for i in range(n_docs-1):
        for j in range(i + 1, n_docs):

            doc1_id, term_id, doc1 = docs[i]
            doc2_id, _, doc2 = docs[j]

            terms_1 = {t_id1: val1 for t_id1, val1 in doc1}
            terms_2 = {t_id2: val2 for t_id2, val2 in doc2}

            common_terms = set(terms_1).intersection(terms_2)

            if not common_terms:
                continue

            max_term = max(common_terms)

            if term_id != max_term:
                continue

            sim = 0.0

            for term in common_terms:
                sim += terms_1[term] * terms_2[term]

            if sim >= threshold:
                if(doc2_id == n_docs):
                    doc2_id+=1
                pair = ((doc1_id, doc2_id), sim)
                pairs.append(pair)

    return pairs


#pairs = my_reduce(mapped_list,0.1)


## Execution

In [12]:
import os
os.environ['PYSPARK_PYTHON'] = 'C:/Users/lita4/anaconda3/python.exe'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/lita4/anaconda3/python.exe'

In [13]:
doc_info_list = extract_document_terms(tfidf_matrix_docs)

In [32]:
from pyspark import SparkConf, SparkContext

def my_map(doc_info):
    mapped_doc = []

    doc_id, terms = doc_info
    
    max_term_id = max(terms, key=lambda x: x[0])[0]
    doc_terms = [(term_id, value) for term_id, value in terms]
    mapped_doc.append((doc_id, max_term_id, doc_terms))

    return mapped_doc

def my_reduce(docs1, docs2, threshold):
    pairs = []

    doc1_id, term_id, doc1 = docs1
    doc2_id, _, doc2 = docs2

    if doc1_id == doc2_id:
        return pairs

    terms_1 = {t_id1: val1 for t_id1, val1 in doc1}
    terms_2 = {t_id2: val2 for t_id2, val2 in doc2}

    common_terms = set(terms_1).intersection(terms_2)

    if not common_terms:
        return pairs

    max_term = max(common_terms)

    if term_id != max_term:
        return pairs

    sim = 0.0

    for term in common_terms:
        sim += terms_1[term] * terms_2[term]

    if sim >= threshold:
        pair = ((doc1_id, doc2_id), sim)
        pairs.append(pair)

    return pairs







conf = SparkConf().setAppName("MyApp").setMaster("local[8]").set("spark.executor.memory", "16g")
sc = SparkContext(conf=conf)

print("INPUT")
input_rdd = sc.parallelize(doc_info_list)
#print(input_rdd.collect())

print("MAP")
mapped_rdd = input_rdd.flatMap(my_map)
#print(mapped_rdd.collect())

print("REDUCE")
threshold = 0.3 # Replace with the desired threshold value
reduced_pairs = mapped_rdd.cartesian(mapped_rdd).flatMap(lambda x: my_reduce(x[0], x[1],threshold))
results = reduced_pairs.collect()

#results = list(set(results))
for result in sorted(results):
    print(result)
sc.stop()


INPUT
MAP
REDUCE


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 0.0 failed 1 times, most recent failure: Lost task 8.0 in stage 0.0 (TID 8) (DESKTOP-JP7MLME executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\lita4\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 810, in main
  File "C:\Users\lita4\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 594, in read_int
    length = stream.read(4)
  File "C:\Users\lita4\anaconda3\lib\socket.py", line 705, in readinto
    return self._sock.recv_into(b)
TimeoutError: timed out

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1019)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\lita4\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 810, in main
  File "C:\Users\lita4\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 594, in read_int
    length = stream.read(4)
  File "C:\Users\lita4\anaconda3\lib\socket.py", line 705, in readinto
    return self._sock.recv_into(b)
TimeoutError: timed out

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1019)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more


In [31]:
sc.stop()
