In [1]:
import datetime
import sys

from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.types import IntegerType, StringType, FloatType, StructType, StructField

In [2]:
spark = (SparkSession.builder
         .appName('ikhlebushkin_nosql_task1')
         .master('yarn')
         .config('spark.sql.catalog.cassandra', 'com.datastax.spark.connector.datasource.CassandraCatalog')
         .getOrCreate())

In [3]:
pubg_kills = spark.read.format('csv').option('header', 'true').load('/data/pubg')
pubg_kills = pubg_kills.fillna(0)

                                                                                

In [4]:
x_diff = pubg_kills.killer_position_x - pubg_kills.victim_position_x
y_diff = pubg_kills.killer_position_y - pubg_kills.victim_position_y
pubg_kills = pubg_kills.withColumn('distance', f.sqrt(f.pow(x_diff, 2) + f.pow(y_diff, 2)))

In [5]:
pubg_kills = pubg_kills.withColumn('placement_diff', pubg_kills.victim_placement - pubg_kills.killer_placement)

In [6]:
pubg_kills.show()

[Stage 1:>                                                          (0 + 1) / 1]

+------------+----------------+----------------+-----------------+-----------------+-------+--------------------+----+---------------+----------------+-----------------+-----------------+------------------+--------------+
|   killed_by|     killer_name|killer_placement|killer_position_x|killer_position_y|    map|            match_id|time|    victim_name|victim_placement|victim_position_x|victim_position_y|          distance|placement_diff|
+------------+----------------+----------------+-----------------+-----------------+-------+--------------------+----+---------------+----------------+-----------------+-----------------+------------------+--------------+
|     Grenade| KrazyPortuguese|             5.0|         657725.1|         146275.2|MIRAMAR|2U4GBNA0YmnLSqvEy...| 823|KrazyPortuguese|             5.0|         657725.1|         146275.2|               0.0|           0.0|
|      SCAR-L|nide2Bxiaojiejie|            31.0|         93091.37|         722236.4|MIRAMAR|2U4GBNA0YmnLSqvEy...

                                                                                

In [75]:
now = datetime.datetime.now()
write_timestamp = f'{now.year}-{now.month}-{now.day}'

In [77]:
avg_dist = pubg_kills.groupby(['killed_by', 'time']).agg(f.avg('distance').alias('distance'))
avg_dist = avg_dist.withColumn('write_timestamp', f.lit(write_timestamp))
(avg_dist.write.format('org.apache.spark.sql.cassandra')
               .mode('append')
               .options(table='dist_avg', keyspace='miptstudent2024_07')
               .save())

24/05/29 11:23:22 WARN SchemaAgreementChecker: [s1] Schema agreement not reached after 10.95 s
                                                                                

In [78]:
max_dist = pubg_kills.groupby(['killed_by', 'time']).agg(f.max('distance').alias('distance'))
max_dist = max_dist.withColumn('write_timestamp', f.lit(write_timestamp))
(max_dist.write.format('org.apache.spark.sql.cassandra')
               .mode('append')
               .options(table='dist_max', keyspace='miptstudent2024_07')
               .save())

                                                                                

In [45]:
placement_diff = pubg_kills.groupby(['killed_by', 'time']).agg(f.avg('placement_diff').alias('placement_diff'))
placement_diff = placement_diff.withColumn('write_timestamp', f.lit(write_timestamp))
(placement_diff.write.format('org.apache.spark.sql.cassandra')
               .mode('append')
               .options(table='placement_diff_avg', keyspace='miptstudent2024_07')
               .save())

                                                                                

In [89]:
old_map_kills = spark.sql('SELECT * FROM cassandra.miptstudent2024_07.map_kills_all')
map_kills_new = pubg_kills.groupby(['map', 'killed_by', 'time']).agg(f.count('victim_name').alias('kills_count'))
map_kills_new = map_kills_new.fillna({'map': 'unknown_map'})

map_kills_joined = old_map_kills.withColumnRenamed('kills_count', 'kills_count_left').join(
    map_kills_new, ['killed_by', 'map', 'time'], 'outer'
).fillna({'kills_count_left': 0})
map_kills_joined = map_kills_joined.fillna({'kills_count': 0})
map_kills_joined = map_kills_joined.withColumn(
    'kills_count', map_kills_joined['kills_count_left'] + map_kills_joined['kills_count']
).drop('kills_count_left')
(map_kills_joined.write
                 .format('org.apache.spark.sql.cassandra')
                 .mode('append')
                 .options(table='map_kills_all', keyspace='miptstudent2024_07')
                 .save())

                                                                                

