Giovanni Grotto - giovanni.grotto@studio.unibo.it <br>
Francesco Farneti - francesco.farneti7@studio.unibo.it <br>
Tancredi Bosi - tancredi.bosi@studio.unibo.it

## 1. Imports

In [2]:
!pip install colorama

Collecting colorama
  Downloading colorama-0.4.6-py2.py3-none-any.whl.metadata (17 kB)
Downloading colorama-0.4.6-py2.py3-none-any.whl (25 kB)
Installing collected packages: colorama
Successfully installed colorama-0.4.6


In [3]:
!pip install sdv

Collecting sdv
  Downloading sdv-1.19.0-py3-none-any.whl.metadata (14 kB)
Collecting boto3<2.0.0,>=1.28 (from sdv)
  Downloading boto3-1.37.28-py3-none-any.whl.metadata (6.7 kB)
Collecting botocore<2.0.0,>=1.31 (from sdv)
  Downloading botocore-1.37.28-py3-none-any.whl.metadata (5.7 kB)
Collecting copulas>=0.12.1 (from sdv)
  Downloading copulas-0.12.2-py3-none-any.whl.metadata (9.4 kB)
Collecting ctgan>=0.11.0 (from sdv)
  Downloading ctgan-0.11.0-py3-none-any.whl.metadata (10 kB)
Collecting deepecho>=0.7.0 (from sdv)
  Downloading deepecho-0.7.0-py3-none-any.whl.metadata (10 kB)
Collecting rdt>=1.14.0 (from sdv)
  Downloading rdt-1.15.1-py3-none-any.whl.metadata (10 kB)
Collecting sdmetrics>=0.19.0 (from sdv)
  Downloading sdmetrics-0.19.0-py3-none-any.whl.metadata (9.4 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from boto3<2.0.0,>=1.28->sdv)
  Downloading jmespath-1.0.1-py3-none-any.whl.metadata (7.6 kB)
Collecting s3transfer<0.12.0,>=0.11.0 (from boto3<2.0.0,>=1.28->sdv)
  Downloading s

In [4]:
import warnings
import pandas as pd
import numpy as np
import re
import time

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import AgglomerativeClustering
from collections import Counter

from colorama import Fore, Style

from sdv.metadata import Metadata
from sdv.single_table import GaussianCopulaSynthesizer
from sdv.sampling import Condition

# Suppress user warnings
warnings.filterwarnings("ignore", category=UserWarning)

## 2. Data Loading

In [5]:
file_path = 'Dataset_2.0_Akkodis.xlsx'
original_data = pd.read_excel(file_path)

## 3. Data Cleaning

##### FUNCTIONS

In [6]:
def organize_data(data):
    """
    Cleans and organizes the dataset.
    - Strips whitespace from column names and string values.
    - Removes duplicate rows.
    - Cleans the 'Overall' column and splits 'Residence' column into separate columns.
    - Converts 'Year of insertion' and 'Year of Recruitment' columns to integers.
    - Filters out rows with invalid 'Last Role' values.
    - Groups rows with the same ID by keeping the one with the most non-NaN values.
    - Drops unnecessary columns.
    """
    data.columns = data.columns.str.strip()  # Remove leading/trailing spaces from column names
    data = data.map(lambda x: x.strip() if isinstance(x, str) else x)  # Strip string values in all cells

    # Drop duplicate rows
    data = data.drop_duplicates()

    # Drop the tilde in the 'Overall' column
    data['Overall'] = data['Overall'].str.lstrip('~ ')

    # Extract 'City', 'Province' and 'Region' from the column 'Residence'
    data[['City', 'Province', 'Region']] = data['Residence'].str.split(' » | ~ ', expand=True)

    # Convert the columns 'Year of insertion' and 'Year of Recruitment' to integers
    data['Year of insertion'] = pd.to_numeric(data['Year of insertion'].str.strip('[]'), errors='coerce').astype(
        'Int64')
    data['Year of Recruitment'] = pd.to_numeric(data['Year of Recruitment'].str.strip('[]'), errors='coerce').astype(
        'Int64')

    # Remove invalid values in 'Last Role' column
    undesired_values = ['????', '-', '.', '/']
    data.loc[data['Last Role'].isin(undesired_values), 'Last Role'] = np.nan

    # Group rows by ID, keeping the one with the most non-NaN values
    def group_ids(df):
        """
        Groups the dataset by ID, keeping the row with the highest number of non-NaN values for each ID.
        """
        # Count non-NaN values in each row
        df['non_nan_count'] = df.notna().sum(axis=1)
        # Keep the row with the highest non-NaN count per ID
        df = df.loc[df.groupby('ID')['non_nan_count'].idxmax()]
        # Drop the helper column 'non_nan_count'
        df = df.drop(columns=['non_nan_count'])
        return df

    data = group_ids(data)

    # Drop columns that are not useful for the analysis
    data = data.drop(columns=['linked_search__key', 'Years Experience.1', 'Study Area.1', 'Residence', 'Recruitment Request'])

    return data

