In [18]:
gameDF = spark.read.csv("/home/training/data/city-campaign-2.csv", inferSchema=True, header=True)
gameDF.printSchema()
interDF = gameDF.select("City","Campaign","Users","Sessions","Bounce Rate","Pages / Session","`Avg. Session Duration`")
gamesDF = interDF.withColumn("Users", interDF.Users.cast("double"))
gamesDF.printSchema()


root
 |-- City: string (nullable = true)
 |-- Campaign: string (nullable = true)
 |-- Users: integer (nullable = true)
 |-- New Users: integer (nullable = true)
 |-- Sessions: integer (nullable = true)
 |-- Bounce Rate: string (nullable = true)
 |-- Pages / Session: double (nullable = true)
 |-- Avg. Session Duration: double (nullable = true)
 |-- Goal Conversion Rate: string (nullable = true)
 |-- Goal Completions: integer (nullable = true)
 |-- Goal Value: double (nullable = true)

root
 |-- City: string (nullable = true)
 |-- Campaign: string (nullable = true)
 |-- Users: double (nullable = true)
 |-- Sessions: integer (nullable = true)
 |-- Bounce Rate: string (nullable = true)
 |-- Pages / Session: double (nullable = true)
 |-- Avg. Session Duration: double (nullable = true)



In [19]:
from pyspark.sql import functions as fn
df1 = gamesDF.withColumnRenamed('Pages / Session', 'Pages_Session')
df2 = df1.withColumnRenamed('Bounce Rate', 'BounceRate')
df3 =  df2.withColumn('BounceRate', fn.regexp_replace('BounceRate', '%', ''))
clean_df = df3.toDF(*(c.replace('.', '_') for c in df3.columns))
final_df = clean_df.withColumnRenamed("Avg_ Session Duration", "AvgSessDuration")

df = final_df.withColumn("BounceRate", final_df.BounceRate.cast("double"))
df.printSchema()
df.show()

root
 |-- City: string (nullable = true)
 |-- Campaign: string (nullable = true)
 |-- Users: double (nullable = true)
 |-- Sessions: integer (nullable = true)
 |-- BounceRate: double (nullable = true)
 |-- Pages_Session: double (nullable = true)
 |-- AvgSessDuration: double (nullable = true)

+---------+--------------------+-----+--------+----------+-------------+---------------+
|     City|            Campaign|Users|Sessions|BounceRate|Pages_Session|AvgSessDuration|
+---------+--------------------+-----+--------+----------+-------------+---------------+
|(not set)|           (not set)|145.0|     199|      3.52|          6.1|         317.81|
| Istanbul|           (not set)| 37.0|      44|      2.27|         6.86|          391.3|
|   London|           (not set)| 33.0|      33|       0.0|         3.55|         121.39|
| Istanbul|rainbow-girls-nye...| 30.0|      42|      2.38|         4.33|          90.07|
|(not set)|princess-in-colou...| 29.0|      42|      2.38|         5.67|         13

In [20]:
df.select("City").distinct().count()

3046

In [21]:
import pyspark.sql.functions as fn

df.agg( * [ (1 - (fn.count(c) / fn.count('*')) ).alias(c + '_missing') for c in df.columns] ) \
    .show(vertical=True)

df.select("City").filter(df.City == "(not set)").count()

-RECORD 0----------------------
 City_missing            | 0.0 
 Campaign_missing        | 0.0 
 Users_missing           | 0.0 
 Sessions_missing        | 0.0 
 BounceRate_missing      | 0.0 
 Pages_Session_missing   | 0.0 
 AvgSessDuration_missing | 0.0 



568

In [23]:
from pyspark.sql.functions import *
testDF = df.na.drop()


DF1 = df.na.replace(["(not set)"], ["notknown"], 'City')
DF2_c = DF1.na.replace(["(not set)"], ["notknown"], 'Campaign')
#Replacing with the most played game
DF2 = DF2_c.na.replace(["notknown"], ["baby-boss-photo-shoot"], 'Campaign')



In [8]:
numerical = ['Users','BounceRate','Pages_Session','Sessions','AvgSessDuration']
desc= DF2.describe(numerical)
desc.show()
DF2.count()

+-------+------------------+------------------+-----------------+------------------+-----------------+
|summary|             Users|        BounceRate|    Pages_Session|          Sessions|  AvgSessDuration|
+-------+------------------+------------------+-----------------+------------------+-----------------+
|  count|              9823|              9823|             9823|              9823|             9823|
|   mean|1.6469510332892192|2.2417082357731855|5.771635956428785|2.2083884760256542|237.5987305303876|
| stddev| 4.459273227068245|13.296253736391343|5.879564921858606| 6.018826338560221| 519.968694552614|
|    min|               1.0|               0.0|              1.0|                 1|              0.0|
|    max|             244.0|             100.0|             95.0|               319|           8788.0|
+-------+------------------+------------------+-----------------+------------------+-----------------+



