In [None]:
# Principais bibliotecas
import requests
import json
import pandas as pd
from datetime import datetime, date, timedelta
from google.cloud import bigquery
from google.api_core.exceptions import NotFound

In [None]:
# Autenticação no Google Colab
from google.colab import auth
auth.authenticate_user()


In [None]:
# Define as variáveis de ambiente
API_KEY = '66fa5192ea29b6e667661468f424eb01' # Chave da API da API-Sports
API_URL = 'https://v3.football.api-sports.io' # URL base da API da API-Sports
PROJECT_ID = 'jopecasports' # ID do projeto no BigQuery
DATASET_NAME = 'soccer_analysis' # Nome do dataset no BigQuery
FULL_LOAD_DATE = '2023-01-01' # Data para carga completa inicial
LEAGUE = 2 # ID da liga na API-Sports
SEASON = 2023 # Ano da temporada

# Define o cabeçalho da requisição para a API
HEADERS = {
    'x-rapidapi-key': API_KEY
}

In [None]:
# Variáveis de ambiente
API_KEY = 'Aqui deve ser colocada a chave da API'
API_URL = 'https://v3.football.api-sports.io'
PROJECT_ID = 'jopecasports'
DATASET_NAME = 'soccer_analysis'
FULL_LOAD_DATE = '2023-01-01'
LEAGUE = 2
SEASON = 2023

# Cabeçalho da requisição para a API
HEADERS = {
    'x-rapidapi-key': API_KEY
}

In [None]:
# Define os endpoints da API a serem consumidos
endpoints = [
    {
        "table": "past_fixtures", # Nome da tabela no BigQuery
        "write_disposition": "WRITE_APPEND", # Define o método de escrita no BigQuery (append)
        "path": "fixtures", # Caminho do endpoint na API-Sports
        "quality_control": False, # Flag para controle de qualidade (não utilizado neste caso)
        "params": { # Parâmetros da requisição para a API-Sports
            "league": LEAGUE,
            "season": SEASON
        },
        "incremental_load_params": { # Parâmetros para a carga incremental
            "from": "YYYY-MM-DD", # Data de início para a carga incremental
            "to": "YYYY-MM-DD" # Data de fim para a carga incremental
        },
        "fields": [], # Campos a serem selecionados na resposta da API (não utilizado neste caso)
        "nested_fields": [ # Campos aninhados a serem extraídos da resposta da API
            "fixture.id",
            "fixture.referee",
            "fixture.timezone",
            "fixture.date",
            "fixture.timestamp",
            "fixture.periods.first",
            "fixture.periods.second",
            "fixture.venue.id",
            "fixture.venue.name",
            "fixture.venue.city",
            "fixture.status.long",
            "fixture.status.short",
            "fixture.status.elapsed",
            "league.id",
            "league.name",
            "league.country",
            "league.logo",
            "league.flag",
            "league.season",
            "league.round",
            "teams.home.id",
            "teams.home.name",
            "teams.home.logo",
            "teams.home.winner",
            "teams.away.id",
            "teams.away.name",
            "teams.away.logo",
            "teams.away.winner",
            "goals.home",
            "goals.away",
            "score.halftime.home",
            "score.halftime.away",
            "score.fulltime.home",
            "score.fulltime.away",
            "score.extratime.home",
            "score.extratime.away",
            "score.penalty.home",
            "score.penalty.away"
        ],
        "repeatable_fields": [] # Campos que se repetem na resposta da API (não utilizado neste caso)
    },
    # Próximos endpoints seguem a mesma estrutura do anterior
    {
        "table": "future_fixtures",
        "write_disposition": "WRITE_TRUNCATE", # Define o método de escrita no BigQuery (truncate)
        "path": "fixtures",
        "quality_control": False,
        "params": {
            "league": LEAGUE,
            "season": SEASON,
            "to": "2099-12-31"
        },
        "incremental_load_params": {
            "from": "YYYY-MM-DD",
        },
        "fields": [],
        "nested_fields": [
            "fixture.id",
            "fixture.timezone",
            "fixture.date",
            "fixture.timestamp",
            "fixture.venue.id",
            "fixture.venue.name",
            "fixture.venue.city",
            "fixture.status.long",
            "fixture.status.short",
            "league.id",
            "league.name",
            "league.country",
            "league.logo",
            "league.flag",
            "league.season",
            "league.round",
            "teams.home.id",
            "teams.home.name",
            "teams.home.logo",
            "teams.away.id",
            "teams.away.name",
            "teams.away.logo"
        ],
        "repeatable_fields": []
    },
    {
        "table": "players",
        "write_disposition": "WRITE_TRUNCATE", # Define o método de escrita no BigQuery (truncate)
        "path": "players",
        "quality_control": True, # Flag para controle de qualidade (não utilizado neste caso)
        "params": {
            "league": LEAGUE,
            "season": SEASON,
            "page": 1 # Número da página de resultados da API
        },
        "fields": [],
        "nested_fields": [
            "player.id",
            "player.name",
            "player.firstname",
            "player.lastname",
            "player.age",
            "player.birth.date",
            "player.birth.place",
            "player.nationality",
            "player.height",
            "player.weight",
            "player.injured",
            "player.photo"
        ],
        "repeatable_fields": []
    },
]

