# TASK 1: Preparing the data and first look

## Step 1: Beginnig

### Loading libraries.

In [1]:
import findspark
import pandas as pd
import warnings

In [2]:
findspark.init("/opt/manual/spark")

In [3]:
from pyspark.sql import SparkSession, functions as F
import pandas as pd

In [4]:
spark = SparkSession.builder \
.appName("KMeansMethod") \
.master("local[*]") \
.config("spark.sql.shuffle.partitions","2") \
.getOrCreate()

### Veriyi datasets klasörü altına taşıyınız.

In [5]:
# ! hdfs dfs -put ~/datasets/flo_ismail.csv /user/train/datasets

### Spark session oluşturunuz.

In [6]:
df = spark.read.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.option('delimiter', '|') \
.load("/user/train/datasets/flo_ismail.csv")

## Adım 2: İlk Bakış

In [7]:
df.limit(5).toPandas()

Unnamed: 0,master_id,order_channel,platform_type,last_order_channel,first_order_date,last_order_date,last_order_date_online,last_order_date_offline,order_num_total_ever_online,order_num_total_ever_offline,customer_value_total_ever_offline,customer_value_total_ever_online,interested_in_categories_12,online_product_group_amount_top_name_12,offline_product_group_name_12,last_order_date_new,store_type
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,Offline,Offline,Offline,2019-02-23 12:59:17,2019-02-23 12:59:17,NaT,2019-02-23 12:59:17,,1.0,212.98,0.0,,,,2019-02-23,A
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,Offline,OmniChannel,Offline,2019-12-01 16:48:09,2019-12-01 16:48:09,NaT,2019-12-01 16:48:09,,1.0,199.98,0.0,,,,2019-12-01,A
2,602897a6-cdac-11ea-b31f-000d3a38a36f,Offline,Offline,Offline,2020-07-24 15:49:47,2020-07-24 15:49:47,NaT,2020-07-24 15:49:47,,1.0,140.49,0.0,[ERKEK],,ERKEK,2020-07-24,A
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,Mobile,Online,Mobile,2018-12-31 07:22:07,2018-12-31 07:22:07,2018-12-31 07:22:07,NaT,1.0,,0.0,174.99,,,,2018-12-31,A
4,80664354-adf0-11eb-8f64-000d3a299ebf,Desktop,Online,Desktop,2021-05-05 21:07:02,2021-05-05 22:39:36,2021-05-05 22:39:36,NaT,2.0,,0.0,283.95,[],,,2021-05-05,A


In [8]:
df.persist()

DataFrame[master_id: string, order_channel: string, platform_type: string, last_order_channel: string, first_order_date: timestamp, last_order_date: timestamp, last_order_date_online: timestamp, last_order_date_offline: timestamp, order_num_total_ever_online: double, order_num_total_ever_offline: double, customer_value_total_ever_offline: double, customer_value_total_ever_online: double, interested_in_categories_12: string, online_product_group_amount_top_name_12: string, offline_product_group_name_12: string, last_order_date_new: string, store_type: string]

In [9]:
df.printSchema()

root
 |-- master_id: string (nullable = true)
 |-- order_channel: string (nullable = true)
 |-- platform_type: string (nullable = true)
 |-- last_order_channel: string (nullable = true)
 |-- first_order_date: timestamp (nullable = true)
 |-- last_order_date: timestamp (nullable = true)
 |-- last_order_date_online: timestamp (nullable = true)
 |-- last_order_date_offline: timestamp (nullable = true)
 |-- order_num_total_ever_online: double (nullable = true)
 |-- order_num_total_ever_offline: double (nullable = true)
 |-- customer_value_total_ever_offline: double (nullable = true)
 |-- customer_value_total_ever_online: double (nullable = true)
 |-- interested_in_categories_12: string (nullable = true)
 |-- online_product_group_amount_top_name_12: string (nullable = true)
 |-- offline_product_group_name_12: string (nullable = true)
 |-- last_order_date_new: string (nullable = true)
 |-- store_type: string (nullable = true)



In [10]:
df_count = df.count()
print(df_count)

100000


In [11]:
len(df.columns)


17

## Adım 3: Eksik Gözlemler