9823

In [9]:

quantiles = DF2.approxQuantile("AvgSessDuration", [0.25, 0.75], 0.05)
IQR = quantiles[1] - quantiles[0]
bounds = [quantiles[0] - 1.5 * IQR, quantiles[1] + 1.5 * IQR]

DF3 = DF2.where( (DF2.AvgSessDuration > bounds[0]) | (DF2.AvgSessDuration < bounds[1]) )
DF3.count()

9823

In [10]:
quantiles = DF3.approxQuantile("BounceRate", [0.25, 0.75], 0.05)
IQR = quantiles[1] - quantiles[0]
bounds = [quantiles[0] - 1.5 * IQR, quantiles[1] + 1.5 * IQR]

DF4 = DF3.where( (DF3.AvgSessDuration > bounds[0]) | (DF3.AvgSessDuration < bounds[1]))
DF4.count()

7733

In [21]:
DF4.agg( {'BounceRate': 'skewness', 'Users': 'skewness', 'AvgSessDuration': 'skewness', 'Pages_Session': 'skewness'} ).show()

+--------------------+-----------------+-------------------------+-----------------------+
|skewness(BounceRate)|  skewness(Users)|skewness(AvgSessDuration)|skewness(Pages_Session)|
+--------------------+-----------------+-------------------------+-----------------------+
|   8.130705910332097|27.95353977327889|        3.947101486321689|      4.360683795901222|
+--------------------+-----------------+-------------------------+-----------------------+



In [22]:
n_numerical = len(numerical)
corr = []

for i in range(0, n_numerical):
    temp = [None] * i
    for j in range(i, n_numerical):
        temp.append(DF4.corr(numerical[i], numerical[j]))
    corr.append(temp)
    from tabulate import tabulate
print(tabulate(corr, headers=numerical, showindex=numerical, tablefmt="fancy_grid", numalign="center"))

╒═════════════════╤═════════╤══════════════╤═════════════════╤════════════╤═══════════════════╕
│                 │  Users  │  BounceRate  │  Pages_Session  │  Sessions  │  AvgSessDuration  │
╞═════════════════╪═════════╪══════════════╪═════════════════╪════════════╪═══════════════════╡
│ Users           │    1    │  0.0805697   │   -0.0173786    │  0.966667  │    -0.0080914     │
├─────────────────┼─────────┼──────────────┼─────────────────┼────────────┼───────────────────┤
│ BounceRate      │         │      1       │   -0.0662484    │ 0.0715256  │    -0.0421201     │
├─────────────────┼─────────┼──────────────┼─────────────────┼────────────┼───────────────────┤
│ Pages_Session   │         │              │        1        │ 0.00175671 │     0.687481      │
├─────────────────┼─────────┼──────────────┼─────────────────┼────────────┼───────────────────┤
│ Sessions        │         │              │                 │     1      │     0.0242913     │
├─────────────────┼─────────┼───────────

In [23]:
import pyspark.ml.feature as ft
Campaign_indexer = ft.StringIndexer(inputCol="Campaign", outputCol="campaign-num")
City_indexer = ft.StringIndexer(inputCol="City", outputCol="city-num")
#featuresCreator = ft.VectorAssembler(inputCols=["Landing Page-num","Country-num"], outputCol='features')


In [24]:
assembler = ft.VectorAssembler(inputCols=["campaign-num","city-num","BounceRate","AvgSessDuration"], outputCol='features-vec')
scaler = ft.StandardScaler(inputCol="features-vec", outputCol="features")

In [29]:
import pyspark.ml.clustering as clus
kmeans = clus.KMeans(k = 5, featuresCol = 'features')



In [30]:
from pyspark.ml import Pipeline
pipeline = Pipeline( stages = [Campaign_indexer,City_indexer,assembler, scaler,kmeans] )

In [31]:
model = pipeline.fit(DF4)
clusters = model.transform(DF4)


In [32]:
import pyspark.ml.evaluation as ev

predictions = model.transform(DF4)
evaluator = ev.ClusteringEvaluator()
print(evaluator.evaluate(predictions))

0.6071640728197522


In [393]:
km = model.stages[4]

summary = km.summary
print("Number of clusters: ", summary.k)
print("Size of (number of data points in) each cluster: ", summary.clusterSizes)
print("The cluster centers are: ", km.clusterCenters())

Number of clusters:  5
Size of (number of data points in) each cluster:  [487, 4194, 1415, 121, 1516]
The cluster centers are:  [array([0.66201807, 0.84843219, 0.00433509, 3.54345163]), array([0.32245537, 0.41413074, 0.0473323 , 0.33184432]), array([4.00450046e-01, 2.58636362e+00, 1.75944274e-03, 3.02789681e-01]), array([0.67090684, 0.4164316 , 7.41594766, 0.23115123]), array([2.40281348, 0.51901095, 0.0179725 , 0.34700184])]