# Define os endpoints da API a serem consumidos de forma iterativa,
# ou seja, para cada item do endpoint principal, este endpoint será chamado
iterable_endpoints = {
    "past_fixtures": [ # Define os endpoints iteráveis para o endpoint "past_fixtures"
        {
            "table": "fixturesStatistics", # Nome da tabela no BigQuery
            "write_disposition": "WRITE_APPEND", # Define o método de escrita no BigQuery (append)
            "path": "fixtures/statistics", # Caminho do endpoint na API-Sports
            "query_param": { # Define o parâmetro da requisição para a API-Sports que será usado para iterar sobre os dados do endpoint principal
                "fixture": "fixture.id" # O valor do campo "fixture.id" do endpoint principal será usado como valor para o parâmetro "fixture" da requisição
            },
            "fixed_params": {}, # Parâmetros fixos da requisição para a API-Sports
            "fields": ["fixture"], # Campos a serem selecionados na resposta da API
            "nested_fields": [ # Campos aninhados a serem extraídos da resposta da API
                "team.id",
                "team.name",
                "team.logo"
            ],
            "repeatable_fields": [ # Campos que se repetem na resposta da API
                "statistics"
            ]
        },
        {
            "table": "fixturesLineups", # Nome da tabela no BigQuery
            "write_disposition": "WRITE_APPEND", # Define o método de escrita no BigQuery (append)
            "path": "fixtures/lineups", # Caminho do endpoint na API-Sports
            "query_param": { # Define o parâmetro da requisição para a API-Sports que será usado para iterar sobre os dados do endpoint principal
                "fixture": "fixture.id" # O valor do campo "fixture.id" do endpoint principal será usado como valor para o parâmetro "fixture" da requisição
            },
            "fixed_params": {}, # Parâmetros fixos da requisição para a API-Sports
            "fields": [ # Campos a serem selecionados na resposta da API
                "fixture",
                "formation"
            ],
            "nested_fields": [ # Campos aninhados a serem extraídos da resposta da API
                "team.id"
            ],
            "repeatable_fields": [ # Campos que se repetem na resposta da API
                "startXI"
            ]
        }
    ]
}

In [None]:
# Função para buscar os dados da API
def fetch_data(path, params, headers):
    """
    Busca dados da API em várias páginas.

    Args:
        path (str): O caminho do endpoint da API.
        params (dict): Os parâmetros da consulta da API.
        headers (dict): Os cabeçalhos da requisição da API.

    Returns:
        list: Uma lista de dicionários contendo os dados da API.
    """
    all_data = []
    while True:
        response = requests.get(f"{API_URL}/{path}", headers=headers, params=params)
        data = response.json()
        print(response.text)
        if 'response' in data and data['response']:
            all_data.extend(data['response'])
            if 'page' in params:
                params['page'] += 1
            else:
                break
        else:
            break
    return all_data


