# Finding Similar Items: Textually Similar Documents

In [30]:
import findspark
findspark.init()

from pyspark import *
from pyspark.sql.functions import desc, col
from pyspark.sql import *
from graphframes import *

import os
from IPython.display import display, HTML
import pandas as pd
import numpy as np
import sys
from sympy.ntheory.generate import nextprime

In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
# https://graphframes.github.io/graphframes/docs/_site/quick-start.html
# https://stackoverflow.com/questions/65011599/how-to-start-graphframes-on-spark-on-pyspark-on-juypter-on-docker
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell'
os.environ['PYSPARK_PYTHON'] = sys.executable

In [4]:
spark = SparkSession.builder.appName('hw1').getOrCreate()

21/11/09 22:14:47 WARN Utils: Your hostname, mark-machine resolves to a loopback address: 127.0.1.1; using 192.168.0.102 instead (on interface wlp8s0)
21/11/09 22:14:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/mark/.ivy2/cache
The jars for the packages stored in: /home/mark/.ivy2/jars
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6282b76b-44c7-4f26-8bb6-65b47b4de868;1.0
	confs: [default]
	found graphframes#graphframes;0.8.1-spark3.0-s_2.12 in spark-packages
	found org.slf4j#slf4j-api;1.7.16 in central
:: resolution report :: resolve 152ms :: artifacts dl 6ms
	:: modules in use:
	graphframes#graphframes;0.8.1-spark3.0-s_2.12 from spark-packages in [default]
	org.slf4j#slf4j-api;1.7.16 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	-------------------------------------

http://mlg.ucd.ie/datasets/bbc.html

In [5]:
%%bash
file=./data/bbc-fulltext.zip

if [ -e "$file" ]; then
    echo "$file exists, skipping"
else 
    echo "$file does not exist, using wget to download it"
    wget http://mlg.ucd.ie/files/datasets/bbc-fulltext.zip -P data/
    unzip $file -d data/
fi

./data/bbc-fulltext.zip exists, skipping


In [6]:
def read_text(path):
    print(path)
    text = np.nan
    try:
        with open(path, 'r') as f:
            lines = f.readlines()

        lines = [line.strip() for line in lines]
        lines = list(filter(lambda x: x != "", lines))

        text = " ".join(lines)
    except Exception as e:
        print(f"skipping {path} due to: {e}")
    finally:
        return text

path = "data/bbc/sport/199.txt"
read_text(path)

data/bbc/sport/199.txt
skipping data/bbc/sport/199.txt due to: 'utf-8' codec can't decode byte 0xa3 in position 257: invalid start byte


nan

In [7]:
def read_text2(path):
    text = None
    try:
        text = pd.read_csv(path, encoding= 'unicode_escape')
    except Exception as e:
        print(f"skipping file at: {path} due to: {e}")
    else:
        return text

path = "data/bbc/business/001.txt"
read_text(path)

data/bbc/business/001.txt


'Ad sales boost Time Warner profit Quarterly profits at US media giant TimeWarner jumped 76% to $1.13bn (£600m) for the three months to December, from $639m year-earlier. The firm, which is now one of the biggest investors in Google, benefited from sales of high-speed internet connections and higher advert sales. TimeWarner said fourth quarter sales rose 2% to $11.1bn from $10.9bn. Its profits were buoyed by one-off gains which offset a profit dip at Warner Bros, and less users for AOL. Time Warner said on Friday that it now owns 8% of search-engine Google. But its own internet business, AOL, had has mixed fortunes. It lost 464,000 subscribers in the fourth quarter profits were lower than in the preceding three quarters. However, the company said AOL\'s underlying profit before exceptional items rose 8% on the back of stronger internet advertising revenues. It hopes to increase subscribers by offering the online service free to TimeWarner internet customers and will try to sign up AOL\

In [8]:
data_path = "data/bbc/"
topic_names = ["business", "entertainment", "politics", "sport", "tech"]

data = {}
paths = []
topics = []
texts = []

for topic_name in topic_names:
    dir_path = os.path.join(data_path, topic_name)
    
    for dirpath, dirnames, filenames in os.walk(dir_path):
        paths_ = [os.path.join(dirpath, filename) for filename in filenames]
        topics_ = [topic_name for i in range(len(paths_))]
        
        texts_ = [read_text(path_) for path_ in paths_]
        
        paths += paths_
        topics += topics_
        texts += texts_

