<a href="https://colab.research.google.com/github/stefanoclementeunimi/AMD_project/blob/main/project_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xvf spark-3.4.1-bin-hadoop3.tgz

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

In [None]:
!pip install pyspark
!pip install findspark

import findspark
findspark.init("/content/spark-3.4.1-bin-hadoop3")
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('Basics') \
                    .config('spark.executor.memory', '8g') \
                    .config("spark.memory.offHeap.enabled","true")\
                    .config("spark.memory.offHeap.size","16g") \
                    .getOrCreate()


In [None]:
# Connection & download
os.environ['KAGGLE_USERNAME'] = "XXX"
os.environ['KAGGLE_KEY'] = "XXX"
!kaggle datasets download -d yelp-dataset/yelp-dataset


In [5]:
# The academic dataset review is extracted from the zip file
from zipfile import ZipFile

with ZipFile('yelp-dataset.zip', 'r') as f:
  f.extract('yelp_academic_dataset_review.json')

In [6]:
# We import here all the pyspark/python stuff we need

import pyspark
import pyspark.sql.functions as F
from pyspark.sql.types import *
import string, hashlib
from random import randint

# Default values for our algorithm
hashNum = 20        # Number of hashes in a signature
bandW = 2           # Width of our bands, must divide hashNum
threshold = 0.1     # Threshold for our similar pair
limit_flag = True   # If limit_flag is set to True, we use a fraction of the dataset.


In [7]:
# The json dataset is turned into a pyspark dataframe.

sp_df = spark.read.json('yelp_academic_dataset_review.json')

# We index every tweet with its row number, which we call document_id.
# We keep both the tweets and their document id.

if limit_flag:
  sp_df = sp_df.select('text')\
               .withColumn('document_id', F.monotonically_increasing_id())\
               .limit(10000)
else:
  sp_df = sp_df.select('text')\
               .withColumn('document_id', F.monotonically_increasing_id())

# We drop all the missing values from the text field
sp_df = sp_df.dropna()

In [8]:
# This function removes all the punctuation symbols.

def remove_punctuation(text):
  return text.translate(str.maketrans('', '', string.punctuation)).lower()


# This function replaces the text field with the set of all the shingles in a document.
# We define a shingle as any three consecutive words in a text.
# Furthermore, our shingles will be hashed into 32-bit integers.

def make_shingles(text):
  shingles = set()
  hashed_shingles = []
  tweet = text.split()
  length = len(tweet)
  if (length >= 3):
    for i in range(length - 2):
      shingles.add(" ".join([tweet[i], tweet[i+1], tweet[i+2]]))
  else:
    shingles.add(text)
  hashed_shingles = [int.from_bytes(hashlib.sha256(bytes(word, encoding='UTF-8')).digest()[:4], 'little') for word in shingles]
  return hashed_shingles

# In order to use them in a Spark context, we need to convert them into vectorial functions (UDFs)

make_shinglesUDF = F.udf(lambda x: make_shingles(x), ArrayType(LongType()))
remove_punctuationUDF = F.udf(lambda x: remove_punctuation(x), StringType())

# Finally, we apply the text processing functions to our dataset.

df_processed = sp_df.select('document_id', remove_punctuationUDF(F.col('text')).alias('text'))\
                .select('document_id', make_shinglesUDF(F.col('text')).alias('text'))


In [9]:
# Now we want to apply the MinHash algorithm to our dataframe.
# As a first step, we need to produce a family of n hash functions.
# A common strategy is to use functions of the form h(x) = (a*x + b) % c
# a and b will be chosen at random according to our needs
# c is a prime slighty bigger than the maximum value of the shingles - which is 2^32-1.


maxShingleValue = 2**32-1
prime = 4294967311 # prime is the smallest prime bigger than 2^32-1

def generate_random_values(n, maxValue):
  coeff = []
  while len(coeff) < n:
    r = randint(0, maxValue)
    if r not in coeff:
      coeff.append(r)
  return coeff

def generate_hash_function(n=hashNum, maxValue=maxShingleValue, c=prime):
  hashFunctions = []
  a_coeff = generate_random_values(n, maxValue)
  b_coeff = generate_random_values(n, maxValue)
  for i in range(n):
    hashFunctions.append(lambda x, i=i: (a_coeff[i]*x + b_coeff[i]) % c)
  return hashFunctions

# This function calculates the signature for every document

def min_hash(shingles, hash_list):
  hashed_shingles = []
  for hash in hash_list:
    values = list(map(hash, shingles))
    min_h = min(values, default=4294967312)
    hashed_shingles.append(min_h)
  return hashed_shingles

# We generate the list of hashes functions.
hashes = generate_hash_function()

In [10]:
# Finally, MinHash is applied. We obtain a dataframe where each document has its own signature

min_hashUDF = F.udf(lambda x: min_hash(x, hashes), ArrayType(LongType()))

df_minhashed = df_processed.select('document_id', min_hashUDF(F.col('text')).alias('text'))


In [11]:
# Having calculated the minHash signature, now we expand our dataset.
# We aim to produce one row for each element of a signature.
# We keep track of the signature indices in order
# to match them and apply our banding technique.

