In [1]:
import numpy as np
from scipy import spatial
import time
import pandas as pd
import pickle
import collections
from collections import OrderedDict
from operator import itemgetter
from pyspark.sql.functions import udf,col,cos,sin
from pyspark.sql.types import *
from pyspark.sql.dataframe import *
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import monotonically_increasing_id


In [2]:
np.__version__

'1.16.2'

In [3]:
#spark.sparkContext.getConf().getAll()

In [4]:
#known products and their PCAs - master for nneighb search
obj_sql = "select * from advanlwork.rgx_knowns" 
obj_ds = spark.sql(obj_sql)

In [5]:
#customer dimensions - looking for recommendation for these
cust_sql = "select account, pca_device as pca1,pca_viewing as pca2,pca_tickets as pca3,pca_truckroll as pca4,pca_spectra as pca5,pca_network as pca6,pca_costs as pca7,pca_ivr as pca8,pca_pscs as pca9,pca_consumer as pca10,pca_census as pca11,pca_email as pca12,pca_call as pca13 from jberry003.customer_pca limit 100" 
cust_ds = spark.sql(cust_sql)

In [6]:
#product weights - decided by business
prod_sql = "select * from advanlwork.rgx_product" 
prod_ds = spark.sql(prod_sql)

In [7]:
prod_ds_pd=prod_ds.toPandas()
prod_ds_pd.head()

Unnamed: 0,prod,w_stats,w_rev,w_season
0,BOXING EVENT,1.0,135.0,"[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, ..."
1,PRO WRESTLING EVENT,1.0,160.0,"[0.0, 0.0, 0.0, 2.4, 2.4, 0.0, 2.4, 0.0, 2.4, ..."
2,HISTORY VAULT,1.0,5.0,"[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, ..."
3,GAIAM TV FIT & YOGA,1.0,7.0,"[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, ..."
4,UP FAITH & FAMILY,1.0,6.0,"[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, ..."


In [8]:
#index of current month
index_curr=pd.to_datetime('today').month-1
index_curr

4

In [9]:
#get seasonal weight for current month
prod_ds_pd['w_season1']=prod_ds_pd['w_season'].apply(lambda x:x[index_curr])
prod_ds_pd.drop(['w_season'],inplace=True,axis=1)
prod_ds_pd.head()

Unnamed: 0,prod,w_stats,w_rev,w_season1
0,BOXING EVENT,1.0,135.0,1.0
1,PRO WRESTLING EVENT,1.0,160.0,2.4
2,HISTORY VAULT,1.0,5.0,1.0
3,GAIAM TV FIT & YOGA,1.0,7.0,1.0
4,UP FAITH & FAMILY,1.0,6.0,1.0


In [10]:
#make min season weight as 1
prod_ds_pd['w_season1']=prod_ds_pd['w_season1'].apply(lambda x:x if x>=1 else 1) 

#weight computation
prod_ds_pd['decision_weight']=prod_ds_pd['w_stats']*prod_ds_pd['w_rev']*prod_ds_pd['w_season1'] 

In [11]:
#create weighted dictionary
weight_dict={}
for i in range(prod_ds_pd.shape[0]):
         weight_dict[prod_ds_pd['prod'][i]] = prod_ds_pd['decision_weight'][i]


In [12]:
weight_dict

{u'ACORN SVOD': 5.0,
 u'AMC PREMIERE SVOD': 5.0,
 u'BOXING EVENT': 135.0,
 u'DISNEY FAMILY MOVIES': 5.0,
 u'DOGTV': 5.0,
 u'EROS NOW': 8.0,
 u'FX+ SVOD': 6.0,
 u'GAIA': 9.0,
 u'GAIAM TV FIT & YOGA': 7.0,
 u'HISTORY VAULT': 5.0,
 u'LIFETIME MOVIE CLUB': 4.0,
 u'MLB EXTRA INNINGS': 120.0,
 u'MLS DIRECT KICK': 5.0,
 u'NBA LEAGUE PASS': 120.0,
 u'NHL CENTER ICE': 150.0,
 u'PANTAYA SVOD': 6.0,
 u'PRO WRESTLING EVENT': 384.0,
 u'STINGRAY KARAOKE': 7.0,
 u'UP FAITH & FAMILY': 6.0,
 u'URBAN MOVIE CHANNEL SVOD': 5.0,
 u'XFI PODS': 120.0}

In [13]:
obj_ds.show(5)

