05-observation-based-mechanism.ipynb
======================

**Things to do**
* Find how to get the total size of a dataframe.
* Find how to combine unique killers in the killing network and unique cheaters in the observation network (as they could overlap).

## Load packages and read tables.

In [None]:
from pyspark.sql.functions import col, lit, when
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from pyspark.sql.types import StructType, StructField, LongType
import pubg_analysis as pubg

In [3]:
# Read a table that contains edges.
td = spark.read.parquet("s3://jinny-capstone-data-test/data_for_obs_mech.parquet")
td.registerTempTable("td")

# Read a table that contains cheater data.
cheaters = spark.read.parquet("s3://jinny-capstone-data-test/cheater_info.parquet")
cheaters.registerTempTable("cheaters")

# Read a table that contains nodes.
nodes = spark.read.parquet("s3://jinny-capstone-data-test/nodes.parquet")
nodes.registerTempTable("nodes")

## 1. Count the number of motifs on the empirical network.

In [4]:
# First, add information of killers.
src_info = spark.sql("""SELECT mid, src, start_date AS src_sd, ban_date AS src_bd, src_flag, 
                        dst, dst_bd, dst_flag, time, m_date 
                        FROM td t JOIN nodes n ON t.src = n.id""")
src_info.registerTempTable("src_info")

# Add information of victims.
full_info = spark.sql("""SELECT mid, src, src_sd, src_bd, src_flag, 
                         dst, start_date AS dst_sd, dst_bd, dst_flag, time, m_date 
                         FROM src_info s JOIN nodes n ON s.dst = n.id ORDER BY src_flag""")
full_info.registerTempTable("full_info")

# Add information of cheaters.
add_flags = spark.sql("""SELECT mid, src, src_sd, src_bd, src_flag,
                         CASE WHEN src_bd >= m_date AND src_sd <= m_date 
                         AND src_flag == 1 THEN 1 ELSE 0 END AS src_curr_flag, 
                         dst, dst_sd, dst_bd, dst_flag,
                         CASE WHEN dst_bd >= m_date AND dst_sd <= m_date 
                         AND dst_flag == 1 THEN 1 ELSE 0 END AS dst_curr_flag, time, m_date 
                         FROM full_info ORDER BY mid, time""")
add_flags.registerTempTable("add_flags")

legit_matches = spark.sql("""SELECT mid FROM (SELECT mid, SUM(src_curr_flag) AS c_kills FROM add_flags GROUP BY mid) 
                             WHERE c_kills > 0""")
legit_matches.registerTempTable("legit_matches")

records = spark.sql("""SELECT r.mid, src, src_sd, src_bd, src_flag, src_curr_flag, 
                       dst, dst_sd, dst_bd, dst_flag, dst_curr_flag, time, m_date 
                       FROM add_cheating_flag r JOIN legit_matches l ON r.mid = l.mid""")
records.registerTempTable("records")

records = spark.sql("SELECT *, ROW_NUMBER() OVER (PARTITION BY mid ORDER BY time) AS aid FROM records")
records.registerTempTable("records")

In [4]:
# Get a list of killings done by cheaters.
kills_done_by_cheaters = spark.sql("""SELECT mid, src AS killer, time, aid FROM records 
                                      WHERE src_curr_flag = 1""")
kills_done_by_cheaters.registerTempTable("kills_done_by_cheaters")

# Get a table of players (both killers and victims) who observed killings done by cheaters when they were alive.
observers_tab = spark.sql("""SELECT id, TO_DATE(CAST(UNIX_TIMESTAMP(start_date, 'yyyy-MM-dd') AS TIMESTAMP)) AS start_date, 
                             TO_DATE(CAST(UNIX_TIMESTAMP(m_date, 'yyyy-MM-dd') AS TIMESTAMP)) AS m_date, 
                             CAST(DATEDIFF(start_date, m_date) AS INT) AS period, killer, COUNT(*) AS obs 
                             FROM (SELECT s.mid, s.src AS id, s.src_sd AS start_date, s.src_bd, k.time, s.m_date, k.killer, k.aid 
                             FROM records s LEFT JOIN kills_done_by_cheaters k ON s.mid = k.mid AND s.aid < k.aid 
                             WHERE src_flag == 1 AND src_sd != 'NA' AND src != killer AND src_curr_flag == 0
                             UNION
                             SELECT s.mid, s.src AS id, s.src_sd, s.src_bd, k.time, s.m_date, k.killer, k.aid 
                             FROM records s LEFT JOIN kills_done_by_cheaters k ON s.mid = k.mid AND s.aid > k.aid 
                             WHERE src_flag == 1 AND src_sd != 'NA' AND src != killer AND src_curr_flag == 0
                             UNION
                             SELECT s.mid, s.dst AS id, s.dst_sd, s.dst_bd, k.time, s.m_date, k.killer, k.aid 
                             FROM records s LEFT JOIN kills_done_by_cheaters k ON s.mid = k.mid AND s.aid > k.aid 
                             WHERE dst_flag == 1 AND dst_sd != 'NA' AND dst != killer AND dst_curr_flag == 0) 
                             GROUP BY id, start_date, m_date, killer""")
