# Import Libraries

In [1]:
!pip install pyspark
!pip install pyspark[pandas_on_spark] plotly
!pip install -U pandas

In [2]:
!pip install dataprep

In [22]:
!pip install sweetviz

In [23]:
import gc
import pyspark
import pandas as pd
import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import col,sum
from IPython.display import HTML, display
from sklearn.cluster import KMeans
from pyspark.sql import functions as F
from dataprep.eda import create_report
from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
import sweetviz as sv


%matplotlib inline

In [4]:
spark=SparkSession.builder.appName('Market Segmentation').getOrCreate()

In [5]:
spark

# Dataset Loading

In [6]:
df_market=spark.read.option('header','true').csv('../input/ecommerce-behavior-data-from-multi-category-store/2019-Oct.csv')

In [7]:
df_market.show()

In [8]:
df_market.count()

# Datatypes

In [9]:
df_market.printSchema()

In [10]:
df_market.dtypes

# Drop Duplicate Values and columns of no use

In [15]:
df_market = df_market.dropDuplicates()

In [16]:
df_market=df_market.drop(*['event_time','product_id', 'user_id', 'user_session'])

# Missing Values

In [17]:
df_market.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df_market.columns)).show()

In [18]:
df_market = df_market.na.drop(how="any")

In [19]:
df_market.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df_market.columns)).show()

# Statistical Analysis

In [20]:
df_market.describe().show()

# SQL Queries Analysis

## Event Type Is Purchase

In [24]:
df_event_type_purchase = df_market.select('*').where(df_market.event_type == 'purchase')
df_event_type_purchase.show()

### Dataframe for visualisation

In [25]:
df_event_type_purchase_pandas = df_event_type_purchase.toPandas()

In [26]:
df_event_type_purchase_pandas.head()

In [27]:
df_event_type_purchase_pandas = df_event_type_purchase_pandas.drop(['category_id', 'event_type'], axis=1)
df_event_type_purchase_pandas = df_event_type_purchase_pandas.astype({'price':'float'})
df_event_type_purchase_pandas.to_csv('df_event_type_purchase_pandas.csv')
df_event_type_purchase_pandas.head()

### Visualize

#### DataPrep

In [29]:
create_report(df_event_type_purchase_pandas)

## Event Type Is Cart

In [30]:
df_event_type_cart = df_market.select('*').where(df_market.event_type == 'cart')
df_event_type_cart.show()

### Dataframe for visualisation

In [31]:
df_event_type_cart_pandas = df_event_type_cart.toPandas()

In [32]:
df_event_type_cart_pandas.head()

In [33]:
df_event_type_cart_pandas = df_event_type_cart_pandas.drop(['category_id', 'event_type'], axis=1)

In [34]:
df_event_type_cart_pandas = df_event_type_cart_pandas.astype({'price':'float'})

In [35]:
df_event_type_cart_pandas.to_csv('df_event_type_cart_pandas.csv')

In [36]:
df_event_type_cart_pandas.head()

### Visualize

#### DataPrep

In [37]:
create_report(df_event_type_cart_pandas)

## Event Type Is view

In [38]:
df_event_type_view = df_market.select('*').where(df_market.event_type == 'view')

In [39]:
df_event_type_view.show()

### Dataframe for visualisation

In [40]:
df_event_type_view_pandas = df_event_type_view.toPandas()
df_event_type_view_pandas.head()

In [41]:
df_event_type_view_pandas = df_event_type_view_pandas.drop(['category_id', 'event_type'], axis=1)
df_event_type_view_pandas = df_event_type_view_pandas.astype({'price':'float'})
df_event_type_view_pandas.to_csv('df_event_type_view_pandas.csv')
df_event_type_view_pandas.head()

### Visualize

#### DataPrep

In [42]:
create_report(df_event_type_view_pandas)

# Analyzing the whole dataset

In [43]:
df_market=df_market.drop(*['category_id'])

In [44]:
df_market = df_market.toPandas()

In [46]:
df_market = df_market.astype({'price':'float'})

In [47]:
df_market.to_csv('df_market.csv')

In [48]:
df_market.head()

##  Visualize

#### DataPrep

In [49]:
create_report(df_market)

# Segmentation

In [None]:
def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[:-1])]).toDF(['features'])

In [None]:
def get_dummy(df,indexCol,categoricalCols,continuousCols):

    indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
                 for c in categoricalCols ]

    # default setting: dropLast=True
    encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
                 outputCol="{0}_encoded".format(indexer.getOutputCol()))
                 for indexer in indexers ]

    assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
                                + continuousCols, outputCol="features")

    pipeline = Pipeline(stages=indexers + encoders + [assembler])

    model=pipeline.fit(df)
    data = model.transform(df)

    return data.select(indexCol,'features')

In [None]:
transformed= transData(df_market)
transformed.show(5, False)

In [None]:
featureIndexer = VectorIndexer(inputCol="features", \
                               outputCol="indexedFeatures",\
                               maxCategories=4).fit(transformed)

data = featureIndexer.transform(transformed)

In [None]:
cost = np.zeros(20)
for k in range(2,20):
    kmeans = KMeans()\
            .setK(k)\
            .setSeed(1) \
            .setFeaturesCol("indexedFeatures")\
            .setPredictionCol("cluster")

    model = kmeans.fit(data)
    cost[k] = model.computeCost(data) # requires Spark 2.0 or late

In [None]:
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,20),cost[2:20])
ax.set_xlabel('k')
ax.set_ylabel('cost')
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.show()