In [1]:
import matplotlib.pyplot as plt
import os
os.chdir("/home/jovyan/work/")
os.getcwd()

import pandas as pd
pd.set_option('display.float_format', lambda x: '%.2f' % x)

In [2]:
import pandas as pd
import ydata_profiling
import datetime
import os
import logging

def profile_dataset(df: pd.DataFrame, dataset_name: str, target_cols: list = None, config_file: str = 'config.txt', output_dir: str = 'reports') -> pd.DataFrame:
    # Save the report as an HTML file with the dataset name as the file name
    profile.to_file(output_file=output_path)
    
    # Append the dataset name and target columns to the configuration file
    with open(os.path.join(output_dir, config_file), 'a') as f:
        f.write(f'{dataset_name}: {",".join(sorted(target_cols))}\n')
    """
    Generate a Pandas profiling report for a DataFrame, allowing for one or more columns to be selected as target variables.
    Append dataset name and columns used to write the report in a configuration file, and include a timestamp in the report name.
    
    Parameters:
    df (pandas.DataFrame): Input DataFrame
    dataset_name (str): Name of the dataset.
    target_cols (list of str): A list of column names to use as target variables. If None, all columns are included.
    config_file (str): Name of the configuration file to append.
    output_dir (str): Directory to save the report file.
    
    Returns:
    None
    """
    
    # Set up logging
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler = logging.FileHandler(os.path.join(output_dir,'profile_dataset.log'))
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)
    
    # Check that the input parameters are valid
    if not isinstance(df, pd.DataFrame):
        raise ValueError('Input parameter `df` must be a pandas DataFrame.')
    if not isinstance(dataset_name, str):
        raise ValueError('Input parameter `dataset_name` must be a string.')
    if target_cols is not None and not isinstance(target_cols, list):
        raise ValueError('Input parameter `target_cols` must be a list of strings.')
    if not isinstance(config_file, str):
        raise ValueError('Input parameter `config_file` must be a string.')
    if not isinstance(output_dir, str):
        raise ValueError('Input parameter `output_dir` must be a string.')
    
    # If target_cols is None, use all columns
    if target_cols is None:
        target_cols = df.columns
    
    # Select only the target columns
    df_target = df[target_cols]

    # If the config file does not exist, create it
    if not os.path.isfile(os.path.join(output_dir, config_file)):
        with open(os.path.join(output_dir, config_file), 'w') as f:
            f.write('# Dataset name and target columns used for each report\n')
    
    # Check if the same set of target columns has already been used for another dataset, or if the report file already exists
    skip_report_generation = False

    with open(os.path.join(output_dir, config_file), 'r') as f:
        for line in f:
            line = line.strip()
            if not line.startswith('#') and ':' in line:
                other_dataset, other_cols = line.split(':')
                other_cols = set(other_cols.strip().split(','))
                if dataset_name == other_dataset and set(target_cols) == other_cols:
                    logger.warning(f'Same dataset name and target columns already used. Skipping report generation.')
                    skip_report_generation = True
                    break

    if skip_report_generation:
        return df_target

    

    # Generate a Pandas profiling report for the target columns
    profile = df_target.profile_report(title=f'{dataset_name} Analysis Report')
    
    # Save the report as an HTML file with the dataset name as the file name
    report_name = f'{dataset_name}_report.html'
    output_path = os.path.join(output_dir, report_name)
    
    
        
    # Log information about the function's execution
    logger.info(f'Generated profiling report for dataset {dataset_name} with target columns {target_cols}.')
    return df_target


In [3]:
import pandas as pd
import numpy as np
from scipy.stats import pearsonr
from sklearn.feature_selection import mutual_info_regression
from dython.nominal import theils_u

