# Initialize MongoDB client
See README.md for setup instructions.

In [5]:
import os
from urllib.parse import quote_plus
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
import warnings

warnings.simplefilter(action='ignore', category=FutureWarning)

username = quote_plus('common')
password = quote_plus(os.environ.get('MONGODB_PASSWORD'))
uri = f"mongodb+srv://{username}:{password}@playervaluations.v7jevdf.mongodb.net/?retryWrites=true&w=majority"
# Create a new client and connect to the server
client = MongoClient(uri, server_api=ServerApi('1'))
# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

Pinged your deployment. You successfully connected to MongoDB!


In [6]:
"""import json

db = client['player_valuations']
collection = db['players']
player = collection.find_one({'player_id': 10})

# Print the result
if player:
    print("Player found:", json.dumps(player, indent=4, default=str))
else:
    print("No player found with player_id", 65)"""

'import json\n\ndb = client[\'player_valuations\']\ncollection = db[\'players\']\nplayer = collection.find_one({\'player_id\': 10})\n\n# Print the result\nif player:\n    print("Player found:", json.dumps(player, indent=4, default=str))\nelse:\n    print("No player found with player_id", 65)'

In [7]:
import pandas as pd
from pyspark.sql.types import StructType, StructField, ArrayType, MapType, StringType, IntegerType, DoubleType

db = client['player_valuations']
collection = db['players']
res = collection.find()

df = pd.DataFrame(list(res))
df.drop("_id", axis=1, inplace=True)

