# Big Data Project - PUBG Big Data Analysis and Modeling

### Part3: Hacker Identify - cluster and Graph theory - recommendation

Jacey Hung, Ying Huang, Kai Li

In [1]:
#!pip install pyspark

In [None]:
#spark sql imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *

import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
#spark ML imports
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StringIndexer, CountVectorizer, IDF
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import time

In [3]:
spark = SparkSession.builder.appName('PUBG').getOrCreate()

#change configuration settings on Spark 
#conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '`16g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','4g')])
# the 4g is the total mun of the memory 4g * 4 executor = 16g


#print spark configuration settings
#spark.sparkContext.getConf().getAll()

In [4]:
spark.sparkContext

## Load Data

In aggregate, each match's meta information and player statistics are summarized (as provided by pubg). It includes various aggregate statistics such as player kills, damage, distance walked, etc as well as metadata on the match itself such as queue size, fpp/tpp, date, etc.

In [5]:
#match = spark.read.csv("C:/Users/kailf/Python_code/UChicago/Big_Data/Project/match_stats_0.csv", inferSchema=True,header=True)
agg0 = spark.read.csv('gs://data-pubg/data/agg_match_stats_0.csv', inferSchema=True, header=True)
agg1 = spark.read.csv('gs://data-pubg/data/agg_match_stats_1.csv', inferSchema=True, header=True)
agg2 = spark.read.csv('gs://data-pubg/data/agg_match_stats_2.csv', inferSchema=True, header=True)
agg3 = spark.read.csv('gs://data-pubg/data/agg_match_stats_3.csv', inferSchema=True, header=True)
agg4 = spark.read.csv('gs://data-pubg/data/agg_match_stats_4.csv', inferSchema=True, header=True)
match = agg0.union(agg1)
match = match.union(agg2)
match = match.union(agg3)
match = match.union(agg4)


In [6]:
match.printSchema()

