#Prepare environment

In [1]:
!apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"
import findspark
findspark.init()

[33m0% [Working][0m            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:8 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:9 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Get:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:11 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:12 http://arch

In [2]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [3]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import datetime
import statistics as stats
import math

# Import PySpark related modules
import pyspark
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions
from pyspark.sql.functions import lit, asc, desc, col, size, array_contains,\
isnan, udf, hour, array_min, array_max, countDistinct, regexp_extract, count, when
from pyspark.sql.functions import avg, struct, sum, explode, collect_list, array_repeat,\
size, collect_set, array
from pyspark.sql.types import *

In [4]:
#Support function
def show_head(df):
  display(df.limit(5).toPandas())

def count_missings(spark_df,sort=False):
    """
    Counts number of nulls and nans in each column
    """
    df = spark_df.select([functions.count(functions.when(functions.isnan(c) | functions.isnull(c), c)).alias(c) for (c,c_type) in spark_df.dtypes if c_type not in ('timestamp',  'date')]).toPandas()

    if len(df) == 0:
        print("There are no any missing values!")
        return None

    if sort:
        return df.rename(index={0: 'count'}).T.sort_values("count",ascending=False)

    return df

In [5]:
MAX_MEMORY = '10G'
# Initialize a spark session.
conf = pyspark.SparkConf().setMaster("local[*]") \
        .set('spark.executor.heartbeatInterval', 10000) \
        .set('spark.network.timeout', 10005) \
        .set("spark.core.connection.ack.wait.timeout", "3600") \
        .set("spark.executor.memory", MAX_MEMORY) \
        .set("spark.driver.memory", MAX_MEMORY)
def init_spark(appName):
    spark = SparkSession \
        .builder \
        .appName(appName) \
        .config(conf=conf) \
        .getOrCreate()
    return spark

spark = init_spark("Preprocessing Online Retail Dataset For RFM")
spark

# Starting to preprocess data

In [6]:
#Load data
filePath = '/content/gdrive/MyDrive/LDS02_k271_NguyenMinhQuan/Problem_2_RFM_SegmetationAnalysys/OnlineRetail.csv'
df = spark.read.csv(filePath, header = True, inferSchema=True).cache()
show_head(df)

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,01-12-2010 08:26,2.55,17850,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,01-12-2010 08:26,3.39,17850,United Kingdom
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,01-12-2010 08:26,2.75,17850,United Kingdom
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,01-12-2010 08:26,3.39,17850,United Kingdom
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,01-12-2010 08:26,3.39,17850,United Kingdom


In [7]:
#overview data
df.describe().show()

+-------+------------------+------------------+--------------------+-----------------+----------------+-----------------+------------------+-----------+
|summary|         InvoiceNo|         StockCode|         Description|         Quantity|     InvoiceDate|        UnitPrice|        CustomerID|    Country|
+-------+------------------+------------------+--------------------+-----------------+----------------+-----------------+------------------+-----------+
|  count|            541909|            541909|              540455|           541909|          541909|           541909|            406829|     541909|
|   mean|  559965.752026781|27623.240210938104|             20713.0| 9.55224954743324|            null|4.611113626082965|15287.690570239585|       null|
| stddev|13428.417280799484| 16799.73762842768|                null|218.0811578502348|            null|96.75985306117944|1713.6003033215932|       null|
|    min|            536365|             10002| 4 PURPLE FLOCK D...|           -80

In [8]:
#Check missing values
count_missings(df)

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,0,0,1454,0,0,0,135080,0


In [9]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [10]:
df = df.drop_duplicates()

### Các bước xử lý
1. Chuẩn hóa dữ liệu ngày (InvoiceDate)
2. Cột InvoiceNo là kiểu String => bên trong có kỹ tự C, là đơn hàng bị hủy, bỏ các dòng này, rồi chuyển sang integer
3. UnitPrice, Quantity bị âm? (Xử lý dữ liệu sai)
3. Bỏ các dòng bị thiếu dữ liệu
4. Bỏ các cột không cần thiết như (StockCode, Description)
5. Tạo thêm feature mới là TotalPrice = Quantity*UnitPrice
6. Group theo id khách hàng và tính R F M cho từng khách hàng
7. Lưu data

In [11]:
#Correct data type
df = df.withColumn('InvoiceDate', functions.to_date(df.InvoiceDate, 'dd-mm-yyyy HH:ss'))
show_head(df)

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-01-01,2.75,17850,United Kingdom
1,536406,85123A,WHITE HANGING HEART T-LIGHT HOLDER,8,2010-01-01,2.55,17850,United Kingdom
2,536423,22619,SET OF 6 SOLDIER SKITTLES,4,2010-01-01,3.75,18085,United Kingdom
3,536446,22294,HEART FILIGREE DOVE SMALL,48,2010-01-01,1.25,15983,United Kingdom
4,536520,22241,GARLAND WOODEN HAPPY EASTER,2,2010-01-01,1.25,14729,United Kingdom


In [16]:
df.filter(~df['InvoiceNo'].rlike('^[0-9\/]+$')).show()

+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
|  C536807|    22778|  GLASS CLOCHE SMALL|      -1| 2010-01-02|     3.95|     15834|United Kingdom|
|  C536820|    21843|RED RETROSPOT CAK...|      -1| 2010-01-02|    10.95|     18168|United Kingdom|
|  C537251|    22911|PAPER CHAIN KIT L...|      -2| 2010-01-06|     2.95|      null|United Kingdom|
|  C538062|    22139|RETROSPOT TEA SET...|     -14| 2010-01-09|     4.25|     17702|United Kingdom|
|  C538083|    22633|HAND WARMER UNION...|      -4| 2010-01-09|      2.1|     15750|United Kingdom|
|  C538755|    22423|REGENCY CAKESTAND...|      -1| 2010-01-14|    12.75|     16782|United Kingdom|
|  C538897|        D|            Discount|      -1| 2010-01-15|     5.76|     16422|United Kingdom|


In [17]:
#Clean InvoiceNo
df = df.filter(df['InvoiceNo'].rlike('^[0-9\/]+$'))
df = df.withColumn(
    'InvoiceNo',
    regexp_extract('InvoiceNo', r'(\d+)', 1).cast(IntegerType())
)

df.printSchema()

root
 |-- InvoiceNo: integer (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: date (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [18]:
#Drop missing records and unused columns
df = df.drop('Description')
df = df.drop('StockCode')
df = df.dropna()

In [19]:
count_missings(df)

Unnamed: 0,InvoiceNo,Quantity,UnitPrice,CustomerID,Country
0,0,0,0,0,0


In [20]:
#Clean Quantity and UnitPrice for getting negative number
t = df.filter('Quantity <= 0 OR UnitPrice <= 0')
print(f"There are total {t.count()} invalid rows")
t.show()

There are total 40 invalid rows
+---------+--------+-----------+---------+----------+--------------+
|InvoiceNo|Quantity|InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+--------+-----------+---------+----------+--------------+
|   577696|       1| 2011-01-21|      0.0|     16406|United Kingdom|
|   575579|      24| 2011-01-10|      0.0|     13081|United Kingdom|
|   561669|      11| 2011-01-28|      0.0|     12507|         Spain|
|   539722|      10| 2010-01-21|      0.0|     14911|          EIRE|
|   571035|       1| 2011-01-13|      0.0|     12446|           RSA|
|   561284|       1| 2011-01-26|      0.0|     16818|United Kingdom|
|   572893|       5| 2011-01-26|      0.0|     18059|United Kingdom|
|   541109|       1| 2011-01-13|      0.0|     15107|United Kingdom|
|   548318|       5| 2011-01-30|      0.0|     13113|United Kingdom|
|   561916|       1| 2011-01-01|      0.0|     15581|United Kingdom|
|   562973|     240| 2011-01-11|      0.0|     14911|          EIRE|
| 

Tại sao UniPrice lại bằng không? Lỗi nhập liệu hay là do tặng quà?

Dù sao đi nữa thì cũng nên loại bỏ những dòng này đi. Vì chúng chiếm một số lượng nhỏ 40rows.



In [21]:
df = df.filter('Quantity > 0 AND UnitPrice > 0')

In [22]:
#Create new feature TotalPrice
df = df.withColumn('TotalPrice', df['Quantity'] * df['UnitPrice'])
# Drop unneeed colums
df = df.drop('Quantity')
df = df.drop('UnitPrice')
show_head(df)

Unnamed: 0,InvoiceNo,InvoiceDate,CustomerID,Country,TotalPrice
0,536365,2010-01-01,17850,United Kingdom,22.0
1,536406,2010-01-01,17850,United Kingdom,20.4
2,536423,2010-01-01,18085,United Kingdom,15.0
3,536446,2010-01-01,15983,United Kingdom,60.0
4,536520,2010-01-01,14729,United Kingdom,2.5


In [23]:
df.describe().show()

+-------+-----------------+------------------+-----------+------------------+
|summary|        InvoiceNo|        CustomerID|    Country|        TotalPrice|
+-------+-----------------+------------------+-----------+------------------+
|  count|           392692|            392692|     392692|            392692|
|   mean|560590.8750471107|15287.843864911942|       null|22.631499735161402|
| stddev|13087.06375899506| 1713.539548924817|       null| 311.0992243348035|
|    min|           536365|             12346|  Australia|             0.001|
|    max|           581587|             18287|Unspecified|          168469.6|
+-------+-----------------+------------------+-----------+------------------+



In [24]:
df.filter('TotalPrice <= 0').show()

+---------+-----------+----------+-------+----------+
|InvoiceNo|InvoiceDate|CustomerID|Country|TotalPrice|
+---------+-----------+----------+-------+----------+
+---------+-----------+----------+-------+----------+



#### Group customer and caculate R F M


In [25]:
#Get max_date
max_date = df.groupby('InvoiceDate').max().collect()[0][0]
max_date

datetime.date(2011, 1, 30)

In [26]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

@pandas_udf(IntegerType(), functionType=PandasUDFType.GROUPED_AGG)
def recency(date_):
    diff = (max_date - date_.max()).days
    return diff if diff >= 0 else 0

@pandas_udf(IntegerType(), functionType=PandasUDFType.GROUPED_AGG)                                        
def frequency(ino):                                        
    return len(ino.unique())

@pandas_udf(FloatType(), functionType=PandasUDFType.GROUPED_AGG)                                        
def monetary(total):                                        
    return round(np.sum(total),2)


@pandas_udf(StringType(), functionType=PandasUDFType.GROUPED_AGG)                                        
def get_country(country):                                        
    return country.iloc[0]



In [27]:
#Group by each customer and apply R F M udf

rfm = df.groupby('CustomerID').agg(
 recency('InvoiceDate'),
 frequency('InvoiceNo'),
 monetary('TotalPrice'),
 get_country('Country')
)

show_head(rfm)

Unnamed: 0,CustomerID,recency(InvoiceDate),frequency(InvoiceNo),monetary(TotalPrice),get_country(Country)
0,12940,14,2,899.690002,United Kingdom
1,13285,3,4,2709.120117,United Kingdom
2,13623,13,5,727.73999,United Kingdom
3,13832,10,1,52.200001,United Kingdom
4,14450,7,3,483.25,United Kingdom


In [28]:
df.where(df['CustomerID'] == 13832).show()

+---------+-----------+----------+--------------+------------------+
|InvoiceNo|InvoiceDate|CustomerID|       Country|        TotalPrice|
+---------+-----------+----------+--------------+------------------+
|   577556| 2011-01-20|     13832|United Kingdom|              17.4|
|   577556| 2011-01-20|     13832|United Kingdom|19.799999999999997|
|   577556| 2011-01-20|     13832|United Kingdom|              15.0|
+---------+-----------+----------+--------------+------------------+



In [29]:
rfm.describe().show()

+-------+------------------+--------------------+--------------------+--------------------+--------------------+
|summary|        CustomerID|recency(InvoiceDate)|frequency(InvoiceNo)|monetary(TotalPrice)|get_country(Country)|
+-------+------------------+--------------------+--------------------+--------------------+--------------------+
|  count|              4338|                4338|                4338|                4338|                4338|
|   mean|15300.408022130014|  19.534347625633934|   4.272014753342554|  2048.6880853099174|                null|
| stddev|1721.8084917653177|  62.180747235543315|   7.697997826095036|   8985.230337669407|                null|
|    min|             12346|                   0|                   1|                3.75|           Australia|
|    max|             18287|                 394|                 209|           280206.03|         Unspecified|
+-------+------------------+--------------------+--------------------+--------------------+-----

In [30]:
rfm.count()

4338

In [31]:
#Save data
rootPath = '/content/gdrive/MyDrive/LDS02_k271_NguyenMinhQuan/Problem_2_RFM_SegmetationAnalysys'
data = rfm.toPandas()
data.columns = ['customer', 'recency', 'frequency', 'monetary', 'country']

data.head()

Unnamed: 0,customer,recency,frequency,monetary,country
0,12940,14,2,899.690002,United Kingdom
1,13285,3,4,2709.120117,United Kingdom
2,13623,13,5,727.73999,United Kingdom
3,13832,10,1,52.200001,United Kingdom
4,14450,7,3,483.25,United Kingdom


In [32]:
data.to_csv(rootPath + '/RFM_OnlineRetail.csv')

In [33]:
#Done the job close the session
spark.stop()