In [1]:
orders_data = "recsys_challenge/yoochoose-buys.dat"
lines = sc.textFile(orders_data).map(lambda row: row.split(","))
lines.take(5)

[[u'420374', u'2014-04-06T18:44:58.314Z', u'214537888', u'12462', u'1'],
 [u'420374', u'2014-04-06T18:44:58.325Z', u'214537850', u'10471', u'1'],
 [u'281626', u'2014-04-06T09:40:13.032Z', u'214535653', u'1883', u'1'],
 [u'420368', u'2014-04-04T06:13:28.848Z', u'214530572', u'6073', u'1'],
 [u'420368', u'2014-04-04T06:13:28.858Z', u'214835025', u'2617', u'1']]

**yoochoose-buys.dat**
* Session ID - the id of the session. In one session there are one or many buying events. Could be represented as an integer number.
* Timestamp - the time when the buy occurred. Format of YYYY-MM-DDThh:mm:ss.SSSZ
* Item ID – the unique identifier of item that has been bought. Could be represented as an integer number.
* Price – the price of the item. Could be represented as an integer number.
* Quantity – the quantity in this buying.  Could be represented as an integer number.

In [2]:
from datetime import date

#calculate dateID, we need this to have itteger date for simplicity
def calcDateID(year, month, day):
    day1 = date(2006, 1, 1)
    dayWanted = date(year, month, day)
    delta = dayWanted - day1
    return delta.days + 1 # we need to add day one as first day not 0


In [3]:
orders = (lines
          .map(lambda row: row + [row[1][:10], calcDateID(int(row[1][:4]), int(row[1][5:7]), int(row[1][8:10]))])
          .cache()
          )
orders.take(3)

[[u'420374',
  u'2014-04-06T18:44:58.314Z',
  u'214537888',
  u'12462',
  u'1',
  u'2014-04-06',
  3018],
 [u'420374',
  u'2014-04-06T18:44:58.325Z',
  u'214537850',
  u'10471',
  u'1',
  u'2014-04-06',
  3018],
 [u'281626',
  u'2014-04-06T09:40:13.032Z',
  u'214535653',
  u'1883',
  u'1',
  u'2014-04-06',
  3018]]

# Dates

In [4]:
dates = orders.map(lambda row: (row[6], 1)).reduceByKey(lambda a, b: a+b)

In [6]:
dates.takeOrdered(10, lambda row: row[0])

[(3013, 7093),
 (3014, 5757),
 (3015, 6642),
 (3016, 4983),
 (3017, 6650),
 (3018, 11303),
 (3019, 7292),
 (3020, 5902),
 (3021, 6455),
 (3022, 5700)]

In [7]:
dates.count()

183

In [8]:
hist = dates.map(lambda row: row[1]).histogram(10)
hist

([32.0,
  2032.2,
  4032.4,
  6032.6,
  8032.8,
  10033.0,
  12033.2,
  14033.4,
  16033.6,
  18033.8,
  20034],
 [25, 13, 60, 38, 23, 10, 6, 1, 4, 3])

In [9]:
import matplotlib.pyplot as plt
plt.plot(hist[1])
plt.show()

# Top Sellers

In [10]:
orders.take(2)

[[u'420374',
  u'2014-04-06T18:44:58.314Z',
  u'214537888',
  u'12462',
  u'1',
  u'2014-04-06',
  3018],
 [u'420374',
  u'2014-04-06T18:44:58.325Z',
  u'214537850',
  u'10471',
  u'1',
  u'2014-04-06',
  3018]]

In [11]:
train_end = 3100
train_period = 90
train_start = train_end - train_period

In [11]:
import operator
top_sellers = (orders
               .filter(lambda row: train_end >= row[6] and row[6] >= train_start)
               .map(lambda row: (row[2], 1))
               .reduceByKey(operator.add)
               )
top_sellers.takeOrdered(30, lambda row: -row[1])