In [7]:
def filter_minor_workers(data):
    """
    Filters out workers who have inconsistencies in their age range and years of experience.
    Specifically, removes rows with invalid combinations of 'Age Range' and 'Years Experience'.
    """
    # Clean data from inconsistencies based on age and experience
    invalid_mask = (
            ((data['Age Range'] == '< 20 years') &
             (data['Years Experience'].isin(['[+10]', '[7-10]', '[5-7]', '[3-5]']))) |
            ((data['Age Range'] == '20 - 25 years') &
             (data['Years Experience'] == '[+10]'))
    )
    # Remove invalid rows
    return data[~invalid_mask].copy()

In [8]:
def preprocess_text(text):
    """
    Preprocesses text by removing special characters and converting it to lowercase.
    """
    if not isinstance(text, str):
        return ""
    # Remove non-alphabetic characters
    text = re.sub(r'[^a-zA-Z ]', '', text)
    # Convert to lowercase
    return text.lower()

def cluster_and_map_roles(unique_values):
    """
    Clusters a list of unique values (e.g., job roles or tags) using hierarchical clustering.
    Returns the cluster assignments and cluster names based on common words.
    """
    # Preprocess the unique values (convert to lowercase and remove special characters)
    processed_values = [preprocess_text(value) for value in unique_values]

    # Create the TF-IDF matrix
    vectorizer = TfidfVectorizer()
    X = vectorizer.fit_transform(processed_values)

    # Perform hierarchical clustering
    clustering = AgglomerativeClustering(n_clusters=None, distance_threshold=1.5, linkage='ward')
    clusters = clustering.fit_predict(X.toarray())

    # Create a dictionary that maps cluster IDs to the corresponding values
    value_clusters = {}
    for value, cluster_id in zip(unique_values, clusters):
        if cluster_id not in value_clusters:
            value_clusters[cluster_id] = []
        value_clusters[cluster_id].append(value)

    # Assign a name to each cluster based on the most common words in the values
    cluster_names = {}
    for cluster_id, values in value_clusters.items():
        words = []
        for value in values:
            words.extend(preprocess_text(value).split())
        common_words = [word for word, count in Counter(words).most_common(2)]
        cluster_names[cluster_id] = "-".join(common_words) if common_words else "Unknown"

    return clusters, cluster_names


def map_to_cluster_name(value, unique_values, clusters, cluster_names):
    """
    Maps a value to its corresponding cluster name.
    """
    if value in unique_values:
        # Find the cluster ID for the value
        cluster_id = clusters[unique_values.index(value)]
        # Return the corresponding cluster name, or the value itself if not found
        return cluster_names.get(cluster_id, value)
    return value


def cluster_tag(data):
    """
    Applies clustering to 'Last Role' and 'TAG' columns in the dataset, assigning each value to a cluster name.
    """
    # Extract unique values for 'Last Role' and 'TAG' columns
    unique_last_roles = data['Last Role'].dropna().unique().tolist()
    unique_tag = data['TAG'].dropna().unique().tolist()

    # Perform clustering and assign names to clusters for 'Last Role' and 'TAG'
    clusters_last_roles, cluster_names_last_roles = cluster_and_map_roles(unique_last_roles)
    clusters_tags, cluster_names_tags = cluster_and_map_roles(unique_tag)

    # Map the original 'Last Role' and 'TAG' values to their cluster names
    data['Last Role'] = data['Last Role'].apply(
        lambda role: map_to_cluster_name(role, unique_last_roles, clusters_last_roles, cluster_names_last_roles))
    data['TAG'] = data['TAG'].apply(lambda tag: map_to_cluster_name(tag, unique_tag, clusters_tags, cluster_names_tags))

    return data

