Libraries


In [1]:
import os
import logging
from pathlib import Path
from typing import Any, List
from functools import wraps
import colorlog
import time
from kaggle.api.kaggle_api_extended import KaggleApi
import backoff
from joblib import Parallel, delayed
import requests


Dataset Processing


In [2]:
class Config:
    LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO').upper()
    DATA_DIRECTORY = Path(os.getenv('DATA_DIRECTORY', '../data'))
    RETRY_LIMIT = int(os.getenv('RETRY_LIMIT', 2))
    N_JOBS = int(os.getenv('N_JOBS', -1))  # Utilize all available CPUs if set to -1
    LOG_FILE = os.getenv('LOG_FILE', 'app.log')
    LOG_FORMAT_CONSOLE = '%(log_color)s%(message)s%(reset)s'
    LOG_FORMAT_FILE = '%(asctime)s - %(levelname)s - %(message)s'
    DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
    LOG_COLORS = {
        'DEBUG': 'cyan',
        'INFO': 'white',
        'WARNING': 'green',
        'ERROR': 'purple',
        'CRITICAL': 'red',
    }
    KAGGLE_DATASETS = [
        "shuyangli94/food-com-recipes-and-user-interactions",
        # Add more datasets here as needed
    ]
    RETRY_ATTEMPTS = 2
    LOG_FORMAT = '%(message)s'
    LOG_DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
    RETRY_EXCEPTIONS = (requests.exceptions.RequestException, OSError)

    @classmethod
    def dataset_destination_path(cls, dataset_name: str) -> Path:
        return cls.DATA_DIRECTORY / dataset_name.split('/')[1]


def setup_logging():
    logger = colorlog.getLogger()
    logger.setLevel(getattr(logging, Config.LOG_LEVEL))
    console_handler = colorlog.StreamHandler()
    console_handler.setFormatter(colorlog.ColoredFormatter(
        Config.LOG_FORMAT_CONSOLE,
        datefmt=Config.DATE_FORMAT,
        log_colors=Config.LOG_COLORS))
    file_handler = logging.FileHandler(Config.LOG_FILE)
    file_handler.setFormatter(logging.Formatter(
        Config.LOG_FORMAT_FILE,
        datefmt=Config.DATE_FORMAT))
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)


def execution_time(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time() - start_time
        logging.info(f"{func.__name__} executed in {end_time:.2f} seconds.")
        return result
    return wrapper


def retry_on_failure(max_tries=Config.RETRY_ATTEMPTS, exceptions=Config.RETRY_EXCEPTIONS):
    def decorator(func):
        @wraps(func)
        @backoff.on_exception(backoff.expo, exceptions, max_tries=max_tries,
                              on_backoff=lambda details: logging.warning(
                                  f"Retry {details['tries']}/{max_tries} for {func.__name__} due to error, waiting {details.get('wait', 0):0.1f} seconds."),
                              on_giveup=lambda details: logging.error(
                                  f"Giving up {func.__name__} after {details['tries']} tries"))
        def wrapper(*args, **kwargs):
            return func(*args, **kwargs)
        return wrapper
    return decorator


def kaggle_api_authenticate() -> KaggleApi:
    api = KaggleApi()
    api.authenticate()
    return api


@execution_time
def download_and_extract_kaggle_dataset(dataset_name: str) -> None:
    api = kaggle_api_authenticate()  # Move Kaggle API authentication inside the function
    destination_path = Config.dataset_destination_path(dataset_name)
    if not destination_path.exists() or not any(destination_path.iterdir()):
        api.dataset_download_files(dataset_name, path=destination_path, unzip=True)
        logging.info(f"Dataset '{dataset_name}' extracted to {destination_path}.")
    else:
        logging.info(f"Dataset '{dataset_name}' already present, skipping download.")


@execution_time
def download_and_extract_kaggle_datasets(datasets: List[str]) -> None:
    Parallel(n_jobs=Config.N_JOBS)(delayed(download_and_extract_kaggle_dataset)(dataset_name) for dataset_name in datasets)


@execution_time
def find_csv_files(datasets: List[str]) -> List[str]:
    all_csv_files = sum(Parallel(n_jobs=Config.N_JOBS)(delayed(find_csv_file)(dataset_name) for dataset_name in datasets), [])
    logging.info(f"Found {len(all_csv_files)} total CSV files in the datasets.")
    return all_csv_files


def find_csv_file(dataset_name: str) -> List[str]:
    csv_files = [str(file) for file in Config.dataset_destination_path(dataset_name).rglob('*.csv')]
    return csv_files


@execution_time
def main():
    setup_logging()
    logging.warning("Data setup initiated...")
    download_and_extract_kaggle_datasets(Config.KAGGLE_DATASETS)
    csv_files = find_csv_files(Config.KAGGLE_DATASETS)
    
    if csv_files:
        # Extract the folder name from the first file path
        dataset_folder = Path(csv_files[0]).parent.relative_to(Config.DATA_DIRECTORY).as_posix()
        # Extract just the file names
        csv_file_names = [Path(file).name for file in csv_files]
        
        # Inserting a newline between the dataset folder path and the CSV files list
        logging.info(f"Dataset extracted to ../data/{dataset_folder}\nCSV files: {', '.join(csv_file_names)}")
    else:
        logging.info("No CSV files found.")
    
    logging.warning("Dataset setup completed.")


if __name__ == "__main__":
    main()


[32mData setup initiated...[0m
[37mdownload_and_extract_kaggle_datasets executed in 0.56 seconds.[0m
[37mFound 7 total CSV files in the datasets.[0m
[37mfind_csv_files executed in 0.08 seconds.[0m
[37mDataset extracted to ../data/food-com-recipes-and-user-interactions
CSV files: interactions_test.csv, interactions_train.csv, interactions_validation.csv, PP_recipes.csv, PP_users.csv, RAW_interactions.csv, RAW_recipes.csv[0m
[32mDataset setup completed.[0m
[37mmain executed in 0.64 seconds.[0m
