<a href="https://colab.research.google.com/github/thomouvic/CSC502/blob/main/pyspark_sim_join.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Similarity Join

**Original Problem Description**

Suppose we have a collection of n=1,000,000 images, each of size 1MB.
Dataset has size 1TB.
Goal is to discover pairs of images that are similar using some similarity function s(x,y).

**MapReduce Algorithm**

Input to **mappers** is key-value pairs (i, Pi), i is ID for the picture and Pi is the picture itself.

**Map**(i, Pi) emits ({i,j}, Pi) for each j in [1,n].

*{i,j} is to be considered a set, i.e. {2,3} and {3,2} are the same key*

Input to a reducer will be ({i, j}, [Pi , Pj])

**Reduce** function simply applies s(Pi ,Pj)

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop2.tgz
!tar xf spark-3.3.2-bin-hadoop2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop2"

import findspark
findspark.init("spark-3.3.2-bin-hadoop2")# SPARK_HOME

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()

In [40]:
from scipy.spatial import distance

# Sample 2D vectors for pics
sample_data = [(0, [1, 2]), (1, [2, 3]), (2, [3, 4]), (3, [4, 5])]

# Parallelize the sample data
R = sc.parallelize(sample_data)

n = R.count()

In [41]:
# this is so that we can use a set of two elements as a key for a pair
h = lambda a, b: ','.join(sorted([str(a), str(b)]))

print(h(2,1))

1,2


In [42]:
# this is a simple similarity function, just for testing
s = lambda pi, pj: 1 - distance.cosine(pi, pj)

print(s([1,2],[2,3]))

0.9922778767136677


In [43]:
# x is id-pic pair
R_emit = R.flatMap(lambda x: [(h(x[0], j), x) for j in range(n) if j!=x[0]])
# (h(i,j), (i,P_i)) elements.

print(R_emit.collect())

[('0,1', (0, [1, 2])), ('0,2', (0, [1, 2])), ('0,3', (0, [1, 2])), ('0,1', (1, [2, 3])), ('1,2', (1, [2, 3])), ('1,3', (1, [2, 3])), ('0,2', (2, [3, 4])), ('1,2', (2, [3, 4])), ('2,3', (2, [3, 4])), ('0,3', (3, [4, 5])), ('1,3', (3, [4, 5])), ('2,3', (3, [4, 5]))]


In [44]:
S = R_emit.groupByKey()
# (h(i,j), [(i,P_i), (j,P_j)]) elements.

print(S.map(lambda x: (x[0],list(x[1]))).collect())

[('0,2', [(0, [1, 2]), (2, [3, 4])]), ('0,3', [(0, [1, 2]), (3, [4, 5])]), ('1,2', [(1, [2, 3]), (2, [3, 4])]), ('1,3', [(1, [2, 3]), (3, [4, 5])]), ('0,1', [(0, [1, 2]), (1, [2, 3])]), ('2,3', [(2, [3, 4]), (3, [4, 5])])]


In [45]:
t = 0.9  # similarity threshold

# function that takes a list of tuples containing indices and vectors (i, P_i)
# and returns a list of pairs (i,j) where the similarity between P_i and P_j
# is above the threshold
f = lambda li: [(i, j) for (i, Pi) in li for (j, Pj) in li if i < j and s(Pi, Pj) > t]

T = S.flatMap(lambda x: f(x[1]))

# Collect the result
result = T.collect()

print(result)

[(0, 2), (0, 3), (1, 2), (1, 3), (0, 1), (2, 3)]