+----------------+------------+--------------------+
|  account_number|        prod|       customer_dims|
+----------------+------------+--------------------+
|8993110620320548|BOXING EVENT|[0.35335969508809...|
|8155100350728428|BOXING EVENT|[0.22663833854761...|
|8499051000009476|BOXING EVENT|[0.0, 0.432564544...|
|8155200510682273|BOXING EVENT|[0.14254242837012...|
|8155600230220983|BOXING EVENT|[0.15717488814089...|
+----------------+------------+--------------------+
only showing top 5 rows



In [14]:
cust_ds.show(5)

+----------------+-------------------+-------------------+--------------------+----+--------------------+----+--------------------+--------------------+------------------+-------------------+-------------------+--------------------+--------------------+
|         account|               pca1|               pca2|                pca3|pca4|                pca5|pca6|                pca7|                pca8|              pca9|              pca10|              pca11|               pca12|               pca13|
+----------------+-------------------+-------------------+--------------------+----+--------------------+----+--------------------+--------------------+------------------+-------------------+-------------------+--------------------+--------------------+
|8499053543493100| 0.3349447369424126|0.24640695696173107|0.002523993689563562| 0.0| 0.22205805321331099| 0.0|0.002738667288396...|0.002039927582795...|0.7392743833390959|                0.0|0.24824282911567036|0.031115239890390705| 3.468

In [15]:
obj_ds_pd=obj_ds.toPandas()

In [16]:
obj_ds_pd[['pca1','pca2','pca3','pca4','pca5','pca6','pca7','pca8','pca9','pca10','pca11','pca12','pca13']] = pd.DataFrame(obj_ds_pd.customer_dims.values.tolist(), index= obj_ds_pd.index)

In [17]:
obj_ds_pd.drop(['customer_dims'],inplace=True,axis=1)

In [18]:
obj_ds_pd.head()

Unnamed: 0,account_number,prod,pca1,pca2,pca3,pca4,pca5,pca6,pca7,pca8,pca9,pca10,pca11,pca12,pca13
0,8993110620320548,BOXING EVENT,0.35336,0.004388,0.006781,0.0,0.240028,0.0,0.001009,0.00123,0.638791,0.0,0.731868,0.001205,0.000225
1,8155100350728428,BOXING EVENT,0.226638,0.224098,0.001349,0.0,0.159882,0.0,0.00322,0.000435,0.107812,0.0,0.389141,0.006321,0.000111
2,8499051000009476,BOXING EVENT,0.0,0.432565,0.004603,0.0,0.0,0.0,0.003728,0.002936,0.235647,0.22016,0.304523,0.0,0.000156
3,8155200510682273,BOXING EVENT,0.142542,0.216289,0.000302,0.0,0.102269,0.0,0.001537,0.000338,0.12614,0.0,0.24502,0.014248,2.8e-05
4,8155600230220983,BOXING EVENT,0.157175,0.392259,0.000194,0.0,0.102438,0.0,0.001557,0.0,0.564833,0.0,0.416608,0.017176,0.0


In [19]:
#dictionary for looking for products of nneighb
d_prod_indexing = {} 
for i in range(obj_ds_pd.shape[0]):
         d_prod_indexing[i] = obj_ds_pd['prod'][i]

In [20]:
d_prod_indexing[0]

u'BOXING EVENT'

In [21]:
#kd-tree creation for querying
start=time.time()
obj=np.array(obj_ds_pd[['pca1','pca2','pca3','pca4','pca5','pca6','pca7','pca8','pca9','pca10','pca11','pca12','pca13']])
tree = spatial.cKDTree(obj,leafsize=4)

print("Done! Overall Took " + str(round(((time.time())-start)/60)) + " minutes")

Done! Overall Took 0.0 minutes


In [22]:
#sample query
random_cust=[0,0.1,0.05,0.04,0.01,0.06,0.08,0.09,0.001,0.1,0.1,0.1,0.1]
tree.query(random_cust,2)

(array([0.25167995, 0.25340626]), array([13333,  8946]))

In [23]:
#fuction for freq weight
def func_freq(inp):
    d_tmp={}
    for k,v in inp.items():
        d_tmp[k]=v/5.0 #used 5.0 instead of 5 because Python 2.0 returns integer instead of float while dividing
    return d_tmp

#weighted results, sorting and restring to top n
def weighted_fetch(dict1,dict2,limit):
    d1={}
    for k,v in dict1.items():
        d1[k]=dict2[k]*v
    sorted_d1 = OrderedDict(sorted(d1.items(), key = itemgetter(1), reverse = True))
    l=[]
    [l.extend([k]) for k,v in sorted_d1.items()]
    l1=l[0:limit]  
    return l1

In [24]:
#all magic happens here - search nneighb, get products and their freq weights, multiple with product weight dict, get results
start=time.time()
kdt_b = sc.broadcast(tree)
pcas= cust_ds.select('pca1','pca2','pca3','pca4','pca5','pca6','pca7','pca8','pca9','pca10','pca11','pca12','pca13').rdd.map(lambda l: (l[0],l[1],l[2],l[3],l[4],l[5],l[6],l[7],l[8],l[9],l[10],l[11],l[12]))
                                                                                                                                      
t = pcas.map(lambda x: kdt_b.value.query(x,5)[1]) #returns index of nearest neighbour
t1=t.map(lambda x:[d_prod_indexing[i] for i in x.tolist()]) #map index to corresponding product
t2=t1.map(lambda x:(collections.Counter(x))) #frequency of products
t3=t2.map(lambda x:func_freq(x)) # freq pct
t4=t2.map(lambda x:weighted_fetch(x,weight_dict,2)) #return top2 based on weights - freq weight*product weights

print("Done! Took " + str(round(((time.time())-start)/60)) + " minutes")

Done! Took 0.0 minutes


In [25]:
t3.take(5)

[{u'AMC PREMIERE SVOD': 0.2,
  u'BOXING EVENT': 0.2,
  u'NHL CENTER ICE': 0.2,
  u'STINGRAY KARAOKE': 0.2,
  u'XFI PODS': 0.2},
 {u'FX+ SVOD': 0.2,
  u'GAIA': 0.2,
  u'PRO WRESTLING EVENT': 0.2,
  u'STINGRAY KARAOKE': 0.2,
  u'URBAN MOVIE CHANNEL SVOD': 0.2},
 {u'EROS NOW': 0.4, u'GAIA': 0.2, u'NBA LEAGUE PASS': 0.4},
 {u'GAIA': 0.2,
  u'MLS DIRECT KICK': 0.2,
  u'STINGRAY KARAOKE': 0.2,
  u'URBAN MOVIE CHANNEL SVOD': 0.2,
  u'XFI PODS': 0.2},
 {u'ACORN SVOD': 0.2,
  u'EROS NOW': 0.2,
  u'MLS DIRECT KICK': 0.2,
  u'XFI PODS': 0.4}]

In [26]:
t4.take(5)

[[u'URBAN MOVIE CHANNEL SVOD', u'EROS NOW'],
 [u'PRO WRESTLING EVENT', u'BOXING EVENT'],
 [u'NBA LEAGUE PASS', u'GAIAM TV FIT & YOGA'],
 [u'NHL CENTER ICE', u'AMC PREMIERE SVOD'],
 [u'EROS NOW', u'GAIAM TV FIT & YOGA']]

In [27]:
df_recomend=t4.zipWithIndex().toDF()

In [28]:
df_recomend.show(5,False)

+----------------------------------------+---+
|_1                                      |_2 |
+----------------------------------------+---+
|[PRO WRESTLING EVENT, MLB EXTRA INNINGS]|0  |
|[LIFETIME MOVIE CLUB, EROS NOW]         |1  |
|[BOXING EVENT, PANTAYA SVOD]            |2  |
|[GAIA, STINGRAY KARAOKE]                |3  |
|[URBAN MOVIE CHANNEL SVOD, PANTAYA SVOD]|4  |
+----------------------------------------+---+
only showing top 5 rows



In [29]:
df_tmp = cust_ds.select("account").rdd.zipWithIndex().toDF()
df_tmp = df_tmp.select(col("_1").alias('acct'),col("_2"))

In [30]:
df_tmp.show(4)

+------------------+---+
|              acct| _2|
+------------------+---+
|[8529113120187980]|  0|
|[8529113050437579]|  1|
|[8529113070045386]|  2|
|[8529113070141847]|  3|
+------------------+---+
only showing top 4 rows



In [31]:
df_final = df_tmp.join(df_recomend, "_2", "inner").drop(df_recomend._2)
df_final=df_final.withColumn("account", df_final.acct.account).drop(df_final.acct)
df_final =df_final.drop(df_final._2).select(col("account"),col("_1").alias("Top5_NNeb"))


In [32]:
df_final.printSchema()

root
 |-- account: string (nullable = true)
 |-- Top5_NNeb: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [33]:
df_final.show(10,False)

+----------------+--------------------------------------+
|account         |Top5_NNeb                             |
+----------------+--------------------------------------+
|8529112680028964|[HISTORY VAULT, AMC PREMIERE SVOD]    |
|8529112661968238|[GAIA, GAIAM TV FIT & YOGA]           |
|8772105760360186|[XFI PODS, EROS NOW]                  |
|8529112660684356|[PRO WRESTLING EVENT, NBA LEAGUE PASS]|
|8772106810342257|[BOXING EVENT, UP FAITH & FAMILY]     |
|8529113120187980|[BOXING EVENT, STINGRAY KARAOKE]      |
|8529112660812379|[PRO WRESTLING EVENT, GAIA]           |
|8529113120203464|[NBA LEAGUE PASS, EROS NOW]           |
|8772105950322533|[NHL CENTER ICE, NBA LEAGUE PASS]     |
|8529112770053237|[PRO WRESTLING EVENT, EROS NOW]       |
+----------------+--------------------------------------+
only showing top 10 rows



In [34]:
df_final.count()

100

In [35]:
start=time.time()
df_final.write.mode("overwrite").saveAsTable("sbalas203.recommend_top5")
print("Done! Took " + str(round(((time.time())-start)/60)) + " minutes")

Done! Took 0.0 minutes
