In [38]:
from datetime import datetime
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import visuals as vs
%config InlineBackend.figure_format = 'retina'
%matplotlib inline

plt.style.use('fivethirtyeight')

df = pd.read_csv('../data/exp3/new/ws_orderinfo_orders_server.csv', header=0)
# filtered_df = df[df['orderdate'].isnull()]
df = df.dropna()
df["orderdate"] = df["orderdate"].apply(lambda x: datetime.strptime(x, '%m/%d/%Y %H:%M:%S'))
df["takendate"] = df["takendate"].apply(lambda x: datetime.strptime(x, '%m/%d/%Y %H:%M:%S'))
df["shipdate"] = df["shipdate"].apply(lambda x: datetime.strptime(x, '%m/%d/%Y %H:%M:%S'))
df["transitDuration"] = (df["shipdate"]-df["takendate"])/ np.timedelta64(1, 's')
df["fulfillDuration"] = (df["shipdate"]-df["orderdate"])/ np.timedelta64(1, 's')

df["amount"] = df["red"]+df["blue"]+df["yellow"]+df["black"]+df["white"]

dic = {}

# Combine with customer info
df_tmp = pd.read_csv('..\data\exp3\ws_orderinfo_demographic.csv', header=0)
df = pd.merge(df, df_tmp, how='inner', left_on="customer", right_on="name")
df = df.drop(columns=['name','orderdate','tokendate','takendate','shipdate', 'id','entryid'])
for key in ["customer","age", "sex", "city", "state", "country",\
                 "income", "credit","education", "occupation"]:
    dic[key] = {}
    ## Add Customer ID (Integer number)
    id = 1
    for _,name in df[[key]].drop_duplicates()[key].iteritems():
        dic[key][name] = id # id starts from 0
        id = id+1
    df[key] = df[key].apply(lambda x: dic[key][x])

    
print "dictionary keys:",dic.keys()
print df.info()

dictionary keys: ['customer', 'city', 'country', 'age', 'sex', 'credit', 'state', 'income', 'education', 'occupation']
<class 'pandas.core.frame.DataFrame'>
Int64Index: 150 entries, 0 to 149
Data columns (total 21 columns):
customer           150 non-null int64
red                150 non-null int64
blue               150 non-null int64
green              150 non-null int64
yellow             150 non-null int64
black              150 non-null int64
white              150 non-null int64
pending            150 non-null int64
shipped            150 non-null int64
transitDuration    150 non-null float64
fulfillDuration    150 non-null float64
amount             150 non-null int64
age                150 non-null int64
sex                150 non-null int64
city               150 non-null int64
state              150 non-null int64
country            150 non-null int64
income             150 non-null int64
credit             150 non-null int64
education          150 non-null int64
occupation  

In [39]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark import SparkContext, HiveContext
from pyspark.mllib.clustering import KMeans
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
import numpy as np
from numpy import array
from math import sqrt

sc = SparkContext.getOrCreate()
sqlCtx = SQLContext(sc)

df_spark = sqlCtx.createDataFrame(df)

vecAssembler = VectorAssembler(inputCols=df_spark.columns, outputCol="features")
df_spark = vecAssembler.transform(df_spark)
rdd = df_spark.rdd.map(lambda x: array(x["features"]))

clusters = KMeans.train(\
                 rdd, 2, maxIterations=10, initializationMode="random",\
                  seed=50, initializationSteps=5, epsilon=1e-4)

# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = df_spark.rdd.map(lambda point: error(array(point["features"]))).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))



Within Set Sum of Squared Error = 199180.153338


In [20]:
rdd.take(10)

[array([ 1.0000e+00,  0.0000e+00,  5.0000e+00,  0.0000e+00,  7.0000e+00,
         6.0000e+00,  0.0000e+00,  0.0000e+00,  1.0000e+00,  8.8000e+01,
        -1.4293e+04,  1.8000e+01,  1.0000e+00,  1.0000e+00,  1.0000e+00,
         1.0000e+00,  1.0000e+00,  1.0000e+00,  1.0000e+00,  1.0000e+00,
         1.0000e+00]),
 array([ 1.000e+00,  8.000e+00,  6.000e+00,  4.000e+00,  7.000e+00,
         7.000e+00,  5.000e+00,  0.000e+00,  1.000e+00,  2.620e+02,
        -9.826e+03,  3.300e+01,  1.000e+00,  1.000e+00,  1.000e+00,
         1.000e+00,  1.000e+00,  1.000e+00,  1.000e+00,  1.000e+00,
         1.000e+00]),
 array([1.00000e+00, 0.00000e+00, 8.00000e+00, 0.00000e+00, 2.00000e+00,
        6.00000e+00, 0.00000e+00, 0.00000e+00, 1.00000e+00, 1.47000e+02,
        5.23587e+05, 1.60000e+01, 1.00000e+00, 1.00000e+00, 1.00000e+00,
        1.00000e+00, 1.00000e+00, 1.00000e+00, 1.00000e+00, 1.00000e+00,
        1.00000e+00]),
 array([1.00000e+00, 0.00000e+00, 0.00000e+00, 1.00000e+00, 2.00000e+00,
   