# Starbucks Customer Segmentation

# Importing libraries

In [284]:
pip install pyspark



In [285]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import *
import time
from pyspark import SparkFiles

In [286]:
spark = SparkSession.builder.\
master("local").\
appName("Word Count").\
config("spark.some.config.option", "some-value").\
getOrCreate()

# Portfolio

In [287]:
url1=("https://raw.githubusercontent.com/seifip/starbucks-customer-segmentation/master/data/portfolio.json")
spark.sparkContext.addFile(url1)
portfolio = spark.read.option("inferSchema", "true").json(SparkFiles.get("portfolio.json"))

In [288]:
portfolio

DataFrame[channels: array<string>, difficulty: bigint, duration: double, id: string, offer_type: string, reward: bigint]

In [289]:
portfolio.printSchema()

root
 |-- channels: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- difficulty: long (nullable = true)
 |-- duration: double (nullable = true)
 |-- id: string (nullable = true)
 |-- offer_type: string (nullable = true)
 |-- reward: long (nullable = true)



In [290]:
portfolio.show(5)

+--------------------+----------+--------+--------------------+-------------+------+
|            channels|difficulty|duration|                  id|   offer_type|reward|
+--------------------+----------+--------+--------------------+-------------+------+
|[email, mobile, s...|        10|     7.0|ae264e3637204a6fb...|         bogo|    10|
|[web, email, mobi...|        10|     5.0|4d5c57ea9a6940dd8...|         bogo|    10|
|[web, email, mobile]|         0|     4.0|3f207df678b143eea...|informational|     0|
|[web, email, mobile]|         5|     7.0|9b98b8c7a33c4b65b...|         bogo|     5|
|        [web, email]|        20|    10.0|0b1e1539f2cc45b7b...|     discount|     5|
+--------------------+----------+--------+--------------------+-------------+------+
only showing top 5 rows



In [291]:
portfolio.count()

10

# Profile

In [292]:
url2=("https://raw.githubusercontent.com/seifip/starbucks-customer-segmentation/master/data/profile.json")
spark.sparkContext.addFile(url2)
profile = spark.read.option("inferSchema", "true").json(SparkFiles.get("profile.json"))

In [293]:
profile

DataFrame[age: bigint, became_member_on: string, gender: string, id: string, income: bigint]

In [294]:
profile.printSchema()

root
 |-- age: long (nullable = true)
 |-- became_member_on: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- id: string (nullable = true)
 |-- income: long (nullable = true)



In [295]:
profile.show(5)

+---+----------------+------+--------------------+------+
|age|became_member_on|gender|                  id|income|
+---+----------------+------+--------------------+------+
|118|        20170212|  null|68be06ca386d4c319...|  null|
| 55|        20170715|     F|0610b486422d4921a...|112000|
|118|        20180712|  null|38fe809add3b4fcf9...|  null|
| 75|        20170509|     F|78afa995795e4d85b...|100000|
|118|        20170804|  null|a03223e636434f42a...|  null|
+---+----------------+------+--------------------+------+
only showing top 5 rows



In [296]:
profile.count()

17000

# Transcript

In [297]:
url3=("https://raw.githubusercontent.com/seifip/starbucks-customer-segmentation/master/data/transcript.json")
spark.sparkContext.addFile(url3)
transcript = spark.read.option("inferSchema", "true").json(SparkFiles.get("transcript.json"))

In [298]:
transcript

DataFrame[event: string, person: string, time: bigint, value: struct<amount:double,offer id:string,offer_id:string,reward:bigint>]

In [299]:
transcript.printSchema()

root
 |-- event: string (nullable = true)
 |-- person: string (nullable = true)
 |-- time: long (nullable = true)
 |-- value: struct (nullable = true)
 |    |-- amount: double (nullable = true)
 |    |-- offer id: string (nullable = true)
 |    |-- offer_id: string (nullable = true)
 |    |-- reward: long (nullable = true)



In [300]:
transcript.show(5)

+--------------+--------------------+----+--------------------+
|         event|              person|time|               value|
+--------------+--------------------+----+--------------------+
|offer received|78afa995795e4d85b...|   0|[, 9b98b8c7a33c4b...|
|offer received|a03223e636434f42a...|   0|[, 0b1e1539f2cc45...|
|offer received|e2127556f4f64592b...|   0|[, 2906b810c7d441...|
|offer received|8ec6ce2a7e7949b1b...|   0|[, fafdcd668e3743...|
|offer received|68617ca6246f4fbc8...|   0|[, 4d5c57ea9a6940...|
+--------------+--------------------+----+--------------------+
only showing top 5 rows



In [301]:
transcript.count()

306534

In [302]:
type(transcript)

pyspark.sql.dataframe.DataFrame

In [303]:
portfolio.show()

+--------------------+----------+--------+--------------------+-------------+------+
|            channels|difficulty|duration|                  id|   offer_type|reward|
+--------------------+----------+--------+--------------------+-------------+------+
|[email, mobile, s...|        10|     7.0|ae264e3637204a6fb...|         bogo|    10|
|[web, email, mobi...|        10|     5.0|4d5c57ea9a6940dd8...|         bogo|    10|
|[web, email, mobile]|         0|     4.0|3f207df678b143eea...|informational|     0|
|[web, email, mobile]|         5|     7.0|9b98b8c7a33c4b65b...|         bogo|     5|
|        [web, email]|        20|    10.0|0b1e1539f2cc45b7b...|     discount|     5|
|[web, email, mobi...|         7|     7.0|2298d6c36e964ae4a...|     discount|     3|
|[web, email, mobi...|        10|    10.0|fafdcd668e3743c1b...|     discount|     2|
|[email, mobile, s...|         0|     3.0|5a8bc65990b245e5a...|informational|     0|
|[web, email, mobi...|         5|     5.0|f19421c1d4aa40978...|  

In [304]:
profile.show()

+---+----------------+------+--------------------+------+
|age|became_member_on|gender|                  id|income|
+---+----------------+------+--------------------+------+
|118|        20170212|  null|68be06ca386d4c319...|  null|
| 55|        20170715|     F|0610b486422d4921a...|112000|
|118|        20180712|  null|38fe809add3b4fcf9...|  null|
| 75|        20170509|     F|78afa995795e4d85b...|100000|
|118|        20170804|  null|a03223e636434f42a...|  null|
| 68|        20180426|     M|e2127556f4f64592b...| 70000|
|118|        20170925|  null|8ec6ce2a7e7949b1b...|  null|
|118|        20171002|  null|68617ca6246f4fbc8...|  null|
| 65|        20180209|     M|389bc3fa690240e79...| 53000|
|118|        20161122|  null|8974fc5686fe429db...|  null|
|118|        20170824|  null|c4863c7985cf408fa...|  null|
|118|        20150919|  null|148adfcaa27d485b8...|  null|
| 58|        20171111|     M|2eeac8d8feae4a8ca...| 51000|
| 61|        20170911|     F|aa4862eba776480b8...| 57000|
| 26|        2

In [305]:
transcript.show()

+--------------+--------------------+----+--------------------+
|         event|              person|time|               value|
+--------------+--------------------+----+--------------------+
|offer received|78afa995795e4d85b...|   0|[, 9b98b8c7a33c4b...|
|offer received|a03223e636434f42a...|   0|[, 0b1e1539f2cc45...|
|offer received|e2127556f4f64592b...|   0|[, 2906b810c7d441...|
|offer received|8ec6ce2a7e7949b1b...|   0|[, fafdcd668e3743...|
|offer received|68617ca6246f4fbc8...|   0|[, 4d5c57ea9a6940...|
|offer received|389bc3fa690240e79...|   0|[, f19421c1d4aa40...|
|offer received|c4863c7985cf408fa...|   0|[, 2298d6c36e964a...|
|offer received|2eeac8d8feae4a8ca...|   0|[, 3f207df678b143...|
|offer received|aa4862eba776480b8...|   0|[, 0b1e1539f2cc45...|
|offer received|31dda685af34476ca...|   0|[, 0b1e1539f2cc45...|
|offer received|744d603ef08c4f33a...|   0|[, 0b1e1539f2cc45...|
|offer received|3d02345581554e81b...|   0|[, 0b1e1539f2cc45...|
|offer received|4b0da7e80e5945209...|   

# Data Wrangling

In [306]:
# Removing duplicate rows for all the datasets
profile = profile.distinct()
portfolio = portfolio.distinct()
transcript = transcript.distinct()

### Profile :

In [307]:
# Checking Null values are present or not for all the columns
profile.select("age").filter("age is null").show(1)
profile.select("became_member_on").filter("became_member_on is null").show(1)
profile.select("gender").filter("gender is null").show(1)
profile.select("id").filter("id is null").show(1)
profile.select("income").filter("income is null").show(1)

+---+
|age|
+---+
+---+

+----------------+
|became_member_on|
+----------------+
+----------------+

+------+
|gender|
+------+
|  null|
+------+
only showing top 1 row

+---+
| id|
+---+
+---+

+------+
|income|
+------+
|  null|
+------+
only showing top 1 row



In [308]:
# working on null values

# filling null values of gender and became_member_on columns with NA
profile = profile.na.fill({'gender': 'NA','became_member_on':'NA'})

# filling income column's null values with the mean or average value of income column
from pyspark.sql.functions import mean
mean_val = profile.select(mean(profile.income)).collect()
mean_val
mean_income = mean_val[0][0]
#now using mean_income value to fill the nulls in income column
profile = profile.na.fill(mean_income,subset=['income'])

# filling age column's null values with the mean or average value of age column
mean_values = profile.select(mean(profile.age)).collect()
mean_values
mean_age = mean_values[0][0]
#now using mean_age value to fill the nulls in age column
profile = profile.na.fill(mean_age,subset=['age'])

# removing rows where null is present in id column (primary key)
profile = profile.filter("id is not null")
profile.show()


+---+----------------+------+--------------------+------+
|age|became_member_on|gender|                  id|income|
+---+----------------+------+--------------------+------+
| 58|        20161231|     F|73c157a4b8e342fcb...| 95000|
| 57|        20180429|     M|717a423a31284ce1b...| 41000|
|118|        20150823|    NA|59a12422f83746789...| 65404|
|118|        20151210|    NA|149f67791ca9431bb...| 65404|
| 23|        20140719|     M|0c0af42e1e804ded9...| 65000|
| 51|        20170501|     F|b87a63894b164eae8...| 91000|
| 55|        20160611|     F|992aed77ecda4f518...| 58000|
| 36|        20150305|     M|ed53076edc1d445ca...| 37000|
|100|        20170905|     F|28bbebb2b76f4057b...|118000|
| 68|        20170208|     F|77a05cdcda3e431eb...| 88000|
|118|        20150812|    NA|3f8b8ce1768d42758...| 65404|
| 75|        20160108|     M|ec71b22a0ce543188...| 80000|
| 72|        20171027|     M|13d9215510ae4f508...| 67000|
|118|        20170608|    NA|41aaf597e9524875a...| 65404|
| 66|        2

In [309]:
# dropping rows having age > 100
profile = profile.filter("age <= 75")
profile.show()

print("unique genders are :")
profile.select("gender").distinct().show()

+---+----------------+------+--------------------+------+
|age|became_member_on|gender|                  id|income|
+---+----------------+------+--------------------+------+
| 58|        20161231|     F|73c157a4b8e342fcb...| 95000|
| 57|        20180429|     M|717a423a31284ce1b...| 41000|
| 23|        20140719|     M|0c0af42e1e804ded9...| 65000|
| 51|        20170501|     F|b87a63894b164eae8...| 91000|
| 55|        20160611|     F|992aed77ecda4f518...| 58000|
| 36|        20150305|     M|ed53076edc1d445ca...| 37000|
| 68|        20170208|     F|77a05cdcda3e431eb...| 88000|
| 75|        20160108|     M|ec71b22a0ce543188...| 80000|
| 72|        20171027|     M|13d9215510ae4f508...| 67000|
| 66|        20171010|     M|09a6798f587a404c8...| 54000|
| 52|        20171003|     M|2bd6a1c9019240538...| 90000|
| 45|        20180314|     M|d21a703697804c059...| 49000|
| 74|        20171202|     M|4fbb451301b54298a...| 67000|
| 73|        20170609|     M|d310ea670c7149a1b...| 67000|
| 43|        2

In [310]:
# converting str to date format for "became_member_on" column
profile = profile.withColumn("became_member_on", coalesce(to_date("became_member_on", "yyyyMMdd")))
profile.show()
profile.printSchema()

+---+----------------+------+--------------------+------+
|age|became_member_on|gender|                  id|income|
+---+----------------+------+--------------------+------+
| 58|      2016-12-31|     F|73c157a4b8e342fcb...| 95000|
| 57|      2018-04-29|     M|717a423a31284ce1b...| 41000|
| 23|      2014-07-19|     M|0c0af42e1e804ded9...| 65000|
| 51|      2017-05-01|     F|b87a63894b164eae8...| 91000|
| 55|      2016-06-11|     F|992aed77ecda4f518...| 58000|
| 36|      2015-03-05|     M|ed53076edc1d445ca...| 37000|
| 68|      2017-02-08|     F|77a05cdcda3e431eb...| 88000|
| 75|      2016-01-08|     M|ec71b22a0ce543188...| 80000|
| 72|      2017-10-27|     M|13d9215510ae4f508...| 67000|
| 66|      2017-10-10|     M|09a6798f587a404c8...| 54000|
| 52|      2017-10-03|     M|2bd6a1c9019240538...| 90000|
| 45|      2018-03-14|     M|d21a703697804c059...| 49000|
| 74|      2017-12-02|     M|4fbb451301b54298a...| 67000|
| 73|      2017-06-09|     M|d310ea670c7149a1b...| 67000|
| 43|      201

In [311]:
print('Descriptive stats for age and income:')
profile.describe().show()

Descriptive stats for age and income:
+-------+------------------+------+--------------------+-----------------+
|summary|               age|gender|                  id|           income|
+-------+------------------+------+--------------------+-----------------+
|  count|             13175| 13175|               13175|            13175|
|   mean| 50.78998102466793|  null|2.565638242424101E31|64636.81214421253|
| stddev|14.804504804077265|  null|                 NaN|21384.17799749065|
|    min|                18|     F|0009655768c64bdeb...|            30000|
|    max|                75|     O|ffff82501cea40309...|           120000|
+-------+------------------+------+--------------------+-----------------+



In [312]:
profile.show()

+---+----------------+------+--------------------+------+
|age|became_member_on|gender|                  id|income|
+---+----------------+------+--------------------+------+
| 58|      2016-12-31|     F|73c157a4b8e342fcb...| 95000|
| 57|      2018-04-29|     M|717a423a31284ce1b...| 41000|
| 23|      2014-07-19|     M|0c0af42e1e804ded9...| 65000|
| 51|      2017-05-01|     F|b87a63894b164eae8...| 91000|
| 55|      2016-06-11|     F|992aed77ecda4f518...| 58000|
| 36|      2015-03-05|     M|ed53076edc1d445ca...| 37000|
| 68|      2017-02-08|     F|77a05cdcda3e431eb...| 88000|
| 75|      2016-01-08|     M|ec71b22a0ce543188...| 80000|
| 72|      2017-10-27|     M|13d9215510ae4f508...| 67000|
| 66|      2017-10-10|     M|09a6798f587a404c8...| 54000|
| 52|      2017-10-03|     M|2bd6a1c9019240538...| 90000|
| 45|      2018-03-14|     M|d21a703697804c059...| 49000|
| 74|      2017-12-02|     M|4fbb451301b54298a...| 67000|
| 73|      2017-06-09|     M|d310ea670c7149a1b...| 67000|
| 43|      201

In [313]:
# Checking again Null values are present or not for all the columns
profile.select("age").filter("age is null").show(1)
profile.select("became_member_on").filter("became_member_on is null").show(1)
profile.select("gender").filter("gender is null").show(1)
profile.select("id").filter("id is null").show(1)
profile.select("income").filter("income is null").show(1)

+---+
|age|
+---+
+---+

+----------------+
|became_member_on|
+----------------+
+----------------+

+------+
|gender|
+------+
+------+

+---+
| id|
+---+
+---+

+------+
|income|
+------+
+------+



### Transcript :

In [314]:
transcript.show()

+--------------+--------------------+----+--------------------+
|         event|              person|time|               value|
+--------------+--------------------+----+--------------------+
|offer received|bf2c086d4c4049289...|   0|[, 9b98b8c7a33c4b...|
|offer received|27aa749a6f5f448e9...|   0|[, 5a8bc65990b245...|
|offer received|b94c7601b17b41609...|   0|[, ae264e3637204a...|
|offer received|696493b9f616411a8...|   0|[, fafdcd668e3743...|
|offer received|56163abfe5a848deb...|   0|[, f19421c1d4aa40...|
|offer received|abf29a94ba3d46488...|   0|[, 5a8bc65990b245...|
|offer received|23264960b6724afea...|   0|[, 2298d6c36e964a...|
|offer received|7fc12dee8c9144f3b...|   0|[, fafdcd668e3743...|
|offer received|cf8dc5cf3dc84f648...|   0|[, 5a8bc65990b245...|
|offer received|269424f345f6478e8...|   0|[, 2906b810c7d441...|
|offer received|fcbf38029321416f9...|   0|[, fafdcd668e3743...|
|offer received|2f21db46b5f84be5b...|   0|[, ae264e3637204a...|
|offer received|e5c59811346840e2a...|   

In [315]:
# Checking null values are present or not for all the common columns
transcript.select("event").filter("event is null").show(1)
transcript.select("person").filter("person is null").show(1)
transcript.select("time").filter("time is null").show(1)

+-----+
|event|
+-----+
+-----+

+------+
|person|
+------+
+------+

+----+
|time|
+----+
+----+



In [316]:
# working on null values for all the common columns

# filling null values for event and time columns
transcript = transcript.na.fill({'event': 'NA','time': 0})

# removing rows where null is present in person column (primary key)
transcript = transcript.filter("person is not null")

transcript.show()

+--------------+--------------------+----+--------------------+
|         event|              person|time|               value|
+--------------+--------------------+----+--------------------+
|offer received|bf2c086d4c4049289...|   0|[, 9b98b8c7a33c4b...|
|offer received|27aa749a6f5f448e9...|   0|[, 5a8bc65990b245...|
|offer received|b94c7601b17b41609...|   0|[, ae264e3637204a...|
|offer received|696493b9f616411a8...|   0|[, fafdcd668e3743...|
|offer received|56163abfe5a848deb...|   0|[, f19421c1d4aa40...|
|offer received|abf29a94ba3d46488...|   0|[, 5a8bc65990b245...|
|offer received|23264960b6724afea...|   0|[, 2298d6c36e964a...|
|offer received|7fc12dee8c9144f3b...|   0|[, fafdcd668e3743...|
|offer received|cf8dc5cf3dc84f648...|   0|[, 5a8bc65990b245...|
|offer received|269424f345f6478e8...|   0|[, 2906b810c7d441...|
|offer received|fcbf38029321416f9...|   0|[, fafdcd668e3743...|
|offer received|2f21db46b5f84be5b...|   0|[, ae264e3637204a...|
|offer received|e5c59811346840e2a...|   

In [317]:
# Checking again null values are present or not for all the common columns
transcript.select("event").filter("event is null").show(1)
transcript.select("person").filter("person is null").show(1)
transcript.select("time").filter("time is null").show(1)

+-----+
|event|
+-----+
+-----+

+------+
|person|
+------+
+------+

+----+
|time|
+----+
+----+



In [318]:
print('Unique event types:')
transcript.select('event').distinct().show()

Unique event types:
+---------------+
|          event|
+---------------+
|    transaction|
| offer received|
|offer completed|
|   offer viewed|
+---------------+



### Splitting transactions and offers dataframes

In [319]:
# making transactions dataframe
transactions = transcript.filter("event == 'transaction' ")
transactions = transactions.withColumn("trans_amount",coalesce("value.amount"))
transactions = transactions.drop('value')
transactions.show()

# making offers dataframe
offers = transcript.filter("event != 'transaction' ")
offers = offers.withColumn("offer_id",coalesce("value.offer_id", "value.offer id"))
offers = offers.drop('value')
offers.show()


+-----------+--------------------+----+------------+
|      event|              person|time|trans_amount|
+-----------+--------------------+----+------------+
|transaction|7a1eda9ab57049068...|   0|        6.13|
|transaction|2fb4578848f34ce4b...|   0|        21.4|
|transaction|6851449a9192478d8...|   0|         2.3|
|transaction|d5e320154bed47159...|   0|        7.83|
|transaction|461e13a14a074077b...|   6|        7.94|
|transaction|6e0b0d6db74942b49...|   6|        8.12|
|transaction|8d59a5826cc547f58...|   6|       23.77|
|transaction|ae56c4d76ac84a639...|   6|       27.23|
|transaction|1575f73eb0274ab6b...|   6|        0.13|
|transaction|ea6cfa381e2c492e9...|   6|       13.84|
|transaction|26829118683847c8a...|  12|        7.41|
|transaction|9ca7ccb9ef3e44b4a...|  12|        3.13|
|transaction|3dbbfc8fc19d40df9...|  12|       10.98|
|transaction|3b3f484e876f475ea...|  12|        15.1|
|transaction|63b1186a539940508...|  12|        2.93|
|transaction|f422c808ac4d47d1a...|  12|       

In [320]:
# checking null values are present or not for transactions and offers dataframe
transactions.select("trans_amount").filter("trans_amount is null").show(1)
offers.select("offer_id").filter("offer_id is null").show(1)

+------------+
|trans_amount|
+------------+
+------------+

+--------+
|offer_id|
+--------+
+--------+



In [321]:
# working on null values for transactions and offers dataframe

# transactions dataframe :
##  filling trans_amount column's null values with the mean or average value of trans_amount column
mean_vals = transactions.select(mean(transactions.trans_amount)).collect()
mean_vals
mean_trans = mean_vals[0][0]
#now using mean_trans value to fill the nulls in trans_amount column
transactions = transactions.na.fill(mean_trans,subset=['trans_amount'])
transactions.show()


# offers dataframe
## removing rows where null is present in offer_id column (primary key)
offers = offers.filter("offer_id is not null")
offers.show()


+-----------+--------------------+----+------------+
|      event|              person|time|trans_amount|
+-----------+--------------------+----+------------+
|transaction|7a1eda9ab57049068...|   0|        6.13|
|transaction|2fb4578848f34ce4b...|   0|        21.4|
|transaction|6851449a9192478d8...|   0|         2.3|
|transaction|d5e320154bed47159...|   0|        7.83|
|transaction|461e13a14a074077b...|   6|        7.94|
|transaction|6e0b0d6db74942b49...|   6|        8.12|
|transaction|8d59a5826cc547f58...|   6|       23.77|
|transaction|ae56c4d76ac84a639...|   6|       27.23|
|transaction|1575f73eb0274ab6b...|   6|        0.13|
|transaction|ea6cfa381e2c492e9...|   6|       13.84|
|transaction|26829118683847c8a...|  12|        7.41|
|transaction|9ca7ccb9ef3e44b4a...|  12|        3.13|
|transaction|3dbbfc8fc19d40df9...|  12|       10.98|
|transaction|3b3f484e876f475ea...|  12|        15.1|
|transaction|63b1186a539940508...|  12|        2.93|
|transaction|f422c808ac4d47d1a...|  12|       

In [322]:
# checking again null values are present or not for transactions, offers and reward dataframe
transactions.select("trans_amount").filter("trans_amount is null").show(1)
offers.select("offer_id").filter("offer_id is null").show(1)

+------------+
|trans_amount|
+------------+
+------------+

+--------+
|offer_id|
+--------+
+--------+



### Portfolio

In [323]:
portfolio.show()

+--------------------+----------+--------+--------------------+-------------+------+
|            channels|difficulty|duration|                  id|   offer_type|reward|
+--------------------+----------+--------+--------------------+-------------+------+
|[web, email, mobi...|         7|     7.0|2298d6c36e964ae4a...|     discount|     3|
|[web, email, mobile]|        10|     7.0|2906b810c7d441179...|     discount|     2|
|[web, email, mobi...|        10|    10.0|fafdcd668e3743c1b...|     discount|     2|
|[web, email, mobi...|        10|     5.0|4d5c57ea9a6940dd8...|         bogo|    10|
|[email, mobile, s...|        10|     7.0|ae264e3637204a6fb...|         bogo|    10|
|        [web, email]|        20|    10.0|0b1e1539f2cc45b7b...|     discount|     5|
|[web, email, mobile]|         5|     7.0|9b98b8c7a33c4b65b...|         bogo|     5|
|[email, mobile, s...|         0|     3.0|5a8bc65990b245e5a...|informational|     0|
|[web, email, mobile]|         0|     4.0|3f207df678b143eea...|in

In [324]:
portfolio.printSchema()

root
 |-- channels: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- difficulty: long (nullable = true)
 |-- duration: double (nullable = true)
 |-- id: string (nullable = true)
 |-- offer_type: string (nullable = true)
 |-- reward: long (nullable = true)



In [325]:
# Checking Null values are present or not for all the columns
portfolio.select("channels").filter("channels is null").show()
portfolio.select("difficulty").filter("difficulty is null").show()
portfolio.select("duration").filter("duration is null").show()
portfolio.select("id").filter("id is null").show()
portfolio.select("offer_type").filter("offer_type is null").show()
portfolio.select("reward").filter("reward is null").show()

+--------+
|channels|
+--------+
+--------+

+----------+
|difficulty|
+----------+
+----------+

+--------+
|duration|
+--------+
+--------+

+---+
| id|
+---+
+---+

+----------+
|offer_type|
+----------+
+----------+

+------+
|reward|
+------+
+------+



In [326]:
# working on null values

# filling null values of difficulty, duration, offer_type, reward
portfolio = portfolio.na.fill({'difficulty': 0,'duration':0.0, 'offer_type':'NA','reward':0})

# removing rows where null is present in id column (primary key)
portfolio = portfolio.filter("id is not null")
portfolio.show()


+--------------------+----------+--------+--------------------+-------------+------+
|            channels|difficulty|duration|                  id|   offer_type|reward|
+--------------------+----------+--------+--------------------+-------------+------+
|[web, email, mobi...|         7|     7.0|2298d6c36e964ae4a...|     discount|     3|
|[web, email, mobile]|        10|     7.0|2906b810c7d441179...|     discount|     2|
|[web, email, mobi...|        10|    10.0|fafdcd668e3743c1b...|     discount|     2|
|[web, email, mobi...|        10|     5.0|4d5c57ea9a6940dd8...|         bogo|    10|
|[email, mobile, s...|        10|     7.0|ae264e3637204a6fb...|         bogo|    10|
|        [web, email]|        20|    10.0|0b1e1539f2cc45b7b...|     discount|     5|
|[web, email, mobile]|         5|     7.0|9b98b8c7a33c4b65b...|         bogo|     5|
|[email, mobile, s...|         0|     3.0|5a8bc65990b245e5a...|informational|     0|
|[web, email, mobile]|         0|     4.0|3f207df678b143eea...|in

In [327]:
# Checking Again Null values are present or not for all the columns
portfolio.select("channels").filter("channels is null").show()
portfolio.select("difficulty").filter("difficulty is null").show()
portfolio.select("duration").filter("duration is null").show()
portfolio.select("id").filter("id is null").show()
portfolio.select("offer_type").filter("offer_type is null").show()
portfolio.select("reward").filter("reward is null").show()

+--------+
|channels|
+--------+
+--------+

+----------+
|difficulty|
+----------+
+----------+

+--------+
|duration|
+--------+
+--------+

+---+
| id|
+---+
+---+

+----------+
|offer_type|
+----------+
+----------+

+------+
|reward|
+------+
+------+



# Merging Dataframes

In [328]:
profile.show(2)

+---+----------------+------+--------------------+------+
|age|became_member_on|gender|                  id|income|
+---+----------------+------+--------------------+------+
| 58|      2016-12-31|     F|73c157a4b8e342fcb...| 95000|
| 57|      2018-04-29|     M|717a423a31284ce1b...| 41000|
+---+----------------+------+--------------------+------+
only showing top 2 rows



In [329]:
portfolio.show(2)

+--------------------+----------+--------+--------------------+----------+------+
|            channels|difficulty|duration|                  id|offer_type|reward|
+--------------------+----------+--------+--------------------+----------+------+
|[web, email, mobi...|         7|     7.0|2298d6c36e964ae4a...|  discount|     3|
|[web, email, mobile]|        10|     7.0|2906b810c7d441179...|  discount|     2|
+--------------------+----------+--------+--------------------+----------+------+
only showing top 2 rows



#### transcript

In [330]:
transactions.show(2)

+-----------+--------------------+----+------------+
|      event|              person|time|trans_amount|
+-----------+--------------------+----+------------+
|transaction|7a1eda9ab57049068...|   0|        6.13|
|transaction|2fb4578848f34ce4b...|   0|        21.4|
+-----------+--------------------+----+------------+
only showing top 2 rows



In [331]:
offers.show(2)

+--------------+--------------------+----+--------------------+
|         event|              person|time|            offer_id|
+--------------+--------------------+----+--------------------+
|offer received|bf2c086d4c4049289...|   0|9b98b8c7a33c4b65b...|
|offer received|27aa749a6f5f448e9...|   0|5a8bc65990b245e5a...|
+--------------+--------------------+----+--------------------+
only showing top 2 rows



In [332]:
# joining profile, portfolio and offers for all the offers

df1 = profile.join(offers, profile.id == offers.person, 'inner').drop(profile.id)
df1.show(2)

allOffers = df1.join(portfolio, df1.offer_id == portfolio.id, 'inner').drop(portfolio.id)
allOffers.show(2)

+---+----------------+------+------+--------------+--------------------+----+--------------------+
|age|became_member_on|gender|income|         event|              person|time|            offer_id|
+---+----------------+------+------+--------------+--------------------+----+--------------------+
| 32|      2015-10-08|     M| 38000|offer received|27aa749a6f5f448e9...|   0|5a8bc65990b245e5a...|
| 62|      2016-05-02|     F| 98000|offer received|abf29a94ba3d46488...|   0|5a8bc65990b245e5a...|
+---+----------------+------+------+--------------+--------------------+----+--------------------+
only showing top 2 rows

+---+----------------+------+------+--------------+--------------------+----+--------------------+--------------------+----------+--------+-------------+------+
|age|became_member_on|gender|income|         event|              person|time|            offer_id|            channels|difficulty|duration|   offer_type|reward|
+---+----------------+------+------+--------------+--------

In [333]:
# joining profile and transactions for all the transactions

allTransactions = profile.join(transactions, profile.id == transactions.person, 'inner').drop(profile.id)
allTransactions.show(2)

+---+----------------+------+------+-----------+--------------------+----+------------+
|age|became_member_on|gender|income|      event|              person|time|trans_amount|
+---+----------------+------+------+-----------+--------------------+----+------------+
| 51|      2015-12-16|     M| 81000|transaction|2fb4578848f34ce4b...|   0|        21.4|
| 40|      2017-11-14|     M| 39000|transaction|6851449a9192478d8...|   0|         2.3|
+---+----------------+------+------+-----------+--------------------+----+------------+
only showing top 2 rows



In [334]:
allOffers.show(2)
allOffers.printSchema()

+---+----------------+------+------+--------------+--------------------+----+--------------------+--------------------+----------+--------+-------------+------+
|age|became_member_on|gender|income|         event|              person|time|            offer_id|            channels|difficulty|duration|   offer_type|reward|
+---+----------------+------+------+--------------+--------------------+----+--------------------+--------------------+----------+--------+-------------+------+
| 32|      2015-10-08|     M| 38000|offer received|27aa749a6f5f448e9...|   0|5a8bc65990b245e5a...|[email, mobile, s...|         0|     3.0|informational|     0|
| 62|      2016-05-02|     F| 98000|offer received|abf29a94ba3d46488...|   0|5a8bc65990b245e5a...|[email, mobile, s...|         0|     3.0|informational|     0|
+---+----------------+------+------+--------------+--------------------+----+--------------------+--------------------+----------+--------+-------------+------+
only showing top 2 rows

root
 |--

In [335]:
allTransactions.show(2)
allTransactions.printSchema()

+---+----------------+------+------+-----------+--------------------+----+------------+
|age|became_member_on|gender|income|      event|              person|time|trans_amount|
+---+----------------+------+------+-----------+--------------------+----+------------+
| 51|      2015-12-16|     M| 81000|transaction|2fb4578848f34ce4b...|   0|        21.4|
| 40|      2017-11-14|     M| 39000|transaction|6851449a9192478d8...|   0|         2.3|
+---+----------------+------+------+-----------+--------------------+----+------------+
only showing top 2 rows

root
 |-- age: long (nullable = true)
 |-- became_member_on: date (nullable = true)
 |-- gender: string (nullable = false)
 |-- income: long (nullable = true)
 |-- event: string (nullable = false)
 |-- person: string (nullable = true)
 |-- time: long (nullable = false)
 |-- trans_amount: double (nullable = false)



# Dataframes based on problem statement

In [336]:
# 1st problem statement dataframe
# based on gender, age, income analysis of offer_type

df1 = allOffers.select("age","gender","income","offer_type")
df1.show(5)

+---+------+------+-------------+
|age|gender|income|   offer_type|
+---+------+------+-------------+
| 32|     M| 38000|informational|
| 62|     F| 98000|informational|
| 67|     M| 97000|     discount|
| 24|     M| 38000|     discount|
| 51|     F| 90000|informational|
+---+------+------+-------------+
only showing top 5 rows



In [337]:
# 2nd problem statement dataframe
# based on offer utilized per channel

df2 = allOffers.filter("event == 'offer completed' ").select("channels","offer_type")
df2.show(5)

+--------------------+----------+
|            channels|offer_type|
+--------------------+----------+
|[web, email, mobi...|      bogo|
|[web, email, mobi...|      bogo|
|[web, email, mobi...|  discount|
|[email, mobile, s...|      bogo|
|[web, email, mobi...|  discount|
+--------------------+----------+
only showing top 5 rows



In [338]:
# 3rd problem statement dataframe
# based on trend w.r.t event Vs customer and offers

df3 = allOffers.select("event","age","gender","income","offer_type")
df3.show(5)

+--------------+---+------+------+-------------+
|         event|age|gender|income|   offer_type|
+--------------+---+------+------+-------------+
|offer received| 32|     M| 38000|informational|
|offer received| 62|     F| 98000|informational|
|offer received| 67|     M| 97000|     discount|
|offer received| 24|     M| 38000|     discount|
|offer received| 51|     F| 90000|informational|
+--------------+---+------+------+-------------+
only showing top 5 rows



In [339]:
profile = profile.toPandas()

# gender type dummies
gender_dummies = pd.get_dummies(profile['gender']).add_prefix('gender_')

profile = pd.concat([profile, gender_dummies], axis=1)

profile.head()

Unnamed: 0,age,became_member_on,gender,id,income,gender_F,gender_M,gender_O
0,58,2016-12-31,F,73c157a4b8e342fcb5d8479d8cb41e81,95000,1,0,0
1,57,2018-04-29,M,717a423a31284ce1b767cad1c471d968,41000,0,1,0
2,23,2014-07-19,M,0c0af42e1e804ded94eb5d73dfffbb6f,65000,0,1,0
3,51,2017-05-01,F,b87a63894b164eae86b2c2e421f966ea,91000,1,0,0
4,55,2016-06-11,F,992aed77ecda4f5180d5da87adaff710,58000,1,0,0


## Problem 1

## Import Required Library

In [340]:
# Decision Tree classification Algo
import pandas as pd
from sklearn.tree import DecisionTreeClassifier # Import Decision Tree Classifier
from sklearn.model_selection import train_test_split # Import train_test_split function
from sklearn import metrics         #Import scikit-learn metrics module for accuracy calculation
import numpy as np 


#                        Loading Data

In [341]:

col_names = ["age","gender","income","offer_type"]
col_names

df = df1.toPandas()  #spark dataframe  into panda
df







Unnamed: 0,age,gender,income,offer_type
0,32,M,38000,informational
1,62,F,98000,informational
2,67,M,97000,discount
3,24,M,38000,discount
4,51,F,90000,informational
...,...,...,...,...
131660,28,M,68000,discount
131661,56,M,65000,discount
131662,68,F,50000,discount
131663,58,F,68000,bogo


## Label Encoding 

In [342]:
from sklearn import preprocessing
# label_encoder object knows how to understand word labels. 
label_encoder = preprocessing.LabelEncoder()
# Encode labels in column 'Country'. 
df['Gender']= label_encoder.fit_transform(df['gender']) 
df.head()
df['Offer_type']= label_encoder.fit_transform(df['offer_type']) 
df.head()

Unnamed: 0,age,gender,income,offer_type,Gender,Offer_type
0,32,M,38000,informational,1,2
1,62,F,98000,informational,0,2
2,67,M,97000,discount,1,1
3,24,M,38000,discount,1,1
4,51,F,90000,informational,0,2


## Droping Unwanted Columns

In [343]:
c=df.drop(['gender','offer_type'],axis='columns')
c


Unnamed: 0,age,income,Gender,Offer_type
0,32,38000,1,2
1,62,98000,0,2
2,67,97000,1,1
3,24,38000,1,1
4,51,90000,0,2
...,...,...,...,...
131660,28,68000,1,1
131661,56,65000,1,1
131662,68,50000,0,1
131663,58,68000,0,0


In [344]:
c.head()

Unnamed: 0,age,income,Gender,Offer_type
0,32,38000,1,2
1,62,98000,0,2
2,67,97000,1,1
3,24,38000,1,1
4,51,90000,0,2


In [345]:
feature_cols = ["age","income","Gender","Offer_type"]
X = c[feature_cols] # Features
y = c.Offer_type # target Variable

## Spliting Data

In [372]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33,random_state=1)

print(X_train.shape)
print(y_train.shape)
print(X_test.shape)
print(y_test.shape)

(88215, 5)
(88215,)
(43450, 5)
(43450,)


## Importing DecisionTreeClassifier Object

In [370]:
from sklearn.tree import DecisionTreeClassifier

## Applying gini Method

In [373]:
classifier = DecisionTreeClassifier(criterion='gini')
classifier.fit(X_train,y_train)

print("Score",classifier.score(X_test,y_test))

#print("Prediction",classifier.predict([[24,38000,1,1,1]]))

print("Pridiction",classifier.predict(X_test))


Score 1.0
Pridiction [0 1 0 ... 1 0 0]


## Applying Entropy Method

In [349]:
model_en = tree.DecisionTreeClassifier(criterion='entropy')
#model_en.fit(c,target)
#model_en.score(c,target)

print(model_en.fit(X_train,y_train))

print("Score",model_en.score(X_test,y_test))

pridict = model_en.predict(X_test)
print(pridict)


DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='entropy',
                       max_depth=None, max_features=None, max_leaf_nodes=None,
                       min_impurity_decrease=0.0, min_impurity_split=None,
                       min_samples_leaf=1, min_samples_split=2,
                       min_weight_fraction_leaf=0.0, presort='deprecated',
                       random_state=None, splitter='best')
Score 1.0
[0 1 0 ... 1 2 1]


## Performing Standard Scalar

In [350]:
from sklearn.preprocessing import StandardScaler
sc = StandardScaler()

In [351]:
sc.fit(X_train)

StandardScaler(copy=True, with_mean=True, with_std=True)

In [352]:
X_train_sc = sc.transform(X_train)
X_test_sc = sc.transform(X_test)

In [353]:
classifier_sc = DecisionTreeClassifier(criterion='gini')
classifier_sc.fit(X_train_sc,y_train)

print("score",classifier_sc.score(X_test_sc,y_test))

score 1.0


# Problem 2

In [354]:


P2 = df2.toPandas()  #spark dataframe  into panda
P2

col_names = ["channels","offer_type"]
col_names

P2



Unnamed: 0,channels,offer_type
0,"[web, email, mobile, social]",bogo
1,"[web, email, mobile, social]",bogo
2,"[web, email, mobile, social]",discount
3,"[email, mobile, social]",bogo
4,"[web, email, mobile, social]",discount
...,...,...
28239,"[web, email, mobile, social]",bogo
28240,"[web, email, mobile, social]",discount
28241,"[web, email]",discount
28242,"[web, email, mobile]",bogo


## Problem 3

In [355]:
#df3 = allOffers.select("event","age","gender","income","offer_type")
#df3.show(5)

P3 = df3.toPandas()
P3

Unnamed: 0,event,age,gender,income,offer_type
0,offer received,32,M,38000,informational
1,offer received,62,F,98000,informational
2,offer received,67,M,97000,discount
3,offer received,24,M,38000,discount
4,offer received,51,F,90000,informational
...,...,...,...,...,...
131660,offer viewed,28,M,68000,discount
131661,offer completed,56,M,65000,discount
131662,offer completed,68,F,50000,discount
131663,offer completed,58,F,68000,bogo


## Label Encoding


In [356]:
P3.dtypes

event         object
age            int64
gender        object
income         int64
offer_type    object
dtype: object

In [357]:
from sklearn import preprocessing
# label_encoder object knows how to understand word labels. 
label_encoder = preprocessing.LabelEncoder()
# Encode labels in column 'Country'. 
P3['Event']= label_encoder.fit_transform(P3['event']) 
P3.head()

P3['Gender']= label_encoder.fit_transform(P3['gender']) 
P3.head()

P3['Offer_type']= label_encoder.fit_transform(P3['offer_type']) 
P3.head()





Unnamed: 0,event,age,gender,income,offer_type,Event,Gender,Offer_type
0,offer received,32,M,38000,informational,1,1,2
1,offer received,62,F,98000,informational,1,0,2
2,offer received,67,M,97000,discount,1,1,1
3,offer received,24,M,38000,discount,1,1,1
4,offer received,51,F,90000,informational,1,0,2


In [358]:

d = P3.drop(['event','gender','offer_type'],axis='columns')
d.head()


Unnamed: 0,age,income,Event,Gender,Offer_type
0,32,38000,1,1,2
1,62,98000,1,0,2
2,67,97000,1,1,1
3,24,38000,1,1,1
4,51,90000,1,0,2


## Features Selection

In [359]:
feature_cols = ["age","income","Event","Gender","Offer_type"]
X = d[feature_cols] # Features
y = d.Offer_type # target Variable

In [360]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=0)

## Spliting Data

In [361]:
# Create Decision Tree classifer object
clf = DecisionTreeClassifier()

# Train Decision Tree Classifer
clf = clf.fit(X_train,y_train)

#Predict the response for test dataset
y_pred = clf.predict(X_test)
print(y_pred)

[0 1 0 ... 2 1 0]


In [375]:
clf = DecisionTreeClassifier(criterion='gini', min_samples_split=50)
clf.fit(X_train, y_train)
print('Accuracy Score on train data: ', accuracy_score(y_true=y_train, y_pred=clf.predict(X_train)))
print('Accuracy Score on the test data: ', accuracy_score(y_true=y_test, y_pred=clf.predict(X_test)))

Accuracy Score on train data:  1.0
Accuracy Score on the test data:  1.0


In [363]:
clf = DecisionTreeClassifier(criterion='entropy', min_samples_split=50)
clf.fit(X_train, y_train)
print('Accuracy Score on train data: ', accuracy_score(y_true=y_train, y_pred=clf.predict(X_train)))
print('Accuracy Score on the test data: ', accuracy_score(y_true=y_test, y_pred=clf.predict(X_test)))

Accuracy Score on train data:  1.0
Accuracy Score on the test data:  1.0


## Performing StandardSclar

In [364]:
from sklearn.preprocessing import StandardScaler
sc = StandardScaler()

In [365]:
sc.fit(X_train)

StandardScaler(copy=True, with_mean=True, with_std=True)

In [366]:
X_train_sc1 = sc.transform(X_train)
X_test_sc2 = sc.transform(X_test)

In [367]:
classifier_sc = DecisionTreeClassifier(criterion='gini')
classifier_sc.fit(X_train_sc1,y_train)

print("score",classifier_sc.score(X_test_sc2,y_test))

p = classifier_sc.predict(X_test)
print(p)

score 1.0
[1 1 1 ... 2 1 1]
