<html>
<font color = green size = 6>
<b>
Data Preparation to Predict player churn
</b>
</font>
</html>

<html>
<font color = Purple size = 4>
<b> Set up the Pyspark environment required for running the model</b>
</font>
</html>

In [5]:
#Pyspark setup
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import NumericType
from pyspark.sql.functions import *

In [6]:
sc = SparkContext.getOrCreate()
spark = SQLContext(sc)

<html>
<font color = Red size = 6>
<b> Feature Engineering</b>
</font>
</html>

<html>
<font color = blue>
A very extensive and detailed feature engineering is performed to derive features for Churn prediction.
</font>
</html>

<html>
<font color = blue>
Load cleaned up log files as a Spark dataframe.
</font>
</html>

In [7]:
newlogs_df = spark.read.csv(path = "newlogs.csv", header = True,inferSchema = True)

In [33]:
newlogs_df.show(5)

+---+------------------+--------+--------+-----+-----+----+--------+--------+
|_c0|         QueryTime|QuerySeq|AvatarID|Guild|Level|Race|   Class|    Zone|
+---+------------------+--------+--------+-----+-----+----+--------+--------+
|  0| 12/31/05 23:59:46|       1|       0|     |    5| Orc| Warrior| Durotar|
|  1| 12/31/05 23:59:46|       1|       1|     |    9| Orc|  Shaman| Durotar|
|  2| 12/31/05 23:59:52|       2|       2|     |   13| Orc|  Shaman| Durotar|
|  3| 12/31/05 23:59:52|       2|       3|    0|   14| Orc| Warrior| Durotar|
|  4| 12/31/05 23:59:52|       2|       4|     |   14| Orc|  Shaman| Durotar|
+---+------------------+--------+--------+-----+-----+----+--------+--------+
only showing top 5 rows



<html>
<font color = blue>
Data is in minutes for each AvatarID. So, first count AvatarIDs by grouping them into minutes.
</font>
</html>

In [8]:
df2 = newlogs_df.groupBy(['AvatarID','QueryTime']).count()

<html>
<font color = blue>
Strip time from Query Time.
</font>
</html>

In [9]:
df2 = df2.withColumn('Date', df2['QueryTime'].substr(1,9))

In [36]:
df2.show(5)

+--------+------------------+-----+---------+
|AvatarID|         QueryTime|count|     Date|
+--------+------------------+-----+---------+
|      41| 01/01/06 00:00:12|    2| 01/01/06|
|     161| 01/01/06 00:11:08|    2| 01/01/06|
|     311| 01/01/06 00:22:53|    2| 01/01/06|
|     348| 01/01/06 00:22:58|    2| 01/01/06|
|     382| 01/01/06 00:33:20|    2| 01/01/06|
+--------+------------------+-----+---------+
only showing top 5 rows



<html>
<font color = blue>
Now, take count of each player by grouping them by date.
</font>
</html>

In [10]:
PlayerbyDate = df2.groupBy(['Date','AvatarID']).agg({"AvatarID": "count"})

In [11]:
PlayerbyDate = PlayerbyDate.select('Date','AvatarID')

In [13]:
PlayerbyDate = PlayerbyDate.withColumn("Date_Unix", to_timestamp(PlayerbyDate['Date'],' MM/dd/yy'))

In [14]:
PlayerbyDate = PlayerbyDate.select('Date_Unix','AvatarID')

In [15]:
PlayerbyDate = PlayerbyDate.withColumnRenamed('Date_Unix','Date')

In [16]:
PlayerbyDays = PlayerbyDate.groupBy(['AvatarID']).agg({"AvatarID": "count"})

In [17]:
PlayerbyDays = PlayerbyDays.withColumnRenamed('count(AvatarID)','DaysPlayed')

In [18]:
wow_data = newlogs_df.select('QueryTime','AvatarID','Guild','Level','Race','Class','Zone')

In [19]:
wow_data = wow_data .withColumn('Date', df2['QueryTime'].substr(1,9))

In [49]:
wow_data.show(5)

+------------------+--------+-----+-----+----+--------+--------+---------+
|         QueryTime|AvatarID|Guild|Level|Race|   Class|    Zone|     Date|
+------------------+--------+-----+-----+----+--------+--------+---------+
| 12/31/05 23:59:46|       0|     |    5| Orc| Warrior| Durotar| 12/31/05|
| 12/31/05 23:59:46|       1|     |    9| Orc|  Shaman| Durotar| 12/31/05|
| 12/31/05 23:59:52|       2|     |   13| Orc|  Shaman| Durotar| 12/31/05|
| 12/31/05 23:59:52|       3|    0|   14| Orc| Warrior| Durotar| 12/31/05|
| 12/31/05 23:59:52|       4|     |   14| Orc|  Shaman| Durotar| 12/31/05|
+------------------+--------+-----+-----+----+--------+--------+---------+
only showing top 5 rows



