In [1]:
%load_ext autoreload
%autoreload 2
import os
import logging
from datetime import datetime
import dask.dataframe as dd

#log.py
def configure_logging(log_dir: str = 'log', log_level: str = 'INFO', log_format: str = None) -> logging.Logger:
    """
    Configures logging settings for the application and returns a logger instance.

    Parameters:
    log_dir (str): The directory where the log file will be saved.
    log_level (str): The logging level (INFO, DEBUG, WARNING, ERROR).
    log_format (str): The logging format string.

    Returns:
    logging.Logger: A logger instance for logging messages.
    """
    if log_format is None:
        log_format = '%(asctime)s - %(levelname)s - %(message)s'+'\n'
    
    log_level = log_level.upper()
    log_levels = {'DEBUG': logging.DEBUG, 'INFO': logging.INFO, 'WARNING': logging.WARNING, 'ERROR': logging.ERROR}
    level = log_levels.get(log_level, logging.INFO)

    if not os.path.exists(log_dir):
        os.makedirs(log_dir)

    log_file_name = f'log_src_{datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}.log'
    log_file_path = os.path.join(log_dir, log_file_name)

    logging.basicConfig(filename=log_file_path, level=level, format=log_format, filemode='w')
    
    logger = logging.getLogger()
    return logger

def log_message(logger: logging.Logger, level: str, message: str) -> None:
    """
    Logs a message at the specified level using the given logger.

    Parameters:
    logger (logging.Logger): The logger instance.
    level (str): The level of the log ('INFO', 'ERROR', 'WARNING', etc.).
    message (str): The log message.

    Returns:
    None
    """
    if level.upper() == 'INFO':
        logger.info(message)
    elif level.upper() == 'WARNING':
        logger.warning(message)
    elif level.upper() == 'ERROR':
        logger.error(message)
    elif level.upper() == 'DEBUG':
        logger.debug(message)
    else:
        logger.info(message)

# prints.py
from datetime import datetime
import logging

def custom_print(logger: logging.Logger, message: str, skip_lines: bool = True, space_lines:bool=False) -> None:
    """
    Custom print function to print to both the console and the log file.

    Parameters:
    logger (logging.Logger): The logger instance.
    message (str): The message to print.
    skip_lines (bool): If True, adds an extra blank line for readability.

    Returns:
    None
    """
    if space_lines:
        space='_'*150+"\n"
    else:
        space=''
    if skip_lines:
        message = "\n" +space+ message + "\n"

    print(message)
    logger.info(message.strip())
    
def print_error(logger: logging.Logger, message: str) -> None:
    """
    Custom print function to handle error messages.

    Parameters:
    logger (logging.Logger): The logger instance.
    message (str): The error message to print.

    Returns:
    None
    """
    error_message = f"ERROR: {message}"
    print(error_message)
    logger.error(error_message)

def print_warning(logger: logging.Logger, message: str) -> None:
    """
    Custom print function to handle warning messages.

    Parameters:
    logger (logging.Logger): The logger instance.
    message (str): The warning message to print.

    Returns:
    None
    """
    warning_message = f"WARNING: {message}"
    print(warning_message)
    logger.warning(warning_message)

def print_success(logger: logging.Logger, message: str) -> None:
    """
    Custom print function to handle success messages.

    Parameters:
    logger (logging.Logger): The logger instance.
    message (str): The success message to print.

    Returns:
    None
    """
    success_message = f"SUCCESS: {message}"
    print(success_message)
    logger.info(success_message)


def print_info(logger: logging.Logger, message: str, additional_info: str = None) -> None:
    """
    Custom print function to handle informational messages.

    Parameters:
    logger (logging.Logger): The logger instance.
    message (str): The informational message to print.
    additional_info (str): Additional info to print, if provided.

    Returns:
    None
    """
    info_message = f"INFO: {message}"
    if additional_info:
        info_message += f" | Additional Info: {additional_info}"

    print(info_message)
    logger.info(info_message)


def register_time_spent(logger: logging.Logger, start_time: datetime, process_name: str) -> None:
    """
    Logs the time spent on a process.

    Parameters:
    logger (logging.Logger): The logger instance.
    start_time (datetime): The start time of the process.
    process_name (str): A string indicating the name of the process.

    Returns:
    None
    """
    end_time = datetime.now()
    time_spent = (end_time - start_time).total_seconds()  
    message = f"Time spent on {process_name}: {time_spent:.2f} seconds"  
    logger.info(message)
    
