In [1]:
import pyspark
import math
sc = pyspark.SparkContext.getOrCreate()

In [2]:
data = sc.textFile("user-ct-test-collection-01.txt")
data.take(5)

[u'AnonID\tQuery\tQueryTime\tItemRank\tClickURL',
 u'142\trentdirect.com\t2006-03-01 07:17:12\t\t',
 u'142\twww.prescriptionfortime.com\t2006-03-12 12:31:06\t\t',
 u'142\tstaple.com\t2006-03-17 21:19:29\t\t',
 u'142\tstaple.com\t2006-03-17 21:19:45\t\t']

In [None]:
# format the table : [ query, (user id, date & hour) ]

In [3]:
headers = data.first()
logs = data.filter(lambda line: line != headers)
logs = logs.map(lambda line: line.split("\t"))
logs = logs.map(lambda line: (line[1], (int(line[0]),line[2].split(":")[0])))
logs = logs.filter(lambda line: len(line[0]) > 1).distinct()
logs.take(5)

[(u'what is crime', (3195885, u'2006-03-01 18')),
 (u'wwwf.farmfresh.com', (804397, u'2006-03-01 18')),
 (u'who wants to be a millionaire', (723360, u'2006-03-18 10')),
 (u'sands of iwo jima', (1720447, u'2006-04-02 14')),
 (u'www myspace com', (8356618, u'2006-05-20 03'))]

In [None]:
# our divide to transactions means that every query in the same transaction execute by the same user and same date & hour
#the format of the table now: [query, list of transactions key the query appear]


In [4]:
query_with_user_IDs = logs.groupByKey().map(lambda row:(row[0],list(row[1])))
query_with_user_IDs.take(5)

[(u'billion automotive',
  [(569010, u'2006-05-03 12'), (569010, u'2006-05-23 13')]),
 (u'azprobonolawyer', [(7426445, u'2006-03-08 14')]),
 (u'toyato dealers', [(2631908, u'2006-05-18 20')]),
 (u'www.lesmills.usa',
  [(1034839, u'2006-05-30 19'),
   (1034839, u'2006-03-26 06'),
   (1034839, u'2006-04-01 04')]),
 (u'www.woindsorpilates.com', [(10509979, u'2006-05-11 17')])]

In [None]:
# we remove query that not appear in at least 20 transactions to get more realistic result

In [5]:
minTrans = 20

In [6]:
query_with_user_IDs = query_with_user_IDs.filter(lambda row: len(row[1]) > minTrans)
query_with_user_IDs.take(5)

