In [249]:
import re
import string
import nltk

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import MinHashLSH, CountVectorizer
from pyspark.ml.linalg import Vectors

In [250]:
spark = SparkSession.builder.appName('minhash').getOrCreate()
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /home/nestor/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

## 1. Lectura del dataset yahoo_answers

In [471]:
schema = StructType([
    StructField("category", StringType(), True),
    StructField("title",    StringType(), True),
    StructField("content",  StringType(), True),
    StructField("answer",   StringType(), True),
])
cols_to_select = ['category', 'title', 'content']
df_yahoo = spark.read.csv('yahoo_answers_csv/test.csv', schema = schema)
df_yahoo = df_yahoo.select(cols_to_select)
df_yahoo = df_yahoo.na.fill('')
df_yahoo.printSchema()

root
 |-- category: string (nullable = false)
 |-- title: string (nullable = false)
 |-- content: string (nullable = false)



In [472]:
df_yahoo.show(20, truncate = False)

+--------+------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+
|category|title                                                                                                 |content                                                                                          |
+--------+------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+
|9       |What makes friendship click?                                                                          |How does the spark keep going?                                                                   |
|2       |Why does Zebras have stripes?                                                                         |What is the purpose or those stripes? W

## Limpieza

In [498]:
def build_document(row, question_words=False):
    """
        Function that handles the cleaning of a document's row
        by removing special characters, stop words and question words

        Parameters:
        -----------
        row: str

        question_words: Boolean

        Returns:
        --------
        list with the same number of columns as row
    """
    if len(row) == 1:
        return [clean_column(row[0], question_words)]
    if len(row) > 2:
        columns = [row[0], row[1], row[2]]
    else:
        columns = [row[0], row[1]]
    str_ = ""
    for idx, c in enumerate(row[1:]):
        str_ += c
    columns.append(clean_column(str_, question_words))
    return columns

def clean_column(column, question_words=False):
    """
        Function that removes unwanted characters, stop words and
        question words if asked

        Parameters:
        -----------
        column : str

        question_words : Boolean 
            Indicates if the function should remove question words from
            the column

        Returns:
        --------
        cleaned column: str
    """
    if column == '' or column == ' ' or column is None:
        return ''
    column = column.strip().lower().replace("\"", ' ').replace(r"\n", ' ')
    column = column.replace("<br />", ' ').replace("-", ' ').replace("/", ' ')
    column = column.replace("<br>", ' ').replace("..", ' ').replace("?", ' ')
    column = column.replace(",", ' ')
    column = re.sub('[^A-Za-z0-9 ]+', '', column).strip().split(' ')
    # Stop words
    stop_words = nltk.corpus.stopwords.words('english') + [
        'a', 'e', 'i', 'o', 'u', ''
    ]
    question_words = [
        'what', 'when', 'where', 'which', 'who', 'why', 
        'how', 'will', 'whos', 'whats', 'can'
    ]
    if not question_words:
        for qw in question_words: 
            if qw in stop_words: stop_words.remove(qw)
    column = [word for word in column if word not in stop_words]
    return column

In [474]:
if len(cols_to_select) > 2:
    cols = ['category', 'question', 'content', 'document']
else:
    cols = ['category', 'question', 'document']
df_yahoo = df_yahoo.rdd \
    .map(lambda x: build_document(x, question_words=False)) \
        .toDF(cols)

In [475]:
df_yahoo.show(10, truncate=False)

+--------+------------------------------------------------------------------------------------+------------------------------------------------------------------------------------+--------------------------------------------------------------+
|category|question                                                                            |content                                                                             |document                                                      |
+--------+------------------------------------------------------------------------------------+------------------------------------------------------------------------------------+--------------------------------------------------------------+
|9       |What makes friendship click?                                                        |How does the spark keep going?                                                      |[makes, friendship, click, spark, keep, going]                |
|2       |Why does Zebra

## CountVectorizer