data["path"] = paths
data["topic"] = topics
data["text"] = texts
data["id"] = [i for i in range(len(texts))]

df = pd.DataFrame(data=data)
df

data/bbc/business/489.txt
data/bbc/business/194.txt
data/bbc/business/461.txt
data/bbc/business/007.txt
data/bbc/business/145.txt
data/bbc/business/463.txt
data/bbc/business/294.txt
data/bbc/business/505.txt
data/bbc/business/348.txt
data/bbc/business/159.txt
data/bbc/business/123.txt
data/bbc/business/077.txt
data/bbc/business/002.txt
data/bbc/business/509.txt
data/bbc/business/370.txt
data/bbc/business/050.txt
data/bbc/business/072.txt
data/bbc/business/011.txt
data/bbc/business/314.txt
data/bbc/business/052.txt
data/bbc/business/355.txt
data/bbc/business/155.txt
data/bbc/business/412.txt
data/bbc/business/491.txt
data/bbc/business/105.txt
data/bbc/business/497.txt
data/bbc/business/061.txt
data/bbc/business/404.txt
data/bbc/business/125.txt
data/bbc/business/070.txt
data/bbc/business/477.txt
data/bbc/business/349.txt
data/bbc/business/237.txt
data/bbc/business/103.txt
data/bbc/business/167.txt
data/bbc/business/217.txt
data/bbc/business/211.txt
data/bbc/business/043.txt
data/bbc/bus

data/bbc/business/129.txt
data/bbc/business/228.txt
data/bbc/business/442.txt
data/bbc/business/427.txt
data/bbc/business/115.txt
data/bbc/business/248.txt
data/bbc/business/462.txt
data/bbc/business/184.txt
data/bbc/business/063.txt
data/bbc/business/085.txt
data/bbc/business/226.txt
data/bbc/business/486.txt
data/bbc/business/500.txt
data/bbc/business/360.txt
data/bbc/business/149.txt
data/bbc/business/392.txt
data/bbc/business/227.txt
data/bbc/business/445.txt
data/bbc/business/046.txt
data/bbc/business/153.txt
data/bbc/business/448.txt
data/bbc/business/503.txt
data/bbc/business/454.txt
data/bbc/business/236.txt
data/bbc/business/476.txt
data/bbc/business/322.txt
data/bbc/business/183.txt
data/bbc/business/317.txt
data/bbc/business/350.txt
data/bbc/business/108.txt
data/bbc/business/099.txt
data/bbc/business/114.txt
data/bbc/business/054.txt
data/bbc/business/073.txt
data/bbc/business/309.txt
data/bbc/business/210.txt
data/bbc/business/296.txt
data/bbc/business/012.txt
data/bbc/bus

data/bbc/entertainment/304.txt
data/bbc/entertainment/307.txt
data/bbc/entertainment/208.txt
data/bbc/entertainment/086.txt
data/bbc/entertainment/232.txt
data/bbc/entertainment/117.txt
data/bbc/entertainment/177.txt
data/bbc/entertainment/318.txt
data/bbc/politics/194.txt
data/bbc/politics/007.txt
data/bbc/politics/145.txt
data/bbc/politics/294.txt
data/bbc/politics/348.txt
data/bbc/politics/159.txt
data/bbc/politics/123.txt
data/bbc/politics/077.txt
data/bbc/politics/002.txt
data/bbc/politics/370.txt
data/bbc/politics/050.txt
data/bbc/politics/072.txt
data/bbc/politics/011.txt
data/bbc/politics/314.txt
data/bbc/politics/052.txt
data/bbc/politics/355.txt
data/bbc/politics/155.txt
data/bbc/politics/412.txt
data/bbc/politics/105.txt
data/bbc/politics/061.txt
data/bbc/politics/404.txt
data/bbc/politics/125.txt
data/bbc/politics/070.txt
data/bbc/politics/349.txt
data/bbc/politics/237.txt
data/bbc/politics/103.txt
data/bbc/politics/167.txt
data/bbc/politics/217.txt
data/bbc/politics/211.tx