def compute_correlations(df: pd.DataFrame, target_col: str, missing_values: str = 'drop') -> dict:
    """
    Compute Pearson correlation coefficient, Theil's U and mutual information between a target column and all other columns in a DataFrame.
    
    Parameters:
    df (pandas.DataFrame): Input DataFrame
    target_col (str): Name of the target column
    missing_values (str): How to handle missing values. 'drop' to remove missing values or 'fill' to replace missing values with column mean.
    
    Returns:
    dict: Dictionary of column names and correlation measures
    """
    # Drop columns with more than 90% missing values
    df.dropna(thresh=len(df)*0.1, axis=1, inplace=True)
    
    # Drop constant columns
    df = df.loc[:, (df != df.iloc[0]).any()]
    
    # Select only the numeric columns
    df_numeric = df.select_dtypes(include=['float64', 'int64'])
    
    if missing_values == 'drop':
        # Drop rows with missing values
        df_numeric = df_numeric.dropna()
    elif missing_values == 'fill':
        # Fill missing values with column mean
        df_numeric = df_numeric.fillna(df_numeric.mean())
    
    # Compute Pearson correlation coefficient between target column and all other numeric columns
    pearson_corr = {}
    for col in df_numeric.columns:
        if col != target_col:
            corr, _ = pearsonr(df_numeric[target_col], df_numeric[col])
            pearson_corr[col] = corr
    
    # Compute mutual information between target column and all other numeric columns
    mi = mutual_info_regression(df_numeric.drop(columns=[target_col]), df_numeric[target_col])
    mi_dict = dict(zip(df_numeric.drop(columns=[target_col]).columns, mi))
    
    # Compute Theil's U between target column and all other non-numeric columns
    theils_u_dict = {}
    for col in df.columns:
        if col != target_col and col not in df_numeric.columns:
            theils_u_val = theils_u(df[target_col], df[col])
            theils_u_dict[col] = theils_u_val
    
    # Combine the dictionaries of correlation measures
    corr_dict = {}
    for col in df.columns:
        if col in pearson_corr.keys():
            corr_dict[col] = {'pearson_corr': pearson_corr[col], 'mutual_info': mi_dict[col]}
        elif col in theils_u_dict.keys():
            corr_dict[col] = {'theils_u': theils_u_dict[col]}
    
    return corr_dict


In [4]:
import csv
from collections import defaultdict

def csv_to_dict(filepath):
    """
    Reads a csv file with ";" as separators and uses the first row as the index.
    For each row, outputs a dictionary where each entry is one of the possible values in the row,
    and the value is a list of columns where we find that value.

    Args:
    - filepath (str): the path to the csv file

    Returns:
    - data_list (list): a list of dictionaries where each dictionary represents a row in the csv file.
    """

    import csv
    
    data_list = []

    with open(filepath, newline='', encoding='utf-8') as csvfile:
        reader = csv.reader(csvfile, delimiter=';')
        headers = next(reader) # Store headers as list and move to next line

        for row in reader:
            row_dict = {}
            for index, value in enumerate(row):
                if value not in row_dict:
                    row_dict[value] = [headers[index]]
                else:
                    row_dict[value].append(headers[index])
            data_list.append(row_dict)

    return data_list



In [5]:
data_list = csv_to_dict("data/external/relevant_features.csv")
data_list[1]["1"] +  data_list[1]['Target (first)']