In [12]:
for col_name in df.dtypes:
    null_count = df.filter( (F.col(col_name[0]).isNull()) | (F.col(col_name[0]) == "")).count()
    
    if null_count > 0:
        print("{}  {} type null values: {} % {}".format(col_name[0], col_name[1], null_count, (null_count/df_count * 100)))

last_order_date_online  timestamp type null values: 70784 % 70.784
last_order_date_offline  timestamp type null values: 21703 % 21.703
order_num_total_ever_online  double type null values: 70784 % 70.784
order_num_total_ever_offline  double type null values: 21703 % 21.703
interested_in_categories_12  string type null values: 56590 % 56.589999999999996
online_product_group_amount_top_name_12  string type null values: 88295 % 88.295
offline_product_group_name_12  string type null values: 77209 % 77.209


# Görev 2: Veri Analizi

## Adım 1: Unique Değişken Kontrolü

### master_id değişkeninin unique olup olmadığını kontrol ediniz.

In [17]:
if df_count == df.select("master_id").distinct().count():
    print("master_id is unique")
else:print("not unique")

master_id is unique


## Adım 2: Veriyi Anlama – Groupby

In [18]:
df.groupBy("platform_type", "order_channel").count().show()

+-------------+-------------+-----+
|platform_type|order_channel|count|
+-------------+-------------+-----+
|  OmniChannel|      Offline| 4793|
|       Online|      Ios App| 3008|
|      Offline|      Offline|65991|
|       Online|       Mobile| 6451|
|       Online|      Desktop| 3253|
|       Online|  Android App| 8728|
|  OmniChannel|       Mobile| 2061|
|  OmniChannel|  Android App| 3261|
|  OmniChannel|      Ios App|  956|
|  OmniChannel|      Desktop| 1498|
+-------------+-------------+-----+



## Adım 3: Omnichannel

In [19]:
df.filter(F.col("order_num_total_ever_online").isNull()).count()

70784

In [20]:
df.filter(F.col("order_num_total_ever_offline").isNull()).count()

21703

In [21]:
df.toPandas()

Unnamed: 0,master_id,order_channel,platform_type,last_order_channel,first_order_date,last_order_date,last_order_date_online,last_order_date_offline,order_num_total_ever_online,order_num_total_ever_offline,customer_value_total_ever_offline,customer_value_total_ever_online,interested_in_categories_12,online_product_group_amount_top_name_12,offline_product_group_name_12,last_order_date_new,store_type
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,Offline,Offline,Offline,2019-02-23 12:59:17,2019-02-23 12:59:17,NaT,2019-02-23 12:59:17,,1.0,212.98,0.00,,,,2019-02-23,A
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,Offline,OmniChannel,Offline,2019-12-01 16:48:09,2019-12-01 16:48:09,NaT,2019-12-01 16:48:09,,1.0,199.98,0.00,,,,2019-12-01,A
2,602897a6-cdac-11ea-b31f-000d3a38a36f,Offline,Offline,Offline,2020-07-24 15:49:47,2020-07-24 15:49:47,NaT,2020-07-24 15:49:47,,1.0,140.49,0.00,[ERKEK],,ERKEK,2020-07-24,A
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,Mobile,Online,Mobile,2018-12-31 07:22:07,2018-12-31 07:22:07,2018-12-31 07:22:07,NaT,1.0,,0.00,174.99,,,,2018-12-31,A
4,80664354-adf0-11eb-8f64-000d3a299ebf,Desktop,Online,Desktop,2021-05-05 21:07:02,2021-05-05 22:39:36,2021-05-05 22:39:36,NaT,2.0,,0.00,283.95,[],,,2021-05-05,A
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,f1295546-708d-11ea-8021-000d3a38a36f,Offline,Offline,Offline,2020-02-09 14:21:01,2020-02-09 14:21:01,NaT,2020-02-09 14:21:01,,1.0,69.99,0.00,,,,2020-02-09,A
99996,04ed98e6-2b86-11ea-853d-000d3a38a36f,Offline,Offline,Offline,2019-11-30 16:55:09,2019-12-29 12:22:46,NaT,2019-12-29 12:22:46,,2.0,309.89,0.00,,,,2019-12-29,A
99997,d46e9bba-f670-11e9-841e-000d3a38a36f,Offline,Offline,Offline,2019-10-24 17:53:04,2019-10-24 17:53:04,NaT,2019-10-24 17:53:04,,1.0,149.99,0.00,,,,2019-10-24,A
99998,0dc97a82-0b25-11eb-8ffc-000d3a38a36f,Android App,Online,Android App,2020-10-11 15:40:12,2021-05-13 18:03:49,2021-05-13 18:03:49,NaT,4.0,,0.00,767.42,"[COCUK, KADIN]",KADIN,,2021-05-13,A