#read_data.py
def read_parquet_file(file_path: str, logger: logging.Logger) -> dd.DataFrame:
    """
    Reads a Parquet file into a Dask DataFrame.

    Parameters:
    file_path (str): The full path to the Parquet file.
    logger (logging.Logger): The logger instance for logging operations.

    Returns:
    dd.DataFrame: The loaded Dask DataFrame or None if the read fails.
    """
    try:
        custom_print(logger, f"Reading file: {file_path}", space_lines=True)
        start_time = datetime.now()

        dask_df = dd.read_parquet(file_path, split_row_groups=True)

        register_time_spent(logger, start_time, f"Reading {file_path}")

        return dask_df
    except Exception as e:
        print_error(logger, f"Failed to read file: {file_path}. Error: {str(e)}")
        return None


def analyze_dask_dataframe(dask_df: dd.DataFrame, name: str, logger: logging.Logger, n: int = 10) -> None:
    """
    Analyzes a Dask DataFrame and logs various metrics like info and summary statistics.

    Parameters:
    dask_df (dd.DataFrame): The Dask DataFrame to analyze.
    name (str): The name of the DataFrame for logging.
    logger (logging.Logger): The logger instance.
    n (int): The number of rows to print during the analysis.

    Returns:
    None
    """
    start_time = datetime.now()
    custom_print(logger, f"--- Analyzing {name} DataFrame ---")

    try:
        custom_print(logger, f"Displaying data sample {n} rows:")
        df_sample = dask_df.head(n)
        df_markdown = df_sample.to_markdown(index=False)
        custom_print(logger, f"\n{df_markdown}\n", skip_lines=True)
        
        custom_print(logger, f"\nDataFrame info for {name}:")
        custom_print(logger, str(dask_df.dtypes))
        
        custom_print(logger, "\nSummary statistics:")
        custom_print(logger, str(dask_df.describe().compute()))
        
        custom_print(logger, "\nMissing values per column:")
        custom_print(logger, str(dask_df.isnull().sum().compute()))
        
        register_time_spent(logger, start_time, f"Analyzing {name} DataFrame")
        
    except Exception as e:
        print_error(logger, f"Error analyzing {name}: {str(e)}")


def read_and_analyze_files(data_dir: str, file_names: list, logger: logging.Logger) -> dict:
    """
    Reads and analyzes multiple Parquet files located in the specified directory.

    Parameters:
    data_dir (str): The directory where the Parquet files are located.
    file_names (list): A list of Parquet file names to read and analyze.
    logger (logging.Logger): The logger instance.

    Returns:
    dict: A dictionary containing the file names as keys and their corresponding Dask DataFrames as values.
    """
    dict_df = {}
    
    for file_name in file_names:
        file_path = os.path.join(data_dir, file_name)
        dask_df = read_parquet_file(file_path, logger)
        
        if dask_df is not None:
            row_count = dask_df.shape[0].compute()
            if row_count == 0:
                error_message = f"File {file_name} is empty."
                print_error(logger, error_message)
                raise ValueError(error_message)
            else:
                analyze_dask_dataframe(dask_df, file_name, logger)
                dict_df.update({file_name: dask_df})
        else:
            error_message = f"Failed to load {file_name}. No DataFrame returned."
            print_error(logger, error_message)
    
    return dict_df




logger=None
logger = configure_logging(log_dir='log')

data_directory = os.path.join(os.getcwd(), 'data', 'raw')

file_names = ['members.parquet', 'transactions.parquet', 'user_logs.parquet']
custom_print(logger, '1.Read and Analyze Data', space_lines=True)
dict_df = read_and_analyze_files(data_directory, file_names, logger)


______________________________________________________________________________________________________________________________________________________
1.Read and Analyze Data


______________________________________________________________________________________________________________________________________________________
Reading file: c:\Users\Lucas\OneDrive\case data master\data\raw\members.parquet


--- Analyzing members.parquet DataFrame ---


Displaying data sample 10 rows:



| msno                                         |   safra |   registration_init_time |   city |   bd | gender   |   registered_via |   is_ativo |
|:---------------------------------------------|--------:|-------------------------:|-------:|-----:|:---------|-----------------:|-----------:|
| +++snpr7pmobhLKUgSHTv/mpkqgBT0tQJ0zQj6qKrqc= |  201612 |                 20140927 |      1 |    0 | <NA>     |                7 |          1 |
| ++/AwGzubug3gT6J+0STBGMdWKxaM+UFZTI8Tcmq4To= |  201607 |               