In [0]:
import requests
import json
import os
from PIL import Image, ImageDraw
import matplotlib.pyplot as plt
from pyspark.sql.types import StringType, StructType, StructField, ArrayType, DoubleType, LongType, TimestampType
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import lit
from pyspark.sql import functions as f
from delta.tables import DeltaTable

API_KEY = "" # get API key

header = {
	"Authorization": f"Bearer {API_KEY}",
	"Accept": "application/vnd.api+json"
}

In [0]:
# Free to change
TEST_PATH = "dbfs:/mnt/delta/test5"
MATCH_PATH = "dbfs:/mnt/delta/matches_test1"
TEL_DATA_PATH = "dbfs:/mnt/delta/mini_tel_data1"


In [0]:
spark = SparkSession.builder.getOrCreate()

def write_data(data, pth=TEST_PATH):
    data = [Row(**data)]
    df = spark.createDataFrame(data)

    df = df.withColumn("created_at", f.current_timestamp())
    df.write.format("delta").mode("overwrite").save(pth)

def append_data(data, pth=TEST_PATH):
    data = [Row(**data)]
    df = spark.createDataFrame(data)
    df = df.withColumn("created_at", f.current_timestamp())
    df.write.format("delta").mode("append").save(pth)

def read_data(pth=TEST_PATH):
    df_loaded = spark.read.format("delta").load(pth)
    df_loaded.display()

def update_data(data, pth=TEST_PATH, idd=False):

    delta_table = DeltaTable.forPath(spark, pth)
    new_data = spark.createDataFrame([Row(**data)])
    if idd:
        query = "target.name = source.name AND target.id = source.id"
    else:
    	query = "target.name = source.name"
    	new_data = new_data.withColumn("created_at", f.current_timestamp())
    delta_table.alias("target").merge(
        new_data.alias("source"),
        query
    ).whenMatchedUpdateAll(
    ).whenNotMatchedInsertAll(
    ).execute()

def get_lifetime_data(name: str):
	url_name = f"https://api.pubg.com/shards/steam/players?filter[playerNames]={name}"

	r = requests.get(url_name, headers=header)

	if r.status_code == 200:
		player_data = r.json()
		player_id = player_data["data"][0]["id"]
		url_acc = f"https://api.pubg.com/shards/steam/players/{player_id}/seasons/lifetime?filter[gamepad]=false"
		r_acc = requests.get(url_acc, headers=header)
		if r_acc.status_code == 200:
			lifetime_data = r_acc.json()
			return lifetime_data
		else:
			print(url_acc)
			print(f"Error code: {r_acc.status_code}")
	else:
		print(url_name)
		print(f"Error code: {r.status_code}")


def get_total_time(name: str, refresh: str = "refresh"):
	data = spark.read.format("delta").load(TEST_PATH)
	filtered_df = data.filter(data["name"] == name)
	new_entry = filtered_df.rdd.isEmpty()

	if refresh != "refresh" and not new_entry:
		return filtered_df.collect()[0].asDict()
	data = get_lifetime_data(name)

	if data == None:
		return None

	modes = data["data"]["attributes"]["gameModeStats"].keys()
	res = f"{name} all time stats:\n"
	result = {"name": name}

	total_time = 0
	total_matches = 0
	total_top10 = 0
	total_wins = 0
	total_kills = 0
	total_assists = 0
	total_damage = 0
	total_headshots = 0
	longest_kill = 0
	most_kills = 0
	total_heals = 0
	total_boosts = 0
	total_revives = 0
	total_walk = 0
	total_swim = 0
	total_ride = 0

	for mode in modes:
		cur_time = data["data"]["attributes"]["gameModeStats"][mode]["timeSurvived"]
		matches = data["data"]["attributes"]["gameModeStats"][mode]["roundsPlayed"]
		top10 = data["data"]["attributes"]["gameModeStats"][mode]["top10s"]
		wins = data["data"]["attributes"]["gameModeStats"][mode]["wins"]

		kills = data["data"]["attributes"]["gameModeStats"][mode]["kills"]
		assists = data["data"]["attributes"]["gameModeStats"][mode]["assists"]
		damageDealt = data["data"]["attributes"]["gameModeStats"][mode]["damageDealt"]
		headshotKills = data["data"]["attributes"]["gameModeStats"][mode]["headshotKills"]
		longestKill = int(data["data"]["attributes"]["gameModeStats"][mode]["longestKill"])
		roundMostKills = int(data["data"]["attributes"]["gameModeStats"][mode]["roundMostKills"])

		heals = data["data"]["attributes"]["gameModeStats"][mode]["heals"]
		boosts = data["data"]["attributes"]["gameModeStats"][mode]["boosts"]
		revives = data["data"]["attributes"]["gameModeStats"][mode]["revives"]

		walkDistance = data["data"]["attributes"]["gameModeStats"][mode]["walkDistance"]
		swimDistance = data["data"]["attributes"]["gameModeStats"][mode]["swimDistance"]
		rideDistance = data["data"]["attributes"]["gameModeStats"][mode]["rideDistance"]


		total_time += float(cur_time)
		total_matches += int(matches)
		total_top10 += int(top10)
		total_wins += int(wins)
		total_kills += int(kills)
		total_assists += int(assists)
		total_damage += int(damageDealt)
		total_headshots += int(headshotKills)
		longest_kill = longestKill if longestKill > longest_kill else longest_kill
		most_kills = roundMostKills if roundMostKills > most_kills else most_kills
		total_heals += int(heals)
		total_boosts += int(boosts)
		total_revives += int(revives)
		total_walk += int(walkDistance)
		total_swim += int(swimDistance)
		total_ride += int(rideDistance)

		result[f"{mode}_mathes"] = matches
		result[f"{mode}_time"] = round(cur_time/3600, 2)

	result["total_time"] = round(total_time/3600, 2)
	result["total_matches"] = total_matches
	result["total_top10"] = total_top10
	result["total_wins"] = total_wins
	result["total_kills"] = total_kills
	result["total_assists"] = total_assists
	result["total_damage"] = total_damage
	result["total_headshots"] = total_headshots
	result["longest_kill"] = longest_kill
	result["most_kills"] = most_kills
	result["total_heals"] = total_heals
	result["total_boosts"] = total_boosts
	result["total_revives"] = total_revives
	result["total_walk"] = total_walk
	result["total_swim"] = total_swim
	result["total_ride"] = total_ride

	if new_entry:
		append_data(result)
	else:
		if refresh == "refresh":
			update_data(result)
	return result