In [22]:
df1 = df.na.fill(value=0, subset=["order_num_total_ever_online", "order_num_total_ever_offline", "customer_value_total_ever_online", "customer_value_total_ever_offline"])
          

In [23]:
df1 = df1.withColumn("order_num_total", df1.order_num_total_ever_online+df1.order_num_total_ever_offline)

In [24]:
df1 = df1.withColumn("customer_value_total", df1.customer_value_total_ever_online \
                                            +df1.customer_value_total_ever_offline)

In [25]:
df1.limit(5).toPandas()

Unnamed: 0,master_id,order_channel,platform_type,last_order_channel,first_order_date,last_order_date,last_order_date_online,last_order_date_offline,order_num_total_ever_online,order_num_total_ever_offline,customer_value_total_ever_offline,customer_value_total_ever_online,interested_in_categories_12,online_product_group_amount_top_name_12,offline_product_group_name_12,last_order_date_new,store_type,order_num_total,customer_value_total
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,Offline,Offline,Offline,2019-02-23 12:59:17,2019-02-23 12:59:17,NaT,2019-02-23 12:59:17,0.0,1.0,212.98,0.0,,,,2019-02-23,A,1.0,212.98
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,Offline,OmniChannel,Offline,2019-12-01 16:48:09,2019-12-01 16:48:09,NaT,2019-12-01 16:48:09,0.0,1.0,199.98,0.0,,,,2019-12-01,A,1.0,199.98
2,602897a6-cdac-11ea-b31f-000d3a38a36f,Offline,Offline,Offline,2020-07-24 15:49:47,2020-07-24 15:49:47,NaT,2020-07-24 15:49:47,0.0,1.0,140.49,0.0,[ERKEK],,ERKEK,2020-07-24,A,1.0,140.49
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,Mobile,Online,Mobile,2018-12-31 07:22:07,2018-12-31 07:22:07,2018-12-31 07:22:07,NaT,1.0,0.0,0.0,174.99,,,,2018-12-31,A,1.0,174.99
4,80664354-adf0-11eb-8f64-000d3a299ebf,Desktop,Online,Desktop,2021-05-05 21:07:02,2021-05-05 22:39:36,2021-05-05 22:39:36,NaT,2.0,0.0,0.0,283.95,[],,,2021-05-05,A,2.0,283.95


# Görev 2: veri analizi



## Adım 4: Kanal incelemesi

In [26]:
df1.groupBy("order_channel").agg({"master_id":"count", "order_num_total": "mean", "customer_value_total": "mean"}).show()

+-------------+----------------+-------------------------+--------------------+
|order_channel|count(master_id)|avg(customer_value_total)|avg(order_num_total)|
+-------------+----------------+-------------------------+--------------------+
|      Offline|           70784|       218.16394849122435|  1.6003475361663653|
|      Ios App|            3964|        568.4640312815316|   3.377648839556004|
|       Mobile|            8512|       391.74187617480754|   2.798637218045113|
|      Desktop|            4751|        376.9135550410421|   2.538623447695222|
|  Android App|           11989|        532.8462840937439|    3.50971724080407|
+-------------+----------------+-------------------------+--------------------+



## Adım 5: Platforma göre satış değerleri

In [27]:
df1.groupBy("platform_type").agg({"master_id":"count", "order_num_total": "mean", \
                                  "customer_value_total": "mean"}).show()


+-------------+----------------+-------------------------+--------------------+
|platform_type|count(master_id)|avg(customer_value_total)|avg(order_num_total)|
+-------------+----------------+-------------------------+--------------------+
|      Offline|           65991|       215.73030686006084|  1.5837917291751906|
|  OmniChannel|           12569|         500.793751292853|  3.4947092051873656|
|       Online|           21440|       404.78965764924243|  2.6207089552238805|
+-------------+----------------+-------------------------+--------------------+



# Görev 3: K-Means Metriklerinin Hesaplanması

## Adım 1: RFM