[(u'yahoo',
  [(8046906, u'2006-03-30 21'),
   (2437511, u'2006-04-26 17'),
   (22906667, u'2006-05-21 01'),
   (943253, u'2006-04-03 19'),
   (16113909, u'2006-04-24 22'),
   (1214053, u'2006-03-04 19'),
   (13885626, u'2006-04-18 07'),
   (4924411, u'2006-04-06 06'),
   (10147903, u'2006-03-23 14'),
   (693071, u'2006-03-10 15'),
   (2468346, u'2006-03-14 09'),
   (22715157, u'2006-05-12 23'),
   (2350791, u'2006-03-31 07'),
   (1871121, u'2006-03-15 17'),
   (5427160, u'2006-03-07 17'),
   (10648757, u'2006-05-12 12'),
   (22676176, u'2006-05-29 23'),
   (2281742, u'2006-05-02 17'),
   (11215238, u'2006-03-08 00'),
   (1198843, u'2006-04-09 20'),
   (1454084, u'2006-05-19 15'),
   (15564332, u'2006-03-30 07'),
   (3872195, u'2006-04-04 23'),
   (3134849, u'2006-05-31 19'),
   (1658966, u'2006-03-23 17'),
   (11542410, u'2006-03-08 20'),
   (1467914, u'2006-03-05 23'),
   (2353729, u'2006-05-14 07'),
   (10100047, u'2006-05-23 16'),
   (22344191, u'2006-04-27 13'),
   (2713235, u'200

In [None]:
#here we create a table with all the possible query combinations
#format [(query x, transaction keys list for x), (query y, transaction keys list for y)]

In [7]:
combinations = query_with_user_IDs.cartesian(query_with_user_IDs).filter(lambda pair: pair[0][0] != pair[1][0])
combinations.take(5)

[((u'yahoo',
   [(8046906, u'2006-03-30 21'),
    (2437511, u'2006-04-26 17'),
    (22906667, u'2006-05-21 01'),
    (943253, u'2006-04-03 19'),
    (16113909, u'2006-04-24 22'),
    (1214053, u'2006-03-04 19'),
    (13885626, u'2006-04-18 07'),
    (4924411, u'2006-04-06 06'),
    (10147903, u'2006-03-23 14'),
    (693071, u'2006-03-10 15'),
    (2468346, u'2006-03-14 09'),
    (22715157, u'2006-05-12 23'),
    (2350791, u'2006-03-31 07'),
    (1871121, u'2006-03-15 17'),
    (5427160, u'2006-03-07 17'),
    (10648757, u'2006-05-12 12'),
    (22676176, u'2006-05-29 23'),
    (2281742, u'2006-05-02 17'),
    (11215238, u'2006-03-08 00'),
    (1198843, u'2006-04-09 20'),
    (1454084, u'2006-05-19 15'),
    (15564332, u'2006-03-30 07'),
    (3872195, u'2006-04-04 23'),
    (3134849, u'2006-05-31 19'),
    (1658966, u'2006-03-23 17'),
    (11542410, u'2006-03-08 20'),
    (1467914, u'2006-03-05 23'),
    (2353729, u'2006-05-14 07'),
    (10100047, u'2006-05-23 16'),
    (22344191, u'2006

In [None]:
## calculate the confidence for each combinations

In [8]:
combinations = combinations.map(lambda pair:(pair[0][0],len(pair[0][1]),pair[1][0],len(list(set.intersection(set(pair[0][1]),set(pair[1][1]))))))
combinations.take(5)

[(u'yahoo', 9459, u'dinosaurs', 0),
 (u'yahoo', 9459, u'www.target.com', 0),
 (u'yahoo', 9459, u'rewards', 0),
 (u'yahoo', 9459, u'va lottery', 0),
 (u'yahoo', 9459, u'irl', 0)]

In [9]:
combinations = combinations.map(lambda row:(row[0],row[2],float(row[3]) / float(row[1])))
combinations.take(5)

[(u'yahoo', u'dinosaurs', 0.0),
 (u'yahoo', u'www.target.com', 0.0),
 (u'yahoo', u'rewards', 0.0),
 (u'yahoo', u'va lottery', 0.0),
 (u'yahoo', u'irl', 0.0)]

In [None]:
# filter all the low confidences for the output

In [10]:
output06 = combinations.filter(lambda row: row[2] > 0.6)
output06.take(5)

[(u'answers .com', u'find a grave', 0.673469387755102),
 (u'nyse hl', u'nyse wit', 0.6692913385826772),
 (u'barely 19 gallery', u'lightspeedgirls', 0.8085106382978723),
 (u'philadelphia.craigslist.org cas', u'karate', 0.9642857142857143),
 (u'hqgal', u'toolbar', 0.7560975609756098)]

In [11]:
output08 = output06.filter(lambda row: row[2] > 0.8)
output08.take(5)

[(u'barely 19 gallery', u'lightspeedgirls', 0.8085106382978723),
 (u'philadelphia.craigslist.org cas', u'karate', 0.9642857142857143),
 (u'bt store', u'btstore', 0.9615384615384616),
 (u'pierpoint landing', u'free republic', 0.8875),
 (u'lightspeedgirls', u'barely 19 gallery', 0.8085106382978723)]

In [12]:
output09 = output08.filter(lambda row: row[2] > 0.9)
output09.take(5)

[(u'philadelphia.craigslist.org cas', u'karate', 0.9642857142857143),
 (u'bt store', u'btstore', 0.9615384615384616),
 (u'latest lottery results', u'florida lottery', 0.9565217391304348),
 (u'craglist', u'craigslist', 0.9583333333333334),
 (u'planters bank', u'suntrust bank', 0.9545454545454546)]

In [None]:
# write output

In [13]:
output06 = output06.repartition(1)
output08 = output08.repartition(1)
output09 = output09.repartition(1)

In [14]:
output06.saveAsTextFile("output/output06")
output08.saveAsTextFile("output/output08")
output09.saveAsTextFile("output/output09")