In [8]:
schema = StructType([
    StructField("player_id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("name", StringType(), True),
    StructField("last_season", IntegerType(), True),
    StructField("current_club_id", IntegerType(), True),
    StructField("player_code", StringType(), True),
    StructField("country_of_birth", StringType(), True),
    StructField("city_of_birth", StringType(), True),
    StructField("country_of_citizenship", StringType(), True),
    StructField("date_of_birth", StringType(), True),
    StructField("sub_position", StringType(), True),
    StructField("position", StringType(), True),
    StructField("foot", StringType(), True),
    StructField("height_in_cm", DoubleType(), True),
    StructField("contract_expiration_date", StringType(), True),
    StructField("agent_name", StringType(), True),
    StructField("image_url", StringType(), True),
    StructField("url", StringType(), True),
    StructField("current_club_domestic_competition_id", StringType(), True),
    StructField("current_club_name", StringType(), True),
    StructField("market_value_in_eur", DoubleType(), True),
    StructField("highest_market_value_in_eur", DoubleType(), True),
    StructField("valuations", ArrayType(StructType([
        StructField("player_id", IntegerType(), True),
        StructField("date", StringType(), True),
        StructField("datetime", StringType(), True),
        StructField("dateweek", StringType(), True),
        StructField("market_value_in_eur", IntegerType(), True),
        StructField("current_club_id", IntegerType(), True),
        StructField("player_club_domestic_competition_id", StringType(), True),
    ]), True), True),
])

# Starting a spark session and extracting the raw data into dataframes

In [9]:
from pyspark.sql import SparkSession

ss = SparkSession.builder.master("local[*]").getOrCreate()

player_valuation_df=ss.createDataFrame(df, schema = schema)
appearances_df = ss.read.csv("raw_data\\transfermarkt\\appearances.csv", header=True, inferSchema=True)
games_df = ss.read.option("multiline","true").json("raw_data\\transfermarkt\\games.json")
game_events_df = ss.read.csv("raw_data\\transfermarkt\\game_events.csv", header=True, inferSchema=True)

# Some cleaning operations

In [10]:
from pyspark.sql.functions import when

player_valuation_df = player_valuation_df.drop(*["image_url", "url", "name", "player_code"])
player_valuation_df = player_valuation_df.withColumns({
    "current_club_id": when(player_valuation_df["last_season"] != 2023, -1).otherwise(player_valuation_df["current_club_id"]),
    "current_club_domestic_competition_id": when(player_valuation_df["last_season"] != 2023, "-1").otherwise(player_valuation_df["current_club_domestic_competition_id"]),
    "current_club_name": when(player_valuation_df["last_season"] != 2023, "Retired").otherwise(player_valuation_df["current_club_name"]),
    "market_value_in_eur": when(player_valuation_df["last_season"] != 2023, 0).otherwise(player_valuation_df["market_value_in_eur"])
})

games_df = games_df.drop(*["url", "aggregate", "home_club_formation", "away_club_formation"])

# Max market value player in 2023. If there are many with the same max value take them all.

In [11]:
from pyspark.sql.functions import explode, col, expr

valuations = player_valuation_df.select("valuations")
flattened_valuations = valuations.select(explode("valuations").alias("valuation"))
valuations2023 = flattened_valuations.filter("substring(valuation.date, 1, 4) = '2023'")
max_market_value_players_2023 = valuations2023\
    .select("valuation.*").groupBy("player_id").max("market_value_in_eur")\
    .withColumnRenamed("max(market_value_in_eur)", "market_value_in_eur")\
    .join(player_valuation_df.select("player_id", "first_name", "last_name"), on="player_id", how="inner")\
    .orderBy('market_value_in_eur', ascending=False)\
    .select("first_name", "last_name", "market_value_in_eur")

max_value = max_market_value_players_2023.select("market_value_in_eur").first()["market_value_in_eur"]

max_value_players = max_market_value_players_2023.filter(col("market_value_in_eur") == max_value)

max_value_players.show()

+----------+----------+-------------------+
|first_name| last_name|market_value_in_eur|
+----------+----------+-------------------+
|    Kylian|    Mbappé|          180000000|
|    Erling|   Haaland|          180000000|
|      Jude|Bellingham|          180000000|
+----------+----------+-------------------+



# EPL bookings exaample

In [12]:
cards_Premier_League = games_df.join(game_events_df, on="game_id", how="inner")\
    .join(player_valuation_df, on="player_id", how="inner")\
    .filter((col("type") == "Cards") & (col("competition_id") == "GB1"))\
    .select(player_valuation_df.first_name, player_valuation_df.last_name, games_df.home_club_name, games_df.away_club_name, games_df.date, game_events_df.description, game_events_df.minute)  

cards_Premier_League.show(5)

+----------+----------+--------------------+--------------------+----------+--------------------+------+
|first_name| last_name|      home_club_name|      away_club_name|      date|         description|minute|
+----------+----------+--------------------+--------------------+----------+--------------------+------+
|      Juan|      Cala|Tottenham Hotspur...|        Cardiff City|2014-03-02|1. Yellow card  ,...|    90|
|       NaN|Diogo Jota|Tottenham Hotspur...|Liverpool Footbal...|2023-09-30|Second yellow  , ...|    69|
|       NaN|Diogo Jota|Tottenham Hotspur...|Liverpool Footbal...|2023-09-30| Yellow card  , Foul|    68|
|      Juan|      Cala|        Cardiff City|Liverpool Footbal...|2014-03-22|2. Yellow card  ,...|    33|
|      Juan|      Cala|      Sunderland AFC|        Cardiff City|2014-04-27|Red card  , Profe...|    45|
+----------+----------+--------------------+--------------------+----------+--------------------+------+
only showing top 5 rows



In [19]:
player_valuation_df.show(5)

+---------+----------+------------+-----------+---------------+------------------+-------------+----------------------+-------------+--------------+----------+-----+------------+------------------------+--------------------+------------------------------------+-----------------+-------------------+---------------------------+--------------------+
|player_id|first_name|   last_name|last_season|current_club_id|  country_of_birth|city_of_birth|country_of_citizenship|date_of_birth|  sub_position|  position| foot|height_in_cm|contract_expiration_date|          agent_name|current_club_domestic_competition_id|current_club_name|market_value_in_eur|highest_market_value_in_eur|          valuations|
+---------+----------+------------+-----------+---------------+------------------+-------------+----------------------+-------------+--------------+----------+-----+------------+------------------------+--------------------+------------------------------------+-----------------+-------------------+---

# The goals scored by each player, that still plays since the data is being collected

In [30]:
from pyspark.sql import functions as F

# Provide aliases to the DataFrames
player_valuation_df_alias = player_valuation_df.alias("pvd")
game_events_df_alias = game_events_df.alias("ged")

player_goals = (
    player_valuation_df_alias
    .join(game_events_df_alias, on=F.col("pvd.player_id") == F.col("ged.player_id"), how="inner")
    .filter((F.col("ged.type") == "Goals") & (F.col("pvd.last_season") == 2023))
    .groupBy("pvd.player_id")
    .agg(F.sum(F.when(F.col("ged.type") == "Goals", 1).otherwise(0)).alias("total_goals"))
    .orderBy("total_goals", ascending=False)
    .select("pvd.first_name", "pvd.last_name", "total_goals")
)

player_goals.show(5)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `pvd`.`first_name` cannot be resolved. Did you mean one of the following? [`pvd`.`player_id`, `total_goals`].;
'Project ['pvd.first_name, 'pvd.last_name, total_goals#1132L]
+- Sort [total_goals#1132L DESC NULLS LAST], true
   +- Aggregate [player_id#0], [player_id#0, sum(CASE WHEN (type#158 = Goals) THEN 1 ELSE 0 END) AS total_goals#1132L]
      +- Filter ((type#158 = Goals) AND (last_season#4 = 2023))
         +- Join Inner, (player_id#0 = player_id#160)
            :- SubqueryAlias pvd
            :  +- Project [player_id#0, first_name#1, last_name#2, last_season#4, CASE WHEN NOT (last_season#4 = 2023) THEN -1 ELSE current_club_id#5 END AS current_club_id#194, country_of_birth#7, city_of_birth#8, country_of_citizenship#9, date_of_birth#10, sub_position#11, position#12, foot#13, height_in_cm#14, contract_expiration_date#15, agent_name#16, CASE WHEN NOT (last_season#4 = 2023) THEN -1 ELSE current_club_domestic_competition_id#19 END AS current_club_domestic_competition_id#195, CASE WHEN NOT (last_season#4 = 2023) THEN Retired ELSE current_club_name#20 END AS current_club_name#196, CASE WHEN NOT (last_season#4 = 2023) THEN cast(0 as double) ELSE market_value_in_eur#21 END AS market_value_in_eur#197, highest_market_value_in_eur#22, valuations#23]
            :     +- Project [player_id#0, first_name#1, last_name#2, last_season#4, current_club_id#5, country_of_birth#7, city_of_birth#8, country_of_citizenship#9, date_of_birth#10, sub_position#11, position#12, foot#13, height_in_cm#14, contract_expiration_date#15, agent_name#16, current_club_domestic_competition_id#19, current_club_name#20, market_value_in_eur#21, highest_market_value_in_eur#22, valuations#23]
            :        +- LogicalRDD [player_id#0, first_name#1, last_name#2, name#3, last_season#4, current_club_id#5, player_code#6, country_of_birth#7, city_of_birth#8, country_of_citizenship#9, date_of_birth#10, sub_position#11, position#12, foot#13, height_in_cm#14, contract_expiration_date#15, agent_name#16, image_url#17, url#18, current_club_domestic_competition_id#19, current_club_name#20, market_value_in_eur#21, highest_market_value_in_eur#22, valuations#23], false
            +- SubqueryAlias ged
               +- Relation [game_event_id#154,date#155,game_id#156,minute#157,type#158,club_id#159,player_id#160,description#161,player_in_id#162,player_assist_id#163] csv


# Saving the bookings example into parquet format

In [21]:
cards_Premier_League.write.mode("overwrite").parquet("raw_data\\transfermarkt\\epl_bookings")

Unexpected exception formatting exception. Falling back to standard exception


Traceback (most recent call last):
  File "C:\Users\stoya\AppData\Roaming\Python\Python311\site-packages\IPython\core\interactiveshell.py", line 3526, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "C:\Users\stoya\AppData\Local\Temp\ipykernel_14288\1385738950.py", line 1, in <module>
    cards_Premier_League.write.mode("overwrite").parquet("raw_data\\transfermarkt\\epl_bookings")
  File "d:\Anaconda\Lib\site-packages\pyspark\sql\readwriter.py", line 1721, in parquet
    self._jwrite.parquet(path)
  File "d:\Anaconda\Lib\site-packages\py4j\java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "d:\Anaconda\Lib\site-packages\pyspark\errors\exceptions\captured.py", line 179, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File "d:\Anaconda\Lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling

# Closing the MongoDB client and the spark session

In [None]:
"""client.close()
ss.stop()"""