In [3]:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
import requests
from bs4 import BeautifulSoup
import pandas as pd

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 3, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

def extract_new_ipl_matches():
    """
    Extract new IPL matches from Cricbuzz that aren't already in our database
    Returns a list of dictionaries containing match information
    """
    base_url = "https://www.cricbuzz.com/cricket-series/7607/indian-premier-league-2024/matches"
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
    }
    
    try:
        # Get existing match URLs from database
        pg_hook = PostgresHook(postgres_conn_id='postgres_default')
        connection = pg_hook.get_conn()
        cursor = connection.cursor()
        cursor.execute("SELECT match_url FROM ipl_matches")
        existing_urls = {row[0] for row in cursor.fetchall()}
        
        # Scrape new matches from Cricbuzz
        response = requests.get(base_url, headers=headers)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'html.parser')
        
        match_cards = soup.find_all('div', class_='cb-col-75 cb-col')
        new_matches = []
        
        for card in match_cards:
            match_info = {}
            title_tag = card.find('a', class_='text-hvr-underline')
            if title_tag:
                match_url = "https://www.cricbuzz.com" + title_tag['href']
                # Only process if match not already in database
                if match_url not in existing_urls:
                    match_info['title'] = title_tag.text.strip()
                    match_info['match_url'] = match_url
                    
                    # Extract other match details
                    series_info = card.find('div', class_='text-gray')
                    if series_info:
                        match_info['series_info'] = series_info.text.strip()
                    
                    location_time = card.find('div', class_='text-gray cb-font-12')
                    if location_time:
                        parts = [part.strip() for part in location_time.text.split('•') if part.strip()]
                        if len(parts) >= 2:
                            match_info['venue'] = parts[0]
                            match_info['date_time'] = parts[1]
                    
                    result_tag = card.find('div', class_='cb-scr-wll-chvrn cb-lv-scrs-col')
                    if result_tag:
                        result = result_tag.text.strip()
                        # Only include completed matches
                        if 'won' in result.lower() or 'abandoned' in result.lower():
                            match_info['result'] = result
                            new_matches.append(match_info)
        
        return new_matches
    
    except Exception as e:
        print(f"Error in extraction: {str(e)}")
        raise

def transform_match_data(new_matches):
    """
    Transform raw match data into a structured format for database insertion
    """
    transformed = []
    
    for match in new_matches:
        # Parse date and time
        date_str = match.get('date_time', '')
        try:
            match_date = datetime.strptime(date_str, '%b %d, %Y, %I:%M %p')
            formatted_date = match_date.strftime('%Y-%m-%d')
            formatted_time = match_date.strftime('%H:%M')
        except ValueError:
            formatted_date = date_str
            formatted_time = ''
        
        # Extract teams from title
        teams = []
        title = match.get('title', '')
        if 'vs' in title:
            teams = [team.strip() for team in title.split('vs')]
        
        # Create transformed record
        record = {
            'match_title': title,
            'team1': teams[0] if len(teams) > 0 else '',
            'team2': teams[1] if len(teams) > 1 else '',
            'series': match.get('series_info', ''),
            'venue': match.get('venue', ''),
            'match_date': formatted_date,
            'match_time': formatted_time,
            'result': match.get('result', ''),
            'match_url': match.get('match_url', ''),
            'ingestion_timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }
        transformed.append(record)
    
    return transformed

def load_to_data_warehouse(transformed_data):
    """
    Load transformed data to PostgreSQL data warehouse
    """
    if not transformed_data:
        print("No new matches to load")
        return
    
    pg_hook = PostgresHook(postgres_conn_id='postgres_default')
    
    # Create table if not exists
    create_table_sql = """
    CREATE TABLE IF NOT EXISTS ipl_matches (
        id SERIAL PRIMARY KEY,
        match_title TEXT,
        team1 TEXT,
        team2 TEXT,
        series TEXT,
        venue TEXT,
        match_date DATE,
        match_time TIME,
        result TEXT,
        match_url TEXT UNIQUE,
        ingestion_timestamp TIMESTAMP
    );
    """
    pg_hook.run(create_table_sql)
    
    # Insert new matches
    for match in transformed_data:
        insert_sql = """
        INSERT INTO ipl_matches (
            match_title, team1, team2, series, venue, 
            match_date, match_time, result, match_url, ingestion_timestamp
        ) VALUES (
            %(match_title)s, %(team1)s, %(team2)s, %(series)s, %(venue)s,
            %(match_date)s, %(match_time)s, %(result)s, %(match_url)s, %(ingestion_timestamp)s
        ) ON CONFLICT (match_url) DO NOTHING;
        """
        pg_hook.run(insert_sql, parameters=match)
    
    print(f"Successfully loaded {len(transformed_data)} new matches")

def etl_ipl_matches():
    """
    Complete ETL process for IPL matches
    """
    new_matches = extract_new_ipl_matches()
    if not new_matches:
        print("No new completed matches found")
        return
    
    transformed_data = transform_match_data(new_matches)
    load_to_data_warehouse(transformed_data)

with DAG(
    'ipl_daily_ingestion',
    default_args=default_args,
    description='Daily ETL pipeline for IPL match data from Cricbuzz',
    schedule_interval='0 18 * * *',  # Runs daily at 6 PM UTC (11:30 PM IST)
    catchup=False,
    tags=['ipl', 'cricket', 'etl'],
) as dag:
    
    extract_transform_load = PythonOperator(
        task_id='etl_ipl_matches',
        python_callable=etl_ipl_matches,
    )

    extract_transform_load

OSError while attempting to symlink the latest log directory