# Função para buscar dados de endpoints iteráveis
def fetch_iterable_data(main_data, iterable_endpoint):
    """
    Busca dados de um endpoint iterável para cada item em um DataFrame principal.

    Args:
        main_data (pd.DataFrame): O DataFrame principal contendo dados de um endpoint anterior.
        iterable_endpoint (dict): As configurações do endpoint iterável.

    Returns:
        list: Uma lista de dicionários contendo os dados do endpoint iterável para cada item no DataFrame principal.
    """
    all_data = []
    for _, item in main_data.iterrows():
        query_params = {key: item[value.replace('.', '__')] for key, value in iterable_endpoint["query_param"].items()}
        params = query_params.copy()
        params.update(iterable_endpoint['fixed_params'])
        data = fetch_data(iterable_endpoint['path'], params, HEADERS)

        iterated_data = [{**query_params, **item} for item in data]
        all_data.extend(iterated_data)
    return all_data


# Função para preparar o DataFrame
def prepare_dataframe(data, fields, nested_fields, repeatable_fields):
    """
    Prepara um DataFrame a partir de uma lista de dicionários, extraindo campos aninhados e repetíveis.

    Args:
        data (list): A lista de dicionários contendo os dados.
        fields (list): Uma lista de nomes de campos a serem incluídos no DataFrame.
        nested_fields (list): Uma lista de nomes de campos aninhados a serem extraídos.
        repeatable_fields (list): Uma lista de nomes de campos repetíveis a serem extraídos.

    Returns:
        pd.DataFrame: O DataFrame preparado.
    """
    for item in data:
        for field in repeatable_fields:
            if field in item:
                for sub_item in item[field]:
                    for key in sub_item.keys():
                        sub_item[key] = str(sub_item[key])

    meta_fields = fields + repeatable_fields

    df = pd.json_normalize(data, sep='__', meta=meta_fields)

    all_fields = fields + nested_fields + repeatable_fields
    column_names = [col.replace('.', '__') for col in all_fields]

    existing_columns = [col for col in column_names if col in df.columns]

    df = df[existing_columns]
    return df


# Função para criar um conjunto de dados no BigQuery caso ele não exista
def create_dataset_if_not_exists(client, dataset_name):
    """
    Cria um conjunto de dados no BigQuery se ele não existir.

    Args:
        client: O cliente do BigQuery.
        dataset_name (str): O nome do conjunto de dados.
    """
    try:
        client.get_dataset(dataset_name)
    except NotFound:
        print(f"Dataset {dataset_name} not found. Creating...")
        client.create_dataset(dataset_name)


# Função para obter o esquema de uma tabela no BigQuery
def get_table_schema(client, dataset_name, table_name):
    """
    Obtém o esquema de uma tabela no BigQuery.

    Args:
        client: O cliente do BigQuery.
        dataset_name (str): O nome do conjunto de dados.
        table_name (str): O nome da tabela.

    Returns:
        google.cloud.bigquery.schema.SchemaField: O esquema da tabela.
    """
    table_ref = client.dataset(dataset_name).table(table_name)
    table = client.get_table(table_ref)
    return table.schema


# Função para carregar dados no BigQuery
def load_data_to_bigquery (client, dataset_name, table_name, data, write_disposition, partition_column=None, clustering_fields=None):
   
    table_ref = client.dataset(dataset_name).table(table_name)

    try:
        table = client.get_table(table_ref)
        schema = table.schema
    except NotFound:
        schema = None
        print(f"Schema para a tabela {DATASET_NAME}.{table_name} não encontrada.")

    job_config = bigquery.LoadJobConfig(
        write_disposition=write_disposition,
    )
    if schema:
        job_config.schema = schema
    else:
        job_config.autodetect = True

    json_str = data.to_json(orient='records', date_format='iso')
    job = client.load_table_from_json(json.loads(json_str), table_ref, job_config=job_config)
    job.result()
    print(f"Foram carregadas {len(data)} linhas em {dataset_name}.{table_name}")