['Ref_publication_date',
 'Cell_area_measured',
 'Cell_architecture',
 'Cell_flexible',
 'Module',
 'Module_area_total',
 'Substrate_stack_sequence',
 'ETL_stack_sequence',
 'ETL_thickness',
 'ETL_additives_compounds',
 'ETL_deposition_procedure',
 'ETL_surface_treatment_before_next_deposition_step',
 'Perovskite_dimension_2D',
 'Perovskite_dimension_3D',
 'Perovskite_dimension_3D_with_2D_capping_layer',
 'Perovskite_composition_perovskite_ABC3_structure',
 'Perovskite_composition_a_ions',
 'Perovskite_composition_a_ions_coefficients',
 'Perovskite_composition_b_ions',
 'Perovskite_composition_b_ions_coefficients',
 'Perovskite_composition_c_ions',
 'Perovskite_composition_c_ions_coefficients',
 'Perovskite_composition_none_stoichiometry_components_in_excess',
 'Perovskite_additives_compounds',
 'Perovskite_additives_concentrations',
 'Perovskite_thickness',
 'Perovskite_band_gap',
 'Perovskite_band_gap_graded',
 'Perovskite_pl_max',
 'Perovskite_deposition_number_of_deposition_steps',

In [6]:
dataset = pd.read_csv("data/raw/Perovskite_database_all_data.csv")

  dataset = pd.read_csv("data/raw/Perovskite_database_all_data.csv")


In [7]:
dataset.head()

Unnamed: 0,Ref_ID,Ref_ID_temp,Ref_name_of_person_entering_the_data,Ref_data_entered_by_author,Ref_DOI_number,Ref_lead_author,Ref_publication_date,Ref_journal,Ref_part_of_initial_dataset,Ref_original_filename_data_upload,...,Outdoor_PCE_Tse80,Outdoor_PCE_after_1000_h,Outdoor_power_generated,Outdoor_link_raw_data_for_outdoor_trace,Outdoor_detaild_weather_data_available,Outdoor_link_detailed_weather_data,Outdoor_spectral_data_available,Outdoor_link_spectral_data,Outdoor_irradiance_measured,Outdoor_link_irradiance_data
0,1,1,Adam Hultqvist,False,10.1021/jp5126624,Sabba et al.,2015-01-06,The Journal of Physical Chemistry C,True,Historic dataset on 2020 11 22_v7.xlsx,...,,,,,False,,False,,False,
1,2,2,Adam Hultqvist,False,10.1021/jp5126624,Sabba et al.,2015-01-06,The Journal of Physical Chemistry C,True,Historic dataset on 2020 11 22_v7.xlsx,...,,,,,False,,False,,False,
2,3,3,Adam Hultqvist,False,10.1021/jp5126624,Sabba et al.,2015-01-06,The Journal of Physical Chemistry C,True,Historic dataset on 2020 11 22_v7.xlsx,...,,,,,False,,False,,False,
3,4,4,Adam Hultqvist,False,10.1021/jp5126624,Sabba et al.,2015-01-06,The Journal of Physical Chemistry C,True,Historic dataset on 2020 11 22_v7.xlsx,...,,,,,False,,False,,False,
4,5,5,Adam Hultqvist,False,10.1021/jp5126624,Sabba et al.,2015-01-06,The Journal of Physical Chemistry C,True,Historic dataset on 2020 11 22_v7.xlsx,...,,,,,False,,False,,False,


In [8]:
df=profile_dataset(df=dataset, dataset_name="First_Iteration-Relevant_Columns",config_file="config.txt", target_cols=data_list[1]["1"] +  data_list[1]['Target (first)'])

UnboundLocalError: local variable 'profile' referenced before assignment

In [None]:
df.columns

In [None]:
import pandas as pd
import numpy as np
import seaborn as sns

def inspect_data(df):
    """
    Inspects the input DataFrame for constant features, missing values, and outliers.
    
    Parameters:
    df (pandas.DataFrame): Input DataFrame
    
    Returns:
    out_df (pandas.DataFrame): A DataFrame with the same shape as the input DataFrame, where True indicates that the
    corresponding feature value is an outlier, and False indicates that it is not an outlier.
    """
    # Check for constant features
    constant_cols = [col for col in df.columns if df[col].nunique() == 1]
    if len(constant_cols) > 0:
        print(f'Found {len(constant_cols)} constant features: {", ".join(constant_cols)}')
    else:
        print('No constant features found.')
    
    # Check for missing values
    if df.isnull().sum().sum() > 0:
        print(f'Found {df.isnull().sum().sum()} missing values.')
    else:
        print('No missing values found.')
    
    # Detect outliers using the interquartile range (IQR) method
    # For each column, values outside the range [Q1 - 1.5*IQR, Q3 + 1.5*IQR] are considered outliers
    Q1 = df.quantile(0.25)
    Q3 = df.quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    is_outlier = ((df < lower_bound) | (df > upper_bound))
    num_outliers = is_outlier.sum().sum()
    if num_outliers > 0:
        print(f'Found {num_outliers} outliers.')
    else:
        print('No outliers found.')
    
    # Create boolean mask for outliers
    out_df = is_outlier.copy()
    out_df.replace({True: 1, False: 0}, inplace=True)
    
    return out_df


In [None]:
import pandas as pd
import numpy as np
from sklearn.datasets import make_classification
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, FunctionTransformer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

def _infer_datatypes(df: pd.DataFrame) -> (list, list):
    """
    Helper function to infer numeric and categorical columns from a DataFrame.

    Parameters:
    df (pd.DataFrame): Input DataFrame

    Returns:
    tuple: Lists of numeric and categorical column names.
    """
    numeric_cols = df.select_dtypes(include=np.number).columns.tolist()
    categorical_cols = df.select_dtypes(include=['object']).columns.tolist()
    return numeric_cols, categorical_cols

def group_rare_categories(series: pd.Series, threshold: float = 0.01) -> pd.Series:
    """Group rare categories in a pandas Series into an 'other' category."""
    category_counts = series.value_counts(normalize=True)
    rare_categories = category_counts[category_counts < threshold].index
    return series.apply(lambda x: 'other' if x in rare_categories else x)

def preprocess_data(df: pd.DataFrame, target_feature: str,
                    num_imputer_strategy: str = 'median', 
                    cat_imputer_strategy: str = 'most_frequent', 
                    rare_category_threshold: float = 0.01) -> (pd.DataFrame, pd.Series):
    """
    Preprocess the input dataset by handling missing values, outliers, and transforming categorical variables.

    Parameters:
    df (pd.DataFrame): Input DataFrame to preprocess
    target_feature (str): Name of the target feature
    num_imputer_strategy (str): Imputer strategy for numeric columns (default: 'median')
    cat_imputer_strategy (str): Imputer strategy for categorical columns (default: 'most_frequent')
    rare_category_threshold (float): Threshold to group rare categories (default: 0.01)

    Returns:
    tuple: Preprocessed dataset (excluding the target feature) and the target Series.
    """
    # Separate input features and target
    X = df.drop(target_feature, axis=1)
    y = df[target_feature]

    # Infer numeric and categorical columns
    numeric_cols, categorical_cols = _infer_datatypes(X)

    # Define transformers for numerical and categorical features
    numerical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy=num_imputer_strategy)),
        ('scaler', StandardScaler())
    ])

    categorical_transformer = Pipeline(steps=[
        ('to_string', FunctionTransformer(func=lambda x: x.astype(str), check_inverse=False)),
        ('group_rare', FunctionTransformer(func=lambda x: x.apply(group_rare_categories, threshold=rare_category_threshold), check_inverse=False)),
        ('imputer', SimpleImputer(strategy=cat_imputer_strategy)),
        ('encoder', OneHotEncoder(handle_unknown='ignore'))
    ])

    # Define a column transformer that applies the appropriate transformer to each feature
    preprocessor = ColumnTransformer(transformers=[
        ('num', numerical_transformer, numeric_cols),
        ('cat', categorical_transformer, categorical_cols)
    ])

    # Apply the preprocessing steps to the dataset
    X_cleaned = preprocessor.fit_transform(X)

    # Get the fitted OneHotEncoder instance
    group_rare_encoder = categorical_transformer.named_steps['group_rare'].fit(X[categorical_cols].astype(str))
    one_hot_encoder = categorical_transformer.named_steps['encoder'].fit(group_rare_encoder.transform(X[categorical_cols].astype(str)))

    # Get the transformed categorical columns after the group_rare step
    group_rare_transformed_columns = group_rare_encoder.transform(X[categorical_cols]).columns

    # Get the feature names after the one_hot_encoder step
    encoded_feature_names = one_hot_encoder.get_feature_names_out(group_rare_transformed_columns).tolist()

    # Convert the result back to a DataFrame with appropriate column names
    X_cleaned = pd.DataFrame(X_cleaned.toarray(), columns=numeric_cols + encoded_feature_names)

    return X_cleaned, y