[(u'643078800', 15203),
 (u'214829878', 7358),
 (u'214829887', 4476),
 (u'214834880', 3500),
 (u'214834877', 3446),
 (u'214821277', 3201),
 (u'214829882', 3160),
 (u'214829880', 3139),
 (u'214826610', 3046),
 (u'214831946', 2779),
 (u'214831948', 2643),
 (u'214835019', 2479),
 (u'214835167', 2427),
 (u'214835109', 2392),
 (u'214821305', 2151),
 (u'214836932', 2055),
 (u'214829885', 1997),
 (u'214845131', 1889),
 (u'214839973', 1884),
 (u'214835017', 1867),
 (u'214826990', 1803),
 (u'214844439', 1764),
 (u'214836924', 1710),
 (u'214829765', 1691),
 (u'214839313', 1665),
 (u'214821309', 1664),
 (u'214820261', 1620),
 (u'214835747', 1604),
 (u'214834867', 1601),
 (u'214821285', 1588)]

# KNN

In [12]:
prod_session = (orders
                .filter(lambda row: train_end >= row[6] and row[6] >= train_start)
                .map(lambda row: (row[2], row[0]))
                .groupByKey()
                )
prod_session.take(2)

[(u'214697848', <pyspark.resultiterable.ResultIterable at 0x7f97db3a8550>),
 (u'214697840', <pyspark.resultiterable.ResultIterable at 0x7f97dab0c8d0>)]

In [13]:
prod_session.count()

14591

In [26]:
prod_session_1 = prod_session.sample(withReplacement=False, fraction=0.1)

In [27]:
prod_session_1.count()

1483

In [58]:
compare = prod_session.cartesian(prod_session)
#compare = prod_session_1.cartesian(prod_session_1)

In [59]:
compare.take(2)

[((u'214697848', <pyspark.resultiterable.ResultIterable at 0x7f97a809e0d0>),
  (u'214697848', <pyspark.resultiterable.ResultIterable at 0x7f97a8097850>)),
 ((u'214697848', <pyspark.resultiterable.ResultIterable at 0x7f97a8097990>),
  (u'214697840', <pyspark.resultiterable.ResultIterable at 0x7f97a8097810>))]

In [60]:
compare.count()

212897281

In [61]:
import math

def cossim(row):
    p1 = row[0][0]
    p2 = row[1][0]
    if p1 == p2:
        return (p1, p2, 1.0)
    s1 = row[0][1]
    s2 = row[1][1]
    l1 = len(s1)
    l2 = len(s2)
    if l1 == 0 or l2 == 0:
        retunr (p1, p2, 0)
    numerator = 0
    for sess1 in s1:
        if sess1 in s2: 
            numerator = numerator+1
    if numerator == 0:
        return (p1, p2, 0.0)
    denominator1 = math.sqrt(l1)
    denominator2 =  math.sqrt(len(s2))

    return (p1, p2, float(numerator)/ (denominator1 * denominator2))

In [62]:
threshold = 0.1
sim = compare.map(cossim).filter(lambda row: row[2] > threshold and row[0] != row[1])
sim.take(10)

[(u'214697840', u'214697837', 0.35355339059327373),
 (u'214680457', u'214533245', 0.16666666666666669),
 (u'214612672', u'214532107', 0.43835616438356173),
 (u'214511092', u'214675950', 0.4999999999999999),
 (u'214516042', u'214534880', 0.2886751345948129),
 (u'214612672', u'214744768', 0.3333654881912033),
 (u'214684260', u'214712402', 0.13736056394868904),
 (u'214712790', u'214690708', 0.11952286093343936),
 (u'214648466', u'214837260', 0.21320071635561041),
 (u'214531584', u'214600824', 0.1315587028960544)]

In [63]:
knn_sim_grouped = (sim
               .map(lambda row: (row[0], (row[1], row[2])))
               .groupByKey()
              )
knn_sim_grouped.take(2)

[(u'214539285', <pyspark.resultiterable.ResultIterable at 0x7f97a80ab710>),
 (u'214677596', <pyspark.resultiterable.ResultIterable at 0x7f97a80ab150>)]

In [64]:
max = 10
def sort_tuples(row):
    prod = row[0]
    spl = row[1]
    spl_sorted = sorted(spl, key=(lambda t: -1*t[1]))
    out = " ".join("%s:%g" % (p,s) for (p, s) in spl_sorted[:max])
        
    return (prod, out)
           

In [65]:
res = knn_sim_grouped.map(sort_tuples)
res.take(30)