##### CODE

In [9]:
data = organize_data(original_data)
data = filter_minor_workers(data)
data = cluster_tag(data)

## 4. Check for inconsistencies in the data

##### FUNCTIONS

In [10]:
def extract_number(value, choose_second=False):
    """
    Extracts the first or second number from a string.

    Args:
        value (str): The string containing numbers.
        choose_second (bool): Flag to decide whether to return the second number (if any).

    Returns:
        int or None: The extracted number or None if no numbers are found.
    """
    # Find all numbers in the string
    matches = re.findall(r'\d+', value)

    # If the flag is set to choose the second number
    if choose_second:
        if len(matches) > 1:
            return int(matches[1])  # Return the second number
        elif len(matches) > 0:
            return int(matches[0])  # Return the first if no second number
    # If the flag is False, always return the first number if it exists
    elif len(matches) > 0:
        return int(matches[0])
    return None  # Return None if no numbers are found

def check_constraint(row):
    """
    Checks if a row violates any constraints.

    Args:
        row (pd.Series): A row of data to check for constraint violations.

    Returns:
        bool: True if any constraint violation is found, False otherwise.
    """
    flag = False
    flag += minor_worker_check(row)  # Check for minor worker constraint
    flag += hired_check(row)  # Check for hired status constraint
    return bool(flag)  # Return True if any constraint is violated


def minor_worker_check(row):
    """
    Checks if the worker was a minor when starting the job.

    Args:
        row (pd.Series): A row of data containing age and work experience.

    Returns:
        bool: True if the worker was a minor at the time of hire, False otherwise.
    """
    age = extract_number(row['Age Range'], choose_second=True)
    work_exp = extract_number(row['Years Experience'])
    if work_exp == 'nan':
        return False
    # The worker is a minor if their age at the start of work is less than 18
    return (age - work_exp) < 18


def hired_check(row):
    """
    Checks if the 'Years of Recruitment' is less than or equal to 'Years of Insertion'
    when the candidate's state is 'Hired'. If not hired, returns True for constraint violation.

    Args:
        row (pd.Series): A row of data with candidate state and year information.

    Returns:
        bool: True if the constraint is violated, False otherwise.
    """
    candidate_state = row['Candidate State']
    years_insert = row['Year of insertion']
    years_recruit = row['Year of Recruitment']

    if candidate_state == 'Hired':
        if pd.notna(years_insert) and pd.notna(years_recruit):
            if years_insert > years_recruit:
                return True  # Violation: Years of Insertion is greater than Recruitment
    else:
        if pd.notna(years_recruit):
            return True  # Violation: 'Years of Recruitment' should be NaN if not hired
    return False

##### CODE

In [11]:
inconsistencies_flag = data.apply(check_constraint, axis=1)
print(f"{Fore.GREEN}Found {inconsistencies_flag.sum()} inconsistencies in the original data{Style.RESET_ALL}")
time.sleep(1)

# Display rows with violations
violating_rows = data[inconsistencies_flag]

