In [1]:
import findspark
findspark.init()

import pyspark
import random

# just to test if spark is working correctly.

sc = pyspark.SparkContext(appName="Pi")
num_samples = 10000
def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1
count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print(pi)
sc.stop()



3.112


In [4]:
# open the pandas dataframes.
import pandas as pd

user_prod = pd.read_pickle('user_prod.pkl')
results = pd.read_pickle('results.pkl')

In [5]:
user_prod.head()

Unnamed: 0,customer,product_id,num_trades
0,0,256,2
1,0,257,3
2,0,259,1
3,0,261,2
4,0,262,1


In [6]:
results.head()

Unnamed: 0,product_id,customer,sector,risk,maturity_time,yield_premium_round,num_trades
19,12,102,A,HIGH,1,1.9,1
20,12,421,A,HIGH,1,1.9,1
21,12,557,A,HIGH,1,1.9,1
22,12,931,A,HIGH,1,1.9,6
23,12,960,A,HIGH,1,1.9,1


In [7]:

# create a SparkSession.
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
spark


In [10]:
# load pandas into spark dataframes.
user_prod_spark = spark.createDataFrame(user_prod)
results_spark = spark.createDataFrame(results)

In [11]:
user_prod_spark.show()

+--------+----------+----------+
|customer|product_id|num_trades|
+--------+----------+----------+
|       0|       256|         2|
|       0|       257|         3|
|       0|       259|         1|
|       0|       261|         2|
|       0|       262|         1|
|       0|       279|         1|
|       0|       322|         1|
|       0|       324|         2|
|       0|       325|         3|
|       0|       326|         3|
|       0|       328|         2|
|       0|       329|         1|
|       0|       330|         2|
|       0|      1275|         1|
|       1|        14|         1|
|       1|        18|         1|
|       1|        21|         1|
|       1|        23|         1|
|       1|        24|         1|
|       1|        25|         2|
+--------+----------+----------+
only showing top 20 rows



In [12]:
results_spark.show()

+----------+--------+------+----+-------------+-------------------+----------+
|product_id|customer|sector|risk|maturity_time|yield_premium_round|num_trades|
+----------+--------+------+----+-------------+-------------------+----------+
|        12|     102|     A|HIGH|            1|                1.9|         1|
|        12|     421|     A|HIGH|            1|                1.9|         1|
|        12|     557|     A|HIGH|            1|                1.9|         1|
|        12|     931|     A|HIGH|            1|                1.9|         6|
|        12|     960|     A|HIGH|            1|                1.9|         1|
|        12|    1002|     A|HIGH|            1|                1.9|         2|
|        13|     182|     A|HIGH|            1|                2.0|         1|
|        13|     204|     A|HIGH|            1|                2.0|         1|
|        13|     557|     A|HIGH|            1|                2.0|         1|
|        13|     931|     A|HIGH|            1|     

In [103]:
from pyspark.ml.recommendation import ALS

als = ALS(maxIter=5, regParam=1, rank=5, userCol="customer", itemCol="product_id", ratingCol="num_trades",
          coldStartStrategy="drop", implicitPrefs=True, alpha=40)

model = als.fit(user_prod_spark)

In [99]:
userRecs = model.recommendForAllUsers(2)

In [25]:
userRecs.show(truncate=False)