observers_tab.registerTempTable("observers_tab")
observers_tab.show(20)

+--------------------+----------+----------+------+--------------------+---+
|                  id|start_date|    m_date|period|              killer|obs|
+--------------------+----------+----------+------+--------------------+---+
|account.27478c512...|2019-03-02|2019-03-01|     1|account.f1bf619de...|  8|
|account.216ad15bd...|2019-03-02|2019-03-01|     1|account.05e7638fb...|  2|
|account.6dccf1b38...|2019-03-02|2019-03-01|     1|account.0a54b3498...|  1|
|account.49a8333dc...|2019-03-02|2019-03-01|     1|account.b5b88ac71...|  2|
|account.8c52a3348...|2019-03-02|2019-03-01|     1|account.caeaf4f68...|  9|
|account.8c52a3348...|2019-03-02|2019-03-01|     1|account.f450cac80...|  2|
|account.47ff35eca...|2019-03-02|2019-03-01|     1|account.1cb8b6fcd...|  3|
|account.d87ddfa9b...|2019-03-03|2019-03-02|     1|account.9619606b3...|  2|
|account.903b83e20...|2019-03-03|2019-03-01|     2|account.289e5d7fb...|  1|
|account.27478c512...|2019-03-02|2019-03-01|     1|account.299b4920e...| 12|

In [None]:
# Get the table that contains the total number of victimisation experiences and the number of unique cheaters.
obs_info = spark.sql("""SELECT id, start_date, m_date, SUM(obs) AS total_obs, COUNT(DISTINCT killer) AS uniq_cheaters 
                        FROM observers_tab GROUP BY id, start_date, m_date""")
obs_info.registerTempTable("obs_info")
obs_info.show(20)

In [None]:
# Store the summary table in the S3 bucket for the later use.
obs_info.write.parquet("s3://jinny-capstone-data-test/summary-tables/emp-net/obs_tab.parquet")

In [None]:
# Plot the distribution of the total number of observations.
# Show the distribution of the number of times the player observed cheating before the transition happened.
# In this case, we allow duplicate pairs of cheater and observer. 
# as there are some players who have observed the same cheaters more than once. 

obs_info_df = obs_info.toPandas()

bins = np.arange(0, obs_info_df['total_obs'].max() + 1.5) - 0.5
fig = obs_info_df.hist(column = 'total_obs', histtype='step', bins = bins, 
                       weights=np.zeros_like(obs_info_df['total_obs'])+1./len(obs_info_df['total_obs']))
plt.xlim(xmin=0.5)
plt.xlabel("Number of observations")
plt.ylabel("Proportion")
plt.title("")
plt.show() 

In [None]:
# Plot the distribution of the number of unique cheaters.
# In this case, we consider only unique pairs of cheater and observer.

bins = np.arange(0, obs_info_df['uniq_cheaters'].max() + 1.5) - 0.5
fig = obs_info_df.hist(column = 'uniq_cheaters', histtype='step', bins = bins, 
                       weights=np.zeros_like(obs_info_df['uniq_cheaters'])+1./len(obs_info_df['uniq_cheaters']))
plt.xlim(xmin=0.5)
plt.xlabel("Number of unique cheaters")
plt.ylabel("Proportion")
plt.title("")
plt.show()

## 2. Reuse the mapping table in the S3 bucket to create randomised networks.

In [13]:
# Read a table that contains team membership data.
team_info = spark.read.parquet("s3://jinny-capstone-data-test/team_data.parquet")
team_info.registerTempTable("team_ids")

In [None]:
# Read the mapping table.
map_tab = spark.read.parquet("s3://jinny-capstone-data-test/mapping-tables/map_tab_1.parquet")
map_tab.registerTempTable("map_tab")

