In [1]:
import pyspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession,Row
from pyspark.sql.functions import max
from pyspark.sql.functions import col, lit
from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast
from pyspark.sql.types import StructType, StructField, StringType

In [2]:
# Creating the SparkSession

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Case-Study-Data-Processing") \
    .getOrCreate()

In [3]:
# Read the Csv file

fact_data = spark.read.format("csv").option("header", "true").load("C:\\BigData\\use-case-data-processing-main\\fact.csv")
lookup_data = spark.read.format("csv").option("header", "true").load("C:\\BigData\\use-case-data-processing-main\\lookup.csv")

In [4]:
# Doing BroadCast Join As One Big Data Set and One Small DatSet is present

join_data = fact_data.join(broadcast(lookup_data), fact_data.WEB_PAGEID == lookup_data.WEB_PAGEID, "outer").drop(lookup_data.WEB_PAGEID)
join_data.show(5,False)
join_data.printSchema()
join_data.count()

+---------+----------------+----------+------------+
|USER_ID  |VIEW_TIME       |WEB_PAGEID|WEBPAGE_TYPE|
+---------+----------------+----------+------------+
|149921285|10/02/2016 15:03|3739328152|movies      |
|null     |null            |null      |news        |
|null     |null            |null      |news        |
|51792571 |16/02/2016 12:07|3771917324|news        |
|51753953 |16/02/2016 12:07|3771917324|news        |
+---------+----------------+----------+------------+
only showing top 5 rows

root
 |-- USER_ID: string (nullable = true)
 |-- VIEW_TIME: string (nullable = true)
 |-- WEB_PAGEID: string (nullable = true)
 |-- WEBPAGE_TYPE: string (nullable = true)



1100

In [5]:
# join_df = fact_data.join(lookup_data, fact_data.WEB_PAGEID ==  lookup_data.WEB_PAGEID, "outer").drop(fact_data.WEB_PAGEID) 
# join_df.show(5,truncate=False)
# join_df.count()

In [6]:
# Convering View_TIME format to 'DD/MM/YYYY' in Column(Data_S)

new_data = join_data.withColumn('Date_S',from_unixtime(unix_timestamp(join_data["VIEW_TIME"],'dd/MM/yyyy mm:ss'),'dd/MM/yyyy'))
new_data.show(5,False)
new_data.printSchema()

+---------+----------------+----------+------------+----------+
|USER_ID  |VIEW_TIME       |WEB_PAGEID|WEBPAGE_TYPE|Date_S    |
+---------+----------------+----------+------------+----------+
|149921285|10/02/2016 15:03|3739328152|movies      |10/02/2016|
|null     |null            |null      |news        |null      |
|null     |null            |null      |news        |null      |
|51792571 |16/02/2016 12:07|3771917324|news        |16/02/2016|
|51753953 |16/02/2016 12:07|3771917324|news        |16/02/2016|
+---------+----------------+----------+------------+----------+
only showing top 5 rows

root
 |-- USER_ID: string (nullable = true)
 |-- VIEW_TIME: string (nullable = true)
 |-- WEB_PAGEID: string (nullable = true)
 |-- WEBPAGE_TYPE: string (nullable = true)
 |-- Date_S: string (nullable = true)



In [7]:
# Convering Data_S from String datatype to Date formatin Date field.

data = new_data.withColumn('Date_S',from_unixtime(unix_timestamp(new_data["VIEW_TIME"],'dd/MM/yyyy mm:ss'),'dd/MM/yyyy')) \
.select(col("USER_ID"),("VIEW_TIME"),("WEB_PAGEID"),("WEBPAGE_TYPE"),to_date(col("Date_S"),'dd/MM/yyyy').alias("Date")) 

data.show(5,False)
data.printSchema()

+---------+----------------+----------+------------+----------+
|USER_ID  |VIEW_TIME       |WEB_PAGEID|WEBPAGE_TYPE|Date      |
+---------+----------------+----------+------------+----------+
|149921285|10/02/2016 15:03|3739328152|movies      |2016-02-10|
|null     |null            |null      |news        |null      |
|null     |null            |null      |news        |null      |
|51792571 |16/02/2016 12:07|3771917324|news        |2016-02-16|
|51753953 |16/02/2016 12:07|3771917324|news        |2016-02-16|
+---------+----------------+----------+------------+----------+
only showing top 5 rows

root
 |-- USER_ID: string (nullable = true)
 |-- VIEW_TIME: string (nullable = true)
 |-- WEB_PAGEID: string (nullable = true)
 |-- WEBPAGE_TYPE: string (nullable = true)
 |-- Date: date (nullable = true)



