### Read in Data / Data Prep

In [1]:
sc = spark.sparkContext

In [2]:
# The code was removed by DSX for sharing.

In [3]:
dash_users = dash_users.drop("TENANT_ID")

In [4]:
interestmap_config = spark.read.jdbc(properties_6e54ed0ab7e1422aa339fd754b921514['jdbcurl'], table='CAO.INTERESTMAP_CONFIG', properties=properties_6e54ed0ab7e1422aa339fd754b921514)
#interestmap_config.head()

In [5]:
item_tags = spark.read.jdbc(properties_6e54ed0ab7e1422aa339fd754b921514['jdbcurl'], table='CAO.TAG2', properties=properties_6e54ed0ab7e1422aa339fd754b921514)
#item_tags.head()

In [6]:
item_topic = interestmap_config.join(item_tags, (interestmap_config.TAGID == item_tags.TAGID) & (interestmap_config.GROUPID == item_tags.GROUPID),"right_outer")
item_topic = item_topic.select("INTEREST_ID", "INTEREST_LABEL","TARGETID").distinct()
#item_topic.head()

In [7]:
item_topic = item_topic.withColumnRenamed('TARGETID', 'ITEM_ID')
#item_topic.head()

In [8]:
items1 = spark.read.jdbc(properties_6e54ed0ab7e1422aa339fd754b921514['jdbcurl'], table='CAO.USERS_ITEMHISTORY_FULL', properties=properties_6e54ed0ab7e1422aa339fd754b921514)
items1 = items1.select("USER_ID", "ITEM_ID", "ITEM_COMPLETIONDATE").distinct()
#items.head()

In [9]:
itemlist = spark.read.jdbc(properties_6e54ed0ab7e1422aa339fd754b921514['jdbcurl'], table='CAO.WRK_RE2_ITEMLIST2', properties=properties_6e54ed0ab7e1422aa339fd754b921514)
itemlist = itemlist.select("ITEMID").distinct()
#items.head()

In [10]:
itemlist = itemlist.withColumnRenamed('ITEMID', 'ITEM_ID')

In [11]:
items = items1.join(itemlist, "ITEM_ID", "inner")
#dash_valid_items.head()

In [12]:
dash_valid_items = items.join(item_topic, "ITEM_ID", "inner")
#dash_valid_items.head()

In [13]:
dash_users.printSchema()

root
 |-- USER_ID: string (nullable = false)
 |-- BAND: integer (nullable = true)
 |-- TENURE: integer (nullable = true)
 |-- BUSINESSUNITGROUP: integer (nullable = true)
 |-- BUSINESSUNIT: integer (nullable = true)
 |-- CITY: integer (nullable = true)
 |-- STATE: integer (nullable = true)
 |-- COUNTRYCODE: integer (nullable = true)
 |-- COUNTRY: integer (nullable = true)
 |-- REGION: integer (nullable = true)
 |-- GEOGRAPHY: integer (nullable = true)
 |-- PRIMARYJOBCATEGORY: integer (nullable = true)
 |-- SECONDARYJOBCATEGORY: integer (nullable = true)
 |-- JOBSKILLS: integer (nullable = true)
 |-- STATUS: integer (nullable = true)



In [14]:
dash_valid_items.printSchema()

root
 |-- ITEM_ID: string (nullable = false)
 |-- USER_ID: string (nullable = false)
 |-- ITEM_COMPLETIONDATE: timestamp (nullable = true)
 |-- INTEREST_ID: string (nullable = true)
 |-- INTEREST_LABEL: string (nullable = true)



### Feature Transform

In [15]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer
featuresCols = dash_users.columns
featuresCols.remove('USER_ID')
# This concatenates all feature columns into a single feature vector in a new column "rawFeatures".
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
# This identifies categorical features and indexes them.
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4284)

### Item Transform

In [16]:
from pyspark.ml.feature import StringIndexer
labelCol = dash_valid_items.columns
labelCol = "ITEM_ID"
labelIndexer = StringIndexer(inputCol=labelCol, outputCol="LABEL_INDEXED")

### Model Building

In [17]:
#from pyspark.ml.classification import MultilayerPerceptronClassifier
#nnc = MultilayerPerceptronClassifier(labelCol="LABEL_INDEXED", layers=[13, 100,  3805], solver="gd", maxIter=2, seed=123)

In [18]:
from pyspark.ml.classification import DecisionTreeClassifier
dtc = DecisionTreeClassifier(labelCol="LABEL_INDEXED", maxBins=4284)

### Pipeline Buidling

In [19]:
#from pyspark.ml import Pipeline
#pipeline_nnc = Pipeline(stages=[vectorAssembler, vectorIndexer, labelIndexer, nnc])

In [20]:
from pyspark.ml import Pipeline
pipeline_dtc = Pipeline(stages=[vectorAssembler, vectorIndexer, labelIndexer, dtc])

### Split Data into Training by Topic and Test

#### Project Management