<html>
<font color = blue>
Create dummy variables and populate binary values for Race and Class
</font>
</html>

In [20]:
race_dummy = wow_data.select("Race").distinct().rdd.flatMap(lambda x: x).collect()

In [21]:
race_exprs = [when(col("Race") == category, 1).otherwise(0).alias(category)
         for category in race_dummy]

In [22]:
race_wow = wow_data.select("AvatarID", *race_exprs)

In [23]:
class_dummy = wow_data.select("Class").distinct().rdd.flatMap(lambda x: x).collect()

In [24]:
class_exprs = [when(col("Class") == category, 1).otherwise(0).alias(category)
         for category in class_dummy]

In [25]:
class_wow = wow_data.select("AvatarID", *class_exprs)

In [97]:
class_wow

+--------+--------+------+-------+-------------+--------+------+-----+-------+--------+-------+
|AvatarID| Warlock| Druid| Hunter| Death Knight| Paladin| Rogue| Mage| Priest| Warrior| Shaman|
+--------+--------+------+-------+-------------+--------+------+-----+-------+--------+-------+
|       0|       0|     0|      0|            0|       0|     0|    0|      0|       1|      0|
|       1|       0|     0|      0|            0|       0|     0|    0|      0|       0|      1|
|       2|       0|     0|      0|            0|       0|     0|    0|      0|       0|      1|
|       3|       0|     0|      0|            0|       0|     0|    0|      0|       1|      0|
|       4|       0|     0|      0|            0|       0|     0|    0|      0|       0|      1|
|       5|       0|     0|      1|            0|       0|     0|    0|      0|       0|      0|
|       6|       1|     0|      0|            0|       0|     0|    0|      0|       0|      0|
|       7|       0|     0|      1|      

<html>
<font color = blue>
Use df3 to calculate login dates
</font>
</html>

In [26]:
PlayerbyDate.createOrReplaceTempView("playerbydate")

In [27]:
PlayerbyDays.createOrReplaceTempView("playerbydays")

In [28]:
wow_data.createOrReplaceTempView("completedata")

In [29]:
dfx = spark.sql("select playerbydate.AvatarID, min(PlayerbyDate.Date) as FirstLoginDate, max(PlayerbyDate.Date) as LastLoginDate from playerbydate group by playerbydate.AvatarID")

In [30]:
timeformat = "yyyy-MM-dd HH:mm:ss"
tenure = unix_timestamp('LastLoginDate', format = timeformat) - unix_timestamp('FirstLoginDate', format = timeformat)
dfx = dfx.withColumn('PlayerTenure',tenure/(24*60*60))

In [67]:
dfx

+--------+-------------------+-------------------+------------+
|AvatarID|     FirstLoginDate|      LastLoginDate|PlayerTenure|
+--------+-------------------+-------------------+------------+
|   11033|2006-03-30 00:00:00|2006-09-06 00:00:00|       160.0|
|   18024|2006-07-19 00:00:00|2008-11-09 00:00:00|       844.0|
|    1342|2006-01-01 00:00:00|2009-01-07 00:00:00|      1102.0|
|   33412|2007-03-25 00:00:00|2008-04-02 00:00:00|       374.0|
|   30970|2007-02-11 00:00:00|2008-10-06 00:00:00|       603.0|
+--------+-------------------+-------------------+------------+
only showing top 5 rows



In [31]:
from pyspark.sql.functions import lit
lastlogcapture = '2009-01-10 00:00:00'
dfx = dfx.withColumn('LastLogCapture',lit(lastlogcapture))

In [32]:
nologin = unix_timestamp('LastLogCapture', format = timeformat) - unix_timestamp('LastLoginDate', format = timeformat)
dfx = dfx.withColumn('NotLoggedInFrom',nologin/(24*60*60))

In [78]:
dfx

