In [7]:
!pip3 install numpy pandas matplotlib seaborn scikit-learn datetime nbformat

Collecting numpy
  Obtaining dependency information for numpy from https://files.pythonhosted.org/packages/be/f8/034752c5131c46e10364e4db241974f2eb6bb31bbfc4335344c19e17d909/numpy-1.26.0-cp310-cp310-macosx_10_9_x86_64.whl.metadata
  Using cached numpy-1.26.0-cp310-cp310-macosx_10_9_x86_64.whl.metadata (53 kB)
Collecting pandas
  Obtaining dependency information for pandas from https://files.pythonhosted.org/packages/f3/e6/7021570b1152ae8efc2dc99f4aef2c0b91c1f098a18cb8671d5b06ebdf53/pandas-2.1.1-cp310-cp310-macosx_10_9_x86_64.whl.metadata
  Using cached pandas-2.1.1-cp310-cp310-macosx_10_9_x86_64.whl.metadata (18 kB)
Collecting matplotlib
  Obtaining dependency information for matplotlib from https://files.pythonhosted.org/packages/30/5b/a6214caaa5adf07b52aecba98fdace32cc51e63a1fcc1f98d60ec128a6c0/matplotlib-3.8.0-cp310-cp310-macosx_10_12_x86_64.whl.metadata
  Using cached matplotlib-3.8.0-cp310-cp310-macosx_10_12_x86_64.whl.metadata (5.8 kB)
Collecting seaborn
  Obtaining dependency in

In [8]:
# Imports
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

import math

from sklearn.linear_model import LinearRegression

from datetime import datetime, timedelta

import sqlite3
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator

import nbformat

import warnings
warnings.filterwarnings('ignore')

# Create a requirements.txt file with the necessary packages
!pip freeze > airflow/dags/requirements.txt

## Data Cleaning and Preprocessing

In [11]:
# =========================== Random Musings =========================== #
spanish_squads = ['Sevilla', 'Sporting Huelva', 'Athletic Club', 'Levante Planas',
       'UDG Tenerife', 'Villarreal', 'Madrid CFF', 'Barcelona',
       'Atlético Madrid', 'Real Madrid', 'Alhama', 'Alavés',
       'Real Sociedad', 'Levante', 'Real Betis', 'Valencia']

explanable_cols = ['Player','Nation','Pos','Squad','Age','Born','Starts','Min','Gls','Total_Att','Blocks_Blocks','Blocks_Sh','Blocks_Pass','Clr','Err','Touches_Touches','Touches_DefPen','Dribbles_Succ','Dribbles_Att','Dribbles_Mis','AerialDuels_Won','AerialDuels_Lost']

spanish_players = pd.read_csv('assets/all_players.csv')
spanish_players = spanish_players[spanish_players['Squad'].isin(spanish_squads)]
spanish_players = spanish_players[explanable_cols]

# Update the Age column by substracting the Born column from the current year if Born is not null
spanish_players['Age'] = spanish_players['Born'].apply(lambda x: (2023- x) if x != np.nan else np.nan)

# If the Nation is NaN, replace it with Spain
spanish_players['Nation'] = spanish_players['Nation'].fillna('es ESP')

# In the spanish_players dataframe, if the datatype is float64, change the datatype to int64
for col in spanish_players.columns:
    if spanish_players[col].dtype == 'float64':
        spanish_players[col] = spanish_players[col].astype('int64')

cols = spanish_players.columns.to_list()
display(spanish_players.sample(5), spanish_players.shape)

Unnamed: 0,Player,Nation,Pos,Squad,Age,Born,Starts,Min,Gls,Total_Att,...,Blocks_Pass,Clr,Err,Touches_Touches,Touches_DefPen,Dribbles_Succ,Dribbles_Att,Dribbles_Mis,AerialDuels_Won,AerialDuels_Lost
113,Zaira Flores,es ESP,"DF,MF",Alhama,30,1993,4,299,0,91,...,1,9,0,129,7,0,0,2,2,6
104,Garazi Fácila,es ESP,"DF,MF",Alavés,24,1999,8,720,0,319,...,5,27,0,422,31,4,12,13,4,2
134,Geyse,br BRA,"FW,MF",Barcelona,25,1998,7,606,5,136,...,8,3,0,228,1,3,21,21,0,3
342,Irina Uribe,es ESP,"FW,MF",Levante Planas,25,1998,8,793,4,170,...,4,0,0,268,1,14,34,34,0,3
81,Yulema Corres,es ESP,MF,Athletic Club,31,1992,1,45,0,12,...,0,0,0,15,0,0,0,2,1,2


