# 1. Setup stage, RUN IT BEFORE ALL


## 1.1 Import needed libs

In [9]:
import requests
import json
import concurrent.futures
from itertools import chain
import time
import threading
from queue import Queue
from threading import Semaphore
from itertools import product

In [None]:
## 1.2 Initialize spark session

In [18]:
from src.utils.data_configure.unified_data_utils import UnifiedDataUtils
from src.utils.spark_utils.spark_utils import create_spark_session

spark = create_spark_session()
# Initialize unified utilities
utils = UnifiedDataUtils(spark)

🚀 Creating Spark session...


## 2.1 Define common fetching function
- Declare root URL and other endpoints
- Define request's header
- Define `fetchAPI` function


In [10]:
# Construct the API URL
rootUrl = "https://v3.football.api-sports.io" # Configure to use variable later
leaguesEndpoint = f"{rootUrl}/leagues"
teamsEndpoint = f"{rootUrl}/teams"
fixturesEndpoint = f"{rootUrl}/fixtures"
# playersEndpoint = f"{rootUrl}/players"

# Retrieve the API key from Azure Key Vault-backed secret scope
api_key = "dc29305c858a5512b337edba1b7c63b5"

# Define headers
headers = {
    'x-rapidapi-host': 'v3.football.api-sports.io',
    'x-rapidapi-key': api_key  
}

# Define function to fetch API
def fetchAPI(url):
    try:
        # Make GET request to fetch data
        response = requests.get(url, headers=headers)
        # Check if request is success
        response.raise_for_status()
        if response.status_code == 200:
            print('Data fetched successfully')
            return response.json()
        else:
            raise Exception('Error from fetching API')
    except requests.exceptions.RequestException as e:
            raise Exception('Error from fetching API: ', e)


# 3. Fetch each endpoints and save raw data into bronze storage

## 3.1 Call to fetch and save 'Leagues' data
- Define file's path for output of 'League' data
- Save fetched 'League' data into Bronze storage layer

In [21]:
saveLeagueDataPath = 'api_football/leagues.json'

try:
    response = fetchAPI(leaguesEndpoint)
    resultCount = response.get('results')
    
    if not resultCount:
        print('Empty data from fetching Leagues endpoint')
    else:
        print('There is result!!')
        
        # Save the JSON data using the bronze layer path
        data = response.get('response', [])
        print(f"Data rows: {resultCount}")
        
        # Use the medallion architecture bronze layer
        file_path = "../" + utils.get_layer_path("bronze", saveLeagueDataPath)
        
        # Write JSON data using the unified utils method
        success = utils.write_json_file(file_path, data)
        
        if success:
            print(f"Data saved to {file_path}")
        else:
            print(f"Failed to save data to {file_path}")
            
except Exception as e:
    print('There is error from fetching Leagues data')
    print(e)

Data fetched successfully
There is result!!
Data rows: 1186
Data saved to ../data/bronze/api_football/leagues.json


## 3.2 Call to fetch and save 'Teams' data

### Define fetch 'Teams' data function
- Due to request's rate limit (10 request/minutes), handle it by defining delay time between requests
- Use multi workers
- When fetching 'Teams' data, we take notice into 2 params: `league_id` and `season`

In [19]:

# Configuration
MAX_WORKERS = 3  # Conservative number to stay under limit
REQUESTS_PER_MINUTE = 10
DELAY = 60 / REQUESTS_PER_MINUTE  # 6 seconds between requests

# Track last request time globally
last_request_time = 0
request_lock = threading.Lock()

def fetch_teams(league_id, season=2023):
    url = f"{teamsEndpoint}?league={league_id}&season={season}"
    global last_request_time
    try:
        with request_lock:
            elapsed = time.time() - last_request_time
            if elapsed < DELAY:
                time.sleep(DELAY - elapsed)
            last_request_time = time.time()

        # Fetch data
        response = fetchAPI(url)
        resultCount = response.get('results')
        if not resultCount:
            print(f'Empty data from fetching Teams endpoint for league {league_id} and season {season}')
            return []
        else:
            data = response.get('response', [])
            return data
    except requests.exceptions.RequestException as e:
            print(f"Error fetching data from API: {e}")


In [25]:
availableSeason = [2022, 2023]
saveTeamDataPath = 'api_football/teams.json'
file_path = f"bronze/{saveTeamDataPath}"

# Instead looping through 'leagues' list, we will choose to loop through pre-defined list of league's id since there is limitation for API request per day (due to FREE subscription)
# The pre-defined list of league/competitions will included most of major competitions in football, so we can have more other information later
pre_defined_league_ids = [1, 2, 3, 4, 5, 15, 140, 39, 45, 143, 135, 78, 61]
test_leagues = [140, 39, 45, 2, 3, 143, 135]

