#Bronze Layer: Raw Data Collection
The Bronze Layer involves collecting raw data directly from the source, which in this case is the StatsBomb API. This data is unprocessed and includes all details provided by the API. The goal of this layer is to gather comprehensive data that can later be cleaned and transformed in subsequent layers.


In [0]:
pip install requests pyspark

Python interpreter will be restarted.
Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
Collecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py): started
  Building wheel for pyspark (setup.py): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488511 sha256=31705870cfce7ad17aacb1c018e47ac362505140503dd43c2765b2b6ff6839db
  Stored in directory: /root/.cache/pip/wheels/92/09/11/aa01d01a7f005fda8a66ad71d2be7f8aa341bddafb27eee3c7
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.5.1
Python interpreter will be restarted.


In [0]:
#importing project dependencies 
import json
import requests
import pandas as pd

from pandas import json_normalize
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType



We will use data for each movement of Messi in every La Liga match. To achieve this, we need some information about these matches. The API link we will query returns data **for each match**. Therefore, we need to identify which matches these are (**match_id**), and to capture this information, we need to know the specific season and competition (**season_id** and **competition_id**).

To streamline the development, I start by storing some information about Messi. Next, I make the first query to the StatsBomb API to get a list of all competitions that the company freely provides.

In [0]:
#Lionel Messi Basic Information
messi_fullname = 'Lionel Andrés Messi Cuccittini'
messi_competition = 'La Liga'
messi_competition_gender = 'male'
messi_team = 'Barcelona'
messi_seasons = ['2020/2021', '2019/2020', '2018/2019', '2017/2018', '2016/2017', '2015/2016', '2014/2015',
                 '2013/2014', '2012/2013', '2011/2012', '2010/2011', '2009/2010', '2008/2009', '2007/2008',
                 '2006/2007', '2005/2006', '2004/2005']

In [0]:
#Load the competition file from StatsBomb open data folder at Github
competitions = requests.get('https://raw.githubusercontent.com/statsbomb/open-data/master/data/competitions.json')
competitions = competitions.json()

To understand more about the data, I print a list of all the data related to La Liga and notice that we have data available for more years than we need, including seasons in which Messi did not compete.

In [0]:
#Print all the competitions with competition_name: La Liga
for competition in competitions:
    if competition['competition_name'] == messi_competition:
        print(competition)

