<a href="https://colab.research.google.com/github/samuelebompani/yelp-similarity/blob/main/AMD_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **AMD Project**
## Similar items
### Samuele Bompani 984322

# *Global variables*

In [None]:
slow =  {
  "n": 100000,
  "k": 5,
  "h": 100,
  "b": 10000
}
fast = {
  "n": 100000,
  "k": 5,
  "h": 10,
  "b": 2000
}
variables = slow

# *Import Libraries*



In [None]:
! pip install -q pyspark
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import json
from google.colab import files
import string
import re
import nltk
from nltk.corpus import stopwords
import pyspark

In [None]:
spark = pyspark.sql.SparkSession.builder.master('local[*]').appName("yelp-similarity").getOrCreate()
sc = spark.sparkContext

# *Import the dataset*

## *Upload the credentials*

Upload a file named kaggle.json with your Kaggle credentials

In [None]:
files.upload()
print("ok")

Saving kaggle.json to kaggle.json
ok


In [None]:
! mkdir ~/.kaggle

! mv kaggle.json ~/.kaggle/ #copying kaggle.json

! chmod 600 ~/.kaggle/kaggle.json #reading the file with full access

mkdir: cannot create directory ‘/root/.kaggle’: File exists


## *Download the dataset*

In [None]:
! kaggle datasets download -f yelp_academic_dataset_review.json -d yelp-dataset/yelp-dataset #downloading the compatition dataset
! unzip -n yelp_academic_dataset_review.json.zip

yelp_academic_dataset_review.json.zip: Skipping, found more recently modified local copy (use --force to force download)
Archive:  yelp_academic_dataset_review.json.zip


## *Open the dataset file*

In [None]:
# Set the dimension of the subset
n_rows = variables.get("n")
data_spark = spark.read.json("yelp_academic_dataset_review.json").limit(n_rows)
data_spark.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



# *Finding similar items*

In [None]:
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

## Reduce text to tokens

### K-shingles

In [None]:
def k_shingles(text, k):
  shingles = []
  ks_string = " ".join(text)
  for i in range(len(ks_string)-k+1):
    shingles.append(ks_string[i:(i+k)])
  return shingles

### Tokenization and stopwords removal

In [None]:
sw = stopwords.words('english')

# Remove whitespaces and punctuation
def normalize_text(text):
  return text.lower().translate(str.maketrans('', '', string.punctuation))

def tokenize_text(text):
  filtered = []
  for w in normalize_text(text).split():
    # Remove stop words
    if w not in sw:
      filtered.append(w)
  # Apply k-shingles
  return k_shingles(filtered, variables.get("k"))

In [None]:
data_rep = data_spark.repartition("text")
data_sel = data_rep.select(data_rep.text)
text_w_index = data_sel.rdd.map(lambda x: x.text).zipWithIndex()
# Apply the previous defined functions
tuples = text_w_index.flatMap(lambda x: map(lambda y: (y, x[1]), tokenize_text(x[0])))

## Minhash

### Group touples by words

In [None]:
# Create tuples with the shape: (ns, [sd1, sd2, ...])
# with ns = number associated to a shingle s
# and  sdi = number associated to the document i, if s is present in it
grouped_tuples = tuples.groupByKey().zipWithIndex().map(lambda x: (x[1], x[0][1]))

### Count the tokens

In [None]:
token_count = grouped_tuples.count()
print("Number of distinct shingles: "+str(token_count))

Number of distinct shingles: 418945


### Hash functions creation

In [None]:
# Number of hash function
n_hash = variables.get("h")

In [None]:
import random as rd
def h(a, b):
  def new_h(x):
    return (a * x + b) % token_count
  return new_h

hash_list = []
for i in range(n_hash):
  # a and b are two (pseudo)random numbers
  hash_list.append(h(rd.randint(0,token_count), rd.randint(0,token_count)))

### Apply hash functions

In [None]:
def seqOpHash(d, x):
  for i, h in enumerate(hash_list):
    # apply the hash function h
    val = h(x[0])
    for j in x[1]:
      # keys for d are tuples with this shape:
      # (number of the hash function, number of the document)
      if (i, j) not in d:
        d[(i, j)] = val
      else:
        if(val < d.get((i, j))):
          d[(i, j)] = val
  return d

def combOpHash(x, y):
  return y

# Apply the minhash algorithm
min_hash = grouped_tuples.aggregate({}, seqOpHash, combOpHash)

### Extract the vectors

In [None]:
par = []
for i in min_hash:
  par.append((i, min_hash[i]))
d = sc.parallelize(par)

In [None]:
grouped = d.groupBy(lambda x: x[0][1])
# Create the similarity matrix
vecs = grouped.map(lambda x: (x[0], list(map(lambda y: y[1], x[1]))))

## LSH

### Trashold choice

In [47]:
b = variables.get("b")
r = int(n_rows/b)
t = (1/b)**(1/r)
print("b: ", b, "\nr: ", r, "\nt: ", round(t, 4))

b:  10000 
r:  10 
t:  0.3981


### Split vectors

In [None]:
def split_vector(vec):
  sub_vecs = []
  for i in range(0, len(vec), r):
    sub_vecs.append(str(vec[i : i+r]))
  return sub_vecs

### Candidate pairs identification

In [None]:
def seqOp(x, y):
  for i, s in enumerate(y[1]):
    if s not in x[i].keys():
      x[i][s] = [y[0]]
    else:
      x[i][s].append(y[0])
  return x

def combOp(x, y):
  return y

subv = vecs.map(lambda x: (x[0], split_vector(x[1])))
# Populate the buckets
bucks = subv.aggregate(([{}]*b), seqOp, combOp)

In [None]:
from itertools import combinations

def find_candidates(buckets):
  candidates = []
  for bucket in buckets:
    for h in bucket.keys():
      if len(bucket[h]) > 1:
        for pair in combinations(bucket[h], 2):
          candidates.append(pair)
  return candidates

In [None]:
candidates = sc.parallelize(find_candidates(bucks)).distinct()

### Filter by the trashold

In [None]:
vs = vecs.collect()

def agree(x, y):
  intersection = 0
  for i, el in enumerate(x):
    if(y[i] == el):
      intersection += 1
  return intersection / len(x)

def compare(x, y):
  vx = list(filter(lambda i: i[0]==x, vs))[0][1]
  vy = list(filter(lambda i: i[0]==y, vs))[0][1]
  return (x, y, agree(vx, vy))

candidates_agr = candidates.map(lambda x: compare(x[0], x[1]))

In [None]:
filtered_candidates = candidates_agr.filter(lambda x: (x[2] > t)).collect()

# *Evaluation*

## Jaccard similarity

In [None]:
def jaccard(x, y):
  xs = set(x)
  xy = set(y)
  intersection = len(xs.intersection(xy))
  union = len(xs.union(xy))
  if union == 0:
    return 0
  return intersection/union


##

In [None]:
texts = text_w_index.map(lambda x: (x[1], x[0])).collectAsMap()
errors = []
jac_values= []
for i in filtered_candidates:
  x = texts.get(i[0])
  y = texts.get(i[1])
  j = jaccard(tokenize_text(x),
        tokenize_text(y))
  errors.append(abs(i[2]-j))
  jac_values.append(j)

In [61]:
correct = len([e for e in errors if e == 0])
print("Mean error: ", round(sum(errors) / len(errors), 4))
print("Jaccard similarity mean value: ",
  round(sum(jac_values) / len(jac_values), 4))
print("Correctness: ", round(correct/len(errors),4))

Mean error:  0.0174
Jaccard similarity mean value:  0.9202
Correctness:  0.3077