(358, 22)

In [12]:
# DAG arguments
default_args = {
    'owner': 'Lagom-QB',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'ETL_DAG',
    description='Load data into SQLite database using Airflow',
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(1),
    catchup=False
)


In [13]:
# Extract  data
file_loc       = 'assets/matches-checkpoint.csv'
useless_ids    = ['Away_id','Home_id','Match_id','League_id']
spanish_squads = ['Sevilla', 'Sporting Huelva', 'Athletic Club', 'Levante Planas',
                  'UDG Tenerife', 'Villarreal', 'Madrid CFF', 'Barcelona',
                  'Atlético Madrid', 'Real Madrid', 'Alhama', 'Alavés',
                  'Real Sociedad', 'Levante', 'Real Betis', 'Valencia']
def extract_data(file_location = file_loc, spanish_squads = spanish_squads, useless_ids = useless_ids):
    matches = pd.read_csv(file_location)
    # Make sure 'Home' or 'Away' is in the spanish_squads
    matches = matches[(matches['Home'].isin(spanish_squads)) | (matches['Away'].isin(spanish_squads))]
    matches = matches.drop(useless_ids, axis=1).reset_index(drop=True)
    matches['Date'] = pd.to_datetime(matches['Date'])
    
    return matches

# matches = extract_data()
# display(matches.sample(5), matches.shape)

## Data Transformation

**Feature Engineering** : Calculate derived metrics

**Data Aggregation** : Aggregate the data to a higher level of granularity.

**Data Filtering** : Filter the dataframe based on specific conditions or criteria. 

**Data Transformation** : Apply mathematical or statistical transformations to the data. 

**Feature Scaling** : Scale the numeric features to a common range to avoid bias in the analysis.

**Data Encoding** : Encode categorical variables into numerical representations. 

In [14]:
# Transform task
def transform_data(matches):
    # Clean data
    matches = matches.dropna()

    # Convert data types
    matches['Date'] = pd.to_datetime(matches['Date'])
    matches['Time'] = pd.to_datetime(matches['Time'], format='%H:%M').dt.time

    # Goal Difference: You can calculate the goal difference by subtracting the "ScoreAway" from the "ScoreHome" column. This metric gives you the difference in goals scored between the home and away teams in each match.
    matches['GoalDifference'] = matches['ScoreHome'] - matches['ScoreAway']
    
    # Result: You can calculate the result of each match by comparing the "ScoreHome" and "ScoreAway" columns. If the home team scored more goals than the away team, then the home team won the match. If the home team scored fewer goals than the away team, then the home team lost the match. If both teams scored the same number of goals, then the match was a draw.
    matches['Result'] = matches['Score'].apply(lambda x: 'Win' if x[0] > x[2] else 'Draw' if x[0] == x[2] else 'Loss')
    
    # Expected Goals Difference: Similar to the goal difference, you can calculate the expected goals difference by subtracting the "xGAway" from the "xGHome" column. This metric represents the difference in expected goals between the home and away teams in each match.
    matches['ExpectedGoalDifference'] = matches['xGHome'] - matches['xGAway']
    
    # Points: You can calculate the points earned by each team using a scoring system (e.g., 3 points for a win, 1 point for a draw, and 0 points for a loss). You can create a new column called "Points" and assign the corresponding points based on the match result in the "Score" column.
    matches['Points'] = matches['Score'].apply(lambda x: 3 if x[0] > x[2] else 1 if x[0] == x[2] else 0)
    
    # Expected Points: Similar to the points metric, you can calculate the expected points earned by each team using a similar scoring system but based on the expected goals (e.g., 3 points for xGHome > xGAway, 1 point for xGHome = xGAway, and 0 points for xGHome < xGAway). You can create a new column called "ExpectedPoints" and assign the corresponding expected points based on the expected goals in the "xGHome" and "xGAway" columns.
    matches['ExpectedPoints'] = matches['Score'].apply(lambda x: 3 if x[0] > x[2] else 1 if x[0] == x[2] else 0)
    
    # Win Percentage: You can calculate the win percentage for each team by dividing the number of wins (based on the "Score" column) by the total number of matches played.
    wins = matches[matches['Result'] == 'Win'].groupby('Home').size()
    total_matches = matches.groupby('Home').size()
    win_percentage = (wins / total_matches) * 100
    # Add win percentage to matches dataframe
    matches['WinPercentage'] = matches['Home'].map(win_percentage)

    matches['TotalGoals'] = matches['ScoreHome'] + matches['ScoreAway']

    matches['xGRatio'] = matches['xGHome'] / (matches['xGHome'] + matches['xGAway'])

    def get_points(row):
        if row['Result'] == 'Win':
            return 3
        elif row['Result'] == 'Draw':
            return 1
        else:
            return 0
    matches['Points'] = matches.apply(get_points, axis=1)

    return matches