# Define function for fetching teams with combinations of league and season
def process_combination_fetch_teams():
    # Generate all league_id/year combinations
    combinations = list(product(test_leagues, availableSeason))
    results = []

    # Process with controlled parallelism
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        # Map combinations to fetch_teams function
        future_to_combo = {
            executor.submit(fetch_teams, league_id, year): (league_id, year)
            for league_id, year in combinations
        }

        # Collect results as they complete
        for future in concurrent.futures.as_completed(future_to_combo):
            league_id, year = future_to_combo[future]
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                print(f"Processing failed for {league_id}/{year}: {str(e)}")
    return results


all_data = process_combination_fetch_teams()

# Save raw data into bronze storage
if not all_data:
    print('Empty data')
else:
    flattenedList = list(chain.from_iterable(all_data))
    print(f"Data rows: ", len(flattenedList))
    json_data = json.dumps(flattenedList, indent=4)
    # Use the medallion architecture bronze layer
    file_path = "../" + utils.get_layer_path("bronze", saveTeamDataPath)
    # Write JSON data using the unified utils method
    success = utils.write_json_file(file_path, flattenedList)   
    if success:
        print(f"Data saved to {file_path}")
    else:
        print(f"Failed to save data to {file_path}")


Data fetched successfully
Data fetched successfully
Data fetched successfully
Data fetched successfully
Data fetched successfully
Data fetched successfully
Data fetched successfully
Data fetched successfully
Data fetched successfully
Data fetched successfully
Data fetched successfully
Data fetched successfully
Data fetched successfully
Data fetched successfully
Data rows:  2102
Data saved to ../data/bronze/api_football/teams.json


## 3.3 Call to fetch 'Fixtures' data

### Define fetch 'Fixture' data function

In [23]:
# Configuration
MAX_WORKERS = 3  # Conservative number to stay under limit
REQUESTS_PER_MINUTE = 10
DELAY = 60 / REQUESTS_PER_MINUTE  # 6 seconds between requests

# Thread-safe request tracker
request_times = Queue()

# Global semaphore to control total requests
request_semaphore = Semaphore(REQUESTS_PER_MINUTE)

def fetch_fixtures(league_id, season=2023):
    url = f"{fixturesEndpoint}?league={league_id}&season={season}"
    try:
        with request_semaphore:
            # Start timing the request
            start_time = time.time()

            # Fetch data
            response = fetchAPI(url)

            # Calculate remaining delay time
            request_duration = time.time() - start_time
            remaining_delay = max(0, DELAY - request_duration)
            time.sleep(remaining_delay)

            resultCount = response.get('results')
            if not resultCount:
                print(f'Empty data from fetching Fixtures endpoint for league {league_id} and season {season}')
                return []
            else:
                data = response.get('response', [])
                return data
    except requests.exceptions.RequestException as e:
            print(f"Error fetching data from API: {e}")


In [24]:
vailableSeason = [2023]
saveFixtureDataPath = 'api_football/fixtures.json'
file_path = "../" + utils.get_layer_path("bronze", saveFixtureDataPath)


# Instead looping through 'leagues' list, we will choose to loop through pre-defined list of league's id since there is limitation for API request per day (due to FREE subscription)
# The pre-defined list of league/competitions will included most of major competitions in football, so we can have more other information later
pre_defined_league_ids = [1, 2, 3, 4, 5, 15, 140, 39, 45, 143, 135, 78, 61]
test_leagues = [140, 39, 45, 2, 3]

# Define function for fetching teams with combinations of league and season
def process_combination_fetch_fixtures():
    # Generate all league_id/year combinations
    combinations = list(product(test_leagues, availableSeason))
    results = []

    # Process with controlled parallelism
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        # Map combinations to fetch_fixtures function
        future_to_combo = {
            executor.submit(fetch_fixtures, league_id, year): (league_id, year)
            for league_id, year in combinations
        }

        # Collect results as they complete
        for future in concurrent.futures.as_completed(future_to_combo):
            league_id, year = future_to_combo[future]
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                print(f"Processing failed for {league_id}/{year}: {str(e)}")
    return results


all_data = process_combination_fetch_fixtures()

# Save raw data into bronze storage
if not all_data:
    print('Empty data')
else:
    flattenedList = list(chain.from_iterable(all_data))
    print(f"Data rows: ", len(flattenedList))
    json_data = json.dumps(flattenedList, indent=4)
    # Write JSON data using the unified utils method
    success = utils.write_json_file(file_path, flattenedList)   
    if success:
        print(f"Data saved to {file_path}")
    else:
        print(f"Failed to save data to {file_path}")


Data fetched successfully
Data fetched successfully
Data fetched successfully
Data fetched successfully
Data fetched successfully
Data fetched successfully
Data fetched successfully
Data fetched successfully
Data fetched successfully
Data fetched successfully
Data rows:  4066
Data saved to ../data/bronze/api_football/fixtures.json
