In [1]:
import ray
import ray.data
import pandas as pd

In [2]:
from hdfs import Config

client = Config(path="./config/.hdfscli.cfg").get_client(
    "dev"
)

In [9]:
# Insert files, taken from populate_hdfs
files_to_upload = ["transfers.csv","competitions.csv", "appearances.csv", "clubs.csv", "games.csv", "players.csv"]

remote_path = "/data/"

In [10]:
# make sure remote path exists
client.makedirs(remote_path)

# Insert files, taken from populate_hdfs

# Check if the file exists
for file in files_to_upload:
    local_path = f"./data/{file}"
    print(f"Checking if {file} exists in {remote_path}...")
    if client.status(remote_path + file, strict=False):
        print(f"{file} exists in {remote_path}!")
        continue

    print(f"{file} does not exist in {remote_path}!")
    print(f"Uploading {file} to {remote_path}...")
    # Upload a file to tmp, to be processed further
    client.upload(remote_path, local_path)

print(f"contents in {remote_path}: ", client.list("/data"))

Checking if transfers.csv exists in /data/...
transfers.csv exists in /data/!
Checking if competitions.csv exists in /data/...
competitions.csv exists in /data/!
Checking if appearances.csv exists in /data/...
appearances.csv exists in /data/!
Checking if clubs.csv exists in /data/...
clubs.csv exists in /data/!
Checking if games.csv exists in /data/...
games.csv exists in /data/!
Checking if players.csv exists in /data/...
players.csv exists in /data/!
contents in /data/:  ['appearances.csv', 'clubs.csv', 'competitions.csv', 'games.csv', 'players.csv', 'transfers.csv']


In [5]:
# Initialize Ray
ray.init(dashboard_host="0.0.0.0")

