### ONLY FOR TESTING PURPOSES, CANNOT BE USED IN PRODUCTION
##### This is a Kaggle Jupyter Notebook, therefore some restrictions apply when running Airflow in this environment. For example, the scheduling functions are restricted => we tested the execution of the DAG one time instead of scheduling it.

##### This script connects to the API Football data to fetch the Premier League standings results in a json file (Extract). Then, the useful data is arranged into a list of tuples (Transform). Finally, the script connects to Azure Database for MySQL Server to update a table with the standings results (Load).


In [25]:
# install mysql-connector-python package to connect with MySQL
!pip install mysql-connector-python 



Collecting mysql-connector-python
  Downloading mysql_connector_python-9.4.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (7.3 kB)
Downloading mysql_connector_python-9.4.0-cp311-cp311-manylinux_2_28_x86_64.whl (33.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.9/33.9 MB[0m [31m47.7 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hInstalling collected packages: mysql-connector-python
Successfully installed mysql-connector-python-9.4.0


In [11]:
# Orchestration - install apache Airflow

!pip install apache-airflow

Collecting apache-airflow
  Downloading apache_airflow-3.0.6-py3-none-any.whl.metadata (32 kB)
Collecting apache-airflow-core==3.0.6 (from apache-airflow)
  Downloading apache_airflow_core-3.0.6-py3-none-any.whl.metadata (7.4 kB)
Collecting apache-airflow-task-sdk<1.1.0,>=1.0.6 (from apache-airflow)
  Downloading apache_airflow_task_sdk-1.0.6-py3-none-any.whl.metadata (3.8 kB)
Collecting a2wsgi>=1.10.8 (from apache-airflow-core==3.0.6->apache-airflow)
  Downloading a2wsgi-1.10.10-py3-none-any.whl.metadata (4.0 kB)
Collecting apache-airflow-providers-common-compat>=1.6.0 (from apache-airflow-core==3.0.6->apache-airflow)
  Downloading apache_airflow_providers_common_compat-1.7.3-py3-none-any.whl.metadata (5.3 kB)
Collecting apache-airflow-providers-common-io>=1.5.3 (from apache-airflow-core==3.0.6->apache-airflow)
  Downloading apache_airflow_providers_common_io-1.6.2-py3-none-any.whl.metadata (5.3 kB)
Collecting apache-airflow-providers-common-sql>=1.26.0 (from apache-airflow-core==3.0.

In [12]:
# create airflow folder
import os
os.environ["AIRFLOW_HOME"] = "/kaggle/working/airflow"

In [13]:
# update database
!airflow db migrate

[[34m2025-09-17T01:55:20.533+0000[0m] {[34mproviders_manager.py:[0m953} INFO[0m - The hook_class '[1mairflow.providers.standard.hooks.filesystem.FSHook[22m' is not fully initialized (UI widgets will be missing), because the 'flask_appbuilder' package is not installed, however it is not required for Airflow components to work[0m
[[34m2025-09-17T01:55:20.535+0000[0m] {[34mproviders_manager.py:[0m953} INFO[0m - The hook_class '[1mairflow.providers.standard.hooks.package_index.PackageIndexHook[22m' is not fully initialized (UI widgets will be missing), because the 'flask_appbuilder' package is not installed, however it is not required for Airflow components to work[0m
DB: sqlite:////kaggle/working/airflow/airflow.db
Performing upgrade to the metadata database sqlite:////kaggle/working/airflow/airflow.db
[[34m2025-09-17T01:55:20.759+0000[0m] {[34mmigration.py:[0m211} INFO[0m - Context impl [1mSQLiteImpl[22m.[0m
[[34m2025-09-17T01:55:20.760+0000[0m] {[34mmigration.

**Create standings_dag.py necessary for orchestration with Airflow---------------------------------------------------------------**

In [79]:
%%writefile standings_dag.py
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import timedelta, datetime 
import pandas as pd 
import json
import requests
import mysql.connector
from kaggle_secrets import UserSecretsClient # enable kaggle secrets for this notebook to replace .env file


# retrieve credentials to connect API
API_KEY = UserSecretsClient().get_secret("API_KEY")
API_HOST = UserSecretsClient().get_secret("API_HOST")
url = API_HOST
headers = {"X-Auth-Token": API_KEY}

# retrieve credentials to connect to MySQL Server
MYSQL_HOST = UserSecretsClient().get_secret("MYSQL_HOST")
MYSQL_PORT = UserSecretsClient().get_secret("MYSQL_PORT")
MYSQL_USER = UserSecretsClient().get_secret("MYSQL_USER")
MYSQL_PWD = UserSecretsClient().get_secret("MYSQL_PWD")
MYSQL_DB = UserSecretsClient().get_secret("MYSQL_DB")

# define extract_data() function
def extract_data():
    print("fetching data from Football Data API")
    try:
        response = requests.get(url, headers = headers)
        response.raise_for_status() # raises an exception for http errors
        print("API response received sucessfully")
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"An error occured: {e}")
        raise

# define transform_data() function
# parse clubs records into a list of tuples
def transform_data():
    data = extract_data()
    standings_list = data["standings"][0]["table"] # filter data down to the standings table list
    rows = [] 
    for club in standings_list:
        season = 2025
        position = club["position"]
        team_id = club["team"]["id"]
        team = club["team"]["name"]
        played = club["playedGames"]
        won = club["won"]
        draw = club["draw"]
        lost = club["lost"]
        goals_for = club["goalsFor"]
        goals_against = club["goalsAgainst"]
        goal_diff = club["goalDifference"]
        points = club["points"]
        form = club["form"]
        club_records = (season, position, team_id, team, played, won, draw, lost, goals_for, goals_against, goal_diff, points, form)
        rows.append(club_records)
    return rows

# define connect_to_db() function
def connect_to_db():
    print("connect to MySQL database")
    try:
        conn = mysql.connector.connect(
            host = MYSQL_HOST,
            port = MYSQL_PORT,
            user = MYSQL_USER,
            password = MYSQL_PWD,
            database = MYSQL_DB,
            ssl_ca = "/kaggle/input/ca-cert/DigiCertGlobalRootG2.crt.pem", ssl_disabled = False,
            connection_timeout = 10, # process will stops after 10asec without a response
            autocommit = False, # transactions are written in the database ONLY when we call a commit operation
            raise_on_warnings = True # turns MySQL errors into python exceptions
        )
        if conn.is_connected():
           print(f"Connection to database successful!") 
        return conn
    except Error as e:
        print(f"Database connection failed: {e}")
        raise

# define create_table() function
def create_table():
    conn = connect_to_db()
    print("Create table if not exist")
    try:
        cursor = conn.cursor()
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS standings (
        season INT NOT NULL,
        position INT NOT NULL,
        team_id INT NOT NULL,
        team VARCHAR(100) NOT NULL,
        played INT NOT NULL,
        won INT NOT NULL,
        draw INT NOT NULL,
        lost INT NOT NULL,
        goals_for INT NOT NULL,
        goals_against INT NOT NULL,
        goal_diff INT NOT NULL,
        points INT NOT NULL,
        form VARCHAR(5),
        PRIMARY KEY (season, team_id),
        UNIQUE KEY unique_season_pos(season, position)
        );
        """)
        conn.commit()
        print("Table was created")
    except Exception as e:
        print(f"Failed to create table: {e}")

# define upsert_records() function to update the standings table in MySQL database
def upsert_records():
    UPSERT_SQL = """INSERT INTO standings (season, position, team_id, team, played, won, draw, lost, goals_for, goals_against, goal_diff, points, form)
    VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) AS src
    ON DUPLICATE KEY UPDATE
    position = src.position,
    team = src.team,
    played = src.played,
    won = src.won,
    draw = src.draw,
    lost = src.lost,
    goals_for = src.goals_for,
    goals_against = src.goals_against,
    goal_diff = src.goal_diff,
    points = src.points,
    form = src.form"""
    
    data = transform_data()
    conn = connect_to_db()
    print("Insert and update standings results into the database")
    no_rows = len(data) # number of rows in the table
    cursor = conn.cursor()
    try:
        cursor.executemany(UPSERT_SQL, data)
        conn.commit()
        print(f"[SUCCESS] - Upsert completed for {no_rows} rows")
    except Exception as e:
        conn.rollback()
        print(f"[ERROR] - Rolled back due to the following error: {e}")
    finally:
        cursor.close()
        conn.close()
        print("\nAll database connections closed. \nClean up completed.")

# DAG settings
defaults_args= {
    "owner": "jnh",
    "start_date": datetime(2025, 9, 16),
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retries_delay": timedelta(minutes = 5)
}
with DAG("standings_dag",
        default_args = defaults_args,
        catchup = False) as dag:
        new_table = PythonOperator(
        task_id = "new_table",
        python_callable = create_table
        )
        load = PythonOperator(
        task_id = "load",
        python_callable = upsert_records
        )
        new_table>>load

Overwriting standings_dag.py


In [19]:
# create dags folder to host standings_dag.py
folder = "dags"
folder_path = os.path.join("/kaggle/working/airflow", folder)
os.makedirs(folder_path, exist_ok = True)

In [80]:
import shutil
# delete standings_dags in dags folder if exists
if os.path.exists("/kaggle/working/airflow/dags/standings_dag.py"):
    os.remove("/kaggle/working/airflow/dags/standings_dag.py")
    print("file deleted")

# move standings_dag.py in dags folder
shutil.move("/kaggle/working/standings_dag.py", "/kaggle/working/airflow/dags/standings_dag.py")

file deleted


'/kaggle/working/airflow/dags/standings_dag.py'

In [81]:
# test airflow DAG execution
!airflow dags test standings_dag

[[34m2025-09-17T04:11:18.228+0000[0m] {[34mproviders_manager.py:[0m953} INFO[0m - The hook_class '[1mairflow.providers.standard.hooks.filesystem.FSHook[22m' is not fully initialized (UI widgets will be missing), because the 'flask_appbuilder' package is not installed, however it is not required for Airflow components to work[0m
[[34m2025-09-17T04:11:18.229+0000[0m] {[34mproviders_manager.py:[0m953} INFO[0m - The hook_class '[1mairflow.providers.standard.hooks.package_index.PackageIndexHook[22m' is not fully initialized (UI widgets will be missing), because the 'flask_appbuilder' package is not installed, however it is not required for Airflow components to work[0m
[[34m2025-09-17T04:11:18.289+0000[0m] {[34mmanager.py:[0m122} INFO[0m - DAG bundles loaded: [1mdags-folder, example_dags[22m[0m
[[34m2025-09-17T04:11:18.290+0000[0m] {[34mdagbag.py:[0m585} INFO[0m - Filling up the DagBag from [1m/kaggle/working/airflow/dags[22m[0m
[[34m2025-09-17T04:11:18.529+