#### master_id, last_order_date_new, order_num_total, customer_value_total değişkenlerinden rfmadında yeni bir dataframe oluşturunuz.

In [28]:
rfm = df1.select("master_id", "last_order_date_new", "order_num_total", "customer_value_total")

In [29]:
rfm.limit(5).toPandas()

Unnamed: 0,master_id,last_order_date_new,order_num_total,customer_value_total
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,2019-02-23,1.0,212.98
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,2019-12-01,1.0,199.98
2,602897a6-cdac-11ea-b31f-000d3a38a36f,2020-07-24,1.0,140.49
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,2018-12-31,1.0,174.99
4,80664354-adf0-11eb-8f64-000d3a299ebf,2021-05-05,2.0,283.95


## Adım 2: Son satın alım tarihi

#### last_order_date değişkeni üzerinden son satın alım tarihini bulunuz.

In [34]:
last_order_date = df.agg({'last_order_date_new':'max'}).collect()[0][0]

print(last_order_date)

2021-05-30


## Adım 3: Recency

#### Bulduğunuz son satın alım tarihinin iki gün sonrasını baz alarak, her müşterinin son satın alma sürelerini gün cinsinden hesaplayınız.

#### Hesapladığınız değişkeni Recency adında kaydediniz.

In [36]:
rfm = rfm.withColumn("Recency", F.expr(f"datediff('{last_order_date}', last_order_date_new) + 2"))

In [37]:
rfm.limit(3).toPandas()

Unnamed: 0,master_id,last_order_date_new,order_num_total,customer_value_total,Recency
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,2019-02-23,1.0,212.98,829
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,2019-12-01,1.0,199.98,548
2,602897a6-cdac-11ea-b31f-000d3a38a36f,2020-07-24,1.0,140.49,312


## Adım 4: Yeniden isimlendirme

####  order_num_total değişkenini Frequency
####  customer_value_total değişkenini Monetary adında değiştiriniz.

In [38]:
rfm = rfm.withColumnRenamed("order_num_total", "Frequency") \
         .withColumnRenamed("customer_value_total", "Monetary")

## Adım 5: Filtre

#### last_order_date_newdeğişkenini rfmdataframeden çıkarınız.

In [40]:
rfm = rfm.drop("last_order_date_new")

In [42]:
rfm.show()

+--------------------+---------+------------------+-------+
|           master_id|Frequency|          Monetary|Recency|
+--------------------+---------+------------------+-------+
|b3ace094-a17f-11e...|      1.0|            212.98|    829|
|c57d7c4c-a950-11e...|      1.0|            199.98|    548|
|602897a6-cdac-11e...|      1.0|            140.49|    312|
|388e4c4e-af86-11e...|      1.0|            174.99|    883|
|80664354-adf0-11e...|      2.0|            283.95|     27|
|47511f36-aeb4-11e...|      1.0|            139.98|    933|
|77f7c318-3407-11e...|      1.0|             269.9|    182|
|399d6dd2-ecf1-11e...|      1.0|             95.73|    272|
|b3d4a6f2-a368-11e...|      3.0|            207.94|    250|
|42fdccd4-f1d1-11e...|      1.0|            134.95|    226|
|071b497c-a5d2-11e...|      7.0|            399.94|     71|
|47d99768-30ea-11e...|      2.0|            304.89|    667|
|f879c85e-5322-11e...|      1.0|            100.49|    488|
|cfb38c70-b1e7-11e...|      3.0|208.9599

# Görev 4: Modele Hazırlık

## Adım 1: Vector Assembler

In [52]:
from pyspark.ml.feature import VectorAssembler


In [59]:
rfm_col=["Recency", "Monetary", "Frequency"]

In [60]:
assembler = VectorAssembler() \
.setInputCols(rfm_col) \
.setOutputCol("unscaled_features")

## Adım 2: Standart Scaler

In [61]:
    from pyspark.ml.feature import StandardScaler
    scaler = StandardScaler().setInputCol("unscaled_features").setOutputCol("features")

## Adım 3: Pipeline

In [62]:
from pyspark.ml import Pipeline

In [63]:
pipeline_obj = Pipeline().setStages([assembler, scaler])


In [64]:
pipeline_model = pipeline_obj.fit(rfm)


In [65]:
pipeline_df = pipeline_model.transform(rfm)

In [66]:
pipeline_df.limit(5).toPandas()

