In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sparkml").getOrCreate()

In [3]:
spark

In [4]:
agg1 = spark.read.parquet('s3://502pubg/clean/aggeragate1.parquet')

In [12]:
agg0 = spark.read.parquet('s3://502pubg/clean/aggeragate0.parquet')

In [13]:
agg0.printSchema()

root
 |-- game_size: integer (nullable = true)
 |-- match_mode: string (nullable = true)
 |-- party_size: integer (nullable = true)
 |-- player_assists: integer (nullable = true)
 |-- player_dbno: integer (nullable = true)
 |-- player_dist_ride: integer (nullable = true)
 |-- player_dist_walk: integer (nullable = true)
 |-- player_dmg: integer (nullable = true)
 |-- player_kills: integer (nullable = true)
 |-- player_survive_time: integer (nullable = true)
 |-- team_placement: integer (nullable = true)



In [5]:
agg1.printSchema()

root
 |-- game_size: integer (nullable = true)
 |-- match_mode: string (nullable = true)
 |-- party_size: integer (nullable = true)
 |-- player_assists: integer (nullable = true)
 |-- player_dbno: integer (nullable = true)
 |-- player_dist_ride: integer (nullable = true)
 |-- player_dist_walk: integer (nullable = true)
 |-- player_dmg: integer (nullable = true)
 |-- player_kills: integer (nullable = true)
 |-- player_survive_time: integer (nullable = true)
 |-- team_placement: integer (nullable = true)
 |-- matchmode_index: double (nullable = true)
 |-- label: double (nullable = true)



In [6]:
agg1.show()

+---------+----------+----------+--------------+-----------+----------------+----------------+----------+------------+-------------------+--------------+---------------+-----+
|game_size|match_mode|party_size|player_assists|player_dbno|player_dist_ride|player_dist_walk|player_dmg|player_kills|player_survive_time|team_placement|matchmode_index|label|
+---------+----------+----------+--------------+-----------+----------------+----------------+----------+------------+-------------------+--------------+---------------+-----+
|       45|       tpp|         2|             0|          1|               0|            1114|        80|           2|                905|            22|            0.0|  8.0|
|       45|       tpp|         2|             0|          2|             690|            2050|       161|           1|               1095|            20|            0.0| 10.0|
|       45|       tpp|         2|             0|          0|            2111|            1342|       100|           0|  

In [7]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler, StandardScaler

In [16]:
indexer1 = StringIndexer(inputCol='game_size', outputCol="gamesize_index")
indexer2 = StringIndexer(inputCol='party_size', outputCol="partysize_index")

In [17]:
agg2 = indexer2.fit(agg2).transform(agg2)

In [31]:
agg2.show()

+---------+----------+----------+--------------+-----------+----------------+----------------+----------+------------+-------------------+--------------+---------------+-----+--------------+---------------+
|game_size|match_mode|party_size|player_assists|player_dbno|player_dist_ride|player_dist_walk|player_dmg|player_kills|player_survive_time|team_placement|matchmode_index|label|gamesize_index|partysize_index|
+---------+----------+----------+--------------+-----------+----------------+----------------+----------+------------+-------------------+--------------+---------------+-----+--------------+---------------+
|       45|       tpp|         2|             0|          1|               0|            1114|        80|           2|                905|            22|            0.0|  8.0|          18.0|            1.0|
|       45|       tpp|         2|             0|          2|             690|            2050|       161|           1|               1095|            20|            0.0| 10

In [63]:
vectorAssembler_features = VectorAssembler(
    inputCols=['player_assists','player_dbno','player_dist_ride','player_dist_walk','player_dmg','player_kills','player_survive_time','matchmode_index',"gamesize_index","partysize_index"], 
    outputCol="features",handleInvalid="skip")

In [64]:
agg3 = vectorAssembler_features.transform(agg2)

In [65]:
agg3.select('features').take(15)