# Função para obter a última data de atualização da tabela de controle
def get_last_update(client, now):
    """
    Obtém a última data de atualização da tabela de controle.

    Args:
        client: O cliente do BigQuery.
        now (datetime): A data e hora atuais.

    Returns:
        datetime: A última data de atualização.
    """
    table_name = f'{PROJECT_ID}.{DATASET_NAME}.updates'

    try:
        client.get_table(table_name)
    except NotFound:
        print(f"Table {table_name} not found. Initializing with FULL_LOAD_DATE: {FULL_LOAD_DATE}")
        full_load_date = datetime.strptime(FULL_LOAD_DATE, '%Y-%m-%d')
        initial_data = [{'updated_at': full_load_date}]
        load_data_to_bigquery(client, DATASET_NAME, 'updates', pd.DataFrame(initial_data), 'WRITE_TRUNCATE')
        return full_load_date

    query = f'SELECT MAX(updated_at) AS last_update FROM `{table_name}`'

    try:
        query_job = client.query(query)
        results = query_job.result()

        for row in results:
            print(f"Ultima data de atualização: {row.last_update}")
            return row.last_update if row.last_update else now - timedelta(days=1)
    except Exception as e:
        print(f"An error occurred while executing the query: {e}")
        return None


# Função para registrar a data e hora da atualização
def log_update(client, now):
    """
    Registra a data e hora da atualização na tabela de controle.

    Args:
        client: O cliente do BigQuery.
        now (datetime): A data e hora atuais.
    """
    table_name = f'{PROJECT_ID}.{DATASET_NAME}.updates'
    updated_at = [{'updated_at': now}]
    load_data_to_bigquery(client, DATASET_NAME, 'updates', pd.DataFrame(updated_at), 'WRITE_APPEND')


# Função para atualizar os parâmetros de data para a carga incremental
def incremental_params_update(table, incremental_load_params, params, last_update, now, start_date=None, end_date=None):
    """
    Atualiza os parâmetros de data para a carga incremental.

    Args:
        table (str): O nome da tabela.
        incremental_load_params (dict): O dicionário de parâmetros de carga incremental.
        params (dict): O dicionário de parâmetros da API.
        last_update (datetime): A última data de atualização.
        now (datetime): A data e hora atuais.
        start_date (str, optional): A data inicial da carga incremental.
        end_date (str, optional): A data final da carga incremental.

    Returns:
        dict: O dicionário de parâmetros da API atualizado.
    """
    if start_date:
        start_date = datetime.strptime(start_date, '%Y-%m-%d')
    if end_date:
        end_date = datetime.strptime(end_date, '%Y-%m-%d')

    if table == "past_fixtures":
        params.update({
            'from': (start_date if start_date else last_update).strftime('%Y-%m-%d'),
            'to': (end_date if end_date else (now - timedelta(days=1))).strftime('%Y-%m-%d')
        })
    elif table == "future_fixtures":
        params.update({
            'from': (start_date if start_date else now).strftime('%Y-%m-%d')
        })
    print(f"Updated params for {table}: {params}")
    return params