data/bbc/sport/344.txt
data/bbc/sport/160.txt
data/bbc/sport/146.txt
data/bbc/sport/062.txt
data/bbc/sport/106.txt
data/bbc/sport/241.txt
data/bbc/sport/395.txt
data/bbc/sport/456.txt
data/bbc/sport/268.txt
data/bbc/sport/029.txt
data/bbc/sport/008.txt
data/bbc/sport/098.txt
data/bbc/sport/406.txt
data/bbc/sport/051.txt
data/bbc/sport/507.txt
data/bbc/sport/166.txt
data/bbc/sport/485.txt
data/bbc/sport/127.txt
data/bbc/sport/186.txt
data/bbc/sport/261.txt
data/bbc/sport/009.txt
data/bbc/sport/144.txt
data/bbc/sport/202.txt
data/bbc/sport/201.txt
data/bbc/sport/390.txt
data/bbc/sport/386.txt
data/bbc/sport/511.txt
data/bbc/sport/450.txt
data/bbc/sport/131.txt
data/bbc/sport/157.txt
data/bbc/sport/205.txt
data/bbc/sport/132.txt
data/bbc/sport/253.txt
data/bbc/sport/498.txt
data/bbc/sport/358.txt
data/bbc/sport/101.txt
data/bbc/sport/100.txt
data/bbc/sport/016.txt
data/bbc/sport/188.txt
data/bbc/sport/047.txt
data/bbc/sport/327.txt
data/bbc/sport/328.txt
data/bbc/sport/221.txt
data/bbc/sp

data/bbc/tech/114.txt
data/bbc/tech/054.txt
data/bbc/tech/073.txt
data/bbc/tech/309.txt
data/bbc/tech/210.txt
data/bbc/tech/296.txt
data/bbc/tech/012.txt
data/bbc/tech/206.txt
data/bbc/tech/118.txt
data/bbc/tech/214.txt
data/bbc/tech/030.txt
data/bbc/tech/321.txt
data/bbc/tech/303.txt
data/bbc/tech/215.txt
data/bbc/tech/091.txt
data/bbc/tech/398.txt
data/bbc/tech/064.txt
data/bbc/tech/353.txt
data/bbc/tech/340.txt
data/bbc/tech/346.txt
data/bbc/tech/238.txt
data/bbc/tech/018.txt
data/bbc/tech/265.txt
data/bbc/tech/044.txt
data/bbc/tech/262.txt
data/bbc/tech/074.txt
data/bbc/tech/020.txt
data/bbc/tech/122.txt
data/bbc/tech/151.txt
data/bbc/tech/316.txt
data/bbc/tech/057.txt
data/bbc/tech/368.txt
data/bbc/tech/069.txt
data/bbc/tech/244.txt
data/bbc/tech/148.txt
data/bbc/tech/302.txt
data/bbc/tech/154.txt
data/bbc/tech/345.txt
data/bbc/tech/150.txt
data/bbc/tech/033.txt
data/bbc/tech/066.txt
data/bbc/tech/266.txt
data/bbc/tech/038.txt
data/bbc/tech/374.txt
data/bbc/tech/094.txt
data/bbc/t

Unnamed: 0,path,topic,text,id
0,data/bbc/business/489.txt,business,Criminal probe on Citigroup deals Traders at U...,0
1,data/bbc/business/194.txt,business,'Post-Christmas lull' in lending UK mortgage l...,1
2,data/bbc/business/461.txt,business,Beijingers fume over parking fees Choking traf...,2
3,data/bbc/business/007.txt,business,Jobs growth still slow in the US The US create...,3
4,data/bbc/business/145.txt,business,Winn-Dixie files for bankruptcy US supermarket...,4
...,...,...,...,...
2220,data/bbc/tech/232.txt,tech,BBC web search aids odd queries The BBC's onli...,2220
2221,data/bbc/tech/117.txt,tech,Joke e-mail virus tricks users A virus that di...,2221
2222,data/bbc/tech/177.txt,tech,Microsoft debuts security tools Microsoft is r...,2222
2223,data/bbc/tech/394.txt,tech,TV's future down the phone line Internet TV ha...,2223


In [9]:
df.dropna(inplace=True)

In [10]:
data_all = spark.createDataFrame(df, list(df.columns.values))

In [11]:
data_all.show()

[Stage 0:>                                                          (0 + 1) / 1]