In [None]:
# Get randomised gameplay logs.
temp_rand_logs = spark.sql("""SELECT mid, src, randomised AS new_src, dst, time, m_date 
                              FROM td t JOIN map_tab m ON t.src = m.original AND t.mid = m.match_id""")
temp_rand_logs.registerTempTable("temp_rand_logs")
randomised_logs = spark.sql("""SELECT mid, new_src AS src, randomised AS dst, time, m_date 
                               FROM temp_rand_logs t JOIN map_tab m ON t.dst = m.original AND t.mid = m.match_id""")
randomised_logs.registerTempTable("randomised_logs")

## 3. Count the number of motifs on the randomised networks.

In [18]:
# Add information of killers.
src_info = spark.sql("""SELECT mid, src, start_date AS src_sd, ban_date AS src_bd, cheating_flag AS src_flag, 
                        dst, dst_bd, dst_flag, time, m_date 
                        FROM randomised_logs t JOIN nodes n ON t.src = n.id""")
src_info.registerTempTable("src_info")

# Add information of victims.
full_info = spark.sql("""SELECT mid, src, src_sd, src_bd, src_flag, 
                         dst, start_date AS dst_sd, ban_date AS dst_bd, cheating_flag AS dst_flag, 
                         time, m_date FROM src_info s JOIN nodes n ON s.dst = n.id 
                         ORDER BY src_flag""")
full_info.registerTempTable("full_info")

# Add information of cheaters.
add_cheating_flag = spark.sql("""SELECT mid, src, src_sd, src_bd, src_flag,
                                 CASE WHEN src_bd >= m_date AND src_sd <= m_date AND src_flag == 1 THEN 1 ELSE 0 END AS src_curr_flag, 
                                 dst, dst_sd, dst_bd, dst_flag,
                                 CASE WHEN dst_bd >= m_date AND dst_sd <= m_date AND dst_flag == 1 THEN 1 ELSE 0 END AS dst_curr_flag, 
                                 time, m_date FROM full_info ORDER BY mid, time""")
add_cheating_flag.registerTempTable("add_cheating_flag")

legit_matches = spark.sql("SELECT mid FROM (SELECT mid, SUM(src_curr_flag) AS c_kills FROM add_cheating_flag GROUP BY mid) WHERE c_kills > 0")
legit_matches.registerTempTable("legit_matches")
records = spark.sql("""SELECT r.mid, src, src_sd, src_bd, src_flag, src_curr_flag, dst, dst_sd, dst_bd, dst_flag, dst_curr_flag, time, m_date 
                       FROM add_cheating_flag r JOIN legit_matches l ON r.mid = l.mid""")
records.registerTempTable("records")
records = spark.sql("SELECT *, ROW_NUMBER() OVER (PARTITION BY mid ORDER BY time) AS aid FROM records")
records.registerTempTable("records")