In [None]:
target_feature = "JV_default_PCE"
X_cleaned, y = preprocess_data(df=df, target_feature=target_feature)

In [None]:
X_cleaned.info()

In [None]:
X_cleaned.describe()

In [None]:
y.describe()

In [None]:
from sklearn.model_selection import KFold, cross_val_score
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, make_scorer
from sklearn.dummy import DummyRegressor

def cross_val_regression(X, y, model, scoring, n_splits=10):
    """
    Perform k-fold cross validation for the given model and return the scores.

    Parameters:
    ----------
    X: pd.DataFrame or np.array
        The feature matrix.
    y: pd.Series or np.array
        The target variable.
    model: scikit-learn estimator
        The regression model to be used for cross-validation.
    scoring: str
        The scoring method to use. For example, 'neg_mean_squared_error' for regression tasks.
    n_splits: int, optional (default=10)
        The number of splits for the k-fold cross-validation.

    Returns:
    -------
    scores: np.array
        The cross-validation scores for the given model.
    """
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    scores = cross_val_score(model, X, y, cv=kf, scoring=scoring)
    return scores

def naive_model_regression(y, strategy='mean'):
    """
    Create a naive model for regression.

    Parameters:
    ----------
    y: pd.Series or np.array
        The target variable.
    strategy: str, optional (default='mean')
        The strategy to use for the naive model. Possible values are 'mean', 'median', or 'constant'.

    Returns:
    -------
    naive_model: DummyRegressor
        A scikit-learn DummyRegressor instance with the specified strategy.
    """
    naive_model = DummyRegressor(strategy=strategy)
    return naive_model