In [4]:
df = spark.sql("SELECT killed_by, map, SUM(kills_count) FROM cassandra.miptstudent2024_07.map_kills_all WHERE killed_by = 'Mini 14' AND map = 'ERANGEL' AND time >= 10 AND time <= 1000 GROUP BY killed_by, map;")


In [5]:
df.show()

[Stage 1:>                                                          (0 + 1) / 1]

+---------+-------+----------------+
|killed_by|    map|sum(kills_count)|
+---------+-------+----------------+
|  Mini 14|ERANGEL|          366743|
+---------+-------+----------------+



                                                                                

In [6]:
spark.conf.set('fs.s3a.path.style.access', 'true')
df.write.csv("s3a://miptstudent2024-07/pubg/map_kills.csv", header=True, mode="overwrite")

24/05/29 12:20:21 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

In [13]:
df_read = spark.read.format('csv').load("s3a://miptstudent2024-07/pubg/map_kills.csv", header=True, inferSchema=True)

In [14]:
df_read.show()

+---------+-------+-----------+
|killed_by|    map|total_kills|
+---------+-------+-----------+
|      AKM|ERANGEL|    1860336|
+---------+-------+-----------+



In [68]:
df = spark.sql("SELECT * FROM cassandra.miptstudent2024_07.dist_avg WHERE write_timestamp = '2024-5' AND killed_by = 'AKM' AND time >= 10 AND time <= 100")
df.show()

+---------------+---------+----+----------+
|write_timestamp|killed_by|time|  distance|
+---------------+---------+----+----------+
|         2024-5|      AKM|  57|1008.94104|
|         2024-5|      AKM|  59| 2098.7473|
|         2024-5|      AKM|  60| 555.57574|
|         2024-5|      AKM|  61| 388.14728|
|         2024-5|      AKM|  62| 1090.8011|
|         2024-5|      AKM|  63| 875.25635|
|         2024-5|      AKM|  64|  615.4914|
|         2024-5|      AKM|  65| 961.95917|
|         2024-5|      AKM|  66|  825.7436|
|         2024-5|      AKM|  67| 1033.0652|
|         2024-5|      AKM|  68| 816.87103|
|         2024-5|      AKM|  69| 1019.5503|
|         2024-5|      AKM|  70| 916.36597|
|         2024-5|      AKM|  71|  12717.27|
|         2024-5|      AKM|  72| 1174.7329|
|         2024-5|      AKM|  73| 6650.1553|
|         2024-5|      AKM|  74| 1024.4889|
|         2024-5|      AKM|  75| 817.50397|
|         2024-5|      AKM|  76|  6541.723|
|         2024-5|      AKM|  77|

24/05/29 10:53:08 WARN SchemaAgreementChecker: [s1] Schema agreement not reached after 10.97 s
24/05/29 10:53:36 WARN SchemaAgreementChecker: [s1] Schema agreement not reached after 10.95 s


In [84]:
timestamp = '2024-5-29'
weapon = 'AKM'
_from = 100
to = 1000

In [85]:
dist_avg = spark.sql(
    "SELECT * FROM cassandra.miptstudent2024_07.dist_avg "
    f"WHERE write_timestamp = '{timestamp}' AND killed_by = '{weapon}' AND time >= {_from} AND time <= {to}"
).withColumnRenamed('distance', 'distance_avg')
dist_max = spark.sql(
    "SELECT * FROM cassandra.miptstudent2024_07.dist_max "
    f"WHERE write_timestamp = '{timestamp}' AND killed_by = '{weapon}' AND time >= {_from} AND time <= {to}"
).withColumnRenamed('distance', 'distance_max')
placement_diff = spark.sql(
    "SELECT * FROM cassandra.miptstudent2024_07.placement_diff_avg "
    f"WHERE write_timestamp = '{timestamp}' AND killed_by = '{weapon}' AND time >= {_from} AND time <= {to}"
)
joined_df = dist_avg.join(dist_max, on=['write_timestamp', 'killed_by', 'time'])
joined_df = joined_df.join(placement_diff, on=['write_timestamp', 'killed_by', 'time'])
for row in joined_df.collect():
    print(f'timestamp: {row.write_timestamp}\t|\tweapon: {row.killed_by}\t|\t'
          f'time: {row.time}\t|\tdistance_avg: {row.distance_avg}\t|\t'
          f'distance_max: {row.distance_max}\t|\tplacement_diff: {row.placement_diff}')


