In [1]:
from pyspark.context import SparkContext
from pyspark.sql.functions import count, col, length, asc, udf, lit
from pyspark.sql.types import IntegerType, StringType
import math

In [2]:
def levenshtein(s, t):
        ''' From Wikipedia article; Iterative with two matrix rows. '''
        if s == t: return 0
        elif len(s) == 0: return len(t)
        elif len(t) == 0: return len(s)
        v0 = [None] * (len(t) + 1)
        v1 = [None] * (len(t) + 1)
        for i in range(len(v0)):
            v0[i] = i
        for i in range(len(s)):
            v1[0] = i + 1
            for j in range(len(t)):
                cost = 0 if s[i] == t[j] else 1
                v1[j + 1] = min(v1[j] + 1, v0[j + 1] + 1, v0[j] + cost)
            for j in range(len(v0)):
                v0[j] = v1[j]
                
        return v1[len(t)]
    
def levenshtein_threshhold(distance_threshhold, confidence):
    return distance_threshhold * (1- math.pow(confidence, 2))

In [3]:
print("Load the data")
users_queries_search_main_df = spark.read.option("header", "true") \
    .option("delimiter", "\t") \
    .csv("user-ct-test-collection-01.txt")

Load the data


In [4]:
print("Define global variables")
n1 = 2 # min num of rows to display
n2 = 20 # max num of rows to display
min_num_of_queries = 6
min_num_of_queries_pair = 3
min_num_of_chars_in_query = 2
empty_queries = ['-', 'null']
confidences = [0.6, 0.8, 0.9, 1]
levenshtein_distance_threshhold = 15
display_rules_num_of_records_threshhold = 200
stop_queries = ['.com', 'google', 'google.com', 'www.google.com']

Define global variables


In [5]:
users_queries_df = users_queries_search_main_df.select('AnonID', 'Query')\
                    .drop_duplicates(subset=['AnonID', 'Query'])\
                    .filter((col('Query').isin(empty_queries) == False) & (col('Query').isin(stop_queries) == False) & (length(col("Query")) >= min_num_of_chars_in_query))\
                    .select(col('AnonID').alias('user'), col('Query').alias('query'))

num_of_rows = users_queries_df.count()
print("number of rows in the dataset, after initial filtering & dropping duplicates is: " + repr(num_of_rows))
users_queries_df.show(n1, truncate=False)

print("repartition users_queries_df by user column")
users_queries_df.repartition('user')

print("cache users_queries_df")
users_queries_df.cache()

number of rows in the dataset, after initial filtering & dropping duplicates is: 1642464
+----+----------------------------+
|user|query                       |
+----+----------------------------+
|1337|michael keaton date of birth|
|2334|disneychanne.com            |
+----+----------------------------+
only showing top 2 rows

repartition users_queries_df by user column
cache users_queries_df


DataFrame[user: string, query: string]

In [6]:
queries_count_df = users_queries_df.groupBy('query').agg(count("*").alias("count_query"))\
                    .filter("count_query > " + repr(min_num_of_queries))

num_of_queries = queries_count_df.count()
print("number of queries, after filtering is: " + repr(num_of_queries))
queries_count_df.show(n1, truncate=False)

print("repartition queries_count_df by query column")
queries_count_df.repartition('query')

print("cache queries_count_df")
queries_count_df.cache()

number of queries, after filtering is: 12591
+------------------+-----------+
|query             |count_query|
+------------------+-----------+
|www.capitalone.com|112        |
|black pussy       |21         |
+------------------+-----------+
only showing top 2 rows

repartition queries_count_df by query column
cache queries_count_df


DataFrame[query: string, count_query: bigint]

In [7]:
users_queries_df2 = users_queries_df.select(col('user').alias('user2'), col('query').alias('query2'))
users_queries_df2.repartition('user2')
users_queries_df2.cache()
filter_condition = "count_2_queries > " + repr(min_num_of_queries_pair)\
                     + " and levenshtein(query, query2) >= " + repr(levenshtein_threshhold(levenshtein_distance_threshhold,confidences[0]))
queries_pair_count_df = users_queries_df2.join(users_queries_df,on=[col('user') == col('user2'), col('query') > col('query2')], how=('cross'))\
                                .select(col('user'), col('query'), col('query2'))\
                                .groupBy('query', 'query2').agg(count("*").alias("count_2_queries"))\
                                .filter(filter_condition)

# Do not cache queries_pair_count_df, since it contains a cartesian join, which is too large to be cached.

num_of_pairwise_queries = queries_pair_count_df.count()
print("number of pairwise queries is: " + repr(num_of_pairwise_queries))
queries_pair_count_df.show(n2, truncate=False)

print("repartition queries_pair_count_df by query column")
queries_pair_count_df.repartition('query')

print("Free memory")
users_queries_df.unpersist()
users_queries_df2.unpersist()

number of pairwise queries is: 16608
+------------------+-----------------------+---------------+
|query             |query2                 |count_2_queries|
+------------------+-----------------------+---------------+
|maps              |kelly blue book        |11             |
|myspace.com       |area codes             |11             |
|usps              |ticketmaster           |12             |
|verizon           |kelly blue book        |9              |
|yahoo messenger   |limewire               |9              |
|yellow pages      |adobe                  |4              |
|msn.com           |bank of america        |8              |
|yahoo mail        |goggle.com             |4              |
|florida lottery   |dillards               |5              |
|nba               |ask jeeves             |5              |
|ups tracking      |mapquest               |10             |
|lowes             |burlington coat factory|10             |
|reverse directory |mapquest               |11  