def compare_models(X, y, baseline_model, naive_model, scoring, greater_is_better=True):
    """
    Compare the performance of baseline and naive models.

    Parameters:
    ----------
    X: pd.DataFrame or np.array
        The feature matrix.
    y: pd.Series or np.array
        The target variable.
    baseline_model: scikit-learn estimator
        The baseline regression model.
    naive_model: scikit-learn estimator
        The naive regression model.
    scoring: str
        The scoring method to use. For example, 'neg_mean_squared_error' for regression tasks.
    greater_is_better: bool, optional
        Whether a higher score is better or not. Default is True.

    Returns:
    -------
    None. Prints the mean and standard deviation of the cross-validation scores for both models,
    and indicates which model performs better.
    """
    baseline_scores = cross_val_regression(X, y, baseline_model, scoring)
    naive_scores = cross_val_regression(X, y, naive_model, scoring)
    
    if greater_is_better:
        print(f"Baseline {scoring} score: {baseline_scores.mean():.4f} ± {baseline_scores.std():.4f}")
        print(f"Naive {scoring} score: {naive_scores.mean():.4f} ± {naive_scores.std():.4f}")
        
        if baseline_scores.mean() < naive_scores.mean():
            print("Naive model performs better than the baseline model.")
        else:
            print("Baseline model performs better than the naive model.")
    else:
        baseline_scores = -baseline_scores
        naive_scores = -naive_scores
        
        print(f"Baseline model performance: {baseline_scores.mean():.4f} ± {baseline_scores.std():.4f}")
        print(f"Naive model performance: {naive_scores.mean():.4f} ± {naive_scores.std():.4f}")
        
        if baseline_scores.mean() > naive_scores.mean():
            print("Naive model performs better than the baseline model.")
        else:
            print("Baseline model performs better than the naive model.")
    
    return baseline_scores, naive_scores


        
def handle_missing_target(y, strategy='mean', constant_value=None):
    """
    Handle missing values in the target variable (y) using the specified strategy.

    Parameters:
    ----------
    y: pd.Series or np.array
        The target variable containing missing values.
    strategy: str, optional (default='mean')
        The strategy to handle missing values. Possible values are 'drop', 'mean', 'median', or 'constant'.
    constant_value: any, optional (default=None)
        The constant value to use when the strategy is set to 'constant'. If the strategy is 'constant' and
        constant_value is not provided, a ValueError will be raised.

    Returns:
    -------
    tuple: (imputed_y, mask)
        imputed_y: pd.Series or np.array
            The target variable with missing values handled according to the chosen strategy.
        mask: np.array
            A boolean array that can be used to filter rows in both the target variable and the feature matrix.
            This is particularly useful when the chosen strategy is 'drop', as it allows for the removal of
            corresponding rows in both the target variable and feature matrix.
    """
    mask = np.ones(y.shape, dtype=bool)
    
    if strategy == 'drop':
        mask = ~np.isnan(y)
        return y[mask], mask
    elif strategy == 'mean':
        imputed_y = y.copy()
        imputed_y[np.isnan(y)] = y.mean()
        return imputed_y, mask
    elif strategy == 'median':
        imputed_y = y.copy()
        imputed_y[np.isnan(y)] = y.median()
        return imputed_y, mask
    elif strategy == 'constant' and constant_value is not None:
        imputed_y = y.copy()
        imputed_y[np.isnan(y)] = constant_value
        return imputed_y, mask
    else:
        raise ValueError("Invalid strategy or constant_value not provided.")


# Choose how to handle missing values in y
y_cleaned, mask = handle_missing_target(y, strategy='drop')

# Update X_cleaned to remove corresponding rows
X_cleaned = X_cleaned[mask]

# Linear regression as a baseline model
baseline_model = LinearRegression()

# Naive model for regression
naive_model = naive_model_regression(y_cleaned)

# Scoring metric
greater_is_better=False
mse_scorer = make_scorer(mean_squared_error, greater_is_better=greater_is_better)

# Compare the models using the cleaned target variable and updated X_cleaned
baseline, naive = compare_models(X_cleaned, y_cleaned, baseline_model, naive_model, mse_scorer, greater_is_better=greater_is_better)
