# W205 Team2 Project 3 Report
Team members: Brendan Mattina, Mohamed Gesalla , Jared Dec, and Oliver Chang

# Introduction and Project Goals

The idea behind this project was to think from the perspective of consultants retained by a mobile gaming company. As consultants we were retained to make a data pipeline to funnel the data from the game itself and then analyze the data coming from a mobile game and help our clients make business decisions based upon them. This means that our work on this project can be divided into three major steps: generating data, constructing a data pipeline, and analyzing the data that comes out the other end of the data pipeline to help our clients make business decisions. The primary business questions that we set out to answer were: 

1) What policies could be enacted to help prevent a player from closing their account and cutting themselves off as a potential revenue stream?

2) What policies could be enacted to cause more players to refer more of their friends to the game? 

3) What policies could be enacted to increase the likelihood that a player becomes a paid subscriber to our game and generate additional revenue?

In the following sections we will break down how the game data was generated, how the data was funnelled to a pyspark notebook, and what the results were that we found to these questions.

# Data generation 

Our sample data represents 10 monthly summaries for 10000 players of our fictitious mobile game. Each row includes a player ID, the month, monthly hours played, number of friends referred, player subscription (Boolean), monthly money spent, whether the player account is open (Boolean), whether a player purchased a sword (Boolean), and whether a player joined a guild (Boolean). Ultimately, monthly hours played will drive nearly every other column. We initiate the dataset by randomly assigning subscriptions, swords, and guilds; and, hours played and money paid, which we drew randomly from normal distributions. Purchasing a sword or joining a guild will increase monthly play time (based on a logistic formula). Otherwise, we draw monthly play time from normal distributions with means that shift based on previous month play time. Increasing play time increases the probability that a player subscribes and leads to more friend referrals (referrals are drawn from two different normal distributions). Once a player subscribes, we calculate their monthly spends with a logistic equation, otherwise their monthly spends are drawn from a random normal distribution. Finally, low play time increases the probability that they will close their account, and once a player’s account closes it will not reopen.

# Description of Pipline

The data pipeline is composed of the following components:

1) Main notebook (W205_Team2_Project3_Report.ipynb)

The main notebook contains several steps of the pipeline. At first, a docker container was launched with specifications described in the docker_compose.yml file. Once the container started, we loaded the simulated user behavior data into the notebook using pandas. We used Apache Bench to load the data into Kafka by calling a user-defined api. We defined different event types and event values when using Apache Bench to record the user behaviors. Two terminal  windows, kafka and web service api, were opened during the Apache Bench execution to monitor if the data was successfully consumed by the pipeline.

The main notebook also contains the data query section where we used SparkSQL to call data from the stored parque files in HDFS. The data was retrieved from HDFS, organized as tables, and joined as our final working dataframe. The final working dataframe was then saved as a csv file for further analysis. For the demonstration purpose of this project, we loaded only a section of the entire dataset to the pipeline.     

2) game_api2.py

In the game_api. py file, we used flask as our primary tool to enable web api services. Once an event was generated through Apache Bench, the flask app captured the event contents and data producer ID and sent it to Kafka. In the api, we defined several @app.route functions to record different event types and contents instead of piping all features together. 

3) filtered_writes2.py

This file basically takes care of writing events from Kafka to HDFS. In our implantation, we chose to call every event separately and write each event to hdfs by separating raw events, processing and extracting those events into parquet files. We used @udf for each event. These are user defined functions that run repeatedly in the pipeline, and we use the tag udf to let the Spark executor know about the function we are running. We took the raw events and casted our values as strings for easier data processing. Finally, we loaded the data into json files as we could not control the structure of the data, however; using json makes it easier for Spark to interpret the data.

# Results and Analysis

The first business query that was conducted was to see if there was a difference in the mean number of hours played in a given month between overall players and players in the month prior to closing their account. Based on these queries the answer appears to be yes, with players having a mean play time of 10.55 hours in the month prior to closing their account while the average play time among all active players was 14.17 hours. Based on these findings, we would advise the company to design player incentives to attempt to keep players at above 11 hours per week and have a target play time for players at around 14 hours. Examples of how this could be done include in-game items based on hours played, more content, or streak incentives where players who play for more than a set number of days in a row are rewarded with certain bonuses. The idea is that players who play less often in a given month are more likely to lose interest and close their accounts causing a decrease in expected revenue for our client.