timestamp: 2024-5-29	|	weapon: AKM	|	time: 100	|	distance_avg: 7737.93408203125	|	distance_max: 854984.5	|	placement_diff: 20.174610137939453
timestamp: 2024-5-29	|	weapon: AKM	|	time: 101	|	distance_avg: 6590.12646484375	|	distance_max: 827534.125	|	placement_diff: 19.894384384155273
timestamp: 2024-5-29	|	weapon: AKM	|	time: 102	|	distance_avg: 5474.49462890625	|	distance_max: 793381.8125	|	placement_diff: 19.066612243652344
timestamp: 2024-5-29	|	weapon: AKM	|	time: 103	|	distance_avg: 7729.44775390625	|	distance_max: 880057.25	|	placement_diff: 19.34817886352539
timestamp: 2024-5-29	|	weapon: AKM	|	time: 104	|	distance_avg: 9291.3154296875	|	distance_max: 828047.25	|	placement_diff: 19.153335571289062
timestamp: 2024-5-29	|	weapon: AKM	|	time: 105	|	distance_avg: 6105.76953125	|	distance_max: 828061.125	|	placement_diff: 18.656978607177734
timestamp: 2024-5-29	|	weapon: AKM	|	time: 106	|	distance_avg: 5499.3955078125	|	distance_max: 826097.1875	|	placement_diff: 19.709413528442383


24/05/29 11:28:17 WARN SchemaAgreementChecker: [s1] Schema agreement not reached after 10.89 s
24/05/29 11:28:45 WARN SchemaAgreementChecker: [s1] Schema agreement not reached after 10.92 s
24/05/29 11:32:55 WARN SchemaAgreementChecker: [s1] Schema agreement not reached after 10.84 s
24/05/29 11:33:05 WARN SchemaAgreementChecker: [s1] Schema agreement not reached after 10.90 s
24/05/29 11:33:23 WARN SchemaAgreementChecker: [s1] Schema agreement not reached after 10.80 s
24/05/29 11:34:00 WARN SchemaAgreementChecker: [s1] Schema agreement not reached after 10.83 s
24/05/29 11:34:44 WARN SchemaAgreementChecker: [s1] Schema agreement not reached after 10.89 s


In [69]:
df_collected = df.collect()
for row in df_collected:
    print(f'timestamp: {row.write_timestamp}\t|\tweapon: {row.killed_by}\t|\ttime: {row.time}\t|\tdistance: {row.distance}')

In [73]:
for row in df_collected:
    print(f'timestamp: {row.write_timestamp}\t|\tweapon: {row.killed_by}\t|\ttime: {row.time}\t|\tdistance: {row.distance}')

timestamp: 2024-5	|	weapon: AKM	|	time: 57	|	distance: 1008.9410400390625
timestamp: 2024-5	|	weapon: AKM	|	time: 59	|	distance: 2098.747314453125
timestamp: 2024-5	|	weapon: AKM	|	time: 60	|	distance: 555.5757446289062
timestamp: 2024-5	|	weapon: AKM	|	time: 61	|	distance: 388.14727783203125
timestamp: 2024-5	|	weapon: AKM	|	time: 62	|	distance: 1090.8011474609375
timestamp: 2024-5	|	weapon: AKM	|	time: 63	|	distance: 875.25634765625
timestamp: 2024-5	|	weapon: AKM	|	time: 64	|	distance: 615.4913940429688
timestamp: 2024-5	|	weapon: AKM	|	time: 65	|	distance: 961.9591674804688
timestamp: 2024-5	|	weapon: AKM	|	time: 66	|	distance: 825.7435913085938
timestamp: 2024-5	|	weapon: AKM	|	time: 67	|	distance: 1033.065185546875
timestamp: 2024-5	|	weapon: AKM	|	time: 68	|	distance: 816.8710327148438
timestamp: 2024-5	|	weapon: AKM	|	time: 69	|	distance: 1019.55029296875
timestamp: 2024-5	|	weapon: AKM	|	time: 70	|	distance: 916.365966796875
timestamp: 2024-5	|	weapon: AKM	|	time: 71	|	distanc

In [13]:
spark.stop()