df_expanded = df_minhashed.select('document_id', F.posexplode('text').alias('signatureIndex', 'minHash'))



In [12]:
# Here we're using LSH: first we split our dataset into bands.
# A band is a subset of indices, reasonably dimensioned.
# Then, within that band, we collect a slice of the signature for each document
# and hash it.

def verify_pairs(nodeList):
  return len(nodeList) > 1

verify_pairsUDF = F.udf(lambda x: verify_pairs(x), BooleanType())

def lsh_banding(df, indices):
  df = df.filter(F.col('signatureIndex').isin(indices))
  df = df.groupby('document_id').agg(F.collect_list(F.col('minHash')).alias('reducedSignature'))\
         .withColumn('hashedSignature', F.hash(F.col('reducedSignature')))\
         .groupBy('hashedSignature').agg(F.collect_list(F.col('document_id')).alias('similar_documents'))
  df = df.withColumn('candidates', verify_pairsUDF(F.col('similar_documents')))\
         .filter(F.col('candidates') == True)
  df = df.withColumnRenamed('similar_documents', 'first_list')
  df_j = df.withColumnRenamed('first_list', 'second_list')
  df = df.join(df_j, ['hashedSignature'], 'inner')\
         .select(F.explode(F.col('first_list')).alias('first_document'), F.col('second_list'))\
         .select(F.col('first_document'), F.explode(F.col('second_list')).alias('second_document'))\
         .filter(F.when(F.col('first_document') == F.col('second_document'), False).otherwise(True))
  return df

# This function groups together all the candidates
# that won the LSH round.
# Later, we will compute explicitly their Jaccard Similarity.

def build_candidate_list_lsh(df, hashNum, bandW):
  indices = [list(range(hashNum))[i:i + bandW] for i in range(0, len(list(range(hashNum))), bandW)]
  # Caching would be appropriate here
  df_un = lsh_banding(df, indices[0])
  for index_el in indices[1:]:
    df_un = df_un.union(lsh_banding(df, index_el))
  return df_un.dropDuplicates()

lsh_candidates = build_candidate_list_lsh(df_expanded, hashNum, bandW)

In [13]:
# Having at our disposal all the candidates, now we are able to restrict our research
# to the promising documents.


# First, we group by the ordered pair (index, hash), collecting
# sets of elements that share a particular signature.

df_coupled = df_expanded.groupby('signatureIndex', 'minHash')\
                        .agg(F.collect_set(F.col('document_id')).alias('firstList'),
                             F.collect_set(F.col('document_id')).alias('secondList'))

# This final query is a standard way to compute the Jaccard Similarity

df_coupled = df_coupled.withColumnRenamed('firstList', "first_document")
df_coupled_j = df_coupled.withColumnRenamed('first_document', 'second_document')

final_df= df_coupled.join(df_coupled_j, ['signatureIndex', 'minHash'], 'inner')\
                     .select(F.col('minhash'),
                             F.explode(F.col('first_document')).alias('first_document'),
                             F.col('second_document'))
final_df = final_df.select(F.col('minHash'),
                           F.col('first_document'),
                           F.explode(F.col('second_document')).alias('second_document'))

# Here we restrict our search (before the aggregation) to our candidate pairs

final_df = final_df.join(lsh_candidates, ['first_document', 'second_document'], 'inner')\
                   .groupby('first_document', 'second_document')\
                   .agg((F.count('*') / hashNum).alias('jaccard_similarity'))\
                   .filter(F.when(F.col('first_document') == F.col('second_document'), False).otherwise(True))
final_df = final_df.filter(F.col('jaccard_similarity') > F.lit(threshold))


final_df = final_df.withColumn("Pair", F.array(F.col("first_document"), F.col("second_document")))\
                   .withColumn("Pair", F.array_sort(F.col("Pair")))\
                   .dropDuplicates(['Pair'])\
                   .select("Pair", "jaccard_similarity")


In [14]:
final_df.show()

+------------+------------------+
|        Pair|jaccard_similarity|
+------------+------------------+
|[1311, 8877]|               0.2|
|[1982, 4971]|              0.15|
|[1026, 6231]|              0.25|
|[3131, 8113]|              0.15|
| [308, 5108]|              0.15|
| [339, 7197]|               0.2|
|[6373, 8860]|               0.2|
|[6202, 9198]|              0.15|
|[4054, 6231]|              0.15|
|[1609, 8548]|              0.15|
| [396, 1602]|               0.2|
|[1026, 2649]|              0.25|
|[6197, 9198]|              0.15|
|[1595, 2961]|              0.15|
|[4584, 7273]|              0.15|
|[6231, 7885]|              0.15|
| [553, 8163]|              0.15|
|[2085, 4881]|               0.2|
|[5268, 7859]|              0.15|
| [553, 6373]|              0.15|
+------------+------------------+
only showing top 20 rows



In [None]:
# A csv document containing all the found pairs is stored in the content directory
final_df.withColumn("Pair", F.col("Pair").cast("string")).write.csv("/content/pairs.csv")


In [None]:
from google.colab import drive
drive.mount('/content/drive')