In [463]:
def count_vectorize(df, input_col, output_col):
    """
        Function that vectorizes a Spark Data Frame

        Parameters:
        -----------
        df : Spark's Data Frame

        input_col : str
            Represents the column name inside df that will be vectorized

        outpu_col : str
            Name of the column that will hold the Sparse Vectors
        
        Returns:
        --------
        df_vectorized : Sparks' Data Frame Vectorized
    """
    cv = CountVectorizer(inputCol=input_col, outputCol=output_col)
    model = cv.fit(df)
    df_vectorized = model.transform(df).rdd.filter(lambda x: x[-1].numNonzeros() != 0).toDF([df.columns[0], df.columns[1], df.columns[2], input_col, output_col])
    return (df_vectorized, model)

In [464]:
df_yahoo_cv, model_cv = count_vectorize(df_yahoo, input_col="document", output_col="features")
df_yahoo_cv.show(5, truncate=True)

+--------+--------------------+--------------------+--------------------+--------------------+
|category|            question|             content|            document|            features|
+--------+--------------------+--------------------+--------------------+--------------------+
|       9|What makes friend...|How does the spar...|[makes, friendshi...|(69795,[35,129,25...|
|       2|Why does Zebras h...|What is the purpo...|[zebras, stripes,...|(69795,[65,1096,1...|
|       4|What did the itsy...|                    |[itsy, bitsy, sip...|(69795,[8817,2620...|
|       4|What is the diffe...|                    |[difference, bach...|(69795,[277,482,2...|
|       3|Why do women get ...|                    |   [women, get, pms]|(69795,[2,119,554...|
+--------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



## MinHash

In [330]:
def min_hash_lsh(df_vectorized, input_col, output_col, num_hash_tables=None, seed=None):
    """
        Function that applys MinHash LSH algorithm to a a Spark Data Frame

        Parameters:
        -----------
        df_vectorized : Spark's Data Frame

        input_col : str
            Represents the column name inside df that will be vectorized

        outpu_col : str
            Name of the column that will hold the Sparse Vectors

        num_hash_tables : int
            Represents the number of hash tables to use

        seed : int
        
        Returns:
        --------
        (df_min_hash, model)
            model : Spark's MinHashLSH model
            df_min_hash : Spark's Data Frame
    """
    mh = MinHashLSH(inputCol=input_col, outputCol=output_col)
    if seed is not None: mh.setSeed(seed)
    if num_hash_tables is not None: mh.setNumHashTables(num_hash_tables)
    model = mh.fit(df_vectorized)
    df_min_hash = model.transform(df_vectorized)
    return (df_min_hash, model)

In [331]:
df_yahoo_mh, model = min_hash_lsh(df_yahoo_cv, input_col="features", output_col="hashes", seed = 12345)
df_yahoo_mh.show(5, truncate=True)

+--------+--------------------+--------------------+--------------------+--------------------+----------------+
|category|            question|             content|            document|            features|          hashes|
+--------+--------------------+--------------------+--------------------+--------------------+----------------+
|       9|What makes friend...|How does the spar...|[makes, friendshi...|(69795,[35,128,25...| [[3.8806879E7]]|
|       2|Why does Zebras h...|What is the purpo...|[zebras, stripes,...|(69795,[65,1095,1...|[[4.67236097E8]]|
|       4|What did the itsy...|                    |[itsy, bitsy, sip...|(69795,[8349,2131...|[[2.43597044E8]]|
|       4|What is the diffe...|                    |[difference, bach...|(69795,[275,482,2...|[[6.31433282E8]]|
|       3|Why do women get ...|                    |   [women, get, pms]|(69795,[2,119,539...|[[5.86621142E8]]|
+--------+--------------------+--------------------+--------------------+--------------------+----------

## Distancias

In [332]:
def similarity_distances(model, distance, threshold, df_min_hashed):
    """
        Finds similarity matrix given a distance method to use

        Parameters:
        ----------
        model : Spark's MinHashLSH model

        distance : str
            -> distance = "JaccardDistance"
            -> distance = "EuclideanDistance"

        threshold : int

        df_min_hashed : Spark's MinHashLSH DataFrame

        Returns:
        --------
        df : Spark's DataFrame
    """
    questions_similarity = model.approxSimilarityJoin(
            datasetA = df_min_hashed, datasetB = df_min_hashed,
            threshold=threshold, distCol = distance
        ).select(
            col("datasetA.category").alias("Category A"),
            col("datasetA.question").alias("Original Q A"),
            col("datasetB.category").alias("Category B"),
            col("datasetB.question").alias("Original Q B"),
            col(distance)
        ).where(
            col(distance) > 0
        )
    return questions_similarity.orderBy(asc(col(distance)))

In [333]:
q_sim_df = similarity_distances(model, distance="JaccardDistance", threshold=0.5, df_min_hashed=df_yahoo_mh)
q_sim_df.show(500, truncate=False)

+----------+--------------------------------------------------------------------------------------------------------------+----------+--------------------------------------------------------------------------------------------------------------+--------------------+
|Category A|Original Q A                                                                                                  |Category B|Original Q B                                                                                                  |JaccardDistance     |
+----------+--------------------------------------------------------------------------------------------------------------+----------+--------------------------------------------------------------------------------------------------------------+--------------------+
|4         |As it was ok to shoot men for saying they were frightened in the first world war then is it ok to shoot both s|4         |As it was ok to shoot men for saying they were frightened in the 

In [334]:
q_sim_df = similarity_distances(model, distance="EuclideanDistance", threshold=0.5, df_min_hashed=df_yahoo_mh)
q_sim_df.show(500, truncate=False)

+----------+--------------------------------------------------------------------------------------------------------------+----------+--------------------------------------------------------------------------------------------------------------+--------------------+
|Category A|Original Q A                                                                                                  |Category B|Original Q B                                                                                                  |EuclideanDistance   |
+----------+--------------------------------------------------------------------------------------------------------------+----------+--------------------------------------------------------------------------------------------------------------+--------------------+
|4         |As it was ok to shoot men for saying they were frightened in the first world war then is it ok to shoot both s|4         |As it was ok to shoot men for saying they were frightened in the 

## MinHash Entire Pipeline + Category Analysis

In [465]:
schema = StructType([
    StructField("category", StringType(), True),
    StructField("title",    StringType(), True),
    StructField("content",  StringType(), True),
    StructField("answer",   StringType(), True),
])

In [466]:
# 0. Document building
cols_to_select = ['category', 'title'] + ['content']
question_words = False

# 1. Data reading
df_yahoo = spark.read.csv('yahoo_answers_csv/test.csv', schema = schema)
df_yahoo = df_yahoo.select(cols_to_select)
df_yahoo = df_yahoo.na.fill('')

# 2. Data cleaning
if len(cols_to_select) > 2:
    cols = ['category', 'question', 'content', 'document']
else:
    cols = ['category', 'question', 'document']
df_yahoo = df_yahoo.rdd \
    .map(lambda x: build_document(x, question_words=False)) \
        .toDF(cols)

# 3. CounVectorizer
df_yahoo_cv, model_cv = count_vectorize(
    df_yahoo, input_col="document", 
    output_col="features"
)

# 4. MinHash
(df_yahoo_mh, model) = min_hash_lsh(
    df_yahoo_cv, input_col="features", 
    output_col="hashes", seed = 12345
)

# 5. Similarity - Distances
distance = "JaccardDistance"
q_sim_df = similarity_distances(
    model, distance=distance, threshold=0.5, 
    df_min_hashed=df_yahoo_mh
)

In [467]:
q_sim_df.show(20, truncate=True)

+----------+--------------------+----------+--------------------+--------------------+
|Category A|        Original Q A|Category B|        Original Q B|     JaccardDistance|
+----------+--------------------+----------+--------------------+--------------------+
|         4|As it was ok to s...|         4|As it was ok to s...|0.050000000000000044|
|         4|As it was ok to s...|         4|As it was ok to s...|0.050000000000000044|
|         4|help on Handmaid'...|         4|help om Handmaid'...| 0.05555555555555558|
|         4|help om Handmaid'...|         4|help on Handmaid'...| 0.05555555555555558|
|         1|can ahuman forgiv...|         1|can ahuman forgiv...|  0.0714285714285714|
|         1|can ahuman forgiv...|         1|can ahuman forgiv...|  0.0714285714285714|
|         9|"How many licks d...|         8|How many licks do...|               0.125|
|         9|how many licks do...|         8|How many licks do...|               0.125|
|         7|"""How many licks...|         9

## NearestNeighbors

In [570]:
def find_nn(query, n, model_cv, df_yahoo_mh):
    """
        Function that finds 'n' nearest neighbors
        given a base query

        Parameters:
        -----------
        query : str
        
        n : int 
            number of nearest neighbors
        
        model_cv : Spark's CountVectorizer model

        df_minhash : Spark's MinHashLSH dataframe

        Return:
        -------
        neigbors : list
    """
    queries = [[query]]
    df_queries = spark.sparkContext.parallelize(queries) \
        .toDF(["question"])
    df_queries = df_queries.rdd \
        .map(build_document) \
            .toDF(["document"])
    df_queries_cv = model_cv.transform(df_queries)
    features = [row.features for row in df_queries_cv.collect()][0]
    most_sim_questions = model.approxNearestNeighbors(df_yahoo_mh, features, n) \
        .select("category", "question", "content", "distCol") \
            .where(col("distCol") > 0) \
                .orderBy(asc(col("distCol"))) \
                    .collect()
    return most_sim_questions

In [576]:
q1 = "will pakistan beat india in cricket"
most_sim_questions = find_nn(q1, n=5, model_cv=model_cv, df_yahoo_mh=df_yahoo_mh)

In [577]:
print(f"Base question:\n\t{q1}")
print("---------------------------------\n\n")
i = 1
for row in most_sim_questions:
    print(f"{i}. {row.question}\n\t{row.distCol}\n\t{row.content}")
    i += 1

Base question:
	will pakistan beat india in cricket
---------------------------------


1. will pakistan beat india in criket matches?
	0.5
	
2. Why India and Pakistan are always against each other?
	0.6
	
3. watch online cricket match between India & Pakistan for free?
	0.625
	
4. Between india and pakistan, which batsman has the most aggregate runs in test cricket?
	0.625
	
5. If best batsman & bowlers of Pakistan & India played in 1 team?Can they beat Australia?
	0.7272727272727273
	


## DataSketch: MinHashLSHForest

In [225]:
from datasketch import MinHashLSHForest, MinHash

In [30]:
df_yahoo = df_yahoo.where(df_yahoo.category == "6")
df_yahoo.show(5, truncate=False)

+--------+---------------------------------------------+
|category|title                                        |
+--------+---------------------------------------------+
|6       |[what, best, road, motorcycle, trail]        |
|6       |[why, doesnt, nba, implement, minor, leagues]|
|6       |[formula1, car]                              |
|6       |[how, can, chicago, cubs, break, curse]      |
|6       |[why, 5, rings, olympics, symbol]            |
+--------+---------------------------------------------+
only showing top 5 rows



In [31]:
q1 = "will pakistan beat india in cricket"
data1 = q1.split(" ")

In [32]:
ms = []
for row in df_yahoo.collect():
    data = row.title
    m = MinHash(num_perm=128)
    for d in data:
        m.update(d.encode('utf8'))
    ms.append(m)

m1 = MinHash(num_perm=128)
for d in data1:
    m1.update(d.encode('utf8'))
ms.append(m1)

KeyboardInterrupt: 

In [150]:
forest = MinHashLSHForest(num_perm=128)
id_ = 1
for m in ms[:-1]:
    forest.add("m"+str(id_), m)
    id_ += 1

In [151]:
forest.index()

In [152]:
"m5999" in forest

True

In [153]:
# Using m1 as the query, retrieve top 2 keys that have the higest Jaccard
result = forest.query(m1, 10)
print("Top 2 candidates", result)

Top 2 candidates ['m3642', 'm2819', 'm1', 'm257', 'm2485', 'm3709', 'm574', 'm4002', 'm442', 'm2394']


In [166]:
o = 1
for row in df_yahoo.collect():
    if o == 442:
        print(row.title)
        break
    o += 1

['who', 'will', 'next', 'captain', 'pakistan', 'cricket', 'teem']