In [21]:
training_data = dash_valid_items.filter(dash_valid_items.ITEM_COMPLETIONDATE < '2018-01-01')
training_data = dash_valid_items.filter(dash_valid_items.INTEREST_LABEL == 'Project Management')

In [22]:
joined_training_data = training_data.join(dash_users, "USER_ID", "right_outer")

In [23]:
joined_training_data = joined_training_data.fillna({"ITEM_ID":"NAN-0000"})
###Find # of distinct LA's
#joined_training_data.select('ITEM_ID').distinct().rdd.map(lambda r: r[0]).collect()

In [24]:
#joined_training_data.count()

In [25]:
dash_valid_items.persist()
training_data.persist()

DataFrame[ITEM_ID: string, USER_ID: string, ITEM_COMPLETIONDATE: timestamp, INTEREST_ID: string, INTEREST_LABEL: string]

In [26]:
#pipeline_nnc_model = pipeline_nnc.fit(joined_training_data)
#pipeline_nnc_model.save("pipeline_nnc_model_mobile")
#pipeline_nnc_model.write().overwrite().save("pipeline_nnc_model_mobile")

In [27]:
pipeline_dtc_model = pipeline_dtc.fit(joined_training_data)
#pipeline_dtc_model.save("pipeline_dtc_model_mobile")
pipeline_dtc_model.write().overwrite().save("pipeline_dtc_model_mobile")

In [28]:
prediction = pipeline_dtc_model.transform(dash_users)

In [29]:
#prediction.take(1) 

In [30]:
prediction.printSchema()

root
 |-- USER_ID: string (nullable = false)
 |-- BAND: integer (nullable = true)
 |-- TENURE: integer (nullable = true)
 |-- BUSINESSUNITGROUP: integer (nullable = true)
 |-- BUSINESSUNIT: integer (nullable = true)
 |-- CITY: integer (nullable = true)
 |-- STATE: integer (nullable = true)
 |-- COUNTRYCODE: integer (nullable = true)
 |-- COUNTRY: integer (nullable = true)
 |-- REGION: integer (nullable = true)
 |-- GEOGRAPHY: integer (nullable = true)
 |-- PRIMARYJOBCATEGORY: integer (nullable = true)
 |-- SECONDARYJOBCATEGORY: integer (nullable = true)
 |-- JOBSKILLS: integer (nullable = true)
 |-- STATUS: integer (nullable = true)
 |-- rawFeatures: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [31]:
#prediction.count()

In [32]:
item_lables = pipeline_dtc_model.stages[2].labels

In [33]:
item_lables_broad = sc.broadcast(item_lables)

In [34]:
# from pyspark.sql.functions import udf
# from pyspark.sql.types import ArrayType,StructType, StructField, StringType, DoubleType
from pyspark.sql import Row
Score = Row("USER_ID","ITEM_ID", "ASSOCIATION_SCORE")
def topK(row):
    arr_v = row[1]
    arg_indexes = arr_v.argsort()[::-1][:200]
    item_lables_internal = item_lables_broad.value
    return [Score(row[0],item_lables_internal[arg_indexes[i]], float(arr_v[arg_indexes[i]])) for i in range(len(arg_indexes))]
# topK_udf = udf(topK, ArrayType(StructType([StructField("ITEM_ID", StringType()),StructField("ITEM_ID", StringType()),StructField("ASSOCIATION_SCORE", DoubleType())])))

In [35]:
def topK_2(row):
    arr_v = row["rawPrediction"]
    return (row["USER_ID"],topK(row))

In [36]:
topK_pred = prediction.select("USER_ID", "probability").rdd\
.flatMap(lambda row: topK(row)).toDF()

In [37]:
topK_pred = topK_pred.filter(topK_pred.ITEM_ID != 'NAN-0000')

In [38]:
#topK_pred.take(5)

In [39]:
from pyspark.sql.functions import lit
topK_pred = topK_pred.withColumn("MODEL", lit("2"))

In [40]:
from pyspark.sql.functions import desc, rank
from pyspark.sql import Window
window = Window.partitionBy("USER_ID").orderBy(desc("ASSOCIATION_SCORE"))
t10_scores_df = topK_pred.withColumn("RANK", rank().over(window)).where("RANK <= 10")
t10_scores_df.write.jdbc(properties_6e54ed0ab7e1422aa339fd754b921514['jdbcurl'], mode = "append", table='LXRECOM.WRK_RE5_TOPIC_SPLIT_MODEL_OUTPUT', properties=properties_6e54ed0ab7e1422aa339fd754b921514)
#t10_scores_df.take(10)

In [41]:
#dash_output = spark.read.jdbc(properties_5eb5ff30cc6a4067865f3070299e76a6['jdbcurl'], table='CAO2.LK_TOPIC_SPLIT_MODEL_OUTPUT_SPARK', properties=properties_5eb5ff30cc6a4067865f3070299e76a6)
#dash_output.head()