# matches = transform_data(matches)
# display(matches.sample(5), matches.shape)

### ETL Process and Data Integration
_Loading of the data_ . 
__Apache Airflow__ supports a few databases: 
- SQLite _Lightweight filebased database suitable for small-scale deployments and testing_
- PostgreSQL _Relational database widely used in production environments_
- MySQL _Popular relational database widely used_
- Microsoft SQL Server _Commercial relational database widely used in enterprises_
- Oracle _Commercial relational database widely used in enterprises_
- Amazon RedShift _Cloud-based data warehouse optimized for analytics workloads_
- Google BigQuery _Cloud-based data warehouse optimized for analytics workloads_
- Apache Casssandra _Distributed No-SQL database optimized for high scalability and availability_
- Apache Hive _Data warehouse infrastructure for data summarization, querying and analytics_

I'm using SQLite because it's a small scale dataset

In [15]:
# Load task
def load_data(matches):
    # Connect to database
    conn = sqlite3.connect('assets/spanish_matches.db')

    # Create cursor
    c = conn.cursor()

    # Create table
    c.execute("""CREATE TABLE IF NOT EXISTS matches (
        Wk INTERGER,
        Day TEXT,
        Date DATE,
        Time TIME,
        Home TEXT,
        xGHome FLOAT,
        Score TEXT,
        xGAway FLOAT,
        Away TEXT,
        xPHome FLOAT,
        xPAway FLOAT,
        ScoreHome INTERGER,
        ScoreAway INTERGER,
        GoalDifference INTERGER,
        Result TEXT,
        ExpectedGoalDifference FLOAT,
        Points INTERGER,
        ExpectedPoints INTERGER,
        WinPercentage FLOAT,
        TotalGoals INTERGER,
        xGRatio FLOAT
    )""")

    # Insert DataFrame records one by one.
    for i, row in matches.iterrows():
        c.execute("""INSERT INTO matches VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", (
            row['Wk'],
            row['Day'],
            row['Date'],
            row['Time'],
            row['Home'],
            row['xGHome'],
            row['Score'],
            row['xGAway'],
            row['Away'],
            row['xPHome'],
            row['xPAway'],
            row['ScoreHome'],
            row['ScoreAway'],
            row['GoalDifference'],
            row['Result'],
            row['ExpectedGoalDifference'],
            row['Points'],
            row['ExpectedPoints'],
            row['WinPercentage'],
            row['TotalGoals'],
            row['xGRatio']
        ))

    # Commit changes
    conn.commit()

    # Close cursor and connection
    c.close()
    conn.close()


In [16]:
# Assign tasks
extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    op_kwargs={'matches': '{{ ti.xcom_pull(task_ids="extract_data") }}'},
    dag=dag
)

load_task = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    op_kwargs={'matches': '{{ ti.xcom_pull(task_ids="transform_data") }}'},
    dag=dag
)

In [17]:
!~/airflow_env/bin/airflow scheduler -D
!~/airflow_env/bin/airflow webserver -D

!~/airflow_env/bin/airflow dags list

!~/airflow_env/bin/airflow cheat-sheet

zsh:1: no such file or directory: /Users/iffiness/airflow_env/bin/airflow
zsh:1: no such file or directory: /Users/iffiness/airflow_env/bin/airflow
zsh:1: no such file or directory: /Users/iffiness/airflow_env/bin/airflow
zsh:1: no such file or directory: /Users/iffiness/airflow_env/bin/airflow


## Validation and Quality Assurance

To validate the quality of the data, I'm connecting to the database to check for null values in each column of the matches table.  
Specifically, I'll check the data type, the range anf completeness of the data

In [18]:
# Validate data in the database and ensure the proper quality
def validate_data():
    # Connect to database
    conn = sqlite3.connect('assets/spanish_matches.db')

    # Create cursor
    c = conn.cursor()

    # Data type validation
    c.execute("""SELECT COUNT(*) FROM matches where CAST(Wk AS INTEGER) IS NULL""")
    null_count = c.fetchone()[0]
    if null_count == 0:
        print('Data type validation passed.')
    else:
        print(f'Data type validation failed with {null_count} null values.')

    # Data range validation
    c.execute("""SELECT COUNT(*) FROM matches where Wk < 1 OR Wk > 10""")
    range_count = c.fetchone()[0]
    if range_count == 0:
        print('Data range validation passed.')
    else:
        print(f'Data range validation failed with {range_count} values out of range.')

    # Data completeness validation
    c.execute("""SELECT COUNT(*) FROM matches where Wk IS NULL""")
    completeness_count = c.fetchone()[0]
    if completeness_count == 0:
        print('Data completeness validation passed.')
    else:
        print(f'Data completeness validation failed with {completeness_count} null values.')

    c.close()
    conn.close()

validate_task = PythonOperator(
    task_id='validate_data',
    python_callable=validate_data,
    op_kwargs={'matches': '{{ ti.xcom_pull(task_ids="load_data") }}'},
    dag=dag
)

# Define task dependencies
extract_task >> transform_task >> load_task >> validate_task

<Task(PythonOperator): validate_data>

## Reporting and Analysis

Generate meaningful insights and reports.
- Trend analysis
- Team Performance analysis
- Team comparisons

In [19]:
# Trend analysis
def trend_analysis():
    # Connect to database
    conn = sqlite3.connect('assets/spanish_matches.db')

    # Create cursor
    c = conn.cursor()

    # Data type validation
    c.execute("""SELECT * FROM matches""")
    matches = pd.DataFrame(c.fetchall())
    
    # Define plot function
    def plot_data():
        sns.lineplot(x='Date', y='TotlaGoals', data=matches)
        plt.title('Total Goals Scored')
        plt.xlabel('Date')
        plt.ylabel('Total Goals')
        plt.show()
    # Look at the correlation between the expected goals and the actual goals
    def calculate_correlation():
        corr_home = matches['xGHome'].corr(matches['ScoreHome'])
        print(f'Correlation between expected Goals for the Home and actual goals Home: {corr_home}')
        corr_away = matches['xGAway'].corr(matches['ScoreAway'])
        print(f'Correlation between expected Goals for the Away and actual goals Away: {corr_away}')
    
    plot_data()
    calculate_correlation()

trend_analysis_task = PythonOperator(
    task_id='trend_analysis',
    python_callable=trend_analysis,
    op_kwargs={'matches': '{{ ti.xcom_pull(task_ids="validate_data") }}'},
    dag=dag
)   

# Define task dependencies
extract_task >> transform_task >> load_task >> validate_task >> trend_analysis_task

<Task(PythonOperator): trend_analysis>

In [20]:
# Exectute the DAG workflow and view the results in the Airflow UI from scripts/DataPipelining.py
!~/airflow_env/bin/airflow trigger_dag 

zsh:1: no such file or directory: /Users/iffiness/airflow_env/bin/airflow


In [22]:
# Convert notebook to python script
!jupyter nbconvert --to script DataPipelining.ipynb --output-dir='airflow/dags/'

[NbConvertApp] Converting notebook DataPipelining.ipynb to script
[NbConvertApp] Writing 15499 bytes to airflow/dags/DataPipelining.py
