# Loading the match data notebook

**Purpose of the notebook:** The purpose of this notebook is to load and unified the data for the further aggregations. This is the necessary part for the data loading.

**Input of the notebook:** The input data are raw match data.

**Output of the notebook:** The output of this notebook is `delta` table. 

**Some notes:**:
* The `spark.DataFrame` will always have notation `_df` at the end of the name of variable
* the `pandas.DataFrame` will always have notation `_pd` at the end of the name of variable

## Set the environment

In this part, the environment is set. The set up is:

* Loading the necessary python modules and helper functions
* Setting the path to data and metadata
* Initialize the spark session

Other config, such as `spark` application name, path, where the final `delta` table will be saved, etc. are defined in `config.yaml` file

#### Import modules

In [1]:
# Import the modules
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from delta import *
from utils import plot_pitch, ball_inside_box, read_config

#### Read config

In [2]:
config = read_config()

#### Initialize spark session

In [3]:
app_name = config['spark_application']['spark_app_batch_name']

builder = (
    SparkSession.builder.appName(app_name) 
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

22/07/18 15:33:21 WARN Utils: Your hostname, tomas-Yoga-Slim-7-Pro-14ACH5-O resolves to a loopback address: 127.0.1.1; using 192.168.0.53 instead (on interface wlp1s0)
22/07/18 15:33:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/tomas/.ivy2/cache
The jars for the packages stored in: /home/tomas/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b8749fc8-aae3-4cf7-9f96-c8bed02a3d4b;1.0
	confs: [default]
	found io.delta#delta-core_2.12;1.2.1 in central
	found io.delta#delta-storage;1.2.1 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
:: resolution report :: resolve 306ms :: artifacts dl 16ms
	:: modules in use:
	io.delta#delta-core_2.12;1.2.1 from central in [default]
	io.delta#delta-storage;1.2.1 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	org.codehaus.jackson#jackson-core-asl;1.9.13 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evic

#### Set the remaining ,,env'' variables

In [4]:
raw_data_path = "g1059778_Data.jsonl"
meta_data_path = "/home/tomas/Personal_projects/Aston_Villa/data/g1059778_Metadata.xml"
delta_player_path = config['batch']['delta_player_dir']
delta_ball_path = config['batch']['delta_ball_dir']

## Read the raw match data

In [5]:
raw_match_data_df = (
    spark
    .read
    .json(raw_data_path)
)

                                                                                

## Read the metadata

In [6]:
from bs4 import BeautifulSoup

with open('/home/tomas/Personal_projects/Aston_Villa/data/g1059778_Metadata.xml','r') as f:
    metadata = f.read()

match_metadata = BeautifulSoup(metadata,'xml')

metadata_match_data = match_metadata.find('match').get('dtDate').split(' ')[0]
field_x = float(match_metadata.find('match').get('fPitchXSizeMeters'))
field_y = float(match_metadata.find('match').get('fPitchYSizeMeters'))
metadata_field_dim = (field_x,field_y)

print(f"Match date: {metadata_match_data}")
print(f"Field dimension: {metadata_field_dim}")

Match date: 2019-10-05
Field dimension: (104.85, 67.97)


## Unifying the data

In this part, the unified dataset is created. It is needed to somehow extract the values from the each `array` column (`homePlayers`,`awayPlayers`, `ball`). 

**Note: This approach will lead to duplicated dataset. However, for just storing the ingested raw data, it does not matter.**

In the cell below:
* `wallClock` is transformed to seconds and then the date of the match is extracted (the unique key). This will help us for identify ieach match uniquely and also tables can be partiotned by this column.
* Also we get the timestamp from `wallClock` column

In [7]:

match_date_df = (
    raw_match_data_df
    .withColumn('match_date', F.from_unixtime(F.col('wallClock')/1000, 'yyyy-MM-dd')) 
    .withColumn('match_timestamp',F.from_unixtime(F.col('wallClock')/1000, 'yyyy-MM-dd HH:mm:ss:S'))
)

In [8]:
(
    match_date_df
    .select('wallClock','match_date','match_timestamp')
).show(5,truncate=False)

+-------------+----------+---------------------+
|wallClock    |match_date|match_timestamp      |
+-------------+----------+---------------------+
|1570284007331|2019-10-05|2019-10-05 16:00:07:0|
|1570284007371|2019-10-05|2019-10-05 16:00:07:0|
|1570284007411|2019-10-05|2019-10-05 16:00:07:0|
|1570284007451|2019-10-05|2019-10-05 16:00:07:0|
|1570284007491|2019-10-05|2019-10-05 16:00:07:0|
+-------------+----------+---------------------+
only showing top 5 rows



In the cell bellow:
* both of `homePlayers` and `awayPlayers` columns are exploded, which we get each value of this array.
* Unfortunatelly, duplicates are created (e.g because of exploding, there is more rows for one player id and each row with this row associated)


In [9]:
base_columns = ['period','frameIdx','gameClock','wallClock','live','lastTouch','match_date']

unified_players_df = (
    match_date_df
    .withColumn('home_players_exploded',F.explode('homePlayers')) 
    .withColumn('away_players_exploded',F.explode('awayPlayers'))
    .select(
        F.col('home_players_exploded.playerId').alias('homePlayer_playerId'),
        F.col('home_players_exploded.speed').alias('homePlayer_speed'),
        F.col('home_players_exploded.xyz').alias('homePlayer_3d_position'),
        F.col('away_players_exploded.playerId').alias('awayPlayer_playerId'),
        F.col('away_players_exploded.speed').alias('awayPlayer_speed'),
        F.col('away_players_exploded.xyz').alias('awayPlayer_3d_position'),
        *base_columns
    )
    .withColumn("home_player_3d_position_x", F.col('homePlayer_3d_position').getItem(0))
    .withColumn("home_player_3d_position_y", F.col('homePlayer_3d_position').getItem(1))
    .withColumn("home_player_3d_position_z", F.col('homePlayer_3d_position').getItem(2))
)

Also, let's make a unified data for the ball data.

In [12]:
unified_ball_df = (
    match_date_df
    .select(
        F.col("ball.xyz").alias("ballPosition"),
        F.col("ball.speed").alias("ballSpeed"),
        *base_columns
    )
)

Here, it is needed to first check, if the path to delta table exists. If there exists a partition in this folder, rows will be updated. The reason can be the data are changed, e.g or new column is added (TODO: toto si este musim overit :D )

In [None]:
if os.path.isdir(delta_player_path):

        deltaTable = DeltaTable.forPath(spark, delta_player_path)

        (
            deltaTable.alias('oldData')
            .merge(
                unified_players_df.alias('newData'),
                "oldData.match_date = newData.match_date"
            )
            .whenNotMatchedInsertAll()
            .execute()
        )
else:

    (
        unified_players_df
        .write
        .format('delta')
        .mode('overwrite')
        .partitionBy('match_date')
        .save(delta_player_path)
    )


In [None]:
if os.path.isdir(delta_ball_path):

        deltaTable = DeltaTable.forPath(spark, delta_ball_path)

        (
            deltaTable.alias('oldData')
            .merge(
                unified_ball_df.alias('newData'),
                "oldData.match_date = newData.match_date"
            )
            .whenNotMatchedInsertAll()
            .execute()
        )
else:

    (
        unified_ball_df
        .write
        .format('delta')
        .mode('overwrite')
        .partitionBy('match_date')
        .save(delta_player_path)
    )


TODO Grouping:

Podme to urobit co najjednoduchsie:
* urobit dva dataframey (vzdy to bdue v dany zapas away team a home team, zgrupovat dva dataframey a dat do jedneho, kde bude identifikator, ci ide o home/away team

Tu by sa pocitali vlastne tie statistiky a performance jednotlivych hracov. uz ale z nacitaneho datasetu - resp. podla mna by mala existovat dalsia tabulka, aby sa to historizovalo.

Ja by som si kludne niekde nechal tie data, a potom si pocital tieto agregacie a spolu s datumom..cize nejaky feature store ?

In [9]:
from pyspark.sql.window import Window

windowTop = Window.partitionBy("away_home_team").orderBy(F.col("player_avg_speed").desc())

columns = unified_df.columns

home_columns = [col for col in columns if col.startswith('home')] + base_columns
home_df = (
    unified_df
    .select(home_columns)
)

home_df_grouped = (
    home_df
    .groupBy('homePlayer_playerId')
    .agg(
        F.avg('homePlayer_speed').alias('player_avg_speed'),
        F.max('homePlayer_speed').alias('player_max_speed')
    )
    .withColumn('away_home_team',F.lit('home'))
    .withColumnRenamed('homePlayer_playerId','playerId')
)

away_columns = [col for col in columns if col.startswith('away')] + base_columns

away_df = (
    unified_df
    .select(away_columns)
)

away_df_grouped = (
    away_df
    .groupBy('awayPlayer_playerId')
    .agg(
        F.avg('awayPlayer_speed').alias('player_avg_speed'),
        F.max('awayPlayer_speed').alias('player_max_speed')
    )
    .withColumn('away_home_team',F.lit('away'))
    .withColumnRenamed('awayPlayer_playerId','playerId')
)


In [23]:
players_performance_df = (
    home_df_grouped
    .union(away_df_grouped)
)

In [24]:
(
    players_performance_df
    .orderBy('playerId')
    .withColumn('rank',F.row_number().over(windowTop))
    .filter(F.col('rank') <= 3)
).show(10,truncate=False)

                                                                                

+--------+------------------+----------------+--------------+----+
|playerId|player_avg_speed  |player_max_speed|away_home_team|rank|
+--------+------------------+----------------+--------------+----+
|230046  |2.0587558851714207|7.43            |away          |1   |
|193488  |1.949944498725782 |9.09            |away          |2   |
|85242   |1.9467171484365127|9.03            |away          |3   |
|193111  |2.0908897298961997|8.31            |home          |1   |
|87396   |2.0776437014192446|8.5             |home          |2   |
|124165  |2.0078769219424277|7.66            |home          |3   |
+--------+------------------+----------------+--------------+----+



In [54]:
# insert new mapping


deltaTable = DeltaTable.forPath(spark, './test/match_one')

(
    deltaTable.alias('oldData')
    .merge(
        newData.alias('newData'),
        "oldData.match_date = newData.match_date"
    )
    .whenNotMatchedInsertAll()
    .execute()
)

22/07/15 23:45:13 WARN BaseSessionStateBuilder$$anon$2: Max iterations (100) reached for batch Operator Optimization before Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a larger value.
22/07/15 23:45:13 WARN BaseSessionStateBuilder$$anon$2: Max iterations (100) reached for batch Operator Optimization after Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a larger value.
22/07/15 23:45:18 WARN MergeIntoCommand: Merge source has SQLMetric(id: 1338, name: Some(number of source rows), value: 153972) rows in initial scan but SQLMetric(id: 1339, name: Some(number of source rows (during repeated scan)), value: 0) rows in second scan


In [12]:
ball_perf = (
    temp_df
    .withColumn('ball_seconds',F.when(F.col('ball_inside_box') == True, 0.04).otherwise(0))
)

* Okay toto zatial neham tak, treba si teraz rozmysliet tu rychlost hraca po x-ovej osi.

In [14]:
ball_perf.agg(F.sum('ball_seconds')/60).show(5,truncate=False)

+------------------------+
|(sum(ball_seconds) / 60)|
+------------------------+
|4.341999999999955       |
+------------------------+



                                                                                

In [29]:
unified_df.show(5,False)

+-------------------+----------------+----------------------+-------------------+----------------+----------------------+------+--------+---------+-------------+-----+---------+----------+---------------+-------------------------+-------------------------+-------------------------+
|homePlayer_playerId|homePlayer_speed|homePlayer_3d_position|awayPlayer_playerId|awayPlayer_speed|awayPlayer_3d_position|period|frameIdx|gameClock|wallClock    |live |lastTouch|match_date|ball_inside_box|home_player_3d_position_x|home_player_3d_position_y|home_player_3d_position_z|
+-------------------+----------------+----------------------+-------------------+----------------+----------------------+------+--------+---------+-------------+-----+---------+----------+---------------+-------------------------+-------------------------+-------------------------+
|198826             |0.0             |[14.98, -4.01, 0.0]   |21205              |0.0             |[-52.29, 0.09, 0.0]   |1     |0       |0.0      |1570

                                                                                

In [16]:
from pyspark.sql.window import Window
windowDelta = Window.partitionBy("homePlayer_playerId").orderBy("period","gameClock")
windowTimeDelta = Window.partitionBy("homePlayer_playerId").orderBy("period")

(
    home_df
    .dropDuplicates()
    .withColumn('x_pos_lag',F.lag("home_player_3d_position_x",1).over(windowDelta))
    .withColumn('y_pos_lag',F.lag("home_player_3d_position_y",1).over(windowDelta))
    .withColumn("time_lag",F.lag("gameClock").over(windowTimeDelta))
    .fillna(0.0)
    .withColumn("delta_x",F.abs(F.col("home_player_3d_position_x") - F.col("x_pos_lag")))
    .withColumn("delta_y",F.abs(F.col("home_player_3d_position_y") - F.col("y_pos_lag")))
    .withColumn("delta_time",F.col("gameClock") - F.col("time_lag"))
    .withColumn("speed_x", F.col("delta_x")/F.col("delta_time"))
    .withColumn("speed_y", F.col("delta_y")/F.col("delta_time"))
    .groupBy("homePlayer_playerId")
    .agg(F.max("speed_x").alias("maximum_speed_x"))
).show(5,truncate=False)

                                                                                

+-------------------+-----------------+
|homePlayer_playerId|maximum_speed_x  |
+-------------------+-----------------+
|232980             |8.750000000007994|
|195546             |8.250000000007505|
|87396              |7.500000000006839|
|71738              |8.500000000007732|
|78607              |9.00000000000826 |
+-------------------+-----------------+
only showing top 5 rows



### uloha 2