In [None]:
# Função principal que orquestra a extração, transformação e carregamento dos dados
def main(request=None, start_date=None, end_date=None):
    """
    Função principal que orquestra a extração, transformação e carregamento dos dados.

    Args:
        request: A requisição HTTP.
        start_date (str, optional): A data inicial da carga incremental.
        end_date (str, optional): A data final da carga incremental.

    Returns:
        tuple: Uma tupla contendo a mensagem de sucesso e o código de status HTTP.
    """
    client = bigquery.Client(project=PROJECT_ID) # Cria um cliente do BigQuery
    create_dataset_if_not_exists(client, DATASET_NAME) # Cria o conjunto de dados se ele não existir
    now = datetime.now() # Obtém a data e hora atuais
    # now = datetime(2024, 6, 1, 0, 0) # Para testes, defina uma data específica
    last_update = get_last_update(client, now) # Obtém a última data de atualização
    main_endpoints = endpoints.copy() # Faz uma cópia da lista de endpoints principais
    main_iterable_endpoints = iterable_endpoints.copy() # Faz uma cópia do dicionário de endpoints iteráveis

    for endpoint in main_endpoints: # Itera sobre cada endpoint principal
        params = endpoint.get('params', {}) # Obtém os parâmetros do endpoint
        table = endpoint.get('table') # Obtém o nome da tabela
        incremental_load_params = endpoint.get('incremental_load_params') # Obtém os parâmetros de carga incremental
        path = endpoint.get('path') # Obtém o caminho do endpoint
        fields = endpoint.get('fields') # Obtém os campos a serem selecionados
        nested_fields = endpoint.get('nested_fields') # Obtém os campos aninhados
        repeatable_fields = endpoint.get('repeatable_fields') # Obtém os campos repetíveis
        write_disposition = endpoint.get('write_disposition') # Obtém a disposição de gravação
        quality_control = endpoint.get('quality_control', False) # Obtém a flag de controle de qualidade

        if incremental_load_params: # Verifica se há parâmetros de carga incremental
            params = incremental_params_update(table, incremental_load_params, params, last_update, now, start_date, end_date) # Atualiza os parâmetros da API com as datas de início e fim da carga incremental

        raw_data = fetch_data(path, params, HEADERS) # Busca os dados brutos da API

        if raw_data: # Verifica se há dados brutos
            prepared_data = prepare_dataframe(raw_data, fields, nested_fields, repeatable_fields) # Prepara o DataFrame
            load_data_to_bigquery(client, DATASET_NAME, table, prepared_data, write_disposition) # Carrega os dados no BigQuery

            if table in main_iterable_endpoints: # Verifica se há endpoints iteráveis para o endpoint principal atual
                for iterable_endpoint in main_iterable_endpoints[table]: # Itera sobre cada endpoint iterável
                    iterable_table = iterable_endpoint.get('table') # Obtém o nome da tabela do endpoint iterável
                    iterable_fields = iterable_endpoint.get('fields') # Obtém os campos a serem selecionados do endpoint iterável
                    iterable_nested_fields = iterable_endpoint.get('nested_fields') # Obtém os campos aninhados do endpoint iterável
                    iterable_repeatable_fields = iterable_endpoint.get('repeatable_fields') # Obtém os campos repetíveis do endpoint iterável
                    iterable_write_disposition = iterable_endpoint.get('write_disposition') # Obtém a disposição de gravação do endpoint iterável

                    print(f"Buscando os dados iteráveis para o endpoint: {iterable_table}") # Imprime uma mensagem informando que está buscando dados iteráveis
                    detailed_data = fetch_iterable_data(prepared_data, iterable_endpoint) # Busca os dados do endpoint iterável

                    if detailed_data: # Verifica se há dados do endpoint iterável
                        prepared_iterable_data = prepare_dataframe(detailed_data, iterable_fields, iterable_nested_fields, iterable_repeatable_fields) # Prepara o DataFrame do endpoint iterável
                        load_data_to_bigquery(client, DATASET_NAME, iterable_table, prepared_iterable_data, iterable_write_disposition) # Carrega os dados do endpoint iterável no BigQuery
                    else:
                        print(f"Não foram encontrados dados para o endpoint: {iterable_table}") # Imprime uma mensagem se não houver dados do endpoint iterável
        else:
            print(f"Não foram encontrados dados para o endpoint: {table}") # Imprime uma mensagem se não houver dados brutos

    log_update(client, now) # Registra a data e hora da atualização

    return "success", 200 # Retorna uma mensagem de sucesso e o código de status HTTP 200

main()
#main(start_date="2023-01-01", end_date="2024-07-31")