+--------------------+--------+--------------------+---+
|                path|   topic|                text| id|
+--------------------+--------+--------------------+---+
|data/bbc/business...|business|Criminal probe on...|  0|
|data/bbc/business...|business|'Post-Christmas l...|  1|
|data/bbc/business...|business|Beijingers fume o...|  2|
|data/bbc/business...|business|Jobs growth still...|  3|
|data/bbc/business...|business|Winn-Dixie files ...|  4|
|data/bbc/business...|business|US economy still ...|  5|
|data/bbc/business...|business|Wall Street cheer...|  6|
|data/bbc/business...|business|China now top tra...|  7|
|data/bbc/business...|business|US to probe airli...|  8|
|data/bbc/business...|business|India unveils ant...|  9|
|data/bbc/business...|business|Train strike grip...| 10|
|data/bbc/business...|business|China had role in...| 11|
|data/bbc/business...|business|Dollar gains on G...| 12|
|data/bbc/business...|business|Euro firms miss o...| 13|
|data/bbc/business...|business|

                                                                                

In [12]:
data = data_all.limit(20)

In [13]:
print(f"data.shape = {data.count(), len(data.columns)}")

data.shape = (20, 4)


In [200]:
def shingles(text, k=5):
    shingles = list(set([text[i:i + k] for i in range(len(text) - k + 1)]))
    return shingles

def hash_shingle(shingle):
    return hash(shingle)

def hash_singles(shingles):
    return [hash_shingle(shingle) for shingle in shingles]

# transform to hashed shingles 
myrdd = data.rdd.map(lambda x: shingles(x["text"]))
myrdd2 = myrdd.map(lambda x: hash_singles(x))

# build shingle map
# for viz
myrdd3 = myrdd.flatMap(lambda x: x)
myrdd4 = myrdd3.map(lambda w: (w,1)).reduceByKey(lambda a, b: a+b)

# for use
myrdd5 = myrdd2.flatMap(lambda x: x)
myrdd6 = myrdd5.map(lambda w: (w,1)).reduceByKey(lambda a, b: a+b)
myrdd7 = myrdd6.map(lambda x: x[0])

In [201]:
myrdd.take(3)