+--------+-------------------+-------------------+------------+---------------+-------------------+
|AvatarID|     FirstLoginDate|      LastLoginDate|PlayerTenure|NotLoggedInFrom|     LastLogCapture|
+--------+-------------------+-------------------+------------+---------------+-------------------+
|   11033|2006-03-30 00:00:00|2006-09-06 00:00:00|       160.0|          857.0|2009-01-10 00:00:00|
|   18024|2006-07-19 00:00:00|2008-11-09 00:00:00|       844.0|           62.0|2009-01-10 00:00:00|
|    1342|2006-01-01 00:00:00|2009-01-07 00:00:00|      1102.0|            3.0|2009-01-10 00:00:00|
|   33412|2007-03-25 00:00:00|2008-04-02 00:00:00|       374.0|          283.0|2009-01-10 00:00:00|
|   30970|2007-02-11 00:00:00|2008-10-06 00:00:00|       603.0|           96.0|2009-01-10 00:00:00|
|    2866|2006-01-07 00:00:00|2009-01-09 00:00:00|      1098.0|            1.0|2009-01-10 00:00:00|
|   51393|2007-07-16 00:00:00|2008-10-25 00:00:00|       467.0|           77.0|2009-01-10 00:00:00|


In [33]:
dfx.createOrReplaceTempView("datainprog")

In [34]:
dip = spark.sql("select datainprog.AvatarID, datainprog.PlayerTenure,case when datainprog.NotLoggedInFrom > 60  then 1 else 0 end as Churn \
                 from datainprog")

In [88]:
dip

+--------+------------+-----+
|AvatarID|PlayerTenure|Churn|
+--------+------------+-----+
|   11033|       160.0|    1|
|   18024|       844.0|    1|
|    1342|      1102.0|    0|
|   33412|       374.0|    1|
|   30970|       603.0|    1|
|    2866|      1098.0|    0|
|   51393|       467.0|    1|
|   46465|       520.0|    1|
|   76110|        21.0|    1|
|    8638|        98.0|    1|
|    1645|      1043.0|    1|
|   36131|       643.0|    0|
|   22373|       678.0|    1|
|   71527|        20.0|    1|
|   34759|       624.0|    0|
|   60769|       443.0|    0|
|     463|       805.0|    1|
|     148|      1074.0|    0|
|   32414|       674.0|    0|
|    8389|         7.0|    1|
+--------+------------+-----+
only showing top 20 rows



In [35]:
guildlevel = spark.sql("select max(AvatarID) as AvatarID, case when max(Level) <= 40 Then 1 else 2 end as LevelFlag,\
                    case when max(Guild) >= 0 Then 1 else 0 end as GuildFlag\
                       from completedata group by AvatarID order by AvatarID")               

In [138]:
guildlevel

70369

In [36]:
zone_wow = spark.sql("select max(AvatarID) as AvatarID, count(Zone) as ZonesPlayed\
                       from completedata group by AvatarID order by AvatarID") 

In [37]:
zone_wow = zone_wow.drop_duplicates()

In [221]:
zone_wow

+--------+-----------+
|AvatarID|ZonesPlayed|
+--------+-----------+
|     260|        580|
|     443|      22488|
|     717|         34|
|    1495|        950|
|    1509|         61|
|    1902|          2|
|    1989|         12|
|    2011|        128|
|    2366|        401|
|    2533|          4|
|    2702|          2|
|    2985|        160|
|    3317|          2|
|    3577|          4|
|    3619|          4|
|    3764|         86|
|    4371|       5555|
|    5453|         64|
|    5541|        251|
|    5668|          2|
+--------+-----------+
only showing top 20 rows



In [38]:
class_wow = class_wow.drop_duplicates()

In [222]:
class_wow

+--------+-------------+-----------+------------+-----------------+-------------+-----------+----------+------------+-------------+------------+
|AvatarID|Class_Warlock|Class_Druid|Class_Hunter|Class_DeathKnight|Class_Paladin|Class_Rogue|Class_Mage|Class_Priest|Class_Warrior|Class_Shaman|
+--------+-------------+-----------+------------+-----------------+-------------+-----------+----------+------------+-------------+------------+
|      28|            0|          0|           1|                0|            0|          0|         0|           0|            0|           0|
|     156|            0|          0|           0|                0|            0|          0|         0|           0|            1|           0|
|     361|            0|          0|           0|                0|            0|          0|         0|           1|            0|           0|
|     406|            0|          0|           0|                0|            0|          0|         0|           0|            1

In [39]:
race_wow = race_wow.drop_duplicates()

In [None]:
race_wow

In [40]:
#Churn data
dip

DataFrame[AvatarID: int, PlayerTenure: double, Churn: int]

In [41]:
#Guild and Level data
guildlevel

DataFrame[AvatarID: int, LevelFlag: int, GuildFlag: int]

In [44]:
#Class data
class_wow

DataFrame[AvatarID: int, Class_Warlock: int, Class_Druid: int, Class_Hunter: int, Class_DeathKnight: int, Class_Paladin: int, Class_Rogue: int, Class_Mage: int, Class_Priest: int, Class_Warrior: int, Class_Shaman: int]