In [0]:
def get_player_matches(name: str):
    data = get_lifetime_data(name)
    relationships = data["data"]["relationships"]
    modes = relationships.keys()
    matches = {"name": name}
    res = "Matches about which you can get more detailed information:" # THIS
    for mode in modes:
        matches[mode] = []
        if len(relationships[mode]["data"]) == 0 or mode in ["player", "season"]:
            matches[mode].append("None")
            continue
        for match in relationships[mode]["data"]:
            matches[mode].append(match["id"])
        res += f"\n{mode}: {len(matches[mode])}" # AND THIS
    return matches, res

def get_tel_data(match_id: str, name:str):
    data = spark.read.format("delta").load(TEL_DATA_PATH)
    filtered_df = data.filter(data["id"] == match_id)
    filtered_df = filtered_df.filter(data["name"] == name)

    if not filtered_df.rdd.isEmpty():
        result_dict = filtered_df.collect()[0].asDict()
        return result_dict

    url = f"https://api.pubg.com/shards/steam/matches/{match_id}"
    data = requests.get(url, headers=header).json()
    for incl in data["included"]:
        if incl["type"] == "asset":
            tel_url = incl["attributes"]["URL"]
    tel_data = requests.get(tel_url, headers=header).json()

    kills_coords = [(0,0)]
    landed = False
    trace = []
    map_name = data["data"]["attributes"]["mapName"]
    blue_zones = [(0,0,0)]
    mini_tel_data = {"name": name, "id": match_id, "map": map_name}
    print(mini_tel_data)
    for event in tel_data:
        if event["_T"] == "LogMatchDefinition":
            mini_tel_data["mode"] = event["MatchId"].split(".")[5]

        if event["_T"] == "LogGameStatePeriodic" and event["gameState"]["poisonGasWarningRadius"] != blue_zones[-1][-1]:
            loc = event["gameState"]["poisonGasWarningPosition"]
            blue_zones.append((int(loc["x"]), int(loc["y"]), int(event["gameState"]["poisonGasWarningRadius"])))

        if event["_T"] == "LogPlayerKillV2" and event["killer"] and event["killer"]["name"] == name:
            kill_location = event["victim"]["location"]
            kills_coords.append((int(kill_location["x"]), int(kill_location["y"])))

        if not landed and event["_T"] == "LogParachuteLanding" and event["character"]["name"] == name:
            landed = True
            loc = event["character"]["location"]
            mini_tel_data["landing"] = (int(loc["x"]), int(loc["y"]))

        if event["_T"] == "LogPlayerKillV2" and event["victim"]["name"] == name:
            death_location = event["victim"]["location"]
            mini_tel_data["death_coords"]  = (int(death_location["x"]), int(death_location["y"]))

        if event["_T"] == "LogPlayerPosition" and event["character"]["name"] == name and  event["common"]["isGame"] > 0:
            loc = event["character"]["location"]
            trace.append((int(loc["x"]), int(loc["y"])))
    mini_tel_data["kills"] = kills_coords
    mini_tel_data["blue_zones"] = blue_zones
    mini_tel_data["trace"] = trace
    return mini_tel_data


def get_match_recap(name: str, mode: str, n: int):
    df = spark.read.format("delta").load(MATCH_PATH)
    mode_data = df.filter(df["name"] == name).select(mode)
    mode_list = mode_data.rdd.flatMap(lambda row: row[mode]).collect()
    m_id = mode_list[n-1]
    tel_data = get_tel_data(m_id, name)
    return tel_data