[['in To',
  'ove w',
  'estio',
  'finan',
  'ant C',
  'to th',
  '. Som',
  'over ',
  'arket',
  'passe',
  'to fl',
  ' a co',
  'eport',
  ' auth',
  'gust.',
  'in fi',
  'd BaF',
  'ntrov',
  'ould ',
  'uded ',
  'own b',
  'ggres',
  'pulat',
  'e Ger',
  'yo af',
  'ater.',
  ' can ',
  'h clu',
  'd thr',
  ') of ',
  't aga',
  'the h',
  'r," a',
  'rnmen',
  'to pu',
  'ales ',
  'ewher',
  'at th',
  'd tha',
  'roup ',
  '4bn e',
  'atory',
  'ic pr',
  ' and ',
  'it wo',
  'blic ',
  'ver, ',
  'fter ',
  'er ac',
  'hat i',
  'oing ',
  'tion ',
  'a fiv',
  'd evi',
  'canno',
  'tes o',
  'sible',
  't the',
  'conti',
  'ntinu',
  's are',
  '"We a',
  'are s',
  'ld fa',
  'erred',
  ' five',
  ' stil',
  'sed d',
  'orth ',
  ' Germ',
  'an sa',
  'e dis',
  ' they',
  'ion o',
  'tion,',
  ' regu',
  'n Tok',
  'und c',
  'hdog ',
  'ritic',
  'sive ',
  'red t',
  'ct sa',
  'laund',
  '1bn e',
  'wheth',
  'n was',
  'ying.',
  ' prob',
  'ainst',
  'e dea',

In [202]:
myrdd2.take(2)

[[-1003462127445295100,
  -8676960776106426357,
  -5621430239379800052,
  -61925653583757296,
  -7149235314197585892,
  353827663677939751,
  -2558723947932630986,
  -3504529482779574212,
  3315265327421882428,
  -2619610836676378558,
  -5778840097396219819,
  4303326089482559578,
  -2265669377992384422,
  -3581715568551239584,
  -726301170688884639,
  8653499187163103335,
  123835771549393000,
  6893542212889272428,
  -7844857403159191422,
  -978242564932599676,
  -6742551641101074297,
  2183264872972329102,
  4888446125909639320,
  -4519288318659157862,
  -7564785347635593052,
  -2786173493953437528,
  -840315989681741653,
  -1670244157886570324,
  5376075156978901169,
  5589928532362059958,
  -4495930183083900745,
  -2955383099770257226,
  7822852136103223481,
  -1185982138674265926,
  330337838176452793,
  -7406679281132650304,
  -7383698818063073087,
  4957408517781078212,
  -3479181340012977975,
  -4387572509557964594,
  -7574354631190560558,
  1077695685923340500,
  -19242397316

In [203]:
myrdd3.sortBy(lambda x: x[1], ascending=True).collect()

['. Som',
 'd BaF',
 'e Ger',
 'h clu',
 'd thr',
 ') of ',
 't aga',
 'd tha',
 'a fiv',
 'd evi',
 't the',
 's are',
 'e dis',
 'n Tok',
 'n was',
 'e dea',
 's cul',
 'e sti',
 'a BaF',
 'n Ger',
 't has',
 's pri',
 '. Cit',
 'e han',
 'd be ',
 '" Cit',
 'e was',
 'n in ',
 'm in ',
 'e BaF',
 '. Ger',
 '. The',
 'e in ',
 'a few',
 'w tra',
 't bac',
 'e spo',
 'e to ',
 'n Fra',
 '" "We',
 'l law',
 'y wit',
 'e tra',
 'n has',
 'l ter',
 'n sho',
 's Tra',
 'g on ',
 'p are',
 'o flo',
 'd at ',
 'h inc',
 'r jai',
 '. Its',
 'e are',
 'y reg',
 'n reg',
 's ($1',
 'h 4bn',
 'e wor',
 't is ',
 'e ful',
 'e que',
 'd it ',
 'f 11b',
 'a fin',
 'd bon',
 '" the',
 's ref',
 'n Cit',
 'n cri',
 'e cas',
 'y cou',
 'p ope',
 'n Jap',
 ', it ',
 'w the',
 'y lau',
 'n a f',
 ', whi',
 'f whe',
 'e ban',
 '. "Ge',
 'n con',
 'n sai',
 'e pub',
 ', the',
 'd tra',
 'g a c',
 'g BaF',
 't pur',
 'f BaF',
 'r the',
 'y inv',
 'a cri',
 'e bro',
 'p sai',
 'e a f',
 'e of ',
 'r act',


In [204]:
myrdd4.sortBy(lambda x: x[1], ascending=False).collect()

[(' the ', 20),
 (' and ', 19),
 ('. The', 19),
 ('ation', 18),
 ('ding ', 18),
 (' The ', 18),
 (' for ', 18),
 ('with ', 17),
 (' said', 17),
 (' of t', 17),
 (', the', 17),
 ('said ', 17),
 ('the s', 17),
 ('f the', 17),
 ('s of ', 17),
 ('n the', 17),
 (' with', 17),
 ('tion ', 16),
 ('that ', 16),
 (' that', 16),
 (' was ', 16),
 ('ing a', 16),
 ('r the', 16),
 (' has ', 16),
 ('of th', 16),
 (' in t', 16),
 ('d the', 16),
 ('s in ', 16),
 ('s the', 16),
 ('ing t', 16),
 ('ould ', 15),
 ('t the', 15),
 ('ed th', 15),
 ('e to ', 15),
 ('in th', 15),
 ('e of ', 15),
 ('ment ', 15),
 ('ions ', 15),
 ('s to ', 15),
 (' year', 15),
 ('at th', 14),
 ('e in ', 14),
 ('the c', 14),
 ('tions', 14),
 ('from ', 14),
 (' comp', 14),
 (' from', 14),
 ('s and', 14),
 ('ing i', 14),
 ('g to ', 14),
 ('ng to', 14),
 (', and', 13),
 (' to t', 13),
 (' are ', 13),
 (' had ', 13),
 (' have', 13),
 ('ting ', 13),
 ('e the', 13),
 (' than', 13),
 ('sing ', 13),
 ('have ', 13),
 (' also', 13),
 ('also 

In [205]:
myrdd6.sortBy(lambda x: x[1], ascending=False).collect()

[(-705421579459337137, 20),
 (-475976066617319205, 19),
 (980924546986107678, 19),
 (-4717207625903585721, 18),
 (8219245882170276375, 18),
 (-393193092546356490, 18),
 (8311970347216854600, 18),
 (1825223508648158238, 17),
 (-5338912559815858256, 17),
 (-214068003866350714, 17),
 (-3618109235835302921, 17),
 (6308644411083443552, 17),
 (2682192565469982723, 17),
 (806367320014385375, 17),
 (-1246585803336420909, 17),
 (7247390317063084048, 17),
 (-3072320191028527847, 17),
 (4743965778977030411, 16),
 (9180302799378482337, 16),
 (2066873783091702048, 16),
 (-6251668106499028571, 16),
 (1981489656097508306, 16),
 (4276116257135062250, 16),
 (7045479462970938923, 16),
 (291126114455165312, 16),
 (7513256988136061610, 16),
 (718126672855817433, 16),
 (2269165898748960130, 16),
 (1480000029518860140, 16),
 (-3078628531264291545, 16),
 (-7844857403159191422, 15),
 (-3701775004060843713, 15),
 (-6223698205833625020, 15),
 (6735444517280678911, 15),
 (6945306998720227451, 15),
 (-41055484650

In [206]:
print(myrdd.count())
print(myrdd2.count())
print(myrdd3.count())
print(myrdd4.count())
print(myrdd5.count())
print(myrdd6.count())
print(myrdd7.count())
n_shingles = myrdd7.count()

20
20
32443
19320
32443
19320
19320


In [207]:
prime_modulo = nextprime(n_shingles)
prime_modulo

19333

In [208]:
def vectorize(shingles, shingles_all):
    #return shingles
    return [shingle in shingles for shingle in shingles_all]

myrdd7_list = myrdd7.collect()
myrdd8 = myrdd2.map(lambda x: vectorize(x, shingles_all=myrdd7_list))

In [209]:
myrdd8.take(2)

[Stage 309:>                                                        (0 + 1) / 1]                                                                                

[[True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,


In [210]:
len(myrdd8.take(2)[0])

[Stage 311:>                                                        (0 + 1) / 1]                                                                                

19320

In [211]:
def compare_sets(vec, vec_all):
    ious = []
    vec_np = np.array(vec)
    
    for idx, vec_other in enumerate(vec_all):
        vec_other_np = np.array(vec_other)
        intersection = np.logical_and(vec_np, vec_other_np).sum()
        union = np.logical_or(vec_np, vec_other_np).sum()
        iou = intersection / union
        ious.append((idx, iou))
        
    return ious

In [212]:
vec_all = myrdd8.collect() 
myrdd9 = myrdd8.map(lambda x: compare_sets(x, vec_all=vec_all))

                                                                                

In [213]:
myrdd9.collect()

                                                                                

[[(0, 1.0),
  (1, 0.03728813559322034),
  (2, 0.04550499445061043),
  (3, 0.053242594075260205),
  (4, 0.0561389337641357),
  (5, 0.040865384615384616),
  (6, 0.060011709601873534),
  (7, 0.03534923339011925),
  (8, 0.06798442647499252),
  (9, 0.06030818278427205),
  (10, 0.07),
  (11, 0.057624960203756764),
  (12, 0.05756635642881765),
  (13, 0.060063897763578275),
  (14, 0.03870106761565836),
  (15, 0.05786951713969775),
  (16, 0.0552),
  (17, 0.05197215777262181),
  (18, 0.07321171387421987),
  (19, 0.05901445854234287)],
 [(0, 0.03728813559322034),
  (1, 1.0),
  (2, 0.0354295837023915),
  (3, 0.048434442270058706),
  (4, 0.03554040895813048),
  (5, 0.04878048780487805),
  (6, 0.03776084796290162),
  (7, 0.04970286331712588),
  (8, 0.0346218487394958),
  (9, 0.043802145411203815),
  (10, 0.03653506187389511),
  (11, 0.03468419131069733),
  (12, 0.03739445114595899),
  (13, 0.04005880191106211),
  (14, 0.05122367672168469),
  (15, 0.03295750216825672),
  (16, 0.0549558390578999),
  (

In [214]:
myrdd9.collect()[11]

                                                                                

[(0, 0.057624960203756764),
 (1, 0.03468419131069733),
 (2, 0.0670306339096148),
 (3, 0.06643132220795891),
 (4, 0.0678075556990636),
 (5, 0.03589557650471356),
 (6, 0.06704601682335477),
 (7, 0.04658176943699732),
 (8, 0.07470974255426552),
 (9, 0.0738362760834671),
 (10, 0.054917444364680545),
 (11, 1.0),
 (12, 0.07244723331431831),
 (13, 0.06281473628412404),
 (14, 0.059873949579831935),
 (15, 0.06455469216975493),
 (16, 0.06389776357827476),
 (17, 0.05678571428571429),
 (18, 0.1302539404553415),
 (19, 0.07860922146636433)]

In [215]:
data.take(12)[-1]

Row(path='data/bbc/business/077.txt', topic='business', text='China had role in Yukos split-up China lent Russia $6bn (£3.2bn) to help the Russian government renationalise the key Yuganskneftegas unit of oil group Yukos, it has been revealed. The Kremlin said on Tuesday that the $6bn which Russian state bank VEB lent state-owned Rosneft to help buy Yugansk in turn came from Chinese banks. The revelation came as the Russian government said Rosneft had signed a long-term oil supply deal with China. The deal sees Rosneft receive $6bn in credits from China\'s CNPC. According to Russian newspaper Vedomosti, these credits would be used to pay off the loans Rosneft received to finance the purchase of Yugansk. Reports said CNPC had been offered 20% of Yugansk in return for providing finance but the company opted for a long-term oil supply deal instead. Analysts said one factor that might have influenced the Chinese decision was the possibility of litigation from Yukos, Yugansk\'s former owner,

In [216]:
data.take(19)[-1]

Row(path='data/bbc/business/314.txt', topic='business', text='Yukos unit fetches $9bn at auction A little-known Russian company has bought the main production unit of oil giant Yukos at auction in Moscow. Baikal Finance Group outbid favourite Gazprom, the state-controlled gas monopoly, to buy Yuganskneftegas. Baikal paid 260.75bn roubles ($9.37bn: £4.8bn) for Yugansk - nowhere near the $27bn Russia says Yukos owes in taxes. Yukos reacted immediately by repeating its view that the auction was illegal in international and Russian law, and said Baikal had bought itself trouble. "The company considers that the victor of today\'s auction has bought itself a serious $9bn headache," said Yukos spokesman Alexander Shadrin. He said the company would continue to make "every lawful move" to protect tens of thousands of shareholders in Yukos from "this forcible and illegitimate removal of their property". Meanwhile, Tim Osborne, head of Yukos main shareholders\' group Menatep, said that Yukos may 

In [217]:
def minhash_func(x, param):
    # h(x) = (ax + b) % c
    a, b, c = param
    return (a * x + b) % c

def minhash_funcs(xs, params):
    signature = []
    
    for param in params:
        minhash_value_per_func = min([minhash_func(x, param) for x in xs])
        signature.append(minhash_value_per_func)
    
    return signature 

def minhash_param(c):
    return np.random.randint(1, c), np.random.randint(1, c), c

def minhash_params(n, c):
    return [minhash_param(c) for i in range(n)]

In [218]:
n = 100
params = minhash_params(n=n, c=prime_modulo)
params[:3]

[(5809, 5019, 19333), (7954, 624, 19333), (16333, 4594, 19333)]

In [219]:
myrdd10 = myrdd8.map(lambda x: list(np.argwhere(x).flatten()))

In [220]:
myrdd10.take(5)

                                                                                

[[0,
  1,
  2,
  3,
  4,
  5,
  6,
  7,
  8,
  9,
  10,
  11,
  12,
  13,
  14,
  15,
  16,
  17,
  18,
  19,
  20,
  21,
  22,
  23,
  24,
  25,
  26,
  27,
  28,
  29,
  30,
  31,
  32,
  33,
  34,
  35,
  36,
  37,
  38,
  39,
  40,
  41,
  42,
  43,
  44,
  45,
  46,
  47,
  48,
  49,
  50,
  51,
  52,
  53,
  54,
  55,
  56,
  57,
  58,
  59,
  60,
  61,
  62,
  63,
  64,
  65,
  66,
  67,
  68,
  69,
  70,
  71,
  72,
  73,
  74,
  75,
  76,
  77,
  78,
  79,
  80,
  81,
  82,
  83,
  84,
  85,
  86,
  87,
  88,
  89,
  90,
  91,
  92,
  93,
  94,
  95,
  96,
  97,
  98,
  99,
  100,
  101,
  102,
  103,
  104,
  105,
  106,
  107,
  108,
  109,
  110,
  111,
  112,
  113,
  114,
  115,
  116,
  117,
  118,
  119,
  120,
  121,
  122,
  123,
  124,
  125,
  126,
  127,
  128,
  129,
  130,
  131,
  132,
  133,
  134,
  135,
  136,
  137,
  138,
  139,
  140,
  141,
  142,
  143,
  144,
  145,
  146,
  147,
  148,
  149,
  150,
  151,
  152,
  153,
  154,
  155,
  156,
  157,
  15

In [221]:
len(myrdd10.take(3)[2])

                                                                                

1511

In [222]:
myrdd11 = myrdd10.map(lambda x: minhash_funcs(x, params))

In [223]:
myrdd11.take(2)

[Stage 325:>                                                        (0 + 1) / 1]                                                                                

[[29,
  7,
  195,
  9,
  74,
  28,
  11,
  1,
  2,
  26,
  21,
  15,
  1,
  11,
  9,
  7,
  9,
  12,
  39,
  90,
  15,
  14,
  21,
  13,
  130,
  3,
  2,
  12,
  37,
  15,
  10,
  13,
  22,
  23,
  1,
  19,
  5,
  23,
  1,
  18,
  27,
  10,
  19,
  4,
  17,
  55,
  20,
  2,
  5,
  7,
  9,
  14,
  14,
  15,
  2,
  1,
  10,
  3,
  1,
  3,
  4,
  30,
  3,
  25,
  12,
  7,
  16,
  5,
  15,
  4,
  4,
  20,
  4,
  14,
  1,
  8,
  37,
  46,
  9,
  2,
  1799,
  13,
  0,
  28,
  9,
  20,
  1,
  4,
  12,
  12,
  33,
  21,
  0,
  5,
  0,
  4,
  36,
  21,
  16,
  8],
 [59,
  25,
  156,
  8,
  82,
  20,
  15,
  2,
  2,
  7,
  25,
  4,
  35,
  3,
  43,
  1,
  9,
  16,
  29,
  78,
  12,
  13,
  6,
  14,
  160,
  11,
  39,
  12,
  27,
  9,
  2,
  4,
  3,
  9,
  14,
  20,
  78,
  9,
  18,
  20,
  23,
  9,
  21,
  16,
  23,
  45,
  4,
  71,
  81,
  4,
  4,
  23,
  9,
  5,
  0,
  24,
  9,
  20,
  1,
  81,
  30,
  1,
  11,
  17,
  7,
  12,
  25,
  15,
  15,
  2,
  7,
  32,
  3,
  61,
  78,
  11,
  34,
  5

In [224]:
def compare_signatures(vec, vec_all):
    similarities = []
    vec_np = np.array(vec)
    
    for idx, vec_other in enumerate(vec_all):
        vec_other_np = np.array(vec_other)
        
        similarities.append((idx, (vec_np == vec_other_np).sum() / vec_np.size))
        
    return similarities

In [225]:
vec_all_2 = myrdd11.collect() 
myrdd12 = myrdd11.map(lambda x: compare_signatures(x, vec_all=vec_all_2))

                                                                                

In [226]:
myrdd12.collect()[11]

                                                                                

[(0, 0.03),
 (1, 0.04),
 (2, 0.08),
 (3, 0.08),
 (4, 0.06),
 (5, 0.01),
 (6, 0.08),
 (7, 0.05),
 (8, 0.08),
 (9, 0.07),
 (10, 0.06),
 (11, 1.0),
 (12, 0.08),
 (13, 0.08),
 (14, 0.07),
 (15, 0.08),
 (16, 0.1),
 (17, 0.08),
 (18, 0.15),
 (19, 0.11)]

In [199]:
class Shingling():
    def __init__(self, k, df):
        # build k shingle map from corpus of docs
        pass
    
    def __call__(self, row):
        # get text from row and convert it to k-shingles
        pass
    
class CompareSets():
    def __init__(self,):
        pass
    
    def __call__(self, row):
        # compute Jaccard sim between two sets of shingles
        pass
    