root
 |-- date: string (nullable = true)
 |-- game_size: integer (nullable = true)
 |-- match_id: string (nullable = true)
 |-- match_mode: string (nullable = true)
 |-- party_size: integer (nullable = true)
 |-- player_assists: integer (nullable = true)
 |-- player_dbno: integer (nullable = true)
 |-- player_dist_ride: double (nullable = true)
 |-- player_dist_walk: double (nullable = true)
 |-- player_dmg: integer (nullable = true)
 |-- player_kills: integer (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_survive_time: double (nullable = true)
 |-- team_id: integer (nullable = true)
 |-- team_placement: integer (nullable = true)



In [7]:
#kill = spark.read.csv("C:/Users/kailf/Python_code/UChicago/Big_Data/Project/kill_stats_final_0.csv",inferSchema=True, header=True)
kil0 = spark.read.csv('gs://data-pubg/data/kill_match_stats_final_0.csv', inferSchema=True, header=True)
kil1 = spark.read.csv('gs://data-pubg/data/kill_match_stats_final_1.csv', inferSchema=True, header=True)
kil2 = spark.read.csv('gs://data-pubg/data/kill_match_stats_final_2.csv', inferSchema=True, header=True)
kil3 = spark.read.csv('gs://data-pubg/data/kill_match_stats_final_3.csv', inferSchema=True, header=True)
kil4 = spark.read.csv('gs://data-pubg/data/kill_match_stats_final_4.csv', inferSchema=True, header=True)
kill = kil0.union(kil1)
kill = kill.union(kil2)
kill = kill.union(kil3)
kill = kill.union(kil4)


In [8]:
kill.printSchema()

root
 |-- killed_by: string (nullable = true)
 |-- killer_name: string (nullable = true)
 |-- killer_placement: double (nullable = true)
 |-- killer_position_x: double (nullable = true)
 |-- killer_position_y: double (nullable = true)
 |-- map: string (nullable = true)
 |-- match_id: string (nullable = true)
 |-- time: integer (nullable = true)
 |-- victim_name: string (nullable = true)
 |-- victim_placement: double (nullable = true)
 |-- victim_position_x: double (nullable = true)
 |-- victim_position_y: double (nullable = true)



In [9]:
match = match.withColumn("date", F.unix_timestamp(match["date"], "yyyy-MM-dd'T'HH:mm:ss'+0000'").cast('timestamp'))

In [10]:
from pyspark.sql.functions import *

match = match.withColumn('month', month(match['date']))
match = match.withColumn('year', year(match['date']))
match = match.withColumn('hour', hour(match['date']))
match = match.withColumn('dayOfWeek', F.date_format(F.col('Date'),'E'))

In [11]:
#replace match_id with numeric id
match_id = match.select('match_id').distinct()
match_id_kill = kill.select('match_id').distinct()
ids = match_id.union(match_id_kill).drop_duplicates(subset=['match_id'])
ids = ids.withColumn('id',F.monotonically_increasing_id())
match = match.join(ids, on=['match_id'],how = 'left').drop('match_id')
kill = kill.join(ids, on=['match_id'],how = 'left').drop('match_id')

In [45]:
kill = kill.withColumn('kill_distance',F.sqrt((kill.killer_position_x/100 - kill.victim_position_x/100)**2+
                                      (kill.killer_position_y/100 - kill.victim_position_y/100)**2))
#kill.show(5)

## Model - Identify Hacker by Clustering


Identify hackers by clustering abnormal data points in three features:
player_kills
player_dist_walk
player_dist_ride

Inner join all results and find the intersection considered as an hacker list.

Future Work:
1. Need more information to have more accurate identification 
2. Try clustering data with more features
3. Try different clustering method

In [47]:
from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler

In [13]:
kill.printSchema()

root
 |-- killed_by: string (nullable = true)
 |-- killer_name: string (nullable = true)
 |-- killer_placement: double (nullable = true)
 |-- killer_position_x: double (nullable = true)
 |-- killer_position_y: double (nullable = true)
 |-- map: string (nullable = true)
 |-- time: integer (nullable = true)
 |-- victim_name: string (nullable = true)
 |-- victim_placement: double (nullable = true)
 |-- victim_position_x: double (nullable = true)
 |-- victim_position_y: double (nullable = true)
 |-- id: long (nullable = true)
 |-- kill_distance: double (nullable = true)



In [14]:
match.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- game_size: integer (nullable = true)
 |-- match_mode: string (nullable = true)
 |-- party_size: integer (nullable = true)
 |-- player_assists: integer (nullable = true)
 |-- player_dbno: integer (nullable = true)
 |-- player_dist_ride: double (nullable = true)
 |-- player_dist_walk: double (nullable = true)
 |-- player_dmg: integer (nullable = true)
 |-- player_kills: integer (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_survive_time: double (nullable = true)
 |-- team_id: integer (nullable = true)
 |-- team_placement: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- dayOfWeek: string (nullable = true)
 |-- id: long (nullable = true)



In [44]:
kill.printSchema()

root
 |-- killed_by: string (nullable = true)
 |-- killer_name: string (nullable = true)
 |-- killer_placement: double (nullable = true)
 |-- killer_position_x: double (nullable = true)
 |-- killer_position_y: double (nullable = true)
 |-- map: string (nullable = true)
 |-- time: integer (nullable = true)
 |-- victim_name: string (nullable = true)
 |-- victim_placement: double (nullable = true)
 |-- victim_position_x: double (nullable = true)
 |-- victim_position_y: double (nullable = true)
 |-- id: long (nullable = true)
 |-- kill_distance: double (nullable = true)



#### Try to identify super sniper hackers but it couldn't work

In [61]:
#from pyspark.sql import functions as F

kill_cluster = kill.select(
  "kill_distance", 'killer_name',
  F.col("kill_distance").cast("int").isNotNull().alias("Value"))

In [62]:
kill_cluster = kill_cluster.select('killer_name', 'kill_distance', F.col('Value') == 'true')

In [63]:
kill_cluster.printSchema()

root
 |-- killer_name: string (nullable = true)
 |-- kill_distance: double (nullable = true)
 |-- (Value = true): boolean (nullable = true)



In [64]:
%%time
#match_cluster = match.select('player_name','player_kills')

vecAssembler = VectorAssembler(inputCols=['kill_distance'], outputCol="features")
new_df = vecAssembler.transform(kill_cluster.select('killer_name','kill_distance'))
#new_df.show(5)
#new_df = new_df.sample(False, 0.01, seed = 1)
new_df = new_df.dropna()

CPU times: user 1.65 ms, sys: 8.01 ms, total: 9.66 ms
Wall time: 32.9 ms


In [None]:
%%time

# Trains a k-means model.
kmeans = KMeans(k = 2, seed = 1)
model = kmeans.fit(new_df.select('features'))

In [None]:
%%time

# Make predictions
predictions = model.transform(new_df)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

### Abnormal Number of kills   

It will take control of  players aiming and target on opponents automaticaly with high kills and headshot rate.

In [15]:
match = match.withColumn('speed_avg', (F.col('player_dist_ride')+F.col('player_dist_walk'))/F.col('player_survive_time'))


In [20]:
kill = kill.withColumn('shoot range', F.sqrt((F.col('killer_position_x') - \
                                              F.col('victim_position_x'))**2 +\
                                            (F.col('killer_position_y') - \
                                              F.col('victim_position_y'))**2))

In [47]:
match2 = match
match2 = match2.select('id', 'speed_avg', 'player_name', 'player_kills')
match2 = match2.selectExpr('id as id2','speed_avg', 'player_name', 'player_kills')

In [48]:
kill_cluster = kill.join(match2.select('id2', 'speed_avg', 'player_name', 'player_kills'), (match2.id2 == kill.id) & (match.player_name == kill.killer_name), how = 'left').drop('id2')
kill_cluster.printSchema()

root
 |-- killed_by: string (nullable = true)
 |-- killer_name: string (nullable = true)
 |-- killer_placement: double (nullable = true)
 |-- killer_position_x: double (nullable = true)
 |-- killer_position_y: double (nullable = true)
 |-- map: string (nullable = true)
 |-- time: integer (nullable = true)
 |-- victim_name: string (nullable = true)
 |-- victim_placement: double (nullable = true)
 |-- victim_position_x: double (nullable = true)
 |-- victim_position_y: double (nullable = true)
 |-- id: long (nullable = true)
 |-- kill_distance: double (nullable = true)
 |-- speed_avg: double (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_kills: integer (nullable = true)



In [100]:
match.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- game_size: integer (nullable = true)
 |-- match_mode: string (nullable = true)
 |-- party_size: integer (nullable = true)
 |-- player_assists: integer (nullable = true)
 |-- player_dbno: integer (nullable = true)
 |-- player_dist_ride: double (nullable = true)
 |-- player_dist_walk: double (nullable = true)
 |-- player_dmg: integer (nullable = true)
 |-- player_kills: integer (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_survive_time: double (nullable = true)
 |-- team_id: integer (nullable = true)
 |-- team_placement: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- dayOfWeek: string (nullable = true)
 |-- id: long (nullable = true)
 |-- speed_avg: double (nullable = true)



In [39]:
#kill_cluster = kill.join(match.select('id', 'speed_avg', 'player_name', 'player_kills'), (match.id == kill.id) & (match.player_name == kill.killer_name), how = 'left')

In [17]:
match.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- game_size: integer (nullable = true)
 |-- match_mode: string (nullable = true)
 |-- party_size: integer (nullable = true)
 |-- player_assists: integer (nullable = true)
 |-- player_dbno: integer (nullable = true)
 |-- player_dist_ride: double (nullable = true)
 |-- player_dist_walk: double (nullable = true)
 |-- player_dmg: integer (nullable = true)
 |-- player_kills: integer (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_survive_time: double (nullable = true)
 |-- team_id: integer (nullable = true)
 |-- team_placement: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- dayOfWeek: string (nullable = true)
 |-- id: long (nullable = true)
 |-- speed_avg: double (nullable = true)



#### Identify hackers by abnormal player_kills

#### Hacker list for abnormal player_kills

In [53]:
%%time
#match_cluster = match.select('player_name','player_kills')

vecAssembler = VectorAssembler(inputCols=['player_kills'], outputCol="features")
new_df = vecAssembler.transform(match.select('player_name','player_kills'))
#new_df.show(5)
#new_df = new_df.sample(False, 0.01, seed = 1)
new_df = new_df.dropna()

CPU times: user 8.97 ms, sys: 726 µs, total: 9.69 ms
Wall time: 63.5 ms


In [None]:
%%time

# Trains a k-means model.
kmeans = KMeans(k = 2, seed = 1)
model = kmeans.fit(new_df.select('features'))

CPU times: user 72.4 ms, sys: 17 ms, total: 89.4 ms
Wall time: 6min 36s


In [None]:
%%time

# Make predictions
predictions = model.transform(new_df)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Silhouette with squared euclidean distance = 0.78192048541292
Cluster Centers: 
[0.26388968]
[3.21559913]
CPU times: user 57 ms, sys: 26.1 ms, total: 83 ms
Wall time: 6min 25s


In [None]:
predictions.filter(predictions['prediction'] == '1').count()

14208220

In [None]:
predictions.filter(predictions['prediction'] == '0').count()

53063358

In [65]:
hacker1 = predictions.filter(predictions['prediction'] == '1')

In [66]:
hacker1.printSchema()

root
 |-- player_name: string (nullable = true)
 |-- player_kills: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: integer (nullable = false)



In [77]:
hacker1.select('player_name').distinct().count()

3845093

In [95]:
match.select('player_name').distinct().count()

7669689

#### Hacker list of abnormal player_dist_ride

In [132]:
%%time
#match_cluster = match.select('player_name','player_kills')

vecAssembler = VectorAssembler(inputCols=['player_dist_ride'], outputCol="features")
new_df2 = vecAssembler.transform(match.select('player_name','player_dist_ride'))
#new_df.show(5)
#new_df = new_df.sample(False, 0.01, seed = 1)
new_df2 = new_df2.dropna()

CPU times: user 8.39 ms, sys: 4.37 ms, total: 12.8 ms
Wall time: 68 ms


In [None]:
%%time

# Trains a k-means model.
kmeans2 = KMeans(k = 2, seed = 1)
model2 = kmeans2.fit(new_df2.select('features'))

CPU times: user 48 ms, sys: 15.9 ms, total: 63.9 ms
Wall time: 7min 21s


In [None]:
%%time

# Make predictions
predictions = model2.transform(new_df2)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Silhouette with squared euclidean distance = 0.9998024478106392
Cluster Centers: 
[0.26388927]
[3.21559963]
CPU times: user 41.6 ms, sys: 17.3 ms, total: 58.9 ms
Wall time: 5min 40s


In [None]:
%%time
predictions.filter(predictions['prediction'] == '1').count()

CPU times: user 16.3 ms, sys: 4.75 ms, total: 21.1 ms
Wall time: 2min 37s


733

In [136]:
%%time
predictions.filter(predictions['prediction'] == '0').count()

CPU times: user 15.1 ms, sys: 5.76 ms, total: 20.9 ms
Wall time: 2min 37s


67270845

In [137]:
%%time
hacker2 = predictions.filter(predictions['prediction'] == '1')

CPU times: user 2.29 ms, sys: 351 µs, total: 2.64 ms
Wall time: 49.9 ms


In [None]:
hacker2.select('player_name').distinct().count()

#### Hacker list for abnormal player_dist_walk

In [139]:
%%time
#match_cluster = match.select('player_name','player_kills')

vecAssembler = VectorAssembler(inputCols=['player_dist_walk'], outputCol="features")
new_df2 = vecAssembler.transform(match.select('player_name','player_dist_walk'))
#new_df.show(5)
#new_df = new_df.sample(False, 0.01, seed = 1)
new_df2 = new_df2.dropna()

CPU times: user 10.5 ms, sys: 1.25 ms, total: 11.8 ms
Wall time: 30.5 ms


In [None]:
%%time

# Trains a k-means model.
kmeans2 = KMeans(k = 2, seed = 1)
model2 = kmeans2.fit(new_df2.select('features'))

CPU times: user 29 ms, sys: 33.5 ms, total: 62.5 ms
Wall time: 6min 55s


In [None]:
%%time

# Make predictions
predictions = model2.transform(new_df2)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Silhouette with squared euclidean distance = 0.9999742396154305
Cluster Centers: 
[0.26388927]
[3.21559963]
CPU times: user 48.7 ms, sys: 7.23 ms, total: 55.9 ms
Wall time: 5min 38s


In [None]:
%%time
predictions.filter(predictions['prediction'] == '1').count()

CPU times: user 12.7 ms, sys: 8.83 ms, total: 21.5 ms
Wall time: 2min 38s


2483

In [None]:
%%time
predictions.filter(predictions['prediction'] == '0').count()

CPU times: user 13.4 ms, sys: 7.35 ms, total: 20.8 ms
Wall time: 2min 38s


67269095

In [144]:
%%time
hacker3 = predictions.filter(predictions['prediction'] == '1')

CPU times: user 0 ns, sys: 2.95 ms, total: 2.95 ms
Wall time: 39.3 ms


In [150]:
hacker1.printSchema()

root
 |-- player_name: string (nullable = true)
 |-- player_kills: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: integer (nullable = false)



In [151]:
hacker2.printSchema()

root
 |-- player_name: string (nullable = true)
 |-- player_dist_ride: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: integer (nullable = false)



In [152]:
hacker3.printSchema()

root
 |-- player_name: string (nullable = true)
 |-- player_dist_walk: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: integer (nullable = false)



#### Find Intersection among all three hacker lists

In [161]:
hacker_name2 = hacker2.selectExpr("player_name as player_name2")
hacker_name3 = hacker3.selectExpr("player_name as player_name3")

In [173]:
hacker_name = hacker1.join(hacker_name2.select('player_name2'), \
                           hacker1.player_name == hacker_name2.player_name2, how = 'inner').drop('player_name2')

hacker_name = hacker_name.join(hacker_name3.select('player_name3'), \
                           hacker_name.player_name == hacker_name3.player_name3, how = 'inner').drop('player_name3')

In [174]:
hacker_name.printSchema()

root
 |-- player_name: string (nullable = true)
 |-- player_kills: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: integer (nullable = false)



In [175]:
hacker_name.select('player_name').distinct().count()

574

In [176]:
distinct_hacker = hacker_name.select('player_name').distinct()

In [None]:
%%time
distinct_hacker.show(10)

+-------------+
|  player_name|
+-------------+
|     NIcemust|
|     WyoRebel|
|HuYaTV_SenSen|
|Lao_Yin_Bi_Ma|
|       VP7777|
|     lingaggg|
|      ytpbaba|
|  Flashin1912|
|     Shanklen|
|    SudoPanda|
+-------------+
only showing top 10 rows

CPU times: user 30.6 ms, sys: 7.84 ms, total: 38.5 ms
Wall time: 5min 12s


In [179]:
print('down')

down


### Graph

In [26]:
#!pip install graphframes

Collecting graphframes
  Downloading graphframes-0.6-py2.py3-none-any.whl (18 kB)
Installing collected packages: graphframes
Successfully installed graphframes-0.6


In [13]:
from graphframes import *

In [30]:
#!wget http://dl.bintray.com/spark-packages/maven/graphframes/graphframes/0.7.0-spark2.4-s_2.11/graphframes-0.7.0-spark2.4-s_2.11.jar

In [31]:
#!cp /graphframes-0.7.0-spark2.4-s_2.11.jar /usr/lib/spark/jars

In [43]:
#!ls /usr/lib/spark/jars

In [14]:
match2 = match
match2 = match2.select('player_name', 'id', 'team_id')
match2 = match2.selectExpr("player_name as teammate_name", "id as id2", 'team_id as team_id2')

In [15]:
match2.printSchema()

root
 |-- teammate_name: string (nullable = true)
 |-- id2: long (nullable = true)
 |-- team_id2: integer (nullable = true)



In [16]:
graph_data = match.join(match2.select('teammate_name', 'team_id2', 'id2'), \
                  (match.team_id == match2.team_id2)\
                  & (match.id == match2.id2), how = 'left').drop('id2', 'team_id2')

In [17]:
graph_data.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- game_size: integer (nullable = true)
 |-- match_mode: string (nullable = true)
 |-- party_size: integer (nullable = true)
 |-- player_assists: integer (nullable = true)
 |-- player_dbno: integer (nullable = true)
 |-- player_dist_ride: double (nullable = true)
 |-- player_dist_walk: double (nullable = true)
 |-- player_dmg: integer (nullable = true)
 |-- player_kills: integer (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_survive_time: double (nullable = true)
 |-- team_id: integer (nullable = true)
 |-- team_placement: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- dayOfWeek: string (nullable = true)
 |-- id: long (nullable = true)
 |-- teammate_name: string (nullable = true)



In [187]:
graph_data= graph_data.dropna()

In [188]:
graph_data = graph_data.filter(graph_data.player_name != graph_data.teammate_name)

In [20]:
#temp.summary().show()

In [190]:
player_name = graph_data2.withColumn("id",graph_data2["player_name"])
teammates = graph_data2.withColumn("src", graph_data2["player_name"])
teammates = teammates.withColumn("dst", graph_data2["teammate_name"])

In [23]:
graph = GraphFrame(player_name, teammates)

In [23]:
#graph = graph.dropna()

### show vetices and degree of the graph

In [None]:
%%time
graph.vertices.show(10)

+------------+-------------+---------+--------------+-----------+----------+------------+-------------------+--------------+------------+
| player_name|teammate_name|game_size|player_assists|player_dbno|player_dmg|player_kills|player_survive_time|team_placement|          id|
+------------+-------------+---------+--------------+-----------+----------+------------+-------------------+--------------+------------+
|     DylanBu| Sloth-Keeper|       29|             0|          1|       100|           1|           1163.402|            18|     DylanBu|
|     DylanBu|     kakuiyou|       29|             0|          1|       100|           1|           1163.402|            18|     DylanBu|
|     DylanBu|   Fish-Alive|       29|             0|          1|       100|           1|           1163.402|            18|     DylanBu|
|Sloth-Keeper|      DylanBu|       29|             0|          1|       200|           1|            473.912|            18|Sloth-Keeper|
|Sloth-Keeper|     kakuiyou|      

In [None]:
%%time
graph.edges.show(10)

+-------------+-------------+---------+--------------+-----------+----------+------------+-------------------+--------------+-------------+-------------+
|  player_name|teammate_name|game_size|player_assists|player_dbno|player_dmg|player_kills|player_survive_time|team_placement|          src|          dst|
+-------------+-------------+---------+--------------+-----------+----------+------------+-------------------+--------------+-------------+-------------+
|      Kobe_24|        modea|       29|             0|          1|         6|           0|            368.224|            22|      Kobe_24|        modea|
|      Kobe_24|    JorDan023|       29|             0|          1|         6|           0|            368.224|            22|      Kobe_24|    JorDan023|
|        modea|      Kobe_24|       29|             0|          1|       257|           1| 472.67400000000004|            22|        modea|      Kobe_24|
|        modea|    JorDan023|       29|             0|          1|       257

In [None]:
%%time
graph.vertices.count()

104041986

In [None]:
print('down')

down


In [None]:
%%time
graph.edges.count()

103641909

In [30]:
print('down')

down


In [None]:
%%time

graph.degrees.orderBy("degree", ascending=False).show(10)

+------------+------+
|          id|degree|
+------------+------+
|feitengdedan|  5460|
|   hzxiaobin|  5346|
|   Slh_Bunny|  5314|
|      JZalan|  5132|
|   yangyl123|  4210|
|    Sterealz|  4206|
|   jimmy0572|  4124|
|    yirui887|  4106|
|BlueChick666|  3966|
|Matthew_wang|  3940|
+------------+------+
only showing top 10 rows

CPU times: user 198 ms, sys: 52.5 ms, total: 250 ms
Wall time: 18min 4s


In [None]:
%%time

graph.edges.groupBy("src", "dst").max("player_survive_time").orderBy("max(player_survive_time)", ascending =False).show()

+--------------+--------------+------------------------+
|           src|           dst|max(player_survive_time)|
+--------------+--------------+------------------------+
|       cocoses|        icheee|            6.3648215E10|
|        icheee|       cocoses|            6.3648215E10|
|        icheee|          AVKU|            6.3648215E10|
|       cocoses|          AVKU|            6.3648215E10|
|          SPGC|Bluehairsorare|            6.3648215E10|
|          AVKU|       cocoses|            6.3648215E10|
|     caonima6b|        yzstar|            6.3648215E10|
|  20Yuwuyanzhu|  GiorgioChang|            6.3648215E10|
|          AVKU|        icheee|            6.3648215E10|
|best-spectator|  beastinpants|            6.3648215E10|
|   Unl3eatable|      ThuCandy|             6.364642E10|
|   Unl3eatable|          ISTD|             6.364642E10|
|assho_detected|      SACancel|             6.364642E10|
|Douglas_Bolden|      SACancel|             6.364642E10|
|   SONGYEJIUWO|       TTTTONE|

In [None]:
%%time

frequentPlayer = graph.edges.groupBy("src", "dst").count().orderBy("count", ascending = False)

frequentPlayer.show(10)

+--------------+--------------+-----+
|           src|           dst|count|
+--------------+--------------+-----+
|     jimmy0572|     hzxiaobin|  767|
|     hzxiaobin|     jimmy0572|  767|
|       AKA8881|     ZDN-chiji|  714|
|     ZDN-chiji|       AKA8881|  714|
|     hzxiaobin| woerziyuchong|  712|
| woerziyuchong|     hzxiaobin|  712|
|    dawsonTsui|  BlueChick666|  664|
|  BlueChick666|    dawsonTsui|  664|
|   FkinAwesome|GoAheadTry2Run|  640|
|GoAheadTry2Run|   FkinAwesome|  640|
+--------------+--------------+-----+
only showing top 10 rows

CPU times: user 197 ms, sys: 54.9 ms, total: 252 ms
Wall time: 17min 59s


#### Page Rank - small sample

In [31]:
match2 = match
match2 = match2.select('player_name', 'id', 'team_id')
match2 = match2.selectExpr("player_name as teammate_name", "id as id2", 'team_id as team_id2')

#match2.printSchema()

graph_data = match.join(match2.select('teammate_name', 'team_id2', 'id2'), \
                  (match.team_id == match2.team_id2)\
                  & (match.id == match2.id2), how = 'left').drop('id2', 'team_id2')

graph_data.printSchema()

graph_data= graph_data.dropna()

graph_data = graph_data.filter(graph_data.player_name != graph_data.teammate_name)

root
 |-- date: timestamp (nullable = true)
 |-- game_size: integer (nullable = true)
 |-- match_mode: string (nullable = true)
 |-- party_size: integer (nullable = true)
 |-- player_assists: integer (nullable = true)
 |-- player_dbno: integer (nullable = true)
 |-- player_dist_ride: double (nullable = true)
 |-- player_dist_walk: double (nullable = true)
 |-- player_dmg: integer (nullable = true)
 |-- player_kills: integer (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_survive_time: double (nullable = true)
 |-- team_id: integer (nullable = true)
 |-- team_placement: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- dayOfWeek: string (nullable = true)
 |-- id: long (nullable = true)
 |-- teammate_name: string (nullable = true)



In [33]:
graph_data2 = graph_data.sample(False, 0.0001, seed = 1)
graph_data2.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- game_size: integer (nullable = true)
 |-- match_mode: string (nullable = true)
 |-- party_size: integer (nullable = true)
 |-- player_assists: integer (nullable = true)
 |-- player_dbno: integer (nullable = true)
 |-- player_dist_ride: double (nullable = true)
 |-- player_dist_walk: double (nullable = true)
 |-- player_dmg: integer (nullable = true)
 |-- player_kills: integer (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_survive_time: double (nullable = true)
 |-- team_id: integer (nullable = true)
 |-- team_placement: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- dayOfWeek: string (nullable = true)
 |-- id: long (nullable = true)
 |-- teammate_name: string (nullable = true)



In [34]:
#page_rank_sample = graph_data2.sample(False, 0.0001, seed = 1)
pr_player_name = graph_data2.withColumn("id",graph_data2["player_name"])
pr_teammates =  graph_data2.withColumn("src", graph_data2["player_name"])
pr_teammates = pr_teammates.withColumn("dst", graph_data2["teammate_name"])

pr_graph = GraphFrame(pr_player_name, pr_teammates)

In [28]:
# %%time
# pr_results = pr_graph.pageRank(resetProbability=0.15, tol= 0.01)
# pr_results.vertices.orderBy('pagerank',ascending=False).show()

#### Triangle counting - small sample

In [None]:
%%time
pr_graph.triangleCount().orderBy("count", ascending=False).show()

+-----+-------------------+---------+----------+----------+--------------+-----------+------------------+------------------+----------+------------+----------------+-------------------+-------+--------------+-----+----+----+---------+----------------+-------------+
|count|               date|game_size|match_mode|party_size|player_assists|player_dbno|  player_dist_ride|  player_dist_walk|player_dmg|player_kills|     player_name|player_survive_time|team_id|team_placement|month|year|hour|dayOfWeek|              id|teammate_name|
+-----+-------------------+---------+----------+----------+--------------+-----------+------------------+------------------+----------+------------+----------------+-------------------+-------+--------------+-----+----+----+---------+----------------+-------------+
|    0|2017-10-28 10:22:38|       28|       tpp|         4|             2|          1|          693.7033|        3939.50854|        83|           0|    327382933Bzc|           1689.721|      5|         

In [None]:
print('down')

down


#### Motif sub graph finding - small sample

In [None]:
play_num_count = pr_graph.edges.groupBy("src", "dst").count().orderBy("count", ascending = False)

#What are the flight routes with no direct connection?
subGraph = GraphFrame(pr_graph.vertices.select("id", "teammate_name"), play_num_count)
res = subGraph.find("(a)-[]->(b); (b)-[]->(c); !(a)-[]->(c)").filter("c.id !=a.id")
res.show(10, truncate=False)

+-------------------------+--------------------------+-----------------------+
|a                        |b                         |c                      |
+-------------------------+--------------------------+-----------------------+
|[Bizzy-Bin, LeLocle0512] |[LeLocle0512, LeLocle0202]|[LeLocle0202, Cannvaro]|
|[SMer_, babyt2]          |[babyt2, 333yinuo]        |[333yinuo, babyt2]     |
|[saoqi-NQR, DouYu_GuLang]|[DouYu_GuLang, CNJianChen]|[CNJianChen, fdgerggs] |
+-------------------------+--------------------------+-----------------------+



In [42]:
print('down')

down