[32mFound 0 inconsistencies in the original data[0m


## 5. Synthesizer

##### FUNCTIONS

In [12]:
# Function to detect and set metadata for columns
def get_metadata(data):
    """
    Detects and sets metadata for the input data.
    Specifically, it updates certain columns as categorical.
    """
    # Detect metadata from the DataFrame
    metadata = Metadata.detect_from_dataframe(data=data)

    # List of columns to be marked as categorical
    categorical_columns = ['Candidate State', 'Last Role', 'City', 'Province', 'Region']

    # Update specified columns to be categorical
    for column in categorical_columns:
        metadata.update_column(column_name=column, sdtype='categorical')

    # Validate the metadata
    metadata.validate()
    return metadata


##### CODE

In [13]:
metadata = get_metadata(data)
synthesizer = GaussianCopulaSynthesizer(metadata, locales='it_IT')

In [14]:
synthesizer.auto_assign_transformers(data)
synthesizer.fit(data)

## 6. Synthetic Data without Constraints

In [23]:
synthetic_data = synthesizer.sample(num_rows=1000)

Sampling rows: 100%|██████████| 1000/1000 [00:04<00:00, 247.45it/s]


Inconstistencies check

In [16]:
inconsistencies_flag = synthetic_data.apply(check_constraint, axis=1)
print(f"{Fore.GREEN}Found {inconsistencies_flag.sum()} inconsistencies in the synthetic data{Style.RESET_ALL}")
time.sleep(1)

# Display rows with violations
violating_rows = synthetic_data[inconsistencies_flag]

[32mFound 68 inconsistencies in the synthetic data[0m


## 7. Synthetic Data with Constraints

##### FUNCTIONS

In [17]:
# Function to load and set constraints for the synthesizer
def set_constraints(synthesizer):
    """
    Loads custom constraints and adds them to the synthesizer.
    """
    # Load the custom constraint classes
    synthesizer.load_custom_constraint_classes(
        filepath='custom_constraint_years.py',
        class_names=['CustomYearsHired']
    )

    # Define constraints
    constraints = [
        # Custom constraint for years hired
        {
            'constraint_class': 'CustomYearsHired',
            'constraint_parameters': {'column_names': ['Candidate State', 'Year of insertion', 'Year of Recruitment']}
        },
        # Fixed combination constraints
        {
            'constraint_class': 'FixedCombinations',
            'constraint_parameters': {'column_names': ['Candidate State', 'Year of Recruitment']}
        },
        {
            'constraint_class': 'FixedCombinations',
            'constraint_parameters': {'column_names': ['Age Range', 'Years Experience']}
        },
        {
            'constraint_class': 'FixedCombinations',
            'constraint_parameters': {'column_names': ['City', 'Province', 'Region']}
        },
        {
            'constraint_class': 'FixedCombinations',
            'constraint_parameters': {'column_names': ['event_type__val', 'event_feedback']}
        }
    ]

    # Add constraints to the synthesizer
    synthesizer.add_constraints(constraints=constraints)
    return synthesizer

##### CODE

In [18]:
# Apply constraints to the synthesizer
synthesizer = set_constraints(synthesizer)

# Generate synthetic data with constraints
synthesizer.fit(data)
synthetic_data_with_constraints = synthesizer.sample(num_rows=1000)

# Check for inconsistencies in the synthetic data with constraints
inconsistencies_flag = synthetic_data_with_constraints.apply(check_constraint, axis=1)
print(f"{Fore.GREEN}Found {inconsistencies_flag.sum()} inconsistencies in synthetic data with constraints{Style.RESET_ALL}")
time.sleep(1)

# Display rows with violations
violating_rows = synthetic_data_with_constraints[inconsistencies_flag]

Sampling rows: 100%|██████████| 1000/1000 [00:00<00:00, 1192.25it/s]


[32mFound 0 inconsistencies in synthetic data with constraints[0m


## 8. Polarized Data Generation

##### FUNCTIONS

In [19]:
def exclude_matching_rows(df, exclusion_conditions):
    """
    Removes rows from the DataFrame that match any set of conditions in exclusion_conditions.

    Parameters:
        df (pd.DataFrame): The input DataFrame.
        exclusion_conditions (list of lists): Each inner list contains dictionaries with 'Field' and 'Value' keys,
                                             specifying conditions to exclude.

    Returns:
        pd.DataFrame: Filtered DataFrame with matching rows removed.
    """
    # Create an initial mask marking all rows as not removed
    mask_to_remove = pd.Series(False, index=df.index)

    for condition_group in exclusion_conditions:
        # Start with all True mask (assume all rows match initially)
        condition_mask = pd.Series(True, index=df.index)

        for condition in condition_group:
            field, value = condition["Field"], condition["Value"]
            condition_mask &= (df[field] == value)

        # Accumulate rows to remove using OR operation
        mask_to_remove |= condition_mask

    # Return DataFrame with unwanted rows removed
    return df[~mask_to_remove]


def filter_dataframe_by_constraints(df, constraints_list, total_rows):
    """
    Filters the DataFrame based on given constraints, ensuring the required number of rows per constraint.

    Parameters:
        df (pd.DataFrame): The input DataFrame.
        constraints_list (list of lists): Each inner list contains dictionaries with 'Field', 'Value', and 'Percentage' keys.
        total_rows (int): Total number of rows to extract based on percentages.

    Returns:
        pd.DataFrame: Filtered DataFrame meeting all constraints.
    """
    final_df = pd.DataFrame()  # Store the filtered results
    used_indices = set()  # Track already used rows to avoid duplication

    for i, constraint_group in enumerate(constraints_list):
        temp_df = df.copy()

        # Exclude rows matching constraints from other groups
        for j, other_group in enumerate(constraints_list):
            if i != j:  # Skip current group
                for constraint in other_group:
                    field, value = constraint["Field"], constraint["Value"]
                    temp_df = temp_df[temp_df[field] != value]

        # Apply current group constraints
        for constraint in constraint_group:
            field, value = constraint["Field"], constraint["Value"]
            num_required = (total_rows * constraint['Percentage']) // 100
            temp_df = temp_df[temp_df[field] == value]

        # Remove already used rows
        temp_df = temp_df.loc[~temp_df.index.isin(used_indices)]

        # Ensure the required number of elements are available
        if len(temp_df) >= num_required:
            temp_df = temp_df.head(num_required)
        else:
            raise ValueError(
                f"Not enough rows for constraints {constraint_group}, missing {num_required - len(temp_df)} elements")

        # Update used indices
        used_indices.update(temp_df.index)

        # Append to final DataFrame
        final_df = pd.concat([final_df, temp_df])

    return final_df.reset_index(drop=True)


def polarized_generation_from_conditions(synthesizer, polarization_list, num_rows=1000, scaling_factor=2,
                                         max_retries=3):
    """
    Generates synthetic data by applying a set of conditions from the polarization_list and retries if an error occurs.

    Parameters:
        synthesizer: The data synthesizer used to generate synthetic data.
        polarization_list (list): List containing sets of conditions to apply to the generated data.
        num_rows (int): The number of rows to generate (default is 1000).
        scaling_factor (int): Factor to scale the number of rows on retries (default is 2).
        max_retries (int): Maximum number of retries in case of errors (default is 3).

    Returns:
        pd.DataFrame: Generated synthetic data with the required constraints applied.
    """
    synthetic_data_list = []

    retries = 0
    while retries < max_retries:
        try:
            tot_rows = 0
            for sublist in polarization_list:
                n_elem = (num_rows * sublist[0]['Percentage']) // 100
                tot_rows += n_elem
                prev_percentage = None
                col_values = {}
                for el in sublist:
                    if prev_percentage and prev_percentage != el['Percentage']:
                        raise ValueError("Error: Mismatched percentages")
                    col_values[el['Field']] = el['Value']

                condition = Condition(
                    num_rows=n_elem * scaling_factor,
                    column_values=col_values
                )
                polarized_synthetic_data = synthesizer.sample_from_conditions(
                    conditions=[condition],
                )
                synthetic_data_list.append(polarized_synthetic_data)

            # Combine all generated synthetic data
            all_polarized_data = pd.concat(synthetic_data_list).reset_index(drop=True)
            filtered_polarized_data = filter_dataframe_by_constraints(all_polarized_data, polarization_list, num_rows)
            break
        except Exception as e:
            retries += 1
            scaling_factor *= 2  # Increase scaling factor on retry
            print(
                f"{Fore.YELLOW}Retry {retries}/{max_retries}: Increasing scaling factor to {scaling_factor} due to error: {e}{Style.RESET_ALL}")
            time.sleep(1)
            if retries == max_retries:
                raise RuntimeError("Max retries reached. Unable to generate valid polarized synthetic data.")

    scaling_factor = 3
    retries = 0
    while retries < max_retries:
        try:
            synthetic_data = synthesizer.sample(num_rows=num_rows * scaling_factor)
            filtered_synthetic_data = exclude_matching_rows(synthetic_data, polarization_list)
            fill_values = filtered_synthetic_data.sample(n=num_rows - tot_rows)
            break
        except Exception as e:
            retries += 1
            scaling_factor *= 2  # Increase scaling factor on retry
            print(
                f"{Fore.YELLOW}Retry {retries}/{max_retries}: Increasing scaling factor to {scaling_factor} due to error: {e}{Style.RESET_ALL}")
            time.sleep(1)
            if retries == max_retries:
                raise RuntimeError("Max retries reached. Unable to generate valid polarized synthetic data.")

    final_data = pd.concat([filtered_polarized_data, fill_values]).reset_index(drop=True)
    return final_data

def check_distribution_constraints(df, constraints_list):
    """
    Checks if the distribution constraints are respected within the dataframe.

    Args:
        df (pd.DataFrame): The dataframe to check.
        constraints_list (list): A list of constraints (field-value pairs) to validate.

    Returns:
        bool: True if all distribution constraints are satisfied, False if any are violated.
    """
    total_rows = len(df)
    if total_rows == 0:
        return False  # Return False if the dataframe is empty

    for constraint_group in constraints_list:
        filtered_df = df.copy()

        # Filter the dataframe based on each condition in the constraint group
        for condition in constraint_group:
            field, value = condition["Field"], condition["Value"]
            filtered_df = filtered_df[filtered_df[field] == value]

        actual_count = len(filtered_df)
        actual_percentage = (actual_count / total_rows) * 100
        expected_percentage = constraint_group[0]["Percentage"]
        expected_count = round((expected_percentage / 100) * total_rows)

        # Compare the actual distribution with the expected distribution
        if round(actual_percentage, 2) != round(expected_percentage, 2):
            print(f"{Fore.RED}Distribution of polarization not respected{Style.RESET_ALL}")
            print(f"Condition: {constraint_group}")
            print(f"Actual Count: {actual_count}, Expected Count: {expected_count}")
            print(f"Actual Percentage: {actual_percentage}%, Expected Percentage: {expected_percentage}%")
            return False  # Constraint is not met

    print(f"{Fore.GREEN}Distribution of polarization respected{Style.RESET_ALL}")
    time.sleep(1)  # Add delay for better visualization of the result
    return True  # All constraints are satisfied

##### CODE

Simple polarization conditions

In [20]:
# Define polarization conditions
polarization_list = [
    [{"Field": "Sex", "Value": "Female", "Percentage": 25}],
    [{"Field": "Candidate State", "Value": "Hired", "Percentage": 25}],
]

# Generate data with polarization
final_data = polarized_generation_from_conditions(synthesizer, polarization_list, num_rows=1000)

# Check the distribution constraints of the polarized data
check_distribution_constraints(final_data, polarization_list);

Sampling conditions: 100%|██████████| 500/500 [00:01<00:00, 494.36it/s] 
Sampling conditions: 100%|██████████| 500/500 [00:08<00:00, 59.79it/s]
Sampling rows: 100%|██████████| 3000/3000 [00:01<00:00, 1635.77it/s]


[32mDistribution of polarization respected[0m


Complex polarization conditions

In [21]:
# Define another set of polarization conditions
polarization_list = [
    [{"Field": "Sex", "Value": "Female", "Percentage": 25},
        {"Field": "Candidate State", "Value": "Hired", "Percentage": 25}],

    [{"Field": "Study Title", "Value": "Five-year degree", "Percentage": 10},
        {"Field": "Assumption Headquarters", "Value": "Milan", "Percentage": 10},
        {"Field": "English", "Value": 3, "Percentage": 10}]
]

# Generate data with the second set of polarization conditions
final_data = polarized_generation_from_conditions(synthesizer, polarization_list, num_rows=1000)

# Check the distribution constraints of the new polarized data
check_distribution_constraints(final_data, polarization_list);

Sampling conditions: 100%|██████████| 500/500 [00:09<00:00, 52.26it/s]
Sampling conditions: 100%|██████████| 200/200 [00:00<00:00, 249.84it/s]


[33mRetry 1/3: Increasing scaling factor to 4 due to error: Not enough rows for constraints [{'Field': 'Sex', 'Value': 'Female', 'Percentage': 25}, {'Field': 'Candidate State', 'Value': 'Hired', 'Percentage': 25}], missing 135 elements[0m


Sampling conditions: 100%|██████████| 1000/1000 [00:17<00:00, 57.23it/s]
Sampling conditions: 100%|██████████| 400/400 [00:00<00:00, 528.96it/s]
Sampling rows: 100%|██████████| 3000/3000 [00:01<00:00, 1900.09it/s]


[32mDistribution of polarization respected[0m