+--------+-------------------------------------+
|customer|recommendations                      |
+--------+-------------------------------------+
|471     |[[769,1.0666696], [768,1.0638285]]   |
|1342    |[[705,0.9800867], [704,0.9799158]]   |
|463     |[[798,0.6076645], [923,0.6061254]]   |
|833     |[[1686,1.1420965], [1687,1.1418743]] |
|496     |[[275,1.0792761], [287,1.0721302]]   |
|148     |[[1781,0.26237535], [1778,0.2621636]]|
|1088    |[[1255,1.0374148], [1251,1.0330113]] |
|1238    |[[740,1.1012335], [752,1.0791321]]   |
|540     |[[769,1.0827347], [768,1.0767851]]   |
|1460    |[[1301,1.0922663], [1303,1.0851476]] |
|392     |[[1669,1.1441243], [1668,1.1384263]] |
|243     |[[1669,1.0145273], [1667,1.0125757]] |
|1483    |[[751,0.4335033], [740,0.4238279]]   |
|1084    |[[1279,1.0281405], [1278,1.0262922]] |
|1025    |[[302,1.1886566], [303,1.181824]]    |
|1395    |[[1669,1.0120323], [1667,1.0112952]] |
|737     |[[1202,1.0823245], [1209,1.064706]]  |
|897     |[[786,0.85

In [104]:
productRecs = model.recommendForAllItems(50)

In [43]:
productRecs.show()

+----------+--------------------+
|product_id|     recommendations|
+----------+--------------------+
|      1580|[[421,1.0608202],...|
|       471|[[1002,0.97791654...|
|      1591|[[265,1.0995704],...|
|      1342|[[223,0.34076113]...|
|       463|[[1026,1.1134721]...|
|       833|[[223,1.0465591],...|
|      1645|[[591,1.039318], ...|
|       496|[[1072,1.1807432]...|
|       148|[[1249,1.1105018]...|
|      1088|[[481,0.6451014],...|
|      1238|[[1433,0.9798046]...|
|      1829|[[334,1.0966368],...|
|       540|[[1468,1.1133763]...|
|      1460|[[1138,1.0802062]...|
|      1721|[[479,1.0086329],...|
|       392|[[1249,0.38186893...|
|      1522|[[846,0.61655897]...|
|       243|[[823,1.2624805],...|
|       623|[[953,1.0530143],...|
|      1483|[[764,1.0253494],...|
+----------+--------------------+
only showing top 20 rows



In [105]:
product_recs = productRecs.toPandas()

In [106]:
product_recs.head()

Unnamed: 0,product_id,recommendations
0,1580,"[(338, 1.1182212829589844), (410, 1.1166957616..."
1,471,"[(1376, 1.133520245552063), (846, 1.0583708286..."
2,1591,"[(338, 1.1322026252746582), (450, 1.1061625480..."
3,1342,"[(223, 0.20942384004592896), (1376, 0.19572058..."
4,463,"[(303, 1.1950074434280396), (346, 1.1897130012..."


In [107]:
# function that returns whether the customer that traded the product was in the top_N recommendation list.
def recommended(row, product_recs, top_N):
    prod_id = row['product_id']
    cust_id = row['customer']
    
    rec = product_recs[product_recs['product_id'] == prod_id]['recommendations']
    
    rec_list = []
    for elem in rec.iloc[0][:top_N]:
        rec_list.append(elem['customer'])    
    
    # test if cust_id is in the recommended list
    return np.sum(np.isin(rec_list, cust_id))

In [109]:
# test out top N recommendations and record results.
perc_rec = []
for i in range(1,51):
    results['recommended'] = results.apply(recommended, product_recs=product_recs, top_N = i, axis=1)
    tot_trades = results['num_trades'].sum()
    tot_trades_recommended = results[results['recommended'] == 1]['num_trades'].sum()
    perc_rec.append(tot_trades_recommended/tot_trades)
    print(i)
    
perc_rec

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50


[0.0020979557723632704,
 0.005969792848188168,
 0.011223210554593755,
 0.01906069573671508,
 0.026898180918836403,
 0.032492729645138455,
 0.04045813895972095,
 0.046538799389375475,
 0.05259387499253776,
 0.05865747887119745,
 0.06404734898556164,
 0.0691472577330138,
 0.07501471127523303,
 0.0817179358161986,
 0.08831882105119523,
 0.09399012425697399,
 0.10030104812505863,
 0.10552888100497199,
 0.11055203527294746,
 0.11793752185370596,
 0.12396701263037602,
 0.12952744825468843,
 0.13562516523533777,
 0.1402560188304323,
 0.14620875512762566,
 0.1513598335280623,
 0.15759400291667022,
 0.16241247857270782,
 0.1679643859215228,
 0.17295342708750863,
 0.17790835515150483,
 0.18415958109110755,
 0.19057284426516113,
 0.1960991667874839,
 0.2017875265442575,
 0.2079449414533887,
 0.21284869986440041,
 0.2181618154992879,
 0.22424247592894242,
 0.23305218451776866,
 0.24016476628261,
 0.24509410952011393,
 0.25054367756296,
 0.2561211697382672,
 0.26308877081965254,
 0.2703292767169550

In [110]:
from pyspark.ml.recommendation import ALS

als = ALS(maxIter=5, regParam=1, rank=5, userCol="customer", itemCol="product_id", ratingCol="num_trades",
          coldStartStrategy="drop")

model = als.fit(user_prod_spark)

productRecs = model.recommendForAllItems(50)
product_recs = productRecs.toPandas()


# test out top N recommendations and record results.
perc_rec = []
for i in range(1,51):
    results['recommended'] = results.apply(recommended, product_recs=product_recs, top_N = i, axis=1)
    tot_trades = results['num_trades'].sum()
    tot_trades_recommended = results[results['recommended'] == 1]['num_trades'].sum()
    perc_rec.append(tot_trades_recommended/tot_trades)
    print(i)
    
perc_rec


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50


[0.08208465166258731,
 0.11960906385119865,
 0.1399660574635203,
 0.16281330752108616,
 0.1904108070307103,
 0.20302412649138218,
 0.212840171588903,
 0.22405485386799937,
 0.2354059885550543,
 0.24853953282106825,
 0.2603000247319989,
 0.26800105750616166,
 0.27651227645257853,
 0.2854754940003582,
 0.2946945598130602,
 0.30393921045225447,
 0.31045481293227695,
 0.3172774333302063,
 0.32306813239294885,
 0.3287309073232302,
 0.33360055263225225,
 0.33848725449226913,
 0.3431778060158455,
 0.3480730361513598,
 0.3535396607452007,
 0.3590830398185183,
 0.36445585338188763,
 0.3690867069769822,
 0.37430601158139815,
 0.379917616858695,
 0.3847275642392352,
 0.39195101358554285,
 0.3962919058137254,
 0.40148562559164913,
 0.4058862157483135,
 0.41050001279241327,
 0.41509675328551815,
 0.41935236275872656,
 0.4237444246398936,
 0.4290063706217966,
 0.4340124683387772,
 0.43865185020936914,
 0.44219108454079503,
 0.44571326232122604,
 0.4495595145705587,
 0.4537810109417775,
 0.4578234135