AnalysisException: "cannot resolve '`src_flag`' given input columns: [n.start_date, t.m_date, t.time, t.mid, t.src, n.id, n.ban_date, n.cheating_flag, t.dst, n.pname]; line 1 pos 59;\n'Project [mid#122, src#343, start_date#184 AS src_sd#350, ban_date#185 AS src_bd#351, 'src_flag, dst#344, 'dst_bd, 'dst_flag, time#129, m_date#130]\n+- Join Inner, (src#343 = id#181)\n   :- SubqueryAlias `t`\n   :  +- SubqueryAlias `rand_logs`\n   :     +- Project [mid#122, new_src#336 AS src#343, randomised#328 AS dst#344, time#129, m_date#130]\n   :        +- Join Inner, ((dst#126 = original#325) && (mid#122 = match_id#324))\n   :           :- SubqueryAlias `t`\n   :           :  +- SubqueryAlias `temp_rand_logs`\n   :           :     +- Project [mid#122, src#123, randomised#328 AS new_src#336, dst#126, time#129, m_date#130]\n   :           :        +- Join Inner, ((src#123 = original#325) && (mid#122 = match_id#324))\n   :           :           :- SubqueryAlias `t`\n   :           :           :  +- SubqueryAlias `td`\n   :           :           :     +- Relation[mid#122,src#123,src_bd#124,src_flag#125,dst#126,dst_bd#127,dst_flag#128,time#129,m_date#130] parquet\n   :           :           +- SubqueryAlias `m`\n   :           :              +- SubqueryAlias `mapping`\n   :           :                 +- Project [match_id#324, original#325, orig_flag#326, orig_tid#327, randomised#328, rand_flag#274, rand_tid#275]\n   :           :                    +- Sort [mid#251 ASC NULLS FIRST], true\n   :           :                       +- Project [mid#251 AS match_id#324, id#253 AS original#325, flag#254 AS orig_flag#326, tid#255 AS orig_tid#327, rand#273 AS randomised#328, rand_flag#274, rand_tid#275, mid#251]\n   :           :                          +- SubqueryAlias `join_on_index`\n   :           :                             +- Project [mid#251, m_date#252, id#253, flag#254, tid#255, match_id#272, rand#273, rand_flag#274, rand_tid#275]\n   :           :                                +- Join Inner, (ColumnIndex#256L = ColumnIndex#276L)\n   :           :                                   :- LogicalRDD [mid#251, m_date#252, id#253, flag#254, tid#255, ColumnIndex#256L], false\n   :           :                                   +- LogicalRDD [match_id#272, rand#273, rand_flag#274, rand_tid#275, ColumnIndex#276L], false\n   :           +- SubqueryAlias `m`\n   :              +- SubqueryAlias `mapping`\n   :                 +- Project [match_id#324, original#325, orig_flag#326, orig_tid#327, randomised#328, rand_flag#274, rand_tid#275]\n   :                    +- Sort [mid#251 ASC NULLS FIRST], true\n   :                       +- Project [mid#251 AS match_id#324, id#253 AS original#325, flag#254 AS orig_flag#326, tid#255 AS orig_tid#327, rand#273 AS randomised#328, rand_flag#274, rand_tid#275, mid#251]\n   :                          +- SubqueryAlias `join_on_index`\n   :                             +- Project [mid#251, m_date#252, id#253, flag#254, tid#255, match_id#272, rand#273, rand_flag#274, rand_tid#275]\n   :                                +- Join Inner, (ColumnIndex#256L = ColumnIndex#276L)\n   :                                   :- LogicalRDD [mid#251, m_date#252, id#253, flag#254, tid#255, ColumnIndex#256L], false\n   :                                   +- LogicalRDD [match_id#272, rand#273, rand_flag#274, rand_tid#275, ColumnIndex#276L], false\n   +- SubqueryAlias `n`\n      +- SubqueryAlias `nodes`\n         +- Relation[id#181,pname#182,cheating_flag#183,start_date#184,ban_date#185] parquet\n"

In [None]:
# Get a list of killings done by cheaters.
kills_done_by_cheaters = spark.sql("SELECT mid, src AS killer, time, aid FROM records WHERE src_curr_flag = 1")
kills_done_by_cheaters.registerTempTable("kills_done_by_cheaters")

# Get a table of players (both killers and victims) who observed killings done by cheaters when they were alive.
observers_tab = spark.sql("""SELECT id, TO_DATE(CAST(UNIX_TIMESTAMP(start_date, 'yyyy-MM-dd') AS TIMESTAMP)) AS start_date, 
                             TO_DATE(CAST(UNIX_TIMESTAMP(m_date, 'yyyy-MM-dd') AS TIMESTAMP)) AS m_date, 
                             CAST(DATEDIFF(start_date, m_date) AS INT) AS period, killer, COUNT(*) AS obs 
                             FROM (SELECT s.mid, s.src AS id, s.src_sd AS start_date, s.src_bd, k.time, s.m_date, k.killer, k.aid 
                             FROM records s LEFT JOIN kills_done_by_cheaters k ON s.mid = k.mid AND s.aid < k.aid 
                             WHERE src_flag == 1 AND src_sd != 'NA' AND src != killer AND src_curr_flag == 0
                             UNION
                             SELECT s.mid, s.src AS id, s.src_sd, s.src_bd, k.time, s.m_date, k.killer, k.aid 
                             FROM records s LEFT JOIN kills_done_by_cheaters k ON s.mid = k.mid AND s.aid > k.aid 
                             WHERE src_flag == 1 AND src_sd != 'NA' AND src != killer AND src_curr_flag == 0
                             UNION
                             SELECT s.mid, s.dst AS id, s.dst_sd, s.dst_bd, k.time, s.m_date, k.killer, k.aid 
                             FROM records s LEFT JOIN kills_done_by_cheaters k ON s.mid = k.mid AND s.aid > k.aid 
                             WHERE dst_flag == 1 AND dst_sd != 'NA' AND dst != killer AND dst_curr_flag == 0) 
                             GROUP BY id, start_date, m_date, killer""")
observers_tab.show(20)