In [1]:
from pyspark import SparkContext, SparkConf  
from operator import add
import csv  


configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<your client app>",
           "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope = "<your scope name>", key = "<your key name>"),
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<your tenent id>/oauth2/token"}

# Optionally, you can add <directory-name> to the source URI of your mount point.
dbutils.fs.unmount(mount_point = "/mnt/customer360")
dbutils.fs.mount(
  source = "abfss://customer360@renjiedl.dfs.core.windows.net",
  mount_point = "/mnt/customer360",
  extra_configs = configs)

sc = SparkContext.getOrCreate()
trackfile = sc.textFile("/mnt/customer360/tracks.csv")


def make_tracks_kv(str):
    l = str.split(",")
    return [l[1], [[int(l[2]), l[3], int(l[4]), l[5]]]]

# make a k,v RDD out of the input data  
tbycust = trackfile.map(lambda line: make_tracks_kv(line)).reduceByKey(lambda a, b: a + b)
tbycust.take(10)




In [2]:
def compute_stats_byuser(tracks):
    mcount = morn = aft = eve = night = 0
    tracklist = []
    for t in tracks:
        trackid, dtime, mobile, zip = t
        if trackid not in tracklist:
            tracklist.append(trackid)
        d, t = dtime.split(" ")
        hourofday = int(t.split(":")[0])
        mcount += mobile
        if (hourofday < 5):
            night += 1
        elif (hourofday < 12):
            morn += 1
        elif (hourofday < 17):
            aft += 1
        elif (hourofday < 22):
            eve += 1
        else:
            night += 1
    return [len(tracklist), morn, aft, eve, night, mcount]

# compute profile for each user  
custdata = tbycust.mapValues(lambda a: compute_stats_byuser(a))  
custdata.take(1)
#custdata.collect()

In [3]:
def clicks_summary(str):
    l = str.split(",")
    custid = l[1]
    adv = l[2]
    if (adv == "ADV_REDUCED_1DAY"):
        return (custid, 1)

def user_clicked(line, which):
    eid, custid, adclicked, ltime = line.split(",")
    if (which in adclicked):
        return (custid, 1)
    else:
        return (custid, 0)


clicksfile = sc.textFile("/mnt/customer360/clicks.csv")
# distill the clicks down to a smaller data set that is faster
clickdata = clicksfile.map(lambda line:user_clicked(line, "ADV_REDUCED_1DAY")).reduceByKey(add)
sortedclicks = clickdata.sortByKey()
sortedclicks.take(10)

In [4]:
from pyspark.sql import *
ltablelist = []
ltablelist.append(("unique", "morn", "aft", "eve","night","mobile"))


custdatalen = custdata.count()
values = list()
keys = list()
for line in custdata.values().take(custdatalen):
    values.append(line)

for line in custdata.keys().take(custdatalen):
    keys.append(line)
    
    
#ltablelist = []
for i in range (0,(custdatalen-1)):
    ltablelist.append((str(keys[i]), str(values[i][0]), str(values[i][1]), str(values[i][2]), str(values[i][3]), str(values[i][4])))
  

#print(ltablelist)
ltabledf = sqlContext.createDataFrame(ltablelist)



####creat train table############

ttablelist = []
ttablelist.append(("clicked","unique", "morn", "aft", "eve","night","mobile"))

#sortedclicks.take(10)
#print(sortedclicks.lookup('1000')[0])

sortedclickslen = sortedclicks.count
cvalues = list()
ckeys = list()
for line in custdata.values().take(custdatalen):
    cvalues.append(line)

for line in custdata.keys().take(custdatalen):
    ckeys.append(line)

for i in range (0,(custdatalen-1)):
    tot = cvalues[i][0] + cvalues[i][1] + cvalues[i][2] + cvalues[i][3]
    if sortedclicks.lookup(str(ckeys[i]))[0] > 0:
      clicked = 1
    else:
      clicked = 0
    ttablelist.append((str(clicked), str(ckeys[i]), str(values[i][0]/tot), str(values[i][1]/tot), str(values[i][2]/tot), str(values[i][3]/tot), str(values[i][4]/tot)))
    
    

#print(ttablelist)
ttabledf = sqlContext.createDataFrame(ttablelist)
ttabledf.take(10)

####Export Dataframe to csv, ttable for Azure machine learning service###################
ltabledf.coalesce(1).write.format("com.databricks.spark.csv").option("header", "false").mode("overwrite").save("/mnt/customer360/ltable.csv")
ttabledf.coalesce(1).write.format("com.databricks.spark.csv").option("header", "false").mode("overwrite").save("/mnt/customer360/ttable.csv")

In [5]:
####creat pyspark macheine learning train table############

train = []


sortedclickslen = sortedclicks.count
cvalues = list()
ckeys = list()
for line in custdata.values().take(custdatalen):
    cvalues.append(line)

for line in custdata.keys().take(custdatalen):
    ckeys.append(line)

for i in range (0,(custdatalen-1)):
    tot = cvalues[i][0] + cvalues[i][1] + cvalues[i][2] + cvalues[i][3]
    if sortedclicks.lookup(str(ckeys[i]))[0] > 0:
      clicked = 1
    else:
      clicked = 0
    train.append((str(clicked), "1:"+str(values[i][0]/tot), "2:"+str(values[i][1]/tot), "3:"+str(values[i][2]/tot), "4:"+str(values[i][3]/tot), "5:"+str(values[i][4]/tot)))
    
    

#print(ttablelist)
traindf = sqlContext.createDataFrame(train)
traindf.take(10)



###############################make txt####################################
traindf.coalesce(1).write.format("com.databricks.spark.csv").option("header", "false").option("delimiter", " ").mode("overwrite").save("/mnt/customer360/train.txt")

In [6]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load training data
data = spark.read.format("libsvm").load("/mnt/customer360/train1.txt")

# Split the data into train and test
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
model = nb.fit(train)

# select example rows to display.
predictions = model.transform(test)
#predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))