# MLLib Deep Dive

This walks us through though a possible solution to the [MLLib Deep Dive workshop](https://confluence.corp.ad.ctc/display/HDP/Deep-dive+Spark+MLLib).

In [1]:
%matplotlib inline

import sys, os, time, datetime
import numpy as np

# this forces the environment to spark2
# this is unnecessary if you want to use spark 1.6
sparkhome = '/usr/lib/spark2/'
os.environ['SPARK_HOME'] = sparkhome
os.environ['PYSPARK_PYTHON']="/opt/anaconda2/bin/python"
sys.path.append(sparkhome + '/python')
sys.path.append(sparkhome + '/python/lib/py4j-0.10.1-src.zip')
sys.path.append(sparkhome + '/python/lib/pyspark.zip')

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext, HiveContext

import pyspark.sql.functions as F

conf = SparkConf()
conf.setAppName("pyspark-lab_answerkey")
conf.set("spark.executor.memory", "20g")
conf.set("spark.executor.instances",5)
conf.set("spark.driver.memory", "20g")
conf.set("spark.executor.cores", 5)

sc = SparkContext(conf = conf)
spark = SparkSession.builder\
        .enableHiveSupport()\
        .getOrCreate()
sqlContext = spark._wrapped

In [2]:
pos = sqlContext.table('examples.mllib_lab_assignment_pos')

# clean up the massive masked store_id
# spark was having trouble merging on that bigint field
def four_digit_string(val):
    strval = str(val)
    return strval[-4:]
make_four_digit = F.udf(lambda x: four_digit_string(x))
pos = pos.withColumn('store_id', make_four_digit(pos.store_id))
pos.head()

Row(pos_transaction_id=2071964351, day_id=20150405, store_id=u'8551', product_id=1689781, deal_id=-2464554231982775937, item_actual_selling_price_amt=0.35, item_transaction_qty=6.0, item_return_quantity=0.0)

## Random forest regression

Guess the basket size based on store num, month of year and number of deals available in the store

In [3]:
# first, change the date to get the month
from pyspark.sql.types import DateType

func =  F.udf (lambda x: datetime.datetime.strptime(str(x), '%Y%M%d'), DateType())
with_dates = pos.withColumn('date', func(pos.day_id))
with_dates.head()

Row(pos_transaction_id=2071964351, day_id=20150405, store_id=u'8551', product_id=1689781, deal_id=-2464554231982775937, item_actual_selling_price_amt=0.35, item_transaction_qty=6.0, item_return_quantity=0.0, date=datetime.date(2015, 1, 5))

In [4]:
baskets = with_dates.groupby(['pos_transaction_id', 'store_id', 'date']).agg(F.sum(with_dates.item_transaction_qty))
baskets = baskets.withColumnRenamed('sum(item_transaction_qty)', 'basket_size')
baskets.head()

Row(pos_transaction_id=2071964381, store_id=u'8551', date=datetime.date(2015, 1, 5), basket_size=3.0)

In [17]:
# do a grouping to find distinct sales per store num
# this is /actually/ how many deals /sold/ that day, but is a good estimator for us.
from pyspark.sql.window import Window

store_deal_counts = pos.select('store_id', 'deal_id')\
    .distinct()\
    .groupby('store_id')\
    .count()\
    .withColumnRenamed('count', 'num_deals')
store_deal_counts = store_deal_counts.select('store_id',
                                             'num_deals',
                                             F.row_number().over(Window.partitionBy()\
                                                           .orderBy(store_deal_counts.store_id))\
                                                           .alias('store_alias'))
store_deal_counts.collect()

[Row(store_id=u'0007', num_deals=115, store_alias=1),
 Row(store_id=u'0710', num_deals=113, store_alias=2),
 Row(store_id=u'0789', num_deals=119, store_alias=3),
 Row(store_id=u'4006', num_deals=106, store_alias=4),
 Row(store_id=u'4989', num_deals=113, store_alias=5),
 Row(store_id=u'8551', num_deals=145, store_alias=6)]

In [18]:
# merge them together
joined = baskets.join(store_deal_counts, on='store_id')
joined.head()

Row(store_id=u'8551', pos_transaction_id=2071964381, date=datetime.date(2015, 1, 5), basket_size=3.0, num_deals=145, store_alias=6)

In [19]:
joined = joined.withColumn('monthnum', F.month(joined.date))
joined.head()

Row(store_id=u'8551', pos_transaction_id=2071964381, date=datetime.date(2015, 1, 5), basket_size=3.0, num_deals=145, store_alias=6, monthnum=1)

In [20]:
# make it into a label and list of features
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import RandomForest

label_and_features = joined.rdd.map(lambda x: LabeledPoint(x.basket_size, [x.store_alias, x.monthnum, x.num_deals]))

In [23]:
# the category thing is weird.
# key: the index (starting at 0) of the feature
# value: the number of categories for this feature (also indexed at 0)
model = RandomForest.trainRegressor(label_and_features, categoricalFeaturesInfo={0:7, 1:11}, numTrees=3, seed=32)

model.predict([1, 11, 110])

2.9992157148419953

## K-means clustering

Look at sales in various departments in a store. Do the stores sell different items? 

In [24]:
kmeans_data = sqlContext.table('examples.mllib_kmeans_lab_assignment_pos')
kmeans_data.head()

Row(hash8=8946748474877045509, lighting_count=651.0, snowthrower_count=4.0, golf_count=899.0, total_items=728757.0)

In [26]:
# move this into an array of features
parsed_data = kmeans_data.rdd.map(lambda x: [x.lighting_count, x.snowthrower_count, x.golf_count])
parsed_data.first()

[651.0, 4.0, 899.0]

In [30]:
from pyspark.mllib.clustering import KMeans, KMeansModel

clusters = KMeans.train(parsed_data, k=4, 
                        maxIterations=10,
                        initializationMode="random")
clusters

<pyspark.mllib.clustering.KMeansModel at 0x7f43aac272d0>

In [31]:
clusters.predict([651.0, 4.0, 899.0])

0

In [39]:
get_cluster = F.udf(lambda x, y, z: clusters.predict([x, y, z]))

kmeans_w_cluster = kmeans_data.withColumn('cluster_id', get_cluster(kmeans_data.lighting_count, kmeans_data.snowthrower_count, kmeans_data.golf_count))
kmeans_w_cluster.toPandas()

Unnamed: 0,hash8,lighting_count,snowthrower_count,golf_count,total_items,cluster_id
0,8946748474877045509,651.0,4.0,899.0,728757.0,0
1,-1311085597010169475,1415.0,44.0,928.0,886446.0,0
2,5790637138709706180,2297.0,58.0,1648.0,1385948.0,1
3,-7882865544843505622,1768.0,88.0,1500.0,1325224.0,1
4,2829467192590545424,707.0,79.0,465.0,505662.0,0
5,1790077521091718865,2339.0,62.0,1288.0,1475895.0,1
6,-361858321721275708,2635.0,112.0,1871.0,1921244.0,2
7,1008613393646276787,5624.0,183.0,2811.0,2232938.0,3
8,-7905715711766584227,926.0,66.0,619.0,560129.0,0
9,-965712746991792591,633.0,23.0,701.0,539087.0,0