In [59]:
class_wow = class_wow.withColumnRenamed(' Warlock','Class_Warlock')\
.withColumnRenamed(' Druid','Class_Druid')\
.withColumnRenamed(' Hunter','Class_Hunter')\
.withColumnRenamed(' Death Knight','Class_DeathKnight')\
.withColumnRenamed(' Paladin','Class_Paladin')\
.withColumnRenamed(' Rogue','Class_Rogue')\
.withColumnRenamed(' Mage','Class_Mage')\
.withColumnRenamed(' Priest','Class_Priest')\
.withColumnRenamed(' Warrior','Class_Warrior')\
.withColumnRenamed(' Shaman','Class_Shaman')\
.withColumnRenamed('AvatarID','Class_AvatarID')

In [45]:
#Race data
race_wow

DataFrame[AvatarID: int,  Orc: int,  Tauren: int,  Undead: int,  Blood Elf: int,  Troll: int]

In [60]:
race_wow = race_wow.withColumnRenamed(' Orc','Race_Orc')\
.withColumnRenamed(' Tauren','Race_Tauren')\
.withColumnRenamed(' Undead','Race_Undead')\
.withColumnRenamed(' Blood Elf','Race_BloodElf')\
.withColumnRenamed(' Troll','Race_Troll')\
.withColumnRenamed('AvatarID','Race_AvatarID')

In [61]:
#Zone data
zone_wow

DataFrame[AvatarID: int, ZonesPlayed: bigint]

In [62]:
#No. of days played
PlayerbyDays

DataFrame[AvatarID: int, DaysPlayed: bigint]

In [63]:
dip.createOrReplaceTempView("churndata")
guildlevel.createOrReplaceTempView("guilddata")
class_wow.createOrReplaceTempView("classdata")
race_wow.createOrReplaceTempView("racedata")
zone_wow.createOrReplaceTempView("zonedata")
PlayerbyDays.createOrReplaceTempView("playersdata")

In [64]:
class_wow

DataFrame[Class_AvatarID: int, Class_Warlock: int, Class_Druid: int, Class_Hunter: int, Class_DeathKnight: int, Class_Paladin: int, Class_Rogue: int, Class_Mage: int, Class_Priest: int, Class_Warrior: int, Class_Shaman: int]

In [66]:
ClassRaceDF = spark.sql("select classdata.*, racedata.* from classdata, racedata where classdata.Class_AvatarID = racedata.Race_AvatarID")

In [70]:
ClassRaceDF = ClassRaceDF.drop('Race_AvatarID')

In [72]:
ClassRaceDF.createOrReplaceTempView("crdata")

In [74]:
CRZoneDF = spark.sql("select * from crdata, zonedata where crdata.Class_AvatarID = zonedata.AvatarID")

In [76]:
CRZoneDF = CRZoneDF.drop('AvatarID')

In [77]:
CRZoneDF.createOrReplaceTempView("crzdata")

In [78]:
CRZGuildDF = spark.sql("select * from crzdata, guilddata where crzdata.Class_AvatarID = guilddata.AvatarID")

In [80]:
CRZGuildDF = CRZGuildDF.drop("AvatarID")

In [82]:
CRZGuildDF.createOrReplaceTempView("crzgdata")

In [83]:
CRZGPDF = spark.sql("select * from crzgdata, playersdata where crzgdata.Class_AvatarID = playersdata.AvatarID")

In [85]:
CRZGPDF = CRZGPDF.drop("AvatarID")

In [86]:
CRZGPDF.createOrReplaceTempView("crzgpdata")

In [87]:
FinalChurnDF = spark.sql("select * from crzgpdata, churndata where crzgpdata.Class_AvatarID = churndata.AvatarID")

In [90]:
FinalChurnDF = FinalChurnDF.drop('AvatarID')

In [91]:
FinalChurnDF = FinalChurnDF.withColumnRenamed('Class_AvatarID','AvatarID')

<html>
<font color = blue>
Now, we have the following 22 features to proceed with Churn prediction.
</font>
</html>

In [94]:
FinalChurnDF

DataFrame[AvatarID: int, Class_Warlock: int, Class_Druid: int, Class_Hunter: int, Class_DeathKnight: int, Class_Paladin: int, Class_Rogue: int, Class_Mage: int, Class_Priest: int, Class_Warrior: int, Class_Shaman: int, Race_Orc: int, Race_Tauren: int, Race_Undead: int, Race_BloodElf: int, Race_Troll: int, ZonesPlayed: bigint, LevelFlag: int, GuildFlag: int, DaysPlayed: bigint, PlayerTenure: double, Churn: int]

In [93]:
#FinalChurnDF.repartition(1).write.csv('churn_data.csv')