{'competition_id': 11, 'season_id': 90, 'country_name': 'Spain', 'competition_name': 'La Liga', 'competition_gender': 'male', 'competition_youth': False, 'competition_international': False, 'season_name': '2020/2021', 'match_updated': '2023-11-15T10:25:53.245065', 'match_updated_360': '2023-11-15T10:29:14.475865', 'match_available_360': '2023-11-15T10:29:14.475865', 'match_available': '2023-11-15T10:25:53.245065'}
{'competition_id': 11, 'season_id': 42, 'country_name': 'Spain', 'competition_name': 'La Liga', 'competition_gender': 'male', 'competition_youth': False, 'competition_international': False, 'season_name': '2019/2020', 'match_updated': '2024-03-10T11:29:39.412759', 'match_updated_360': '2021-06-13T16:17:31.694', 'match_available_360': None, 'match_available': '2024-03-10T11:29:39.412759'}
{'competition_id': 11, 'season_id': 4, 'country_name': 'Spain', 'competition_name': 'La Liga', 'competition_gender': 'male', 'competition_youth': False, 'competition_international': False, 's

So, I decide to generate a list with a more focused search, returning only the data that will be useful for the analysis. I already understood that 11 is the identification for La Liga in this database. Now, I generate a list with all the season IDs in which Messi participated.


In [0]:
messi_competition_id = 11

#Creating a list of all Messi's season_id
messi_seasons_idList = []

for competition in competitions:
    if competition['competition_name'] == messi_competition and competition['competition_gender'] == messi_competition_gender and competition['season_name'] in messi_seasons:
        messi_seasons_idList.append(competition['season_id'])
        print(competition)

{'competition_id': 11, 'season_id': 90, 'country_name': 'Spain', 'competition_name': 'La Liga', 'competition_gender': 'male', 'competition_youth': False, 'competition_international': False, 'season_name': '2020/2021', 'match_updated': '2023-11-15T10:25:53.245065', 'match_updated_360': '2023-11-15T10:29:14.475865', 'match_available_360': '2023-11-15T10:29:14.475865', 'match_available': '2023-11-15T10:25:53.245065'}
{'competition_id': 11, 'season_id': 42, 'country_name': 'Spain', 'competition_name': 'La Liga', 'competition_gender': 'male', 'competition_youth': False, 'competition_international': False, 'season_name': '2019/2020', 'match_updated': '2024-03-10T11:29:39.412759', 'match_updated_360': '2021-06-13T16:17:31.694', 'match_available_360': None, 'match_available': '2024-03-10T11:29:39.412759'}
{'competition_id': 11, 'season_id': 4, 'country_name': 'Spain', 'competition_name': 'La Liga', 'competition_gender': 'male', 'competition_youth': False, 'competition_international': False, 's

With the **season_id and competition_id** information in hand, it's possible to capture the **match_id** for each match played in these competitions and each season. I decide to store the data of each match in an array.

To do this, I create a function to capture this data, and to ensure that only the data of interest is stored, I create a list containing only the **match_id** where Barcelona played, either as the home or away team.

In [0]:
#  Request Function to get all Messi`s match
def fetch_matches(season_id, competition_id):
    url = f'https://raw.githubusercontent.com/statsbomb/open-data/master/data/matches/{competition_id}/{season_id}.json'
    response = requests.get(url)
    return response.json()

# Store the results here
messi_all_matches = []

# Iterate over all seasons to get the array filled
for season_id in messi_seasons_idList:
    matches = fetch_matches(season_id, messi_competition_id)
    messi_all_matches.extend(matches)

In [0]:
messi_matchesId_list = []
for match in messi_all_matches:
    if match['home_team']['home_team_name'] == messi_team or match['away_team']['away_team_name'] == messi_team:
        messi_matchesId_list.append(match['match_id'])

With this metadata, it's indeed possible to capture all the events from all matches. To do this, I create a function that retrieves the information for each match and stores it in an array.

In [0]:

# Starting Spark Session
spark = SparkSession.builder.appName("MessiEventsToParquet").getOrCreate()


Undoubtedly, this was the biggest challenge of the project. I tried at least 4 different approaches until I managed to solve it with the method below. I developed the code in Google Colab (my experimental lab), and it worked there. However, when I tried to run it in a Databricks notebook and transform the complete JSON array into a flattened DataFrame using *pd.json_normalize*, it was consuming too much memory and couldn't complete the operation.

As a Plan B, I went back to Colab and transformed the DataFrame into a CSV file, which ended up being 98MB in size. I saved it to Google Drive (since GitHub has a 25MB limit), then read the file, converted it to a Spark DataFrame, and saved it in Parquet format.

But as a last attempt to resolve everything here without resorting to Plan B, I approached it as follows: I divided the tasks into batches of approximately 20,000 records, generated logs to monitor the code execution, and it worked. The downside of this approach is that, based on my knowledge, pandas handles complex data better than Spark. Due to the nested data structure, Spark couldn't handle it well and generated errors for 3 columns: 'tactics_lineup', 'shot_freeze_frame', and 'related_events', which I had to ignore to make it work. Since these columns were not relevant for the analysis objective, I decided to keep it that way.

A point for improvement would be to further study and explore Spark to find a better way to handle complex data structures, avoiding the need to ignore columns during the process.

In [0]:
# Request Function to get all Messi`s events in a match
def fetch_messi_events(match_id):
    url = f'https://raw.githubusercontent.com/statsbomb/open-data/master/data/events/{match_id}.json'
    response = requests.get(url)
    return response.json()

# Function to eliminate complex columns and save the rest of the data into a Spark table
def save_to_spark(df, spark_table_name):
    complex_cols = ['tactics_lineup', 'shot_freeze_frame', 'related_events']
    for col in complex_cols:
        if col in df.columns:
            df = df.drop(columns=[col])

    spark_df = spark.createDataFrame(df)
    spark_df.write.option("mergeSchema", "true").mode('append').saveAsTable(spark_table_name)

In [0]:
batch_size = 20000
batch_data = []
batch = 0

spark_table_name = "messi_data"

#Iterate over all matches
for match_id in messi_matchesId_list:
    events = fetch_messi_events(match_id)
    batch_data.extend(events)

    # Check if the batch has reached the desired size
    if len(batch_data) >= batch_size:
        batch += 1  
        print(f"Batch {batch} processed. Size: {len(batch_data)}")
        
        # Flatten the data using pd.json_normalize
        df_flattened = pd.json_normalize(batch_data, sep='_')
        print(f"Batch {batch} flattened.")
        
        # Save to Spark table
        save_to_spark(df_flattened, spark_table_name)
        print(f"Batch {batch} Saved on Spark table.")

        # Cleaning the Batch
        batch_data = []

# Save any remaining data in the Batch, probably the last one that didn't reach the batch size
if batch_data:
    batch += 1
    print(f"Batch {batch} processado. Size: {len(batch_data)}")
    
    # Flatten the data using pd.json_normalize
    df_flattened = pd.json_normalize(batch_data, sep='_')
    print(f"Batch {batch} flattened.")

    # Salvar no Spark
    save_to_spark(df_flattened, spark_table_name)
    print(f"Batch {batch} Saved on Spark table.")

print(f" Total de eventos processados: {sum(len(fetch_messi_events(match_id)) for match_id in messi_matchesId_list)}")

Batch 1 processado. Size: 20063
Batch 1 flattened.
Batch 1 Saved on Spark table.
Batch 2 processado. Size: 24178
Batch 2 flattened.
Batch 2 Saved on Spark table.
Batch 3 processado. Size: 23063
Batch 3 flattened.
Batch 3 Saved on Spark table.
Batch 4 processado. Size: 20641
Batch 4 flattened.
Batch 4 Saved on Spark table.
Batch 5 processado. Size: 23430
Batch 5 flattened.
Batch 5 Saved on Spark table.
Batch 6 processado. Size: 23875
Batch 6 flattened.
Batch 6 Saved on Spark table.
Batch 7 processado. Size: 23561
Batch 7 flattened.
Batch 7 Saved on Spark table.
Batch 8 processado. Size: 20117
Batch 8 flattened.
Batch 8 Saved on Spark table.
Batch 9 processado. Size: 23440
Batch 9 flattened.
Batch 9 Saved on Spark table.
Batch 10 processado. Size: 23360
Batch 10 flattened.
Batch 10 Saved on Spark table.
Batch 11 processado. Size: 22766
Batch 11 flattened.
Batch 11 Saved on Spark table.
Batch 12 processado. Size: 23499
Batch 12 flattened.
Batch 12 Saved on Spark table.
Batch 13 processado

In [0]:
# Convert a Pandas Dataframe to Spark Dataframe
df_spark_messi = spark.table(spark_table_name)

In [0]:
# Save the DataFrame in Parquet format to DBFS.
df_spark_messi.write.mode('overwrite').parquet("/FileStore/tables/messi_raw_data_parquet")