In [8]:
# Calculate difference in days between 2011-12-31 and the Invoice Date

data = data.withColumn("RecencyDays", expr("datediff('2019-10-12', Date)"))

data.show(5,False)

+---------+----------------+----------+------------+----------+-----------+
|USER_ID  |VIEW_TIME       |WEB_PAGEID|WEBPAGE_TYPE|Date      |RecencyDays|
+---------+----------------+----------+------------+----------+-----------+
|149921285|10/02/2016 15:03|3739328152|movies      |2016-02-10|1340       |
|null     |null            |null      |news        |null      |null       |
|null     |null            |null      |news        |null      |null       |
|51792571 |16/02/2016 12:07|3771917324|news        |2016-02-16|1334       |
|51753953 |16/02/2016 12:07|3771917324|news        |2016-02-16|1334       |
+---------+----------------+----------+------------+----------+-----------+
only showing top 5 rows



In [9]:
data.select('USER_ID').distinct().count()

800

In [10]:
# Create a TempView:

data.createOrReplaceTempView("records")

In [11]:
# pageview_news_rec_365

pageview_news_rec_365 = spark.sql("select count(WEB_PAGEID) as pageview_news_rec_365 from records where WEBPAGE_TYPE = 'news' and RecencyDays < '365'")
pageview_news_rec_365.show(10)

+---------------------+
|pageview_news_rec_365|
+---------------------+
|                   65|
+---------------------+



In [12]:
# pageview_news_rec_365

news_365 = spark.sql("select USER_ID,count(*) as pageview_news_rec_365 from records where WEBPAGE_TYPE = 'news' and RecencyDays < '365' group by USER_ID")
news_365.show(11)
news_365.count()

+----------+---------------------+
|   USER_ID|pageview_news_rec_365|
+----------+---------------------+
| -69271739|                    1|
|-133621877|                    1|
| 157684212|                    1|
|-285299518|                    1|
|-277209141|                    3|
|-151394098|                    3|
| 250009214|                    1|
| -73385561|                    1|
|  78585170|                    1|
|-163187677|                    3|
|-167528374|                   49|
+----------+---------------------+



11

In [13]:
# pageview_news_rec_730

pageview_news_rec_365 = spark.sql("select count(WEB_PAGEID) as pageview_news_rec_730 from records where WEBPAGE_TYPE = 'news' and RecencyDays > '365' and RecencyDays < '730'")
pageview_news_rec_365.show(10)

+---------------------+
|pageview_news_rec_730|
+---------------------+
|                   35|
+---------------------+



In [14]:

news_730 = spark.sql("select USER_ID,count(*) as pageview_news_rec_730 from records where WEBPAGE_TYPE = 'news' and RecencyDays > '365' and RecencyDays < '730' group by USER_ID")
news_730.show(100)
news_730.count()

+----------+---------------------+
|   USER_ID|pageview_news_rec_730|
+----------+---------------------+
|-247361411|                    3|
|-231838278|                    1|
|-252290668|                    2|
| 155111993|                    1|
|  24160260|                    2|
|-166177270|                    2|
|   9007149|                    1|
|  80169642|                    1|
|-263669125|                    1|
|  61732038|                    4|
|-251309617|                    1|
| 253384086|                    1|
|   1870160|                    1|
| 273780833|                    2|
|  48489723|                    2|
|    951885|                    1|
|   5864422|                    1|
|-244059746|                    1|
|-253796619|                    1|
| 208233177|                    1|
|-216136513|                    2|
| 166709198|                    1|
| 104684497|                    1|
|-175376335|                    1|
+----------+---------------------+



24

In [15]:
# Join news_365 vs news_730
join_df = news_730.join(news_365, news_730.USER_ID ==  news_365.USER_ID, "right").drop(news_730.USER_ID) 
join_df.show(500,truncate=False)
join_df.count()

+---------------------+----------+---------------------+
|pageview_news_rec_730|USER_ID   |pageview_news_rec_365|
+---------------------+----------+---------------------+
|null                 |-69271739 |1                    |
|null                 |-133621877|1                    |
|null                 |157684212 |1                    |
|null                 |-285299518|1                    |
|null                 |-277209141|3                    |
|null                 |-151394098|3                    |
|null                 |250009214 |1                    |
|null                 |-73385561 |1                    |
|null                 |78585170  |1                    |
|null                 |-163187677|3                    |
|null                 |-167528374|49                   |
+---------------------+----------+---------------------+



11

In [16]:
# pageview_news_rec_1460