[Row(features=DenseVector([0.0, 1.0, 0.0, 1114.0, 80.0, 2.0, 905.0, 0.0, 18.0, 1.0])),
 Row(features=DenseVector([0.0, 2.0, 690.0, 2050.0, 161.0, 1.0, 1095.0, 0.0, 18.0, 1.0])),
 Row(features=DenseVector([0.0, 0.0, 2111.0, 1342.0, 100.0, 0.0, 616.0, 0.0, 18.0, 1.0])),
 Row(features=DenseVector([0.0, 0.0, 1978.0, 1335.0, 25.0, 0.0, 616.0, 0.0, 18.0, 1.0])),
 Row(features=DenseVector([0.0, 2.0, 402.0, 1169.0, 334.0, 2.0, 1333.0, 0.0, 18.0, 1.0])),
 Row(features=DenseVector([0.0, 1.0, 3083.0, 1982.0, 113.0, 2.0, 849.0, 0.0, 18.0, 1.0])),
 Row(features=DenseVector([1.0, 1.0, 3088.0, 1847.0, 132.0, 1.0, 849.0, 0.0, 18.0, 1.0])),
 Row(features=DenseVector([0.0, 1.0, 0.0, 217.0, 122.0, 1.0, 213.0, 0.0, 18.0, 1.0])),
 Row(features=DenseVector([1.0, 1.0, 0.0, 402.0, 97.0, 1.0, 264.0, 0.0, 18.0, 1.0])),
 Row(features=DenseVector([0.0, 2.0, 0.0, 111.0, 370.0, 2.0, 191.0, 0.0, 18.0, 1.0])),
 Row(features=SparseVector(10, {3: 128.0, 6: 230.0, 8: 18.0, 9: 1.0})),
 Row(features=DenseVector([0.0, 0.0,

In [66]:
scal = StandardScaler(inputCol='features',outputCol='stdfeatures',withStd=True, withMean=False)

In [67]:
agg4 = scal.fit(agg3).transform(agg3)

In [68]:
agg4.printSchema()

root
 |-- game_size: integer (nullable = true)
 |-- match_mode: string (nullable = true)
 |-- party_size: integer (nullable = true)
 |-- player_assists: integer (nullable = true)
 |-- player_dbno: integer (nullable = true)
 |-- player_dist_ride: integer (nullable = true)
 |-- player_dist_walk: integer (nullable = true)
 |-- player_dmg: integer (nullable = true)
 |-- player_kills: integer (nullable = true)
 |-- player_survive_time: integer (nullable = true)
 |-- team_placement: integer (nullable = true)
 |-- matchmode_index: double (nullable = true)
 |-- label: double (nullable = true)
 |-- gamesize_index: double (nullable = false)
 |-- partysize_index: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- stdfeatures: vector (nullable = true)



In [69]:
agg4.select('stdfeatures').take(15)

[Row(stdfeatures=DenseVector([0.0, 0.8933, 0.0, 0.2645, 0.472, 1.2867, 0.0005, 0.0, 1.6045, 1.28])),
 Row(stdfeatures=DenseVector([0.0, 1.7865, 0.3451, 0.4868, 0.9499, 0.6434, 0.0006, 0.0, 1.6045, 1.28])),
 Row(stdfeatures=DenseVector([0.0, 0.0, 1.0557, 0.3187, 0.59, 0.0, 0.0004, 0.0, 1.6045, 1.28])),
 Row(stdfeatures=DenseVector([0.0, 0.0, 0.9892, 0.317, 0.1475, 0.0, 0.0004, 0.0, 1.6045, 1.28])),
 Row(stdfeatures=DenseVector([0.0, 1.7865, 0.201, 0.2776, 1.9706, 1.2867, 0.0008, 0.0, 1.6045, 1.28])),
 Row(stdfeatures=DenseVector([0.0, 0.8933, 1.5418, 0.4707, 0.6667, 1.2867, 0.0005, 0.0, 1.6045, 1.28])),
 Row(stdfeatures=DenseVector([1.7371, 0.8933, 1.5443, 0.4386, 0.7788, 0.6434, 0.0005, 0.0, 1.6045, 1.28])),
 Row(stdfeatures=DenseVector([0.0, 0.8933, 0.0, 0.0515, 0.7198, 0.6434, 0.0001, 0.0, 1.6045, 1.28])),
 Row(stdfeatures=DenseVector([1.7371, 0.8933, 0.0, 0.0955, 0.5723, 0.6434, 0.0002, 0.0, 1.6045, 1.28])),
 Row(stdfeatures=DenseVector([0.0, 1.7865, 0.0, 0.0264, 2.183, 1.2867, 0.00

In [70]:
agg4.printSchema()

root
 |-- game_size: integer (nullable = true)
 |-- match_mode: string (nullable = true)
 |-- party_size: integer (nullable = true)
 |-- player_assists: integer (nullable = true)
 |-- player_dbno: integer (nullable = true)
 |-- player_dist_ride: integer (nullable = true)
 |-- player_dist_walk: integer (nullable = true)
 |-- player_dmg: integer (nullable = true)
 |-- player_kills: integer (nullable = true)
 |-- player_survive_time: integer (nullable = true)
 |-- team_placement: integer (nullable = true)
 |-- matchmode_index: double (nullable = true)
 |-- label: double (nullable = true)
 |-- gamesize_index: double (nullable = false)
 |-- partysize_index: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- stdfeatures: vector (nullable = true)



In [71]:
agg4.write.parquet('s3://502pubg/clean/aggeragate2.parquet')

In [73]:
kill = spark.read.parquet('s3://502pubg/clean/kill0.parquet')

In [74]:
kill.printSchema()

root
 |-- killed_by: string (nullable = true)
 |-- killer_placement: double (nullable = true)
 |-- killer_position_x: integer (nullable = true)
 |-- killer_position_y: integer (nullable = true)
 |-- map: string (nullable = true)
 |-- time: integer (nullable = true)
 |-- victim_placement: double (nullable = true)
 |-- victim_position_x: integer (nullable = true)
 |-- victim_position_y: integer (nullable = true)



In [75]:
kill.show()

+---------+----------------+-----------------+-----------------+-------+----+----------------+-----------------+-----------------+
|killed_by|killer_placement|killer_position_x|killer_position_y|    map|time|victim_placement|victim_position_x|victim_position_y|
+---------+----------------+-----------------+-----------------+-------+----+----------------+-----------------+-----------------+
| Bluezone|            null|             null|             null|MIRAMAR| 821|            58.0|                0|                0|
|      AKM|            43.0|           266222|           388673|MIRAMAR| 623|            44.0|           271032|           392712|
|      AKM|             1.0|           471619|           555094|MIRAMAR|1727|             8.0|           465798|           551481|
| Bluezone|            null|             null|             null|MIRAMAR| 863|            56.0|                0|                0|
|     S686|            63.0|           371367|           420929|MIRAMAR| 186|      

In [77]:
kill0 = kill.na.drop()

In [78]:
kill0.show()

+---------+----------------+-----------------+-----------------+-------+----+----------------+-----------------+-----------------+
|killed_by|killer_placement|killer_position_x|killer_position_y|    map|time|victim_placement|victim_position_x|victim_position_y|
+---------+----------------+-----------------+-----------------+-------+----+----------------+-----------------+-----------------+
|      AKM|            43.0|           266222|           388673|MIRAMAR| 623|            44.0|           271032|           392712|
|      AKM|             1.0|           471619|           555094|MIRAMAR|1727|             8.0|           465798|           551481|
|     S686|            63.0|           371367|           420929|MIRAMAR| 186|            65.0|           370874|           420951|
|   Kar98k|             6.0|           367942|           355570|MIRAMAR| 759|            37.0|           371189|           366381|
|     M416|            47.0|           449795|           517387|MIRAMAR| 397|      

In [79]:
spark.conf.set("spark.sql.crossJoin.enabled", "true")

In [80]:
kill0.createOrReplaceTempView('kill0')

In [83]:
x1=spark.sql('select Power((Power((killer_position_x-victim_position_x),2)+Power((killer_position_y-victim_position_y),2)),0.5) AS DIST from kill0').cache()

In [102]:
x1=x1.withColumn('id',monotonically_increasing_id())

In [104]:
kill0 = kill0.withColumn('id',monotonically_increasing_id())

In [99]:
from pyspark.sql.functions import col,monotonically_increasing_id

In [106]:
kill1 = kill0.join(x1,kill0.id==x1.id,'inner')

In [107]:
kill1.printSchema()

root
 |-- killed_by: string (nullable = true)
 |-- killer_placement: double (nullable = true)
 |-- killer_position_x: integer (nullable = true)
 |-- killer_position_y: integer (nullable = true)
 |-- map: string (nullable = true)
 |-- time: integer (nullable = true)
 |-- victim_placement: double (nullable = true)
 |-- victim_position_x: integer (nullable = true)
 |-- victim_position_y: integer (nullable = true)
 |-- id: long (nullable = false)
 |-- DIST: double (nullable = true)
 |-- id: long (nullable = false)



In [111]:
kill1 = kill1.drop('id')

In [112]:
kill1.show()

+-------------------+----------------+-----------------+-----------------+-------+----+----------------+-----------------+-----------------+------------------+
|          killed_by|killer_placement|killer_position_x|killer_position_y|    map|time|victim_placement|victim_position_x|victim_position_y|              DIST|
+-------------------+----------------+-----------------+-----------------+-------+----+----------------+-----------------+-----------------+------------------+
|               M416|             3.0|           458250|           524673|MIRAMAR|1473|            12.0|           456787|           531151|6641.1484699560815|
|death.WeapSawnoff_C|            23.0|           520625|           362615|MIRAMAR| 480|             2.0|           520702|           362534|111.75866856758807|
|               M416|             5.0|           484673|           147615|ERANGEL|1565|             6.0|           484817|           148669|1063.7913329220162|
|               UMP9|            10.0|  

In [114]:
ind1 = StringIndexer(inputCol='killed_by',outputCol='weaponindex')
ind2 = StringIndexer(inputCol='map',outputCol='mapindex')

In [115]:
kill2 = ind1.fit(kill1).transform(kill1)
kill2 = ind2.fit(kill2).transform(kill2)

In [116]:
vectorAssembler_features = VectorAssembler(
    inputCols=['killer_position_x','killer_position_y','victim_position_x','victim_position_y','DIST','weaponindex','mapindex'], 
    outputCol="features",handleInvalid="skip")

In [117]:
kill3 = vectorAssembler_features.transform(kill2)

In [118]:
kill3.show()

+-------------------+----------------+-----------------+-----------------+-------+----+----------------+-----------------+-----------------+------------------+-----------+--------+--------------------+
|          killed_by|killer_placement|killer_position_x|killer_position_y|    map|time|victim_placement|victim_position_x|victim_position_y|              DIST|weaponindex|mapindex|            features|
+-------------------+----------------+-----------------+-----------------+-------+----+----------------+-----------------+-----------------+------------------+-----------+--------+--------------------+
|               M416|             3.0|           458250|           524673|MIRAMAR|1473|            12.0|           456787|           531151|6641.1484699560815|        1.0|     1.0|[458250.0,524673....|
|death.WeapSawnoff_C|            23.0|           520625|           362615|MIRAMAR| 480|             2.0|           520702|           362534|111.75866856758807|       36.0|     1.0|[520625.0,36

In [119]:
std2 = StandardScaler(inputCol='features',outputCol='stdfeatures')

In [120]:
kill4 = std2.fit(kill3).transform(kill3)

In [123]:
kill4.select('stdfeatures').take(10)

[Row(stdfeatures=DenseVector([3.2147, 3.7514, 3.0268, 3.6018, 0.0666, 0.1394, 2.611])),
 Row(stdfeatures=DenseVector([3.6523, 2.5927, 3.4503, 2.4584, 0.0011, 5.0179, 2.611])),
 Row(stdfeatures=DenseVector([3.4001, 1.0554, 3.2125, 1.0081, 0.0107, 0.1394, 0.0])),
 Row(stdfeatures=DenseVector([3.0692, 3.2239, 2.9131, 3.0407, 0.0329, 0.6969, 0.0])),
 Row(stdfeatures=DenseVector([2.637, 2.3328, 2.4806, 2.2219, 0.0207, 0.1394, 0.0])),
 Row(stdfeatures=DenseVector([2.5726, 2.7797, 2.4299, 2.6439, 0.0112, 0.1394, 0.0])),
 Row(stdfeatures=DenseVector([2.4151, 3.9831, 2.2823, 3.7804, 0.0045, 1.812, 0.0])),
 Row(stdfeatures=DenseVector([1.3981, 2.0316, 1.3174, 1.928, 0.0052, 0.4182, 0.0])),
 Row(stdfeatures=DenseVector([2.5566, 2.8874, 2.4251, 2.7468, 0.0199, 0.9757, 0.0])),
 Row(stdfeatures=DenseVector([2.6533, 2.0376, 2.5061, 1.9344, 0.0029, 0.4182, 2.611]))]

In [124]:
kill4.printSchema()

root
 |-- killed_by: string (nullable = true)
 |-- killer_placement: double (nullable = true)
 |-- killer_position_x: integer (nullable = true)
 |-- killer_position_y: integer (nullable = true)
 |-- map: string (nullable = true)
 |-- time: integer (nullable = true)
 |-- victim_placement: double (nullable = true)
 |-- victim_position_x: integer (nullable = true)
 |-- victim_position_y: integer (nullable = true)
 |-- DIST: double (nullable = true)
 |-- weaponindex: double (nullable = false)
 |-- mapindex: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- stdfeatures: vector (nullable = true)



In [125]:
kill4.write.parquet('s3://502pubg/clean/kill1.parquet')

In [126]:
kill0.write.parquet('s3://502pubg/clean/kill.parquet')