In [1]:
# 4_etl_update_job.ipynb

import pandas as pd
import logging
import time
from datetime import datetime
from data201 import db_connection

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('etl_job.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

def run_etl_job():
    """Run the ETL job to update analytical tables from operational data"""
    start_time = time.time()
    conn = None
    cursor = None
    
    try:
        logger.info("Starting ETL job")
        conn = db_connection(config_file='premier_league_analytics.ini')
        cursor = conn.cursor()
        
        # 1. Check if there's new data in the operational database
        cursor.execute("""
        SELECT COUNT(*) FROM Matches m
        LEFT JOIN fact_MatchResult fmr ON m.MatchID = fmr.MatchID
        WHERE fmr.MatchID IS NULL
        """)
        new_matches = cursor.fetchone()[0]
        
        if new_matches == 0:
            logger.info("No new matches to process")
            
            # Still update the league snapshot with current data
            update_league_snapshot(conn, cursor)
            return True
        
        logger.info(f"Found {new_matches} new matches to process")
        
        # 2. Update dim_Time if needed
        cursor.execute("""
        INSERT IGNORE INTO dim_Time (
            `Date`, `DayOfWeek`, `DayName`, `DayOfMonth`, 
            `DayOfYear`, `WeekOfYear`, `Month`, `MonthName`, 
            `Quarter`, `Year`, `IsWeekend`
        )
        SELECT DISTINCT 
            m.`MatchDate`,
            DAYOFWEEK(m.`MatchDate`),
            DAYNAME(m.`MatchDate`),
            DAY(m.`MatchDate`),
            DAYOFYEAR(m.`MatchDate`),
            WEEK(m.`MatchDate`),
            MONTH(m.`MatchDate`),
            MONTHNAME(m.`MatchDate`),
            QUARTER(m.`MatchDate`),
            YEAR(m.`MatchDate`),
            CASE WHEN DAYOFWEEK(m.`MatchDate`) IN (1, 7) THEN TRUE ELSE FALSE END
        FROM `Matches` m
        LEFT JOIN dim_Time dt ON m.MatchDate = dt.Date
        WHERE dt.TimeID IS NULL
        """)
        
        new_dates = cursor.rowcount
        if new_dates > 0:
            logger.info(f"Added {new_dates} new dates to Time dimension")
            
            # Update with season information
            cursor.execute("""
            UPDATE dim_Time t
            JOIN Seasons s ON t.Date BETWEEN s.StartDate AND s.EndDate
            SET t.Season = s.SeasonName
            WHERE t.Season IS NULL
            """)
        
        # 3. Update fact_MatchResult
        cursor.execute("""
        INSERT INTO fact_MatchResult (
            MatchID, TimeID, HomeTeamID, AwayTeamID, RefereeID,
            SeasonID, DivisionID, HomeGoals, AwayGoals, Result,
            HalfTimeHomeGoals, HalfTimeAwayGoals, HalfTimeResult
        )
        SELECT 
            m.MatchID,
            dt.TimeID,
            m.HomeTeamID,
            m.AwayTeamID,
            m.RefereeID,
            m.SeasonID,
            m.DivisionID,
            m.FTHG,
            m.FTAG,
            m.FTR,
            m.HTHG,
            m.HTAG,
            m.HTR
        FROM Matches m
        JOIN dim_Time dt ON m.MatchDate = dt.Date
        LEFT JOIN fact_MatchResult fmr ON m.MatchID = fmr.MatchID
        WHERE fmr.MatchID IS NULL
        """)
        
        new_match_facts = cursor.rowcount
        logger.info(f"Added {new_match_facts} new match facts")
        
        # 4. Update fact_TeamMatchStats
        # Home team stats
        cursor.execute("""
        INSERT INTO fact_TeamMatchStats (
            MatchID, TeamID, TimeID, SeasonID,
            IsHomeTeam, OpponentID, Goals, GoalsConceded,
            Shots, ShotsOnTarget, Corners, Fouls, 
            YellowCards, RedCards, Result, Points
        )
        SELECT 
            m.MatchID,
            m.HomeTeamID,
            dt.TimeID,
            m.SeasonID,
            TRUE,
            m.AwayTeamID,
            m.FTHG,
            m.FTAG,
            ms.HomeShots,
            ms.HomeShotsTarget,
            ms.HomeCorners,
            ms.HomeFouls,
            ms.HomeYellowCards,
            ms.HomeRedCards,
            CASE 
                WHEN m.FTR = 'H' THEN 'W'
                WHEN m.FTR = 'D' THEN 'D'
                ELSE 'L'
            END,
            CASE 
                WHEN m.FTR = 'H' THEN 3
                WHEN m.FTR = 'D' THEN 1
                ELSE 0
            END
        FROM Matches m
        JOIN dim_Time dt ON m.MatchDate = dt.Date
        JOIN MatchStatistics ms ON m.MatchID = ms.MatchID
        LEFT JOIN fact_TeamMatchStats tms 
            ON m.MatchID = tms.MatchID AND m.HomeTeamID = tms.TeamID
        WHERE tms.TeamStatsID IS NULL
        """)
        
        new_home_stats = cursor.rowcount
        
        # Away team stats
        cursor.execute("""
        INSERT INTO fact_TeamMatchStats (
            MatchID, TeamID, TimeID, SeasonID,
            IsHomeTeam, OpponentID, Goals, GoalsConceded,
            Shots, ShotsOnTarget, Corners, Fouls, 
            YellowCards, RedCards, Result, Points
        )
        SELECT 
            m.MatchID,
            m.AwayTeamID,
            dt.TimeID,
            m.SeasonID,
            FALSE,
            m.HomeTeamID,
            m.FTAG,
            m.FTHG,
            ms.AwayShots,
            ms.AwayShotsTarget,
            ms.AwayCorners,
            ms.AwayFouls,
            ms.AwayYellowCards,
            ms.AwayRedCards,
            CASE 
                WHEN m.FTR = 'A' THEN 'W'
                WHEN m.FTR = 'D' THEN 'D'
                ELSE 'L'
            END,
            CASE 
                WHEN m.FTR = 'A' THEN 3
                WHEN m.FTR = 'D' THEN 1
                ELSE 0
            END
        FROM Matches m
        JOIN dim_Time dt ON m.MatchDate = dt.Date
        JOIN MatchStatistics ms ON m.MatchID = ms.MatchID
        LEFT JOIN fact_TeamMatchStats tms 
            ON m.MatchID = tms.MatchID AND m.AwayTeamID = tms.TeamID
        WHERE tms.TeamStatsID IS NULL
        """)
        
        new_away_stats = cursor.rowcount
        logger.info(f"Added {new_home_stats + new_away_stats} new team match stats")
        
        # 5. Update league snapshot
        update_league_snapshot(conn, cursor)
        
        # Commit all changes
        conn.commit()
        
        end_time = time.time()
        logger.info(f"ETL job completed in {end_time - start_time:.2f} seconds")
        return True
        
    except Exception as e:
        if conn:
            conn.rollback()
        logger.error(f"Error in ETL job: {e}")
        return False
        
    finally:
        if cursor:
            cursor.close()
        if conn:
            conn.close()

# After loading fact_MatchResult data, add this code to create a league snapshot

def create_initial_league_snapshot():
    """Create an initial league table snapshot from match data"""
    conn = db_connection(config_file='premier_league_analytics.ini')
    cursor = conn.cursor()
    
    try:
        # Find season with ID 6 (2023-24 season)
        cursor.execute("SELECT SeasonID, SeasonName FROM Seasons WHERE SeasonID = 6")
        season_result = cursor.fetchone()
        
        if not season_result:
            print("Season ID 6 not found!")
            return False
            
        season_id = season_result[0]
        season_name = season_result[1]
        print(f"Creating league snapshot for {season_name} (ID: {season_id})")
        
        # Get Premier League division
        cursor.execute("SELECT DivisionID FROM Divisions WHERE DivisionCode = 'E0'")
        division_result = cursor.fetchone()
        
        if not division_result:
            # Use first division as fallback
            cursor.execute("SELECT DivisionID FROM Divisions LIMIT 1")
            division_result = cursor.fetchone()
            
        if not division_result:
            print("No divisions found in database")
            return False
            
        division_id = division_result[0]
        
        # Get most recent date in dim_Time
        cursor.execute("SELECT MAX(TimeID) FROM dim_Time")
        time_id = cursor.fetchone()[0]
        
        # Create the league snapshot
        cursor.execute("""
        INSERT INTO fact_LeagueSnapshot (
            SeasonID, DivisionID, TimeID, TeamID, Position,
            MatchesPlayed, Won, Drawn, Lost, GoalsFor, GoalsAgainst,
            Points, Form
        )
        WITH TeamStats AS (
            SELECT 
                t.TeamID,
                t.TeamName,
                COUNT(DISTINCT CASE WHEN mr.HomeTeamID = t.TeamID OR mr.AwayTeamID = t.TeamID 
                                  THEN mr.MatchID END) AS MatchesPlayed,
                SUM(CASE WHEN (mr.HomeTeamID = t.TeamID AND mr.Result = 'H') OR
                               (mr.AwayTeamID = t.TeamID AND mr.Result = 'A') 
                          THEN 1 ELSE 0 END) AS Won,
                SUM(CASE WHEN mr.Result = 'D' AND 
                               (mr.HomeTeamID = t.TeamID OR mr.AwayTeamID = t.TeamID)
                          THEN 1 ELSE 0 END) AS Drawn,
                SUM(CASE WHEN (mr.HomeTeamID = t.TeamID AND mr.Result = 'A') OR
                               (mr.AwayTeamID = t.TeamID AND mr.Result = 'H') 
                          THEN 1 ELSE 0 END) AS Lost,
                SUM(CASE WHEN mr.HomeTeamID = t.TeamID THEN mr.HomeGoals
                          WHEN mr.AwayTeamID = t.TeamID THEN mr.AwayGoals
                          ELSE 0 END) AS GoalsFor,
                SUM(CASE WHEN mr.HomeTeamID = t.TeamID THEN mr.AwayGoals
                          WHEN mr.AwayTeamID = t.TeamID THEN mr.HomeGoals
                          ELSE 0 END) AS GoalsAgainst,
                SUM(CASE WHEN (mr.HomeTeamID = t.TeamID AND mr.Result = 'H') THEN 3
                          WHEN (mr.AwayTeamID = t.TeamID AND mr.Result = 'A') THEN 3
                          WHEN mr.Result = 'D' AND (mr.HomeTeamID = t.TeamID OR mr.AwayTeamID = t.TeamID) THEN 1
                          ELSE 0 END) AS Points
            FROM Teams t
            JOIN fact_MatchResult mr ON t.TeamID = mr.HomeTeamID OR t.TeamID = mr.AwayTeamID
            WHERE mr.SeasonID = %s
            GROUP BY t.TeamID, t.TeamName
            HAVING MatchesPlayed > 0
        ),
        RankedTeams AS (
            SELECT
                *,
                ROW_NUMBER() OVER (
                    ORDER BY Points DESC, 
                            (GoalsFor - GoalsAgainst) DESC, 
                            GoalsFor DESC
                ) AS Position
            FROM TeamStats
        )
        SELECT 
            %s AS SeasonID,
            %s AS DivisionID,
            %s AS TimeID,
            TeamID,
            Position,
            MatchesPlayed,
            Won,
            Drawn,
            Lost,
            GoalsFor,
            GoalsAgainst,
            Points,
            '' AS Form -- Empty for initial load
        FROM RankedTeams
        """, (season_id, season_id, division_id, time_id))
        
        rows_affected = cursor.rowcount
        print(f"Created league table snapshot with {rows_affected} team positions")
        
        # Commit changes
        conn.commit()
        return True
        
    except Exception as e:
        print(f"Error creating league snapshot: {e}")
        if conn:
            conn.rollback()
        return False
        
    finally:
        cursor.close()
        conn.close()

# Create the initial league snapshot
create_initial_league_snapshot()


def update_league_snapshot(conn, cursor):
    """Update the league table snapshot with better error handling"""
    try:
        # Get the time ID for today
        cursor.execute("SELECT TimeID FROM dim_Time WHERE Date = CURRENT_DATE()")
        today_time_id = cursor.fetchone()
        
        if not today_time_id:
            # Get the most recent date
            cursor.execute("SELECT MAX(TimeID) FROM dim_Time")
            today_time_id = cursor.fetchone()
        
        if not today_time_id or today_time_id[0] is None:
            logger.error("No valid TimeID found in dim_Time table")
            return False
            
        today_time_id = today_time_id[0]
        logger.info(f"Using TimeID: {today_time_id} for snapshot")
        
        # Get most recent season
        cursor.execute("SELECT SeasonID FROM Seasons ORDER BY EndDate DESC LIMIT 1")
        season_result = cursor.fetchone()
        if not season_result:
            logger.error("No seasons found in database")
            return False
            
        season_id = season_result[0]
        logger.info(f"Using SeasonID: {season_id} for snapshot")
        
        # Get division (Premier League)
        cursor.execute("SELECT DivisionID FROM Divisions WHERE DivisionCode = 'E0'")
        division_result = cursor.fetchone()
        if not division_result:
            # Just get the first division as fallback
            cursor.execute("SELECT DivisionID FROM Divisions LIMIT 1")
            division_result = cursor.fetchone()
            
        if not division_result:
            logger.error("No divisions found in database")
            return False
            
        division_id = division_result[0]
        logger.info(f"Using DivisionID: {division_id} for snapshot")
        
        # Check if we have matches for this season and division
        cursor.execute("""
        SELECT COUNT(*) 
        FROM fact_MatchResult 
        WHERE SeasonID = %s
        """, (season_id,))
        
        match_count = cursor.fetchone()[0]
        if match_count == 0:
            logger.warning(f"No matches found for SeasonID {season_id}")
            return False
        
        # Create a simplified snapshot query without relying on complex joins or GROUP_CONCAT
        cursor.execute("""
        INSERT INTO fact_LeagueSnapshot (
            SeasonID, DivisionID, TimeID, TeamID, Position,
            MatchesPlayed, Won, Drawn, Lost, GoalsFor, GoalsAgainst,
            Points, Form
        )
        WITH TeamStats AS (
            SELECT 
                t.TeamID,
                t.TeamName,
                COUNT(DISTINCT CASE WHEN mr.HomeTeamID = t.TeamID OR mr.AwayTeamID = t.TeamID 
                                  THEN mr.MatchID END) AS MatchesPlayed,
                SUM(CASE WHEN (mr.HomeTeamID = t.TeamID AND mr.Result = 'H') OR
                               (mr.AwayTeamID = t.TeamID AND mr.Result = 'A') 
                          THEN 1 ELSE 0 END) AS Won,
                SUM(CASE WHEN mr.Result = 'D' AND 
                               (mr.HomeTeamID = t.TeamID OR mr.AwayTeamID = t.TeamID)
                          THEN 1 ELSE 0 END) AS Drawn,
                SUM(CASE WHEN (mr.HomeTeamID = t.TeamID AND mr.Result = 'A') OR
                               (mr.AwayTeamID = t.TeamID AND mr.Result = 'H') 
                          THEN 1 ELSE 0 END) AS Lost,
                SUM(CASE WHEN mr.HomeTeamID = t.TeamID THEN mr.HomeGoals
                          WHEN mr.AwayTeamID = t.TeamID THEN mr.AwayGoals
                          ELSE 0 END) AS GoalsFor,
                SUM(CASE WHEN mr.HomeTeamID = t.TeamID THEN mr.AwayGoals
                          WHEN mr.AwayTeamID = t.TeamID THEN mr.HomeGoals
                          ELSE 0 END) AS GoalsAgainst,
                SUM(CASE WHEN (mr.HomeTeamID = t.TeamID AND mr.Result = 'H') THEN 3
                          WHEN (mr.AwayTeamID = t.TeamID AND mr.Result = 'A') THEN 3
                          WHEN mr.Result = 'D' AND (mr.HomeTeamID = t.TeamID OR mr.AwayTeamID = t.TeamID) THEN 1
                          ELSE 0 END) AS Points,
                '' AS Form -- Placeholder for form
            FROM Teams t
            JOIN fact_MatchResult mr ON t.TeamID = mr.HomeTeamID OR t.TeamID = mr.AwayTeamID
            WHERE mr.SeasonID = %s
            GROUP BY t.TeamID, t.TeamName
            HAVING MatchesPlayed > 0
        ),
        RankedTeams AS (
            SELECT
                *,
                ROW_NUMBER() OVER (
                    ORDER BY Points DESC, 
                            (GoalsFor - GoalsAgainst) DESC, 
                            GoalsFor DESC
                ) AS Position
            FROM TeamStats
        )
        SELECT 
            %s AS SeasonID,
            %s AS DivisionID,
            %s AS TimeID,
            TeamID,
            Position,
            MatchesPlayed,
            Won,
            Drawn,
            Lost,
            GoalsFor,
            GoalsAgainst,
            Points,
            Form
        FROM RankedTeams
        """, (season_id, season_id, division_id, today_time_id))
        
        rows_affected = cursor.rowcount
        logger.info(f"Created league table snapshot with {rows_affected} team positions")
        
        # Commit the changes
        conn.commit()
        return True
        
    except Exception as e:
        logger.error(f"Error creating league snapshot: {e}")
        return False
# Run the ETL job
if __name__ == "__main__":
    run_etl_job()

2025-05-08 17:11:23,185 - INFO - package: mysql.connector.plugins
2025-05-08 17:11:23,186 - INFO - plugin_name: caching_sha2_password
2025-05-08 17:11:23,186 - INFO - AUTHENTICATION_PLUGIN_CLASS: MySQLCachingSHA2PasswordAuthPlugin
2025-05-08 17:11:23,197 - INFO - Starting ETL job
2025-05-08 17:11:23,218 - INFO - No new matches to process
2025-05-08 17:11:23,219 - INFO - Using TimeID: 117 for snapshot
2025-05-08 17:11:23,220 - INFO - Using SeasonID: 5 for snapshot
2025-05-08 17:11:23,220 - INFO - Using DivisionID: 1 for snapshot


Creating league snapshot for 2223 (ID: 6)
Created league table snapshot with 20 team positions