pageview_news_rec_1460 = spark.sql("select count(WEB_PAGEID) as pageview_news_rec_1460 from records where WEBPAGE_TYPE = 'news' and RecencyDays > '730' and RecencyDays < '1460'")
pageview_news_rec_1460.show(10)

+----------------------+
|pageview_news_rec_1460|
+----------------------+
|                   874|
+----------------------+



In [17]:
news_1460 = spark.sql("select USER_ID,count(*) as pageview_news_rec_1460 from records where WEBPAGE_TYPE = 'news' and RecencyDays > '730' and RecencyDays < '1460' group by USER_ID")
news_1460.show(10)
news_1460.count()

+---------+----------------------+
|  USER_ID|pageview_news_rec_1460|
+---------+----------------------+
| 98343030|                     3|
| 59736608|                     2|
|115056661|                     1|
|162690170|                     1|
|169469168|                     1|
|167725829|                     1|
|264289612|                     2|
|191712149|                     1|
| 66612057|                     1|
|   788539|                     1|
+---------+----------------------+
only showing top 10 rows



746

In [18]:
# Join news_730 vs news_1460
join_df = news_730.join(news_1460, news_730.USER_ID ==  news_1460.USER_ID, "outer").drop(news_730.USER_ID) 
join_df.show(5,truncate=False)
join_df.count()

+---------------------+---------+----------------------+
|pageview_news_rec_730|USER_ID  |pageview_news_rec_1460|
+---------------------+---------+----------------------+
|null                 |98343030 |3                     |
|null                 |115056661|1                     |
|null                 |59736608 |2                     |
|null                 |162690170|1                     |
|null                 |167725829|1                     |
+---------------------+---------+----------------------+
only showing top 5 rows



770

In [19]:
rfm_table = data.groupBy("USER_ID")\
                        .agg(min("RecencyDays").alias("Recency"), \
                             count("WEB_PAGEID").alias("Frequency"))

rfm_table.show()

+----------+-------+---------+
|   USER_ID|Recency|Frequency|
+----------+-------+---------+
|  98343030|    932|        3|
|  59736608|    914|        2|
| 115056661|   1336|        1|
|  88602347|   1259|        1|
| 167725829|   1280|        1|
| 162690170|   1263|        1|
| 169469168|   1263|        1|
|-247361411|    705|        3|
| 159147772|   1308|        2|
| 264289612|   1009|        2|
| 191712149|   1263|        1|
|  66612057|   1263|        1|
|   8856308|   1282|        1|
| 171211230|   1262|        1|
| 161303572|   1263|        1|
|    788539|   1170|        1|
|  78741118|   1349|        2|
|  55620801|   1313|        1|
| 159348322|   1262|        1|
|  96024291|   1361|        1|
+----------+-------+---------+
only showing top 20 rows



In [20]:
r_quartile = rfm_table.approxQuantile("Recency", [0.25, 0.5, 0.75], 0)
f_quartile = rfm_table.approxQuantile("Frequency", [0.25, 0.5, 0.75], 0)

rfm_table = rfm_table.withColumn("R_Quartile", \
                                 when(col("Recency") >= r_quartile[2] , 1).\
                                 when(col("Recency") >= r_quartile[1] , 2).\
                                 when(col("Recency") >= r_quartile[0] , 3).\
                                 otherwise(4))

rfm_table = rfm_table.withColumn("F_Quartile", \
                                 when(col("Frequency") > f_quartile[2] , 4).\
                                 when(col("Frequency") > f_quartile[1] , 3).\
                                 when(col("Frequency") > f_quartile[0] , 2).\
                                 otherwise(1))

rfm_table = rfm_table.withColumn("RFM_Score", concat(col("R_Quartile"), col("F_Quartile")))

rfm_table.show()

+----------+-------+---------+----------+----------+---------+
|   USER_ID|Recency|Frequency|R_Quartile|F_Quartile|RFM_Score|
+----------+-------+---------+----------+----------+---------+
|  98343030|    932|        3|         4|         4|       44|
|  59736608|    914|        2|         4|         4|       44|
| 115056661|   1336|        1|         1|         1|       11|
|  88602347|   1259|        1|         3|         1|       31|
| 167725829|   1280|        1|         2|         1|       21|
| 162690170|   1263|        1|         2|         1|       21|
| 169469168|   1263|        1|         2|         1|       21|
|-247361411|    705|        3|         4|         4|       44|
| 159147772|   1308|        2|         2|         4|       24|
| 264289612|   1009|        2|         4|         4|       44|
| 191712149|   1263|        1|         2|         1|       21|
|  66612057|   1263|        1|         2|         1|       21|
|   8856308|   1282|        1|         2|         1|   