The second business query we attempted to answer was the relationship between hours played in a given month and number of friends referred in the same month. To do this, first we isolated the months in which users’ accounts were not closed. Then we summed the number of friends referred in those months and the number of hours played in those months. The totals we found were approximately 13,938 hours played over the survey period by active accounts and 2,324 friends referred in the same period. This gives an approximate relationship of 5.99 or roughly 6 hours of play time per friend request. Based on this query, we would recommend our client to even further pursue business plans of action that target increasing user engagement and play time as there appears to be a relationship between hours played and friends referred aka increasing the playter base and potential revenue streams.

The third query was what policies could be put in place to cause more players to turn into paid subscribers. It appears that the mean hours played in a month prior to subscribing was 12.01. As we know the average in the population as a whole was 14.17 hours, this is good news as we know subscribers must play far more than non-subscribers. We also have now determined a link between more hours being less likely to close one’s account and that more hours is possibly conducive to referring more friends. In aggregate, it is possible to place the players of the game into three potential categories based upon these findings: Near-Close, Near-Sub, Average Sub Revenue Generator. The goal is to get players to the third category as these players pay subscription fees, refer friends, and in all, generate the most revenue for the firm. Players in the Near Close and Near Sub categories should be flagged as priorities for targeted incentives to increase their play time and engagement. The rough boundaries are between the three averages for these groups, at 10, 12, and 14 roughly respectively. It does seem that most of the business’s profits are centered around increasing players’ time in game and encouraging them to become subscribers or refer friends.



# Docker Compose

In [17]:
! docker-compose up -d

full-stack_mids_1 is up-to-date
full-stack_cloudera_1 is up-to-date
full-stack_zookeeper_1 is up-to-date
full-stack_spark_1 is up-to-date
full-stack_kafka_1 is up-to-date


# Create Kafka Topic

In [18]:
! docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181

Created topic events.


# Call Game API

In [5]:
# run this in a new bash terminal to call game_api

docker-compose exec mids env FLASK_APP=/w205/full-stack/game_api2.py flask run --host 0.0.0.0

# Launch Kafka Activity Monitor

In [33]:
# run this in a second new bash terminal to call kafka 

docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning

^C


# Pipe User Activity Data to Kafka Topic Through Apache Bench

In [19]:
from pandas import DataFrame, read_csv
import pandas as pd 

file = r'sample_data_v7.csv'
df = pd.read_csv(file)

#for i in range(0,df.shape[0]): # for the entire data set
for i in range(0,1): # for testing
    
        id = str(df.iloc[i, 1])
        month = str(df.iloc[i, 2])
        hrs = str(df.iloc[i, 3])
        ref = str(df.iloc[i, 4])
        sub_sub = str(df.iloc[i, 5])
        money = str(df.iloc[i, 6])
        acctopen = str(df.iloc[i, 7])
        sword = str(df.iloc[i, 8])
        guild = str(df.iloc[i, 9])
        swag = str(df.iloc[i, 10])
        
        # the host name is composed of user id and month
        ! docker-compose exec mids ab -n 1 -H "Host: $id $month" http://localhost:5000/play_hrs/"$hrs"
        ! docker-compose exec mids ab -n 1 -H "Host: $id $month" http://localhost:5000/ref_count/"$ref"
        ! docker-compose exec mids ab -n 1 -H "Host: $id $month" http://localhost:5000/sub_count/"$sub_sub"
        ! docker-compose exec mids ab -n 1 -H "Host: $id $month" http://localhost:5000/money_paid/"$money"
        ! docker-compose exec mids ab -n 1 -H "Host: $id $month" http://localhost:5000/account_open/"$acctopen"
        ! docker-compose exec mids ab -n 1 -H "Host: $id $month" http://localhost:5000/purchase_sword/"$sword"
        ! docker-compose exec mids ab -n 1 -H "Host: $id $month" http://localhost:5000/join_guild/"$guild"
        ! docker-compose exec mids ab -n 1 -H "Host: $id $month" http://localhost:5000/sw_a_g/"$swag"
    
    