[(u'214539285', u'214539287:0.25'),
 (u'214677596',
  u'214678074:0.341882 214678076:0.218218 214652977:0.209657 214704157:0.142857 214677902:0.142857 214698013:0.133631 214845596:0.13159 214698470:0.125988 214677836:0.104828'),
 (u'214691668',
  u'214530793:0.316228 214691666:0.282843 214691353:0.158114 214842446:0.129099 214691638:0.119523 214849262:0.119523 214838720:0.111803 214849269:0.111803'),
 (u'214746335', u'214830037:1'),
 (u'214691660',
  u'214691402:0.377964 214691608:0.338062 214691664:0.283473 214838756:0.239046 214691662:0.188982 214691677:0.155936 214691392:0.154303 214843912:0.14825 214691683:0.137505 214691634:0.129641'),
 (u'214663852', u'214510447:0.150756'),
 (u'214535591', u'214697351:0.258199 214587840:0.19245'),
 (u'214819389', u'214710950:0.105409'),
 (u'214716752', u'214576693:0.213201 214561644:0.100504'),
 (u'214741631', u'214826850:0.57735 214834854:0.152499'),
 (u'214533489', u'214589780:0.174078'),
 (u'214819385', u'214557913:0.144338 214725532:0.14072')

In [66]:
knn_model = "knn_model.txt"
def store_knn(knn, path):
    with open(path, "w") as out:
        for p, simp in knn.iteritems():
            out.write("%s\t%s\n" % (p, simp))
store_knn(res.collectAsMap(), knn_model)            


# Apriori

In [12]:
orders.take(2)

[[u'420374',
  u'2014-04-06T18:44:58.314Z',
  u'214537888',
  u'12462',
  u'1',
  u'2014-04-06',
  3018],
 [u'420374',
  u'2014-04-06T18:44:58.325Z',
  u'214537850',
  u'10471',
  u'1',
  u'2014-04-06',
  3018]]

In [13]:
agg_sessions = (orders
               .filter(lambda row: train_end >= row[6] and row[6] >= train_start)
               .map(lambda row: (row[0], row[2]))
               .groupByKey()
               )
agg_sessions.take(2)

[(u'2797118', <pyspark.resultiterable.ResultIterable at 0x7f72d04adb10>),
 (u'2522798', <pyspark.resultiterable.ResultIterable at 0x7f72d04adf50>)]

In [18]:
import operator
def prod_rel(row):
    prods = row[1]
    for p1 in prods:
        for p2 in prods:
            if p1 != p2:
                yield((int(p1), int(p2)), 1)
                
prod_rel = (agg_sessions
            .flatMap(prod_rel)
            .reduceByKey(operator.add)
           )
prod_rel.take(5)

[((214832015, 214839975), 4),
 ((214670070, 214832726), 1),
 ((214583784, 214832214), 1),
 ((214717860, 214696132), 1),
 ((214844345, 214839975), 1)]

In [19]:
prod_counts = (prod_rel
               .map(lambda row: (row[0][0], row[1]))
               .reduceByKey(operator.add)
              )
prod_counts.take(5)

[(214827020, 602),
 (214827022, 229),
 (214827024, 257),
 (214827026, 879),
 (214827028, 204)]

In [20]:
prod_counts.count()

12424

In [27]:
prod_rel_count1 = (prod_rel
                  .map(lambda row: (row[0][0], (row[0][1], row[1])))
                  .join(prod_counts)
                 )
prod_rel_count1.take(2)

[(214646784, ((214646780, 1), 1)), (214827020, ((214702920, 1), 602))]

In [28]:
prod_rel_count2 = (prod_rel_count1
                  .map(lambda row: (row[1][0][0], (row[0], row[1][1], row[1][0][1])))
                  .join(prod_counts)
                 )
prod_rel_count2.take(5)

[(214646784, ((214646780, 9, 1), 1)),
 (214843404, ((214842568, 71, 1), 6)),
 (214843404, ((214843933, 24, 1), 6)),
 (214843404, ((214843402, 44, 1), 6)),
 (214843404, ((214691647, 11, 1), 6))]

In [30]:
prod_rel_count = (prod_rel_count2
                  .map(lambda row: (row[0], row[1][1], row[1][0][0], row[1][0][1], row[1][0][2]))
                 )
prod_rel_count.take(30)

[(214646784, 1, 214646780, 9, 1),
 (214843404, 6, 214842568, 71, 1),
 (214843404, 6, 214843933, 24, 1),
 (214843404, 6, 214843402, 44, 1),
 (214843404, 6, 214691647, 11, 1),
 (214843404, 6, 214843935, 32, 1),
 (214843404, 6, 214842579, 19, 1),
 (214835214, 45, 214835212, 19, 1),
 (214835214, 45, 214697308, 19, 1),
 (214835214, 45, 214835572, 871, 1),
 (214835214, 45, 214836532, 380, 1),
 (214835214, 45, 214829392, 1065, 1),
 (214835214, 45, 214748300, 1342, 1),
 (214835214, 45, 214699284, 10, 1),
 (214835214, 45, 214717048, 501, 1),
 (214835214, 45, 214716240, 34, 1),
 (214835214, 45, 214743508, 26, 1),
 (214835214, 45, 214848120, 15, 1),
 (214835214, 45, 214827009, 1503, 1),
 (214835214, 45, 214820345, 185, 1),
 (214835214, 45, 214753513, 1301, 1),
 (214835214, 45, 214829825, 417, 1),
 (214835214, 45, 214544357, 254, 1),
 (214835214, 45, 214716941, 267, 1),
 (214835214, 45, 214839973, 6019, 1),
 (214835214, 45, 214537181, 12, 1),
 (214835214, 45, 214821309, 2440, 1),
 (214835214, 45, 

In [31]:
prod_rel_count.count()

388880

In [34]:
total_count = prod_counts.map(lambda row: row[1]).reduce(lambda a, b: a+b)
total_count

1123442

In [37]:
def apriori(row):
    # p: product, c:count, sup: support, conf: confidence, l: lift
    p1c = row[1]
    p2c = row[3]
    p1p2c = row[4]
    
    p1sup = float(p1c)/total_count
    p2sup = float(p2c)/total_count
    p1p2sup = float(p1p2c)/total_count
    
    p1conf = p1p2sup/p1sup
    p1lift = p1p2sup/(p1sup*p2sup)
    return (row[0], p1c, row[2], p2c, p1p2c, p1conf, p1lift)
  
    

In [61]:
prod_apriori = prod_rel_count.map(apriori)
prod_apriori.take(10)

[(214646784, 1, 214646780, 9, 1, 1.0, 124826.88888888889),
 (214843404, 6, 214842568, 71, 1, 0.16666666666666666, 2637.1877934272297),
 (214843404, 6, 214843933, 24, 1, 0.16666666666666666, 7801.680555555555),
 (214843404, 6, 214843402, 44, 1, 0.16666666666666666, 4255.462121212121),
 (214843404, 6, 214691647, 11, 1, 0.16666666666666666, 17021.848484848484),
 (214843404, 6, 214843935, 32, 1, 0.16666666666666666, 5851.260416666667),
 (214843404, 6, 214842579, 19, 1, 0.16666666666666666, 9854.754385964912),
 (214835214, 45, 214835212, 19, 1, 0.02222222222222222, 1313.9672514619883),
 (214835214, 45, 214697308, 19, 1, 0.02222222222222222, 1313.9672514619883),
 (214835214, 45, 214835572, 871, 1, 0.02222222222222222, 28.662890674830972)]

In [62]:
path_apriori = "apriori_out.txt"
(prod_apriori
 .map(lambda row: "%d\t%d\t%d\t%d\t%d\t%g\t%g" % (row[0], row[1], row[2], row[3], row[4], row[5], row[6]))
 .saveAsTextFile(path_apriori)
 )

In [63]:
prod_apriori.filter(lambda row: row[5]> 0.9 and row[6]> 1 and row[4] >1).take(10)

[(214712514, 9, 214712516, 10, 9, 1.0, 112344.20000000003),
 (214835730, 4, 214587662, 4, 4, 1.0, 280860.5),
 (214713300, 4, 214836924, 4290, 4, 1.0, 261.8745920745921),
 (214836234, 4, 214835694, 80, 4, 1.0, 14043.025000000001),
 (214639680, 4, 214510689, 9, 4, 1.0, 124826.88888888889),
 (214607154, 4, 214607156, 4, 4, 1.0, 280860.5),
 (214640100, 4, 214530457, 73, 4, 1.0, 15389.616438356165),
 (214591470, 2, 214591468, 2, 2, 1.0, 561721.0),
 (214709940, 16, 214622237, 164, 16, 1.0, 6850.256097560975),
 (214712694, 2, 214669768, 6, 2, 1.0, 187240.33333333334)]