2024-10-18 10:44:37,277	INFO worker.py:1777 -- Started a local Ray instance. View the dashboard at [1m[32m172.18.0.2:8265 [39m[22m


0,1
Python version:,3.9.20
Ray version:,2.37.0
Dashboard:,http://172.18.0.2:8265


In [13]:
with client.read("/data/transfers.csv") as reader:
    file_contents = reader.read().decode('utf-8')

from io import StringIO
transfers_df = pd.read_csv(StringIO(file_contents))

with client.read("/data/clubs.csv") as reader:
    file_contents = reader.read().decode('utf-8')

clubs_df = pd.read_csv(StringIO(file_contents))


transfers_df = transfers_df.merge(clubs_df[['club_id', 'domestic_competition_id']], left_on='from_club_id', right_on='club_id', how='left', validate='m:m')
transfers_df = transfers_df.rename(columns={'domestic_competition_id': 'from_competition_id'})

transfers_df = transfers_df.merge(clubs_df[['club_id', 'domestic_competition_id']], left_on='to_club_id', right_on='club_id', how='left', validate='m:m')
transfers_df = transfers_df.rename(columns={'domestic_competition_id': 'to_competition_id'})

transfers_df = transfers_df.drop(columns=['club_id_x', 'club_id_y', 'transfer_date'])
transfers_df = transfers_df.dropna(subset=['from_competition_id', 'to_competition_id'])

with client.read("/data/competitions.csv") as reader:
    file_contents = reader.read().decode('utf-8')

competitions_df = pd.read_csv(StringIO(file_contents))

transfers_df = transfers_df.merge(competitions_df[['competition_id', 'country_name', 'sub_type']], left_on='from_competition_id', right_on='competition_id', how='left', validate='m:m')
transfers_df = transfers_df.rename(columns={'country_name': 'from_country_name', 'sub_type': 'from_sub_type'})

transfers_df = transfers_df.merge(competitions_df[['competition_id','country_name', 'sub_type']], left_on='to_competition_id', right_on='competition_id', how='left', validate='m:m')
transfers_df = transfers_df.rename(columns={'country_name': 'to_country_name', 'sub_type': 'to_sub_type'})

transfers_df = transfers_df.drop(columns=['competition_id_x', 'competition_id_y'])

print(transfers_df.head())

# Convert the Pandas DataFrame into a Ray Dataset
dataset = ray.data.from_pandas(transfers_df)

   player_id transfer_season  from_club_id  to_club_id  from_club_name  \
0     195778           25/26            79          27   VfB Stuttgart   
1     569033           25/26            39          27  1.FSV Mainz 05   
2     626913           25/26           398         380           Lazio   
3     278343           25/26           167         114     FC Augsburg   
4     301238           25/26          2919         506           Monza   

    to_club_name  transfer_fee  market_value_in_eur          player_name  \
0  Bayern Munich           0.0           12000000.0      Alexander Nübel   
1  Bayern Munich           0.0            4000000.0         Armindo Sieb   
2    Salernitana           0.0           10000000.0          Boulaye Dia   
3       Besiktas     5000000.0            7000000.0       Felix Uduokhai   
4       Juventus    14300000.0           18000000.0  Michele Di Gregorio   

  from_competition_id to_competition_id from_country_name from_sub_type  \
0                  L1  

## Pre-processing

1. Remove entries where `transfer_fee == NaN`, since these entries are usually internal transfers (or from lower league youth teams).
2. Filter entries where `market_value_in_eur == Nan`, since we assume it's hard to find any info about these players

For now, we already execute/apply the filtering. But in the future, we will do all the processing first and then train our model on the batches, (hopefully) never applying `take_all`.

In [14]:
import time

# Define a simple filter function
def filter_func(batch):
    return batch[
        batch['transfer_fee'].notna() & 
        (~batch['transfer_fee'].isna()) & 
        batch['market_value_in_eur'].notna() & 
        (~batch['market_value_in_eur'].isna())
    ]

# Apply the filter using map_batches for better parallelization
start_time = time.time()
filtered_ds = dataset.map_batches(
    filter_func,
    batch_format="pandas",
    num_cpus=1  # This will allow up to 8 batches to be processed in parallel
)

# Materialize the results
result = filtered_ds.take_all()
end_time = time.time()

print(f"Filtering time: {end_time - start_time} seconds")
print(f"Original dataset size: {dataset.count()}")
print(f"Filtered dataset size: {len(result)}")

2024-10-18 12:09:57,649	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-10-18_10-44-35_213749_9/logs/ray-data
2024-10-18 12:09:57,651	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(filter_func)]


Running 0: 0.00 row [00:00, ? row/s]

- MapBatches(filter_func) 1: 0.00 row [00:00, ? row/s]

Filtering time: 39.26508975028992 seconds
Original dataset size: 19872
Filtered dataset size: 18959


(Potential additional steps)

3. Remove retired players
4. Drop `transfer_date` column, as we don't need it for anything (the `transfer_season` should be enough for everything time-related).
5. Drop one of `from_club_name` or `from_club_id` (and the same for `to_club_...`).

In [None]:
# From this table
# player_id, transfer_season, from_club_id, to_club_id, market_value_in_eur, fee

# Other useful tables and their attributes

# appearances.csv - minutes played, goals, assists
# (Would be hard to map to individual players playing, e.g. how do we know who was on the pitch when a goal was scored or conceded?) 
# club_games.csv - own_position, opponent_goals, opponent_position
# clubs.csv - domestic_competition_id, squad_size, average_age, foreigners_percentage, national_team_players, net_transfer_record, (maybe to filter outdated clubs) last_season
# (IMO useless) competitions.csv
# game_events.csv - player_id, type (goal, assist, card)
# (To know no. of games started) game_lineups.csv - player_id, position, type (substitute, starter)
# (IMO useless) games.csv
# (Useful for training, to know the valuation at the time of transfer, maybe 1 year prior?) player_valuations.csv - date, market_value_in_eur, current_club_id, player_id
# players.csv - last_season (filter retired players), country_of_birth, country_of_citizenship, position, sub_position, foot, height_in_cm, contract_expiration_date, agent_name, market_value_in_eur, highest_market_value_in_eur

## Variables

Independent - player information (from other tables), `market_value_in_eur`, `from_club_name`/`from_club_id`

Dependant - `to_club_id`/`to_club_name`, `transfer_fee`

Because we have multiple dependant variables, there would be two models - one regression one predicting the transfer fee and another one (classifier most likely) predicting the club ID/name.

When using as a service, it'd be nice if `player_id` and `to_club_name` were only necessary inputs and the rest read from HDFS/other data storage.
Let's presume that in these scenarios, the `transfer_season` would be the current one (24/25).

Representing club names/ids the best way possible:
- initially as IDs, but that could be interpreted as ordinality by the model
- ideally as embeddings - either of the club name or combinations such as "club country + league + club name"

In [None]:
# Shutdown Ray
# ray.shutdown()