Unnamed: 0,master_id,Frequency,Monetary,Recency,unscaled_features,features
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,1.0,212.98,829,"[829.0, 212.98, 1.0]","[3.2479381193525994, 0.45988105764451537, 0.32..."
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,1.0,199.98,548,"[548.0, 199.98, 1.0]","[2.1470085517553974, 0.4318105639391031, 0.327..."
2,602897a6-cdac-11ea-b31f-000d3a38a36f,1.0,140.49,312,"[312.0, 140.49, 1.0]","[1.2223844309264307, 0.3033556662056436, 0.327..."
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,1.0,174.99,883,"[883.0, 174.99, 1.0]","[3.4595046554744817, 0.37785043796231454, 0.32..."
4,80664354-adf0-11eb-8f64-000d3a299ebf,2.0,283.95,27,"[27.0, 283.95, 2.0]","[0.10578326806094111, 0.6131243605886005, 0.65..."


# Görev 5: K-Means

## Adım 1: Optimum küme sayısı

In [67]:
from pyspark.ml.clustering import KMeans

In [68]:
def compute_kmeans_model(df, k):
    kmeansObject = KMeans() \
        .setSeed(142) \
        .setK(k)
    
    return kmeansObject.fit(df)

In [69]:
from pyspark.ml.evaluation import ClusteringEvaluator

In [70]:
evaluator = ClusteringEvaluator()

In [71]:
for k in range(2,11):
    kmeans_model = compute_kmeans_model(pipeline_df, k)
    
    transformed_df = kmeans_model.transform(pipeline_df)
    
    score = evaluator.evaluate(transformed_df)
    
    print("k: {}, score: {}" .format(k, score))

k: 2, score: 0.4403139929816347
k: 3, score: 0.6541986035114529
k: 4, score: 0.6705260499487318
k: 5, score: 0.4990680393948446
k: 6, score: 0.6819967508140029
k: 7, score: 0.5436509449286715
k: 8, score: 0.5483133950372348
k: 9, score: 0.5248550689192687
k: 10, score: 0.5198657792973086


## Adım 2: Final Model

In [86]:
kmeans_model = compute_kmeans_model(pipeline_df, 3)

In [87]:
transformed_df = kmeans_model.transform(pipeline_df)

In [88]:
transformed_df.limit(5).toPandas()

Unnamed: 0,master_id,Frequency,Monetary,Recency,unscaled_features,features,prediction
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,1.0,212.98,829,"[829.0, 212.98, 1.0]","[3.2479381193525994, 0.45988105764451537, 0.32...",1
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,1.0,199.98,548,"[548.0, 199.98, 1.0]","[2.1470085517553974, 0.4318105639391031, 0.327...",1
2,602897a6-cdac-11ea-b31f-000d3a38a36f,1.0,140.49,312,"[312.0, 140.49, 1.0]","[1.2223844309264307, 0.3033556662056436, 0.327...",0
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,1.0,174.99,883,"[883.0, 174.99, 1.0]","[3.4595046554744817, 0.37785043796231454, 0.32...",1
4,80664354-adf0-11eb-8f64-000d3a299ebf,2.0,283.95,27,"[27.0, 283.95, 2.0]","[0.10578326806094111, 0.6131243605886005, 0.65...",0


## Adım 3: Tanımlayıcı istatistikler

In [95]:
from pyspark.sql.functions import sum, mean, count, desc

In [97]:
transformed_df.groupBy("prediction") \
.agg(count("Monetary").alias("count"), \
     mean("Monetary").alias("AVG_Monetary"), \
     mean("Frequency").alias("AVG_Frequency"), \
     mean("Recency").alias("AVG_Recency")) \
.sort(desc("AVG_Monetary")).show()

+----------+-----+-----------------+------------------+------------------+
|prediction|count|     AVG_Monetary|     AVG_Frequency|       AVG_Recency|
+----------+-----+-----------------+------------------+------------------+
|         2| 2182|2089.541549037581|13.179193400549954|188.68744271310723|
|         0|43394|342.1480506521844| 2.233096741484998| 172.7388809512836|
|         1|54424|180.1204143392638|1.4509958841687491| 614.6668932823754|
+----------+-----+-----------------+------------------+------------------+



In [89]:
from pyspark.sql.functions import sum, col, desc, mean, count