DataFrame[user2: string, query2: string]

In [8]:
print("Add similarity column to result data frame")
func_levenshtein_udf = udf(levenshtein, IntegerType())
result_df = queries_pair_count_df.join(queries_count_df, on='query', how='inner')\
            .withColumn('similarity',func_levenshtein_udf(queries_pair_count_df['query'], queries_pair_count_df['query2']))

queries_pair_count_df.unpersist()

Add similarity column to result data frame


DataFrame[query: string, query2: string, count_2_queries: bigint]

In [9]:
result_df = result_df.withColumn("count_2_queries", result_df["count_2_queries"].cast(StringType()))
result_df = result_df.withColumn("count_query", result_df["count_query"].cast(StringType()))
result_df = result_df.withColumn("similarity", result_df["similarity"].cast(StringType()))
result_df.cache()

DataFrame[query: string, query2: string, count_2_queries: string, count_query: string, similarity: string]

In [10]:
num_of_results = result_df.count()
print("number of results is: " + repr(num_of_results))
result_df.show(n2, truncate=False)

number of results is: 16573
+-----------------+--------------------+---------------+-----------+----------+
|query            |query2              |count_2_queries|count_query|similarity|
+-----------------+--------------------+---------------+-----------+----------+
|aruba            |american idol       |4              |28         |10        |
|ask jeeves.com   |american idol.com   |6              |171        |12        |
|ask jeeves.com   |aaa                 |4              |171        |13        |
|badcock furniture|badcock             |6              |11         |10        |
|goody's          |american idol       |4              |10         |13        |
|medicare         |bank of america     |5              |82         |12        |
|medicare         |continental airlines|7              |82         |16        |
|medicare         |american airlines   |4              |82         |10        |
|movie times      |http                |5              |62         |10        |
|movie times

In [11]:
for i in range(len(confidences) - 1):
    conf = confidences[i]
    next_conf = confidences[i + 1]
    filter_results_condition = 'count_2_queries / count_query > ' + repr(conf)\
                                + ' and count_2_queries / count_query <= ' + repr(next_conf)
    
    if i > 0:
        filter_results_condition += ' and levenshtein(query, query2) >= ' + repr(levenshtein_threshhold(levenshtein_distance_threshhold,conf))
    
    current_result_df = result_df.filter(filter_results_condition)
    current_result_df.orderBy(['count_2_queries', 'count_query'], ascending=False)
    file_name = "results_conf" + repr(conf) + "_to_" + repr(next_conf) + ".txt"
    current_result_df.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").mode("overwrite").save(file_name)
    
    confidence_count = current_result_df.count()
    if confidence_count < display_rules_num_of_records_threshhold:
        str_rule = 'rule'
        if(confidence_count != 1):
            str_rule += 's'
        print("\n\n============================================")
        print(repr(confidence_count) + " " + str_rule + " with confidence between " + repr(conf) + " and " + repr(next_conf))
        print("============================================\n")
        current_result_list = current_result_df.rdd.collect()
        j = 1
        for item in current_result_list:
           print('{index:d}) {q1} ==> {q2}, conf={confidence:.3f}, #q1={q1_count}, #(q1 and q2)={combined_count}, similarity={similarity}\n'.format(index = j, q1 = item[0], q2 = item[1], confidence = int(item[2])/int(item[3]),  q1_count = item[3], combined_count = item[2], similarity = item[4]))
           j += 1
    else:
        print("\n\n============================================================")
        print("number of results with confidence between " + repr(conf) + " and " + repr(next_conf) + " is: " + repr(confidence_count))
        print("============================================================\n")
        current_result_dict.show(n2, truncate=False)



12 rules with confidence between 0.6 and 0.8

1) saks fifth ave ==> neiman marcus, conf=0.625, #q1=8, #(q1 and q2)=5, similarity=14

2) www.continental airlines ==> mapquest, conf=0.625, #q1=8, #(q1 and q2)=5, similarity=22

3) www.cis.ohio-state.edu ==> mime, conf=0.733, #q1=15, #(q1 and q2)=11, similarity=20

4) bombay kids ==> american idol, conf=0.625, #q1=8, #(q1 and q2)=5, similarity=10

5) www.ghana.gov.gh ==> ghana, conf=0.625, #q1=8, #(q1 and q2)=5, similarity=11

6) mortgage loans ==> ebay, conf=0.714, #q1=7, #(q1 and q2)=5, similarity=12

7) tmobil ==> myspace.com, conf=0.625, #q1=8, #(q1 and q2)=5, similarity=11

8) destiny's child ==> ciara, conf=0.667, #q1=9, #(q1 and q2)=6, similarity=14

9) elliot yamin ==> american idol, conf=0.750, #q1=8, #(q1 and q2)=6, similarity=12

10) server.tmrack.com ==> myspace.com, conf=0.667, #q1=9, #(q1 and q2)=6, similarity=11

11) northern tool ==> mapquest, conf=0.667, #q1=15, #(q1 and q2)=10, similarity=11

12) myspace music videos ==

In [12]:
print("Free memory")
result_df.unpersist()

Free memory


DataFrame[query: string, query2: string, count_2_queries: string, count_query: string, similarity: string]