This is ApacheBench, Version 2.3 <$Revision: 1706008 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient).....done


Server Software:        Werkzeug/0.14.1
Server Hostname:        localhost
Server Port:            5000

Document Path:          /play_hrs/10.69179855
Document Length:        14 bytes

Concurrency Level:      1
Time taken for tests:   0.004 seconds
Complete requests:      1
Failed requests:        0
Total transferred:      169 bytes
HTML transferred:       14 bytes
Requests per second:    252.33 [#/sec] (mean)
Time per request:       3.963 [ms] (mean)
Time per request:       3.963 [ms] (mean, across all concurrent requests)
Transfer rate:          41.64 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.0      0       0
Processing:     4    4   0.0      4       4
Waiting:        3   

# Filter Events From Kafka and Write to HDFS as Parque Files

In [None]:
! docker-compose exec spark spark-submit /w205/full-stack/filtered_writes2.py

# Check Saved Parque Files

In [8]:
! docker-compose exec cloudera hadoop fs -ls /tmp/play_hrs/

Found 2 items
-rw-r--r--   1 root supergroup          0 2021-04-10 20:27 /tmp/play_hrs/_SUCCESS
-rw-r--r--   1 root supergroup       1572 2021-04-10 20:27 /tmp/play_hrs/part-00000-1c1e7729-92b7-4bc3-b504-34d2b97ac3da-c000.snappy.parquet


In [33]:
! docker-compose exec cloudera hadoop fs -ls /tmp/ref_count/

Found 2 items
-rw-r--r--   1 root supergroup          0 2021-04-06 03:42 /tmp/ref_count/_SUCCESS
-rw-r--r--   1 root supergroup       1746 2021-04-06 03:42 /tmp/ref_count/part-00000-52638758-a648-4c27-aec6-7717bc255806-c000.snappy.parquet


In [34]:
! docker-compose exec cloudera hadoop fs -ls /tmp/purchase_sword/

Found 2 items
-rw-r--r--   1 root supergroup          0 2021-04-06 03:42 /tmp/purchase_sword/_SUCCESS
-rw-r--r--   1 root supergroup       1723 2021-04-06 03:42 /tmp/purchase_sword/part-00000-cef46c4a-7b40-4213-b3d3-8a663b32edb5-c000.snappy.parquet


In [35]:
! docker-compose exec cloudera hadoop fs -ls /tmp/sw_a_g/

Found 2 items
-rw-r--r--   1 root supergroup          0 2021-04-06 03:43 /tmp/sw_a_g/_SUCCESS
-rw-r--r--   1 root supergroup       1704 2021-04-06 03:43 /tmp/sw_a_g/part-00000-ef28ceec-aedf-4a40-94af-ad5f52b003b0-c000.snappy.parquet


In [36]:
! docker-compose exec cloudera hadoop fs -ls /tmp/join_guild/

Found 2 items
-rw-r--r--   1 root supergroup          0 2021-04-06 03:42 /tmp/join_guild/_SUCCESS
-rw-r--r--   1 root supergroup       1718 2021-04-06 03:42 /tmp/join_guild/part-00000-6ca368c9-4e2c-4c58-896e-26dad9ba2522-c000.snappy.parquet


In [37]:
! docker-compose exec cloudera hadoop fs -ls /tmp/account_open/

Found 2 items
-rw-r--r--   1 root supergroup          0 2021-04-06 03:42 /tmp/account_open/_SUCCESS
-rw-r--r--   1 root supergroup       1729 2021-04-06 03:42 /tmp/account_open/part-00000-2dba2735-2798-49ea-afb9-247a6e4b71f9-c000.snappy.parquet


In [9]:
! docker-compose exec cloudera hadoop fs -ls /tmp/money_paid/

Found 2 items
-rw-r--r--   1 root supergroup          0 2021-04-10 20:28 /tmp/money_paid/_SUCCESS
-rw-r--r--   1 root supergroup       1608 2021-04-10 20:28 /tmp/money_paid/part-00000-ce9c3ce9-50b8-4b5d-949b-595c39c767af-c000.snappy.parquet


# Qurey Data From Spark

In [11]:
# Find Local IP address

from requests import get
ip = get('https://api.ipify.org').text
print ('IP is:', ip)

IP is: 35.230.20.101


In [12]:
#instantiate PySpark in the terminal

! docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root --notebook-dir=/w205/' pyspark

[32m[I 20:31:10.032 NotebookApp](B[m The port 8888 is already in use, trying another port.
[32m[I 20:31:10.041 NotebookApp](B[m Serving notebooks from local directory: /w205
[32m[I 20:31:10.041 NotebookApp](B[m 0 active kernels 
[32m[I 20:31:10.041 NotebookApp](B[m The Jupyter Notebook is running at: http://0.0.0.0:8889/?token=80e89b04740dca18f7bf86b7426c3e2e51574e8ef6b39efd
[32m[I 20:31:10.041 NotebookApp](B[m Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 20:31:10.042 NotebookApp] 
    
    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://0.0.0.0:8889/?token=80e89b04740dca18f7bf86b7426c3e2e51574e8ef6b39efd
^C
[32m[I 20:32:14.991 NotebookApp](B[m interrupted
Serving notebooks from local directory: /w205
0 active kernels 
The Jupyter Notebook is running at: http://0.0.0.0:8889/?token=80e89b04740dca18f7bf86b7426c3e2e51574e8ef6b39efd
Shutdown this notebook s

# Query 1

In [87]:
#Query 1, we need hours played and account_open
from pyspark.sql.functions import col

#create temp tables for both variables
hours = spark.read.parquet('/tmp/play_hrs')
hours.registerTempTable('hours')
hours.show(truncate=False)

acct_open = spark.read.parquet('/tmp/account_open')
acct_open.registerTempTable('acct_open')
acct_open.show(truncate=False)
#now join tables on Player ID


results = hours.join(acct_open, 'Host')
results.show()



# #export to df for analysis
df_q1 = results.toPandas()


+------+--------+---------------+----------+-----------------------+-----------+
|Accept|Host    |User-Agent     |event_type|timestamp              |total_hrs  |
+------+--------+---------------+----------+-----------------------+-----------+
|*/*   |100002 0|ApacheBench/2.3|play_hrs  |2021-04-10 21:18:16.709|10.69179855|
+------+--------+---------------+----------+-----------------------+-----------+

+------+--------+---------------+------------+------------+-----------------------+
|Accept|Host    |User-Agent     |account_open|event_type  |timestamp              |
+------+--------+---------------+------------+------------+-----------------------+
|*/*   |100002 0|ApacheBench/2.3|1           |account_open|2021-04-10 21:18:19.087|
+------+--------+---------------+------------+------------+-----------------------+

+--------+------+---------------+----------+--------------------+-----------+------+---------------+------------+------------+--------------------+
|    Host|Accept|     Use

# Query 2

In [88]:
#Query 2, need friend referrals and to join with play time

#create temp table 
ref = spark.read.parquet('/tmp/ref_count')
ref.registerTempTable('ref')

#inner join
results = results.join(ref, 'Host')
results.show()

# #export to df for analysis
df_q2 = results.toPandas()

+--------+------+---------------+----------+--------------------+-----------+------+---------------+------------+------------+--------------------+------+---------------+----------+--------------------+---------------+
|    Host|Accept|     User-Agent|event_type|           timestamp|  total_hrs|Accept|     User-Agent|account_open|  event_type|           timestamp|Accept|     User-Agent|event_type|           timestamp|total_ref_count|
+--------+------+---------------+----------+--------------------+-----------+------+---------------+------------+------------+--------------------+------+---------------+----------+--------------------+---------------+
|100002 0|   */*|ApacheBench/2.3|  play_hrs|2021-04-10 21:18:...|10.69179855|   */*|ApacheBench/2.3|           1|account_open|2021-04-10 21:18:...|   */*|ApacheBench/2.3| ref_count|2021-04-10 21:18:...|              0|
+--------+------+---------------+----------+--------------------+-----------+------+---------------+------------+-----------

# Query 3

In [89]:
#Query 3, need money and subscriptions, will join to hrs_op_ref in case

#create temp tables
sub_sub = spark.read.parquet('/tmp/sub_count')
sub_sub.registerTempTable('sub_sub')
money = spark.read.parquet('/tmp/money_paid')
money.registerTempTable('money')
#join all 3 tables
results = results.join(sub_sub, 'Host')
results = results.join(money, 'Host')
results.show()

#export to df for analysis
results = results.select('Host','account_open','money_paid_amount','total_sub','total_hrs','total_ref_count' )

df_final = results.toPandas()
#export the whole thing to csv for analysis
df_final.to_csv('sample_data_v9.csv')

+--------+------+---------------+----------+--------------------+-----------+------+---------------+------------+------------+--------------------+------+---------------+----------+--------------------+---------------+------+---------------+----------+--------------------+---------+------+---------------+----------+-----------------+--------------------+
|    Host|Accept|     User-Agent|event_type|           timestamp|  total_hrs|Accept|     User-Agent|account_open|  event_type|           timestamp|Accept|     User-Agent|event_type|           timestamp|total_ref_count|Accept|     User-Agent|event_type|           timestamp|total_sub|Accept|     User-Agent|event_type|money_paid_amount|           timestamp|
+--------+------+---------------+----------+--------------------+-----------+------+---------------+------------+------------+--------------------+------+---------------+----------+--------------------+---------------+------+---------------+----------+--------------------+---------+---

# Query using Pandas

In [93]:
import csv
import pandas as pd

df = pd.read_csv('sample_data_v7.csv')
df.head(20)

Unnamed: 0.1,Unnamed: 0,id,month,mon_hrs_played,mon_fr_ref,sub_sub,money_paid,acct_open,sword,guild,sw_a_g
0,1,100002,0,10.691799,0,0,9.242057,1,0,0,0
1,2,100002,1,12.035393,2,0,3.104142,1,0,0,0
2,3,100002,2,11.999392,1,1,14.83143,1,0,0,0
3,4,100002,3,14.795416,2,1,28.941018,1,1,0,0
4,5,100002,4,15.191799,2,1,28.874064,1,1,1,1
5,6,100002,5,17.271326,4,1,30.341962,1,1,1,1
6,7,100002,6,18.618972,4,1,29.633803,1,1,1,1
7,8,100002,7,19.264966,4,1,29.008141,1,1,1,1
8,9,100002,8,19.529923,5,1,29.563691,1,1,1,1
9,10,100002,9,19.631563,5,1,30.611133,1,1,1,1


In [21]:
# Query 1

temp_df = (df[df.acct_open == 1])

temp_df_2 = temp_df.loc[temp_df.groupby('id').month.idxmax()]

final_df = temp_df_2[temp_df_2.month != 10]

hours_played_prior_to_close = final_df["mon_hrs_played"].mean()

print(hours_played_prior_to_close )

10.392797831717095


In [22]:
# Query 2

x = sum(temp_df.mon_hrs_played)
final_x = round(x, 2)
print("Aggreate number of hours played in survey period ", final_x)
y = sum(temp_df.mon_fr_ref)
final_y = round(y, 2)
print("Aggreate number of friends referred in survey period ", final_y)
z=final_x/final_y
final_z = round(z, 2)
print("Average number of hours played per friend referral ", final_z)


Aggreate number of hours played in survey period  12889318.55
Aggreate number of friends referred in survey period  2135835
Average number of hours played per friend referral  6.03


In [97]:
# Query 3

temp_df = (df[df.acct_open == 1])

# temp_df_2 = temp_df.loc[temp_df.groupby('id').month.idxmax()]

temp_df_3 = temp_df[temp_df.sub_sub == 0]

temp_df_4 = temp_df_3.loc[temp_df_3.groupby('id').month.idxmax()]

final_df = temp_df_4[temp_df_4.month != 10]

hours_played_prior_to_close = final_df["mon_hrs_played"].mean()

print(hours_played_prior_to_close )

12.014211312971515


# Shut Down Docker Container

In [14]:
! docker-compose down

Removing network full-stack_default
