In [None]:
!pip install pandas==1.5.3

In [None]:
!pip install fuzzywuzzy

In [None]:
!pip install python-Levenshtein

In [None]:
!pip install gensim

In [None]:
!pip install spacy

In [None]:
!python -m spacy download fr_core_news_lg

In [105]:
import os
import pandas as pd
import numpy as np
import spacy
from fuzzywuzzy import fuzz
import logging
import warnings
from concurrent.futures import ThreadPoolExecutor
import re
import gc
from typing import Dict
import time

In [106]:
warnings.filterwarnings("ignore")

In [107]:
nlp = spacy.load('fr_core_news_lg')
# Initialize logging
logging.basicConfig(level=logging.INFO)

# Define the folder path
folder_path = '/content/excelwork/'

# List all files in the folder
all_files = os.listdir(folder_path)

# Filter out Excel files
excel_files = [f for f in all_files if f.endswith('.xlsx') or f.endswith('.xls')]

# Full paths to Excel files
excel_file_paths = [os.path.join(folder_path, f) for f in excel_files]

In [108]:
standard_columns = np.load('/content/arrays/standard_columns.npy')
Type_TP = np.load('/content/arrays/Type_TP.npy')
INTERVENANT_OPTIONS = np.load('/content/arrays/INTERVENANT_OPTIONS.npy')
Typologie = np.load('/content/arrays/Typologie.npy')
Causes_Echecs = np.load('/content/arrays/Causes_Echecs.npy')
ADMIN_NAMES = np.load('/content/arrays/ADMIN_NAMES.npy')
OPTIONS_MAP_Stringheader = np.load('/content/arrays/OPTIONS_MAP_Stringheader.npy', allow_pickle=True)

In [109]:
output_xlsx_file = "/content/combined_data.xlsx"

In [110]:
# Initialize logging with timestamp and file handler
logging.basicConfig(filename='excel_processing.log', level=logging.INFO,
                    format='%(asctime)s:%(levelname)s:%(message)s')

In [111]:
outputpath = "/content/resultexcelwork"

#Transforming from xlsx to csv:

In [112]:
# Initialize logging
logging.basicConfig(level=logging.INFO)

In [113]:

def extended_special_char_replacements() -> Dict[str, str]:
    """
    Returns a dictionary mapping special characters to their replacements.
    """
    special_chars = {'é': 'e', 'è': 'e', 'ê': 'e', 'ë': 'e', 'à': 'a', 'â': 'a', 'ô': 'o', 'û': 'u',
                     'ù': 'u', 'î': 'i', 'ï': 'i', 'ç': 'c', '/': '-', ';': ',', 'á': 'a', 'ó': 'o',
                     'í': 'i', 'ú': 'u', 'ñ': 'n', 'ý': 'y', 'ø': 'o', 'þ': 'th', 'ð': 'd', 'ß': 'ss',
                     'ÿ': 'y', 'ä': 'a', 'ö': 'o', 'ü': 'u', 'õ': 'o', 'ã': 'a', 'å': 'a', 'æ': 'ae',
                     'œ': 'oe', 'ł': 'l', '€': 'EUR', '£': 'GBP', '$': 'USD', '%': 'percent', '#': 'number',
                     '@': 'at', '&': 'and', '(': '', ')': '', '{': '', '}': '', '[': '', ']': '', '"': '',
                     "'": '', '?': '', '!': '', ':': '', '|': ''}
    escaped_chars = {re.escape(k): v for k, v in special_chars.items()}
    return escaped_chars


In [114]:
def convert_columns_to_string(df: pd.DataFrame) -> pd.DataFrame:
    """
    Converts non-numeric and non-string DataFrame columns to string type.
    """
    try:
        print("[Info] Starting the conversion of columns to string types.")

        for col in df.columns:
            col_type = df[col].dtype

            # If the column is neither numeric nor a string, try converting it to a string
            if not pd.api.types.is_numeric_dtype(col_type) and not pd.api.types.is_string_dtype(col_type):
                try:
                    df[col] = df[col].astype(str)
                    print(f"[Info] Successfully converted column '{col}' to string.")
                except Exception as e_inner:
                    print(f"[Error] Failed to convert column '{col}' of type '{col_type}' to string. Error: {e_inner}")

        print("[Info] Finished converting all applicable columns to string types.")
        return df

    except Exception as e:
        print(f"[Error] The convert_columns_to_string function encountered an issue: {e}")
        return df

In [115]:
def fill_empty_cells_with_nan(df: pd.DataFrame) -> None:
    """
    Fills empty cells in the DataFrame with 'nan'.
    """
    try:
      print(f"[Info] processing the fill_empty_cells_with_nan function")
      for col in df.columns:
            col_type = df[col].dtype
            if pd.api.types.is_numeric_dtype(col_type):
                df[col].fillna(0, inplace=True)  # Fill NaN with 0 for numeric columns
            elif pd.api.types.is_string_dtype(col_type):
                df[col].fillna('nan', inplace=True)  # Fill NaN with 'nan' for string columns
            else:
                df[col].fillna(np.nan, inplace=True)  # Use np.nan for other types
      print(f"[Info] the fill_empty_cells_with_nan function finished!")
      return df
    except Exception as e :
      print(f"[ERROR] the fill_empty_cells_with_nan function didn't work as expected: {e}")
      return df

In [116]:
def reading_excel_files(excel_file_path):
  print(f"[Info] Processing {excel_file_path}")
  num_columns = 0
  try:
      print("Attempting to read 50 columns.")
      df = pd.read_excel(excel_file_path, usecols=range(50))
      print(f"Reading excel file {excel_file_path} is success")
      return df
  except Exception as e:
      print(f"Error while trying to read 50 columns: {e}")

      # Fallback: Try to identify the number of non-empty columns
      try:
          print("Attempting to identify the actual number of columns.")
          sample_df = pd.read_excel(excel_file_path, nrows=5)
          sample_df.dropna(axis=1, how='all', inplace=True)
          num_columns = sample_df.shape[1]

          print(f"Identified {num_columns} non-empty columns. Trying to read them.")
          df = pd.read_excel(excel_file_path, usecols=range(num_columns))
          print(f"Reading excel file {excel_file_path} is success")
          return df
      except Exception as e_inner:
          print(f"Error while trying to read {num_columns} columns: {e_inner}")
          return None


In [117]:

def excel_to_csv_with_cleaning_and_logging_v2(excel_file_path: str, csv_file_path: str) -> None:
    """
    Converts an Excel file to a CSV file with preprocessing and logging.
    """
    df = None
    try:
        print(f"[Info] Processing {excel_file_path}")
        df = reading_excel_files(excel_file_path)
        print(f"[Info] {excel_file_path} is correctly readed")
        df = df.loc[:, ~df.columns.str.startswith('Unnamed')]
        df.dropna(how='all', inplace=True)
        df = fill_empty_cells_with_nan(df)
        df = convert_columns_to_string(df)
        replacements = extended_special_char_replacements()
        start_time = time.time()
        for col in df.columns:
            if pd.api.types.is_string_dtype(df[col]):
                print(f"[Info] Processing column {col}...")
                df[col].replace(replacements, regex=True, inplace=True)
                print(f"[Info] Processing column {col} finished!")
        # Measure the time after string replacement
        end_time = time.time()

        # Calculate and print the duration
        duration = end_time - start_time
        print(f"Time taken for string replacement: {duration} seconds")
         # Measure the time before string replacement
        start_time = time.time()
        # Save to CSV
        df.to_csv(csv_file_path, encoding='utf-8-sig', sep=';', index=False)
        # Measure the time after string replacement
        end_time = time.time()
        # Calculate and print the duration
        duration = end_time - start_time
        print(f"Time taken for converting to csv file is : {duration} seconds")

    except Exception as e:
        print(f"[ERROR] An error occurred: {e}")

    del df
    gc.collect()


In [118]:

def import_xlsx_to_csv(directory_xlsx: str, directory_csv: str) -> None:
    """
    Converts all Excel files in a directory to CSV files.
    """
    if not os.path.exists(directory_xlsx):
        print(f"[ERROR] The source directory does not exist. ")
        return

    if not os.path.exists(directory_csv):
        print(f"[Info] Creating destination directory.")
        os.makedirs(directory_csv)

    try:
        for filename in os.listdir(directory_xlsx):
            if filename.endswith('.xlsx'):
                csv_filename = filename.replace('.xlsx', '.csv')
                full_csv_path = os.path.join(directory_csv, csv_filename)
                excel_to_csv_with_cleaning_and_logging_v2(os.path.join(directory_xlsx, filename), full_csv_path)
                print(f"[Info] process for {filename} is finished")
    except Exception as e:
        print(f"[Error] importing files: {e}")

In [119]:
directory_xlsx = "/content/excelwork"
directory_csv = "/content/csvwork"

In [None]:
import_xlsx_to_csv(directory_xlsx,directory_csv)

#Checking the saved CSV files :

In [121]:
def get_csv_dimensions_df(directory_path: str) -> pd.DataFrame:
    # Initialize an empty list to store results
    file_dimensions = []

    # Check if the directory exists
    if os.path.exists(directory_path):
        # Loop through each file in the directory
        for filename in os.listdir(directory_path):
            # Check if the file is a CSV file
            if filename.endswith(".csv"):
                file_path = os.path.join(directory_path, filename)

                try:
                    # Read the CSV file into a DataFrame
                    df = pd.read_csv(file_path,encoding='utf-8-sig', sep=';')

                    # Get the dimensions of the DataFrame
                    rows, cols = df.shape

                    # Append the file name and dimensions to the list
                    file_dimensions.append([filename, rows, cols])
                except Exception as e:
                    print(f"An error occurred while reading {filename}: {e}")
    else:
        print("The specified directory does not exist.")

    # Convert the list to a DataFrame
    result_df = pd.DataFrame(file_dimensions, columns=['File Name', 'Rows', 'Columns'])

    return result_df

In [None]:
result_df = get_csv_dimensions_df(directory_csv)
result_df.head(11)

#Cleaning CSV Part:

In [123]:
from sklearn.preprocessing import MinMaxScaler
from sklearn.feature_extraction.text import TfidfVectorizer
from scipy.spatial.distance import jaccard

In [124]:
def calculate_combined_similarity(col, standard):
  print(f"[Info]working on {standard}")
  col_token = nlp(col)
  standard_token = nlp(standard)
  print(f"[Info] Attempting semantic_similarity")
  semantic_similarity = col_token.similarity(standard_token)
  print(f"[Info] Attempting string_similarity")
  string_similarity = fuzz.ratio(col, standard) / 100.0
  print(f"[Info] Attempting combined_similarity")
  # Jaccard similarity using TfidfVectorizer
  print(f"[Info] Attempting Jaccard similarity using TfidfVectorizer")
  vec = TfidfVectorizer()
  tfidf_matrix = vec.fit_transform([col, standard])
  jaccard_similarity = 1 - jaccard(tfidf_matrix[0].toarray()[0], tfidf_matrix[1].toarray()[0])
  print(f"[Info] Calculate the combined similarity ")
  # Combine the similarities into a single array
  similarities = np.array([semantic_similarity, string_similarity, jaccard_similarity])
  # Normalize the similarities using Min-Max normalization
  scaler = MinMaxScaler()
  normalized_similarities = scaler.fit_transform(similarities.reshape(-1, 1)).flatten()
  # Calculate the combined similarity as the average of the normalized similarities
  combined_similarity = np.mean(normalized_similarities)
  print(f"[Info] calculate_combined_similarity is finished!")
  return combined_similarity


In [125]:
#Function to import CSV files
def import_csv_files(directory):
    csv_files = {}
    for filename in os.listdir(directory):
        try:
            if filename.endswith('.csv'):
                file_path = os.path.join(directory, filename)
                print(f"[Info] Attempting to load csv files the file is {filename}")
                csv_files[filename] =  df = pd.read_csv(file_path,encoding='utf-8-sig', sep=';')
                print(f"[Info]Successfully imported {filename}")
        except Exception as e:
            print(f"[Error] importing {filename}: {e}")
    return csv_files

In [126]:
def clean_dataframe(df):
    try:
        # Convert DataFrame to string for consistent comparison, handling NaN by converting it to a string 'nan'
        df_str = df.fillna('nan').astype(str)
        # Create a mask to identify rows that should be removed
        mask = df_str.apply(lambda row: all(val in ['nan', '0', '0.0', '00:00:00'] for val in row), axis=1)
        # Remove rows based on the mask
        df_cleaned = df.loc[~mask]
        return df_cleaned
    except Exception as e:
        print(f"An error occurred: {e}")
        return None

In [127]:
# Function to match and replace DataFrame column headers based on a standard list
def match_and_replace_headers(df, standard_headers):
    for col in df.columns:
        best_match = ""
        highest_similarity = 0
        for standard in standard_headers:
          combined_similarity = calculate_combined_similarity(col, standard)
          if combined_similarity > highest_similarity:
              highest_similarity = combined_similarity
              best_match = standard
          if (best_match !=0):
            print(f"[Info] best match found {best_match}")
        if highest_similarity > 0.7:
            df.rename(columns={col: best_match}, inplace=True)
            print(f"the column name of {col} is replaced with {best_match}")
    return df

In [128]:
def convert_to_text(df):
    """
    Convert_to_text function description...
    """
    for col in df.columns:
        df[col] = df[col].astype(str)
    print(f"[Info] Converted all columns to text type")

In [129]:
def numpy_array_to_string_list(numpy_array):
    """
    Convert a NumPy array to a list of strings.

    Parameters:
    - numpy_array (np.ndarray): The NumPy array to be converted

    Returns:
    - list: A list of strings
    """
    try:
        # Converting the numpy array to a list of strings
        string_list = numpy_array.astype(str).tolist()

        # If the array is multi-dimensional, flattening it to a 1D list
        if isinstance(string_list[0], list):
            string_list = [str(item) for sublist in string_list for item in sublist]

        return string_list
    except Exception as e:
        print(f"An error occurred: {e}")
        return None

In [130]:
def main(directory, output_directory, standard_headers):
    """
    Main function description...
    """
    # Convert numpy.str_ to native Python str
    standard_headers_str =numpy_array_to_string_list(standard_headers)
    csv_files = import_csv_files(directory)
    for filename, df in csv_files.items():
        try:
            print(f"[Info] Attempting cleaned_df")
            cleaned_df = clean_dataframe(df)
            print(f"[Info] cleaned_df finished!")
            print(f"[Info] Attempting matched_df")
            matched_df = match_and_replace_headers(cleaned_df, standard_headers_str)
            print(f"[Info] matched_df finished!")
            print(f"[Info] Attempting convert_to_text")
            convert_to_text(matched_df)
            print(f"[Info] convert_to_text finished!")
            print(f"[Info] Attempting matched_df")
            matched_df.to_csv(os.path.join(output_directory, f'Cleaned_{filename}'), encoding='utf-8-sig', sep=';', index=False)
            print(f"[Info] matched_df finished!")
            print(f"[Info]Successfully processed and saved Cleaned_{filename}")
        except Exception as e:
            print(f"[Error] processing {filename}: {e}")

In [131]:
outputpath = "/content/results"

In [None]:
main(directory_csv, outputpath, standard_columns)

In [None]:
result_df = get_csv_dimensions_df(outputpath)
result_df.head(13)

#Combine the csv files:

In [134]:
def combine_csv_files(directory, output_file, standard_headers):
    try:
        filenames = [f for f in os.listdir(directory) if f.endswith('.csv')]
        if not filenames:
            return "No CSV files found in the directory."

        dfs = []

        for filename in filenames:
            df = pd.read_csv(os.path.join(directory, filename), encoding='utf-8-sig', sep=';')

            # Standardize the headers using match_and_replace_headers
            standardized_df = match_and_replace_headers(df, standard_headers)

            # Sort the columns to match the order in standard_headers
            sorted_columns = [col for col in standard_headers if col in standardized_df.columns]
            standardized_df = standardized_df[sorted_columns]

            dfs.append(standardized_df)
            print(f"[BIGInfo] the {filename} is successfuly added")
        # Combine all DataFrames
        combined_df = pd.concat(dfs, ignore_index=True)
        combined_df=clean_dataframe(combined_df)
        # Write to a global CSV file
        combined_df.to_csv(os.path.join(directory, output_file), encoding='utf-8-sig', sep=';', index=False)

        return f"Successfully combined {len(filenames)} CSV files into {output_file}."

    except Exception as e:
        return f"An error occurred: {e}"

In [None]:
message = combine_csv_files("/content/results", "/content/results/combined.csv",numpy_array_to_string_list(standard_columns))

In [None]:
print(message)

In [137]:
combined_df = pd.read_csv('/content/results/combined.csv',encoding='utf-8-sig', sep=';')

In [None]:
combined_df.shape

In [139]:
combined_df=clean_dataframe(combined_df)

In [None]:
combined_df.shape

In [141]:
combined_df.to_csv("/content/final_combined_csv.csv",encoding='utf-8-sig', sep=';', index=False)

#Reorganizing the final csv file

In [142]:
from typing import List, Any, Union
from fuzzywuzzy import fuzz
import os
from datetime import datetime

In [143]:
class DataFrameOperations:
    def __init__(self):
        pass

    @staticmethod
    def rearrange_columns(df, col_to_move, ref_col):
        columns_list = df.columns.tolist()
        if col_to_move not in columns_list or ref_col not in columns_list:
            return "One or both of the specified columns do not exist."
        index_column_to_move = columns_list.index(col_to_move)
        index_reference_column = columns_list.index(ref_col)
        moved_column = columns_list.pop(index_column_to_move)
        columns_list.insert(index_reference_column, moved_column)
        return df[columns_list]

    @staticmethod
    def combine_common_columns(dfs):
        return pd.concat(dfs, ignore_index=True, join='inner')

    @staticmethod
    def add_columns(df, new_columns, positions):
        for col, values in new_columns.items():
            df[col] = values
        cols = df.columns.tolist()
        for col, pos in positions.items():
            if col in cols:
                cols.remove(col)
                cols.insert(pos, col)
        return df[cols]

    @staticmethod
    def remove_missing_values(df, method='drop'):
        if method == 'drop':
            return df.dropna()
        elif method == 'mean':
            return df.fillna(df.mean())
        elif method == 'median':
            return df.fillna(df.median())
        elif method == 'mode':
            return df.fillna(df.mode().iloc[0])
        else:
            return "Invalid method. Choose among 'drop', 'mean', 'median', or 'mode'."

    @staticmethod
    def remove_duplicates(df):
        return df.drop_duplicates()

    @staticmethod
    def convert_data_types(df, conversions):
        return df.astype(conversions)

In [160]:
def main_reorginize(csv_path,classdataoperation):
  df = pd.read_csv(csv_path, encoding='utf-8-sig', sep=';')
  df = classdataoperation.rearrange_columns(df,"Typologie","Périmètre")
  df = classdataoperation.rearrange_columns(df,"Nbr intervenants","STIT")
  df = classdataoperation.rearrange_columns(df,"Nbr sites distants","Nbr intervenants")
  df = classdataoperation.rearrange_columns(df,"Intervenant Terrain","Nbr sites distants")
  df = classdataoperation.rearrange_columns(df,"STIT","Intervenant Terrain")
  df = classdataoperation.add_columns(df,{"Prestation":'',"Problème":''},{"Prestation":7,"Problème":37})
  return df

In [161]:
df = main_reorginize('/content/final_combined_csv.csv',DataFrameOperations)

In [162]:
df.to_csv("/content/result.csv",encoding='utf-8-sig', sep=';', index=False)

In [46]:
def read_csv_to_df(file_path, encoding='utf-8-sig', sep=';'):
    print("[Debug] Reading CSV file:", file_path)
    df = pd.read_csv(file_path, encoding=encoding, sep=sep)
    print("[Debug] Successfully read CSV file:", file_path)
    return df

In [47]:
def change_column_headers(df: pd.DataFrame, new_headers: List[str], similarity_threshold: int = 80) -> pd.DataFrame:
    """
    Change and reorder the column headers of a DataFrame based on semantic similarity.

    Args:
        df (pd.DataFrame): The DataFrame to modify.
        new_headers (List[str]): List of new column headers.
        similarity_threshold (int): Minimum similarity score to consider a column match.

    Returns:
        pd.DataFrame: The DataFrame with updated and reordered column headers.
    """
    print("[Info] Started changing column headers...")

    # Check if new_headers is a list
    if not isinstance(new_headers, list):
        raise TypeError("new_headers should be a list.")

    print("[Info] Validated new_headers as a list.")

    # Check if all new_headers are strings
    if not all(isinstance(item, str) for item in new_headers):
        raise TypeError("All items in new_headers should be strings.")

    print("[Info] Validated new_headers items as strings.")

    reordered_cols = []
    for new_header in new_headers:
        found = False
        for existing_header in df.columns:
            if fuzz.ratio(new_header.lower(), existing_header.lower()) >= similarity_threshold:
                reordered_cols.append(df[existing_header])
                found = True
                break
        if not found:
            reordered_cols.append(pd.Series(name=new_header))

    print("[Info] Completed semantic similarity matching.")

    # Concatenate Series to form a new DataFrame
    new_df = pd.concat(reordered_cols, axis=1)

    print("[Info] Column header changes complete.")
    return new_df

In [48]:
def reorder_columns(df: pd.DataFrame, current_col_name: str, target_col_name: str, after: bool = True) -> pd.DataFrame:
    print("[Info] Starting column reordering...")

    # Check the existence of column names
    if current_col_name not in df.columns or target_col_name not in df.columns:
        raise ValueError("Either the current or target column name doesn't exist in the DataFrame.")

    print("[Info] Column names are valid.")

    # Get current column order as a list
    cols = list(df.columns)
    print(f"[Info] Current column order: {cols}")

    # Remove current column from list and save its position
    cols.remove(current_col_name)

    # Calculate target index
    target_index = cols.index(target_col_name)
    target_index = target_index + 1 if after else target_index

    # Insert the column at the target index
    cols.insert(target_index, current_col_name)
    print(f"[Info] New column order: {cols}")

    # Reorder DataFrame columns without creating a new DataFrame
    df.columns = cols

    print("[Info] Column reordering is complete.")
    return df

In [49]:
def add_new_column(df: pd.DataFrame, new_column_name: str, position: int, value: Union[str, int, float] = ''):
    """
    Add a new column at a specific position in the DataFrame, and shift existing columns to the right.

    Args:
        df (pd.DataFrame): DataFrame to be modified.
        new_column_name (str): Name of the new column to add.
        position (int): Position at which the new column should be added.
        value: Default value to populate the new column with.
    """
    # Check for name collisions
    if new_column_name in df.columns:
        print(f"[Error] A column with the name {new_column_name} already exists.")
        return

    # Check if the position is within the range of existing columns
    if position < 0 or position > len(df.columns):
        print("[Error] Invalid position.")
        return

    print(f"[Info] Adding a new column '{new_column_name}' at position {position}.")

    # Initialize the new column with the given value
    df.insert(position, new_column_name, value)

    print("[Info] New column added and existing columns shifted.")
    return df

In [50]:
def save_df_to_csv(df: pd.DataFrame, file_path: str = None, encoding: str = 'utf-8-sig', sep: str = ';', index: bool = False, verbose: bool = True):
    """
    Save the DataFrame to a CSV file.

    Args:
        df (pd.DataFrame): The DataFrame to save.
        file_path (str): The path to save the DataFrame to.
        encoding (str): The encoding to use for the CSV file.
        sep (str): The field separator for the CSV file.
        index (bool): Whether to write row (index) names.
        verbose (bool): Whether to print log messages.
    """

    if verbose:
        print("[Info] Initiating DataFrame save process.")

    # Check if DataFrame is None
    if df is None:
        if verbose:
            print("[Error] DataFrame is None.")
        return

    # Check if it's actually a DataFrame
    if not isinstance(df, pd.DataFrame):
        if verbose:
            print(f"[Error] Provided object is not a DataFrame. It is: {type(df)}")
        return

    # If file_path is not provided, save it in the current directory with a timestamp
    if file_path is None:
        file_path = f"DataFrame_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"

    # Check if the file_path has the .csv extension
    if not file_path.lower().endswith('.csv'):
        if verbose:
            print("[Error] The provided file path does not end with '.csv'.")
        return

    try:
        df.to_csv(file_path, encoding=encoding, sep=sep, index=index)
        if verbose:
            print(f"[Info] DataFrame saved successfully at {file_path}.")
    except Exception as e:
        if verbose:
            print(f"[Error] An error occurred while saving the DataFrame: {e}")
    return df if file_path else None

In [51]:
def make_columns_unique(df):
    counts = {}
    new_cols = []
    for col in df.columns:
        if df.columns.tolist().count(col) > 1:  # duplicate column
            if col not in counts:
                counts[col] = 1
            else:
                counts[col] += 1
            new_cols.append(f"{col}_{counts[col]}")
        else:
            new_cols.append(col)
    df.columns = new_cols
    return df

In [52]:
def modify_type_tp(df):
    print("[Info] Modifying 'Type TP' column...")

    # Create an empty DataFrame to hold the new rows
    new_rows = []
    df =  make_columns_unique(df)
    for index, row in df.iterrows():
        type_tp_value = row['Type TP']

        if pd.isna(type_tp_value):
            print("[Info] 'Type TP' value is NaN.")
            continue  # Skip this row

        # Handle '+' in 'Type TP' column
        if '+' in type_tp_value:
            print(f"[Info] Found '+' in '{type_tp_value}'")
            before_plus, after_plus = type_tp_value.split('+', 1)

            # Duplicate the row and modify 'Type TP'
            new_row_before = row.copy()
            new_row_after = row.copy()

            new_row_before['Type TP'] = before_plus.strip()
            new_row_after['Type TP'] = after_plus.strip()

            new_rows.append(new_row_before)
            new_rows.append(new_row_after)

        # Handle 'sans' in 'Type TP' column
        elif 'sans' in type_tp_value:
            print(f"[Info] Found 'sans' in '{type_tp_value}'")
            before_sans = type_tp_value.split('sans', 1)[0]

            # Modify 'Type TP' and keep the row
            row['Type TP'] = before_sans.strip()
            new_rows.append(row)

        else:
            # Keep the row as is
            new_rows.append(row)

    # Create a new DataFrame from the modified rows
    new_df = pd.DataFrame(new_rows).reset_index(drop=True)

    print("[Info] 'Type TP' column modified.")
    return new_df  # Return the new modified DataFrame

In [53]:
def modify_csv(file_path, standard_columns, output_file):
    """
    Modify a CSV file by changing headers, reordering columns, and adding new columns.

    Args:
        file_path (str): Path to the input CSV file.
        standard_columns (list): List of new header names.
        output_file (str): Path to the output CSV file.

    Returns:
        None
    """
    print("[Info] Starting the CSV modification process...")

    # Read the CSV file
    print("[Info] Reading the CSV file:", file_path)
    df = read_csv_to_df(file_path)

    # Change the header names
    print("[Info] Changing column headers...")
    df = change_column_headers(df, standard_columns)

    # Reorder existing columns
    print("[Info] Reordering columns...")
    df = reorder_columns(df, 'Nbr intervenants', 'STIT', after=True)
    print("[Info] Columns reordered: Nbr intervenants after STIT.")
    df = reorder_columns(df, 'Nbr sites distants', 'Nbr intervenants', after=True)
    print("[Info] Columns reordered: Nbr sites distants after Nbr intervenants.")
    df = reorder_columns(df, 'Intervenant Terrain', 'Nbr sites distants', after=True)
    print("[Info] Columns reordered: Intervenant Terrain after Nbr sites distants.")
    df = reorder_columns(df, 'Typologie', 'Périmètre', after=True)
    df = reorder_columns(df,'Typologie','Type TP', after = False)
    print("[Info] Columns reordered: Typologie after Périmètre.")

    # Add new columns at specific positions
    print("[Info] Adding new columns...")
    df = add_new_column(df, 'Prestation', df.columns.get_loc('Typologie'))
    print("[Info] New column 'Prestation' added after 'Typologie'.")
    df = add_new_column(df, 'Problème', df.columns.get_loc('Responsable echec') + 1)
    print("[Info] New column 'Problème' added after 'Responsable echec'.")

    # Modify 'Type TP' column
    df = modify_type_tp(df)

    # Save the modified DataFrame back to a new CSV file
    print("[Info] Saving modified DataFrame to a new CSV file:", output_file)
    save_df_to_csv(df, output_file)

    print("[Info] CSV modification completed. Modified CSV saved to:", output_file)

In [None]:
modify_csv('/content/final_combined_csv.csv',numpy_array_to_string_list(standard_columns),'/content/final.csv')

#Mapping and Replacing:

In [163]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import linear_kernel

In [164]:
vectorizer = TfidfVectorizer()

In [165]:
def remove_selected_numbers(text):
    print("[Info] Removing selected numbers from the text...")
    # Remove numbers that are surrounded by space, arithmetic operators or other numbers
    clean_text = re.sub(r'((?<=\s|\+|-|\*|/|\d)|(?<=^))\d+((?=\s|\+|-|\*|/|\d)|(?=$))', '', text)
    return clean_text

In [166]:
def preprocess(text):
    print("[Info] Starting text preprocessing...")

    # Use FuzzyWuzzy to clean the string
    clean_text = fuzz._process_and_sort(text, force_ascii=False)

    # Remove selected numbers from the text
    clean_text = remove_selected_numbers(clean_text)

    print("[Info] Text preprocessing completed.")
    return clean_text

In [167]:
def semantic_similarity(text1, text2):
    print("[Info] Calculating semantic similarity...")

    doc1 = nlp(text1)
    doc2 = nlp(text2)

    similarity_score = doc1.similarity(doc2)
    print("[Info] Semantic similarity calculated:", similarity_score)

    return similarity_score

In [168]:
def jaccard_similarity(str1, str2):
    print("[Info] Calculating Jaccard similarity...")

    a = set(str1.split())
    b = set(str2.split())
    c = a.intersection(b)

    similarity_score = float(len(c)) / (len(a) + len(b) - len(c))
    print("[Info] Jaccard similarity calculated:", similarity_score)

    return similarity_score

In [169]:
def cosine_similarity(text1, text2):
    print("[Info] Calculating cosine similarity...")

    tfidf = vectorizer.fit_transform([text1, text2])
    similarity_score = linear_kernel(tfidf, tfidf)[0, 1]

    print("[Info] Cosine similarity calculated:", similarity_score)
    return similarity_score

In [170]:
def best_match(text, array):
    print("[Info] Finding the best match...")

    best_score = 0
    best_match = ""

    preprocessed_text = preprocess(text)

    for item in array:
        preprocessed_item = preprocess(item)

        # Fuzzy match score
        fuzz_score = fuzz.ratio(preprocessed_text, preprocessed_item)
        print("[Info] Fuzzy match score:", fuzz_score)

        # If a perfect match is found, return immediately
        if fuzz_score == 100:
            print("[Info] Perfect match found:", item)
            return item, 100

        # Otherwise, continue with the other similarity calculations
        sem_score = semantic_similarity(preprocessed_text, preprocessed_item)
        print("[Info] Semantic similarity score:", sem_score)

        jac_score = jaccard_similarity(preprocessed_text, preprocessed_item)
        print("[Info] Jaccard similarity score:", jac_score)

        cos_score = cosine_similarity(preprocessed_text, preprocessed_item)
        print("[Info] Cosine similarity score:", cos_score)

        final_score = (0.7 * sem_score) + (0.15 * jac_score) + (0.15 * cos_score)
        print("[Info] Weighted final score:", final_score)

        if final_score > best_score:
            best_score = final_score
            best_match = item

    print("[Info] Best match found:", best_match)
    return best_match, best_score

In [171]:
def match_and_replace(df, column_name, array):
    print("[Info] Matching and replacing values in DataFrame...")

    changes = []

    for index, row in df.iterrows():
        original_value = row[column_name]

        if pd.notna(original_value):
            print("[Info] Processing row at index", index)
            best_replacement, score = best_match(original_value, array)
            df.at[index, column_name] = best_replacement

            changes.append({
                'Index': index,
                'Original_Value': original_value,
                'Replacement': best_replacement,
                'Score': score
            })

    changes_df = pd.DataFrame(changes)

    print("[Info] Matching and replacing completed.")
    return df, changes_df

In [172]:
final_df = pd.read_csv('/content/result.csv',  encoding='utf-8-sig', sep=';')
perimètre = OPTIONS_MAP_Stringheader[1]

In [None]:
perimètre

In [None]:
df, changes_df  = match_and_replace(final_df, 'Périmètre', perimètre)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
[Info] Jaccard similarity calculated: 0.0
[Info] Jaccard similarity score: 0.0
[Info] Calculating cosine similarity...
[Info] Cosine similarity calculated: 0.0
[Info] Cosine similarity score: 0.0
[Info] Weighted final score: 0.12729981270888624
[Info] Best match found: ZMD SFR
[Info] Processing row at index 18214
[Info] Finding the best match...
[Info] Starting text preprocessing...
[Info] Removing selected numbers from the text...
[Info] Text preprocessing completed.
[Info] Starting text preprocessing...
[Info] Removing selected numbers from the text...
[Info] Text preprocessing completed.
[Info] Fuzzy match score: 15
[Info] Calculating semantic similarity...
[Info] Semantic similarity calculated: 0.0
[Info] Semantic similarity score: 0.0
[Info] Calculating Jaccard similarity...
[Info] Jaccard similarity calculated: 0.0
[Info] Jaccard similarity score: 0.0
[Info] Calculating cosine similarity...
[Info] Cosine similarity 

In [None]:
df.head()

Unnamed: 0,UO,G2R,Nom du Site,Hostname,Priorité,Périmètre,Typologie,Prestation,Type TP,Nombre PE/Liens/ Cartes,...,Problème,Causes Echecs,Livraison CR,Cloture AP Axis,Statut Attachement STIT,Date Attachement STIT,Statut PV SFR,Date PV SFR,Objet FDR,Description des tâches
0,U368486,10983,NRA FERNEY VOLTAIRE,01FER1-PEAG-1,P1,DSP,DSP SIEA,,Annexe B,1.0,...,,,Livre,OK,A facturer,Fevrier-23,A facturer,,U368486 Feuille de route - Annexe B - S08 - 2...,
1,U368486,10983,NRA FERNEY VOLTAIRE,01FER1-PEAG-1,P1,DSP,DSP SIEA,,Travaux Baie-NRJ,1.0,...,,,Livre,OK,A facturer,Avril-23,A facturer,Avril-23,U368486 Feuille de route - Travaux Baie-NRJ -...,
2,U201781,8110000174,NRO ANGLES,81ang2-peag-1,P00,DSP,DSP 81,,Travaux Baie-NRJ,1.0,...,,,Livre,OK,,,,,U201781 Feuille de route - Travaux Baie-NRJ -...,
3,U368488,10983,NRA FERNEY VOLTAIRE,01FER1-GP-01,P1,DSP,DSP SIEA,,Install OLT 2T4,4.0,...,,,Livre,OK,A facturer,Avril-23,A facturer,Avril-23,U368488 Feuille de route - Install OLT 2T4 - ...,
4,U368486,10983,NRA FERNEY VOLTAIRE,01FER1-PEAG-1,P1,DSP,DSP SIEA,,Install PEAG1 sans DCN,1.0,...,,,Livre,OK,A facturer,Avril-23,A facturer,Avril-23,U368486 Feuille de route - Install PEAG1 sans...,


In [None]:
changes_df.Replacement

0            DSP
1            DSP
2            DSP
3            DSP
4            DSP
          ...   
18063    ZMD SFR
18064           
18065    ZMD SFR
18066    ZMD SFR
18067           
Name: Replacement, Length: 18068, dtype: object

In [None]:
save_df_to_csv(df, '/content/befor.csv')

NameError: ignored

In [None]:
save_df_to_csv(changes_df, '/content/adfter.csv')

In [None]:
!pip install fuzzywuzzy
!pip install python-Levenshtein
!pip install dask[dataframe]
!pip install spacy
!pip install transformers
!pip install -U scikit-learn

In [None]:
!python -m spacy download fr_core_news_sm

In [None]:
import dask.dataframe as dd
import pandas as pd
from fuzzywuzzy import fuzz
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from transformers import AutoTokenizer, AutoModelForMaskedLM
import torch
import spacy

In [None]:
nlp = spacy.load('fr_core_news_sm')
vectorizer = TfidfVectorizer()
tokenizer = AutoTokenizer.from_pretrained("camembert-base")
model = AutoModelForMaskedLM.from_pretrained("camembert-base")

In [None]:
# Initialize a dictionary to act as a cache for preprocessed texts
preprocess_cache = {}

def preprocess(text):
    global preprocess_cache  # Declare cache as a global variable

    # Check if text is None or empty
    if not text:
        print("[Warning] The input text is empty or None.")
        return ""

    print("[Info] Preprocessing text...")

    # Cache lookup
    if text in preprocess_cache:
        print(f"[Info] Cache hit for {text}")
        return preprocess_cache[text]

    try:
        # Converting to lowercase and then processing
        doc = nlp(text.lower())

        # Lemmatization and removing stop words
        preprocessed_text = ' '.join([token.lemma_ for token in doc if not token.is_stop])

        # Updating cache
        preprocess_cache[text] = preprocessed_text

        print("[Info] Text preprocessing completed.")
        return preprocessed_text

    except Exception as e:
        print(f"[Error] Something went wrong while preprocessing text: {e}")
        return None

In [None]:
# Initialize a dictionary to act as a cache for fuzzy similarities
fuzzy_cache = {}

def fuzzy_match(text1, text2):
    global fuzzy_cache  # Declare cache as a global variable

    print("[Info] Calculating fuzzy match score...")

    # Cache lookup
    pair = (str(text1), str(text2))
    if pair in fuzzy_cache:
        print(f"[Info] Cache hit for {pair}")
        return fuzzy_cache[pair]

    # Type checks and debugging
    if not isinstance(text1, str) or not isinstance(text2, str):
        print("[Debug] Types:", type(text1), type(text2))  # Debug logging
        print("[Debug] Values:", text1, text2)  # Debug logging
        fuzzy_cache[pair] = 0  # Cache the 'no match' result
        return 0  # you could return 0 or some other 'no match' score

    # Actual computation
    score = fuzz.ratio(str(text1), str(text2))

    # Update cache
    fuzzy_cache[pair] = score

    print("[Info] Fuzzy match score calculated:", score)
    return score

In [None]:
# Initialize a dictionary to act as a cache for cos_jaccard similarities
cos_jaccard_cache = {}

# Check if the vectorizer is fitted
def check_is_fitted(vectorizer):
    try:
        vectorizer.transform(["test"])
        return True
    except:
        return False

# Updated cos_jaccard function
def cos_jaccard(text1, text2):
    global cos_jaccard_cache  # Declare cache as a global variable

    print("[Info] Calculating combined similarity score...")

    # Input validation
    text1 = str(text1).lower()
    text2 = str(text2).lower()

    # Check cache
    pair = (text1, text2)
    if pair in cos_jaccard_cache:
        print(f"[Info] Cache hit for {pair}")
        return cos_jaccard_cache[pair]

    # Check if the vectorizer is fitted, if not fit it
    if not check_is_fitted(vectorizer):
        vectorizer.fit([text1, text2])

    tfidf_matrix = vectorizer.transform([text1, text2])

    # Cosine Similarity
    cos_sim = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])[0][0]
    print("[Info] Cosine similarity calculated:", cos_sim)

    # Jaccard Similarity
    jaccard_sim = len(set(text1.split()).intersection(set(text2.split()))) / \
                  len(set(text1.split()).union(set(text2.split())))
    print("[Info] Jaccard similarity calculated:", jaccard_sim)

    # Combined Similarity
    combined_sim = 0.5 * cos_sim + 0.5 * jaccard_sim

    # Update cache
    cos_jaccard_cache[pair] = combined_sim

    print("[Info] Combined similarity score calculated:", combined_sim)
    return combined_sim

In [None]:
# Initialize a dictionary to act as a cache for BERT-based similarities
bert_cache = {}

# Function for BERT inference
def bert_inference(text, model, tokenizer):
    tokens = tokenizer(text, padding=True, truncation=True, return_tensors="pt")
    with torch.no_grad():
        outputs = model(**tokens)
        output = outputs.last_hidden_state if hasattr(outputs, 'last_hidden_state') else outputs[0]
        output_mean = output.mean(dim=1)
    return output_mean

# Updated BERT similarity function
def bert_sim(text1, text2):
    global bert_cache  # Declare cache as a global variable

    print("[Info] Calculating BERT-based similarity score...")

    # Input validation
    if not isinstance(text1, str) or not isinstance(text2, str):
        print("[Error] Only string inputs are supported.")
        return -1

    # Check cache
    pair = (text1, text2)
    if pair in bert_cache:
        print(f"[Info] Cache hit for {pair}")
        return bert_cache[pair]

    try:
        output1_mean = bert_inference(text1, model, tokenizer)
        output2_mean = bert_inference(text2, model, tokenizer)

        similarity_score = cosine_similarity(output1_mean, output2_mean)
        similarity_score = similarity_score[0][0].item()

        # Update cache
        bert_cache[pair] = similarity_score

        print("[Info] BERT-based similarity score calculated:", similarity_score)
        return similarity_score

    except Exception as e:
        print(f"[Error] Failed to calculate BERT-based similarity score. Error: {e}")
        return -1

In [None]:
# Initialize dictionaries to act as caches
cache = {}  # Cache for the final result of each row
calculated_similarities = {}  # Cache for already calculated similarities

def match_replace(row, array, column_name):
    global cache, calculated_similarities  # Declare both caches as global variables

    print("[Info] Matching and replacing values in DataFrame row...")
    text = row[column_name]

    # Check if the text is already in the cache, return the cached result
    if text in cache:
        print(f"[Info] Cache hit for {text}")
        return cache[text]

    max_score = 0
    best_match = ""
    preprocessed_text = preprocess(text) if isinstance(text, str) else str(text)

    for candidate in array:
        preprocessed_candidate = preprocess(candidate) if isinstance(candidate, str) else str(candidate)

        print("[Info] Processing candidate:", candidate)

        # Check if this pair's similarity score was calculated before
        pair = (preprocessed_text, preprocessed_candidate)
        if pair in calculated_similarities:
            final_score = calculated_similarities[pair]
        else:
            score1 = fuzzy_match(preprocessed_text, preprocessed_candidate)
            print("[Info] Fuzzy match score:", score1)

            score2 = cos_jaccard(preprocessed_text, preprocessed_candidate)
            print("[Info] Combined similarity score (Cosine + Jaccard):", score2)

            if score1 > 75 and score2 > 0.75:
                final_score = max(score1, score2)
            else:
                final_score = bert_sim(preprocessed_text, preprocessed_candidate)
                print("[Info] BERT-based similarity score:", final_score)

            # Cache the calculated similarity score
            calculated_similarities[pair] = final_score

        print("[Info] Final score:", final_score)

        if final_score > max_score:
            max_score = final_score
            best_match = candidate

    if max_score > 0.75:
        result = pd.Series([row.name, text, best_match, max_score], index=['Index', 'Value_Before', 'Value_After', 'Score'])
        cache[text] = result  # Cache the result
        print("[Info] Value replaced:", best_match)
        return result

    # If no suitable replacement is found
    result = pd.Series([row.name, text, text, 0], index=['Index', 'Value_Before', 'Value_After', 'Score'])
    cache[text] = result  # Cache the result
    print("[Info] No suitable replacement found.")
    return result

In [None]:
dask_df = dd.from_pandas(pd.read_csv("/content/final.csv",encoding='utf-8-sig', sep=';'), npartitions=4)

In [None]:
column_to_match = 'Périmètre'
array_of_comparison = perimètre

In [None]:
result = dask_df.map_partitions(
    lambda df: df.apply(
        match_replace,
        axis=1,
        args=(array_of_comparison, column_to_match)
    ),
    meta=pd.DataFrame(
        {
            'Index': pd.Series(dtype='int'),
            'Value_Before': pd.Series(dtype='object'),
            'Value_After': pd.Series(dtype='object'),
            'Score': pd.Series(dtype='float')
        }
    )
).compute()

In [None]:
result.dropna().to_csv('/content/output.csv',encoding='utf-8-sig', sep=';', index=False)

In [None]:
import json

In [None]:
def process_csv_with_json_mapping(csv_file_path, json_file_path):
    """
    Function to process a CSV based on JSON mapping.
    :param csv_file_path: Path to the CSV file.
    :param json_file_path: Path to the JSON mapping file.
    """

    # Read the JSON file
    with open(json_file_path, 'r') as f:
        json_dict = json.load(f)

    # Read the CSV file
    try:
        dask_df = dd.from_pandas(pd.read_csv(csv_file_path, encoding='utf-8-sig', sep=';'), npartitions=4)
    except FileNotFoundError:
        logging.error("CSV or JSON file not found.")
        return

    # Validate if all columns from JSON exist in the DataFrame
    for column in json_dict.keys():
        if column not in dask_df.columns:
            logging.warning(f"Column '{column}' from JSON doesn't exist in the DataFrame.")
            return

    # Loop through each column specified in the JSON
    for column_name, array_of_comparison in json_dict.items():
        logging.info(f"[Info] Processing column: {column_name}")

        # Use Dask to apply the match_replace function in parallel to each row for this column
        result = dask_df.map_partitions(
            lambda df: df.apply(
                match_replace,
                axis=1,
                args=(array_of_comparison, column_name)
            ),
            meta=pd.DataFrame(
                {
                    'Index': pd.Series(dtype='int'),
                    'Value_Before': pd.Series(dtype='object'),
                    'Value_After': pd.Series(dtype='object'),
                    'Score': pd.Series(dtype='float')
                }
            )
        ).compute()
        output_file_name = f"{column_name}_processed.csv"
        logging.info(f"Saving results to {output_file_name}")
        result.to_csv(output_file_name, index=False)


In [None]:
process_csv_with_json_mapping("/content/final.csv", "/content/option_maps.json")

#Class For Advenced Mapping and Replacing:

In [None]:
import json
# Read the content of the text file
with open('/content/text.txt', 'r') as file:
    content = file.read()
# Split the content into lines
lines = content.split('\n')

# Initialize an empty dictionary to store the transformed data
transformed_data = {}

# Iterate through each line and process it
for line in lines:
    if line:
        key_value = line.split(' : ')
        if len(key_value) == 2:
            transformed_data[key_value[0].strip()] = key_value[1].strip() + ';'

# Save the transformed data as JSON
output_file_path = 'formatted_output.json'
with open(output_file_path, 'w') as output_file:
    json.dump(transformed_data, output_file, indent=4)

print(f"Formatted output saved to '{output_file_path}'")

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import spacy
from fuzzywuzzy import fuzz
import logging
from transformers import BertTokenizer, BertModel
import torch
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable, List, Any, Tuple, Dict
import logging
import pickle
import os
from dask import compute
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable, Any, Tuple
from sklearn.metrics.pairwise import cosine_similarity

In [None]:
class SimilarityCalculator:

    def __init__(self):
            self.tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased')
            self.bert_model = BertModel.from_pretrained('bert-base-multilingual-cased')
            self.nlp = spacy.load('fr_core_news_md')
            self.cache = {}
            logging.basicConfig(level=logging.INFO)

    def jaccard_similarity(self, str1, str2):
        try:
            logging.info("Calculating jaccard similarity...")
            if self._is_cached('jaccard', str1, str2):
                return self._get_cached('jaccard', str1, str2)

            a, b = set(str1.split()), set(str2.split())
            c = a.intersection(b)
            sim = float(len(c)) / (len(a) + len(b) - len(c))

            self._cache_result('jaccard', str1, str2, sim)
            return sim
        except Exception as e:
            logging.error(f"An error occurred: {e}")
            return 0.0

    def fuzzy_similarity(self, str1, str2):
        try:
            logging.info("Calculating fuzzy similarity...")
            if self._is_cached('fuzzy', str1, str2):
                return self._get_cached('fuzzy', str1, str2)

            sim = fuzz.ratio(str1, str2) / 100.0

            self._cache_result('fuzzy', str1, str2, sim)
            return sim
        except Exception as e:
            logging.error(f"An error occurred: {e}")
            return 0.0

    def cosine_similarity(self, str1, str2):
        try:
            logging.info("Calculating cosine similarity...")
            if self._is_cached('cosine', str1, str2):
                return self._get_cached('cosine', str1, str2)

            vectorizer = TfidfVectorizer().fit_transform([str1, str2])
            vectors = vectorizer.toarray()
            sim = cosine_similarity(vectors[0:1], vectors[1:2])[0,0]

            self._cache_result('cosine', str1, str2, sim)
            return sim
        except Exception as e:
            logging.error(f"An error occurred: {e}")
            return 0.0

    def bert(self, str1, str2):
        try:
            logging.info("Calculating bert similarity...")
            if self._is_cached('bert', str1, str2):
                return self._get_cached('bert', str1, str2)

            tokens1 = self.tokenizer(str1, return_tensors='pt')
            tokens2 = self.tokenizer(str2, return_tensors='pt')

            with torch.no_grad():
                outputs1 = self.bert_model(**tokens1)
                outputs2 = self.bert_model(**tokens2)

            embeddings1 = outputs1.last_hidden_state[:, 0, :]
            embeddings2 = outputs2.last_hidden_state[:, 0, :]
            sim = cosine_similarity(embeddings1, embeddings2)[0,0]

            self._cache_result('bert', str1, str2, sim)
            return sim
        except Exception as e:
            logging.error(f"An error occurred: {e}")
            return 0.0

    def _is_cached(self, method, str1, str2):
        return f"{method}_{str1}_{str2}" in self.cache

    def _get_cached(self, method, str1, str2):
        return self.cache[f"{method}_{str1}_{str2}"]

    def _cache_result(self, method, str1, str2, result):
        self.cache[f"{method}_{str1}_{str2}"] = result
    def calculate_normalized_score_from_similarity(self, expanded_val, secondary_vals):
        try:
            print("Calculating normalized score from similarity...")
            expanded_val, secondary_vals = str(expanded_val), str(secondary_vals)
            print("Expanded value:", expanded_val)
            expanded_vec = self.nlp(expanded_val).vector
            print("Expanded vector:", expanded_vec)
            secondary_vecs = np.array([self.nlp(val).vector for val in secondary_vals])
            print("Secondary vectors:", secondary_vecs)
            dot_products = np.dot(secondary_vecs, expanded_vec)
            print("Dot products:", dot_products)
            norm_expanded = np.linalg.norm(expanded_vec)
            print("Norm expanded:", norm_expanded)
            norms_secondary = np.linalg.norm(secondary_vecs, axis=1)
            print("Norms secondary:", norms_secondary)
            similarities = dot_products / (norm_expanded * norms_secondary) * 100
            print("Similarities:", similarities)
            norm_score = np.mean(similarities)
            print("Norm score:", norm_score)
        except Exception as e:
            print(f"An error occurred while calculating normalized score from similarity: {e}")
            return 0
        return norm_score

    def abbreviation_expansion_from_similarity(self, text,cache_level_2,abbreviations):
        try:
            print("Abbreviation expansion from similarity...")
            text = str(text)
            print("Text:", text)
            if text in cache_level_2:
                print("Text found in cache_level_2.")
                return cache_level_2[text]
            print("Text not found in cache_level_2.")
            expanded_text = ' '.join([abbreviations.get(w, w) for w in text.split()])
            print("Expanded text:", expanded_text)
            self.cache_level_2[text] = expanded_text
            print("Cache level 2 updated.")
        except Exception as e:
            print(f"An error occurred while expanding abbreviations from similarity: {e}")
            return text
        return expanded_text

In [None]:
class DataPreparer:
    @staticmethod
    def prepare_data(json_data):
        list_1 = [key for key, value in json_data.items() if isinstance(value, list)]
        list_2 = [key for key, value in json_data.items() if isinstance(value, dict)]
        list_3 = {
            f"{outer_key}.{inner_key}.{nested_key}": nested_key
            for outer_key, inner_dict in json_data.items() if isinstance(inner_dict, dict)
            for inner_key, nested_dict in inner_dict.items() if isinstance(nested_dict, dict)
            for nested_key in nested_dict.keys() if nested_key in list_1
        }
        return list_1, list_2, list_3

In [None]:
class FindMatch:
    def __init__(self,
                 semantic_matching_func: Callable[[Any, Any], float],
                 calculate_normalized_score_func: Callable[[Any, List[Any]], List[float]]):
        self.semantic_matching_func = semantic_matching_func
        self.calculate_normalized_score_func = calculate_normalized_score_func
        self.cache_level_1 = {}
        self.cache_level_3 = {}
        logging.basicConfig(level=logging.INFO)

    def find_primary_match(self, expanded_val: Any, list_1: List[Any]) -> Tuple[Any, float]:
        logging.info("Finding primary match...")
        expanded_val = str(expanded_val)

        cache_file_path = f"./cache/{expanded_val}.pkl"

        # Check if cached file exists
        if os.path.exists(cache_file_path):
            logging.info("Match found in file cache.")
            with open(cache_file_path, 'rb') as f:
                return pickle.load(f)

        try:
            sem_scores = {}
            with ThreadPoolExecutor() as executor:
                future_to_key = {executor.submit(self.semantic_matching_func, expanded_val, key): key for key in list_1}
                for future in as_completed(future_to_key):
                    key = future_to_key[future]
                    try:
                        sem_scores[key] = future.result()
                    except Exception as e:
                        logging.error(f"{key} generated an exception: {e}")

            high_score_keys = {key: score for key, score in sem_scores.items() if score >= 80}
            if high_score_keys:
                key_max_score = max(high_score_keys, key=high_score_keys.get)
                result = (key_max_score, high_score_keys[key_max_score])

                # Save the result to a file
                with open(cache_file_path, 'wb') as f:
                    pickle.dump(result, f)

                return result

        except Exception as e:
            logging.error(f"An error occurred: {e}")

        return None, None

    def find_secondary_match(self, key: Any, mapped_key: Any, json_data: Dict, expanded_val: Any) -> List[float]:
        print("Finding secondary match...")
        try:
            print("Key:", key)
            cache_key = f"{key}_{mapped_key}_{expanded_val}"
            print("Cache key:", cache_key)
            if cache_key in self.cache_level_3:
                print("Cache key found in cache_level_3.")
                return self.cache_level_3[cache_key]

            if mapped_key and key in json_data.get(mapped_key, {}):
                print("Mapped key and key found in json_data.")
                secondary_vals = json_data[mapped_key][key]
                print("Secondary values:", secondary_vals)
                if isinstance(secondary_vals, list):
                    print("Secondary values are a list.")
                    normalized_scores = self.calculate_normalized_score_func(expanded_val, secondary_vals)
                    print("Normalized scores:", normalized_scores)
                    self.cache_level_3[cache_key] = normalized_scores
                    print("Cache level 3 updated.")
                    return normalized_scores

        except Exception as e:
            print(f"An error occurred while finding the secondary match: {e}")

        return []
    def perform_matching(self, val: Any, expanded_val: Any, list_1: List[Any], list_3: Dict[Any, Any], json_data: Dict) -> Tuple[Any, List[float]]:
        try:
            print("Performing matching...")
            # Attempt to find a primary match
            key, sem_score = self.find_primary_match(expanded_val, list_1)
            print("Key:", key)
            if sem_score and sem_score >= 80:
                print("Semantic score is greater than or equal to 80.")
                return [val, key, key, sem_score], []
            print("Semantic score is less than 80.")
            # If primary match is not satisfying, look for a secondary match
            mapped_key = list_3.get(key, None)
            print("Mapped key:", mapped_key)
            if not mapped_key:
                return None, []
            print("Mapped key found.")
            normalized_scores = self.find_secondary_match(key, mapped_key, json_data, expanded_val)
            print("Normalized scores:", normalized_scores)
            if normalized_scores:
                print("Normalized scores found.")
                best_score = max(normalized_scores)
                print("Best score:", best_score)
                return [val, key, key, best_score], normalized_scores

        except Exception as e:
            print(f"An error occurred while performing the matching: {e}")

        return None, []

In [None]:
class TextProcessor:
    def __init__(self):
        self.nlp = spacy.load('fr_core_news_md')
        self.cache_level_1 = {}
        self.cache_level_2 = {}
        self.cache_level_3 = {}
        self.data_preparer = DataPreparer()
        self.ddf = dd.from_pandas(pd.read_csv('/content/final.csv',encoding='utf-8-sig', sep=';'), npartitions=4)
        self.similarity_calculator = SimilarityCalculator()
        self.find_match = FindMatch(self.similarity_calculator.bert, self.similarity_calculator.calculate_normalized_score_from_similarity)
        with open('/content/formatted_output.json', 'r') as f:
            self.abbreviations = json.load(f)
    def average_similarity(self, val, array_of_strings):
      scores = []

      # Weights for each type of score (sum of weights should be 1)
      weights = {'fuzzy': 0.4, 'jaccard': 0.3, 'cosine': 0.3}

      for string_item in array_of_strings:
          fuzzy_score = self.similarity_calculator.fuzzy_similarity(val, string_item)  # assuming this is between 0 and 100
          jaccard_score = self.similarity_calculator.jaccard_similarity(val, string_item)  # assuming this is between 0 and 1
          cosine_score = self.similarity_calculator.cosine_similarity(val, string_item)  # assuming this is between 0 and 1

          # Normalize the scores to be between 0 and 1
          fuzzy_score /= 100.0

          # Calculate weighted average
          avg_score = (weights['fuzzy'] * fuzzy_score) + (weights['jaccard'] * jaccard_score) + (weights['cosine'] * cosine_score)

          scores.append(avg_score)

      return np.mean(scores)
    def process_column(self, json_data):
        logging.info("Starting to process column...")
        result_df = []
        list_1, _, list_3 = self.data_preparer.prepare_data(json_data)
        delayed_results = []

        def inner_loop(val, array_values):
            logging.info(f"Processing value: {val}")
            if val in self.cache_level_1:
                logging.info("Value found in cache_level_1.")
                return self.cache_level_1[val]

            best_match, best_score = self._process_value(val, array_values, list_1, list_3, json_data)
            self.cache_level_1[val] = [best_match, "Best Match", best_score]
            logging.info("Value added to cache_level_1.")
            return val, best_match, "Best Match", best_score

        for column_name in self.ddf.columns:
            if column_name in list_1:
                df_column = self.ddf[column_name].compute()
                array_values = json_data.get(column_name, [])
                delayed_results.extend([delayed(inner_loop)(val, array_values) for val in df_column])

        logging.info("Computing all delayed results...")
        computed_results = compute(*delayed_results)
        result_df = [result for result in computed_results if result is not None]

        result_ddf = dd.from_pandas(pd.DataFrame(result_df, columns=['Value Before', 'Value After', 'Match Type', 'Score']), npartitions=5)
        logging.info("Processing complete.")

        # Clear the cache and collect garbage
        self.cache_level_1.clear()
        gc.collect()

        return result_ddf

    def _process_value(self, val, array_values, list_1, list_3, json_data):
      logging.info(f"Processing individual value: {val}")
      best_match = None
      best_score = 0

      # Calculate average similarity score
      for array_val in array_values:
          logging.info(f"Calculating average score for: {array_val}")
          avg_score = self.average_similarity(val, array_val)

          if avg_score >= 0.75:
              best_match = array_val
              best_score = avg_score
              logging.info("Average score is greater than 75.")
              break

      if best_score < 0.75:
          logging.info("Average score is less than 75. Proceeding to Primary Match...")
          key, bert_score = self.find_primary_match(val, list_1)

          if bert_score and bert_score >=0.80:
              best_match = key
              best_score = bert_score
              logging.info("BERT score is greater than 80.")
          else:
              logging.info("BERT score is less than 80. Proceeding to Secondary Match...")
              mapped_key = list_3.get(key, None)

              if mapped_key and key in json_data.get(mapped_key, {}):
                  secondary_vals = json_data[mapped_key][key]
                  normalized_scores = self.find_secondary_match(key, mapped_key, json_data, secondary_vals)

                  if normalized_scores:
                      final_score = max(normalized_scores)
                      if final_score > best_score:
                          best_match = key
                          best_score = final_score

      return best_match, best_score

    def process_dict_columns(self, json_data):
        result_df = []
        _, list_2, list_3 = self.data_preparer.prepare_data(json_data)
        delayed_results = []
        def inner_loop(deeper_key, related_key, val):
            cache_key = f"{deeper_key}_{related_key}_{val}"
            if cache_key in self.cache_level_1:
                return self.cache_level_1[cache_key]
            # Calculate the average score
            avg_score = self.average_similarity(val, related_key)
            if avg_score > 0.75:
                self.cache_level_1[cache_key] = [val, related_key, deeper_key, avg_score]
                return [val, related_key, deeper_key, avg_score]
            # If score is below 75%, proceed with abbreviation expansion and BERT
            expanded_val = self.similarity_calculator.abbreviation_expansion_from_similarity(val, self.cache_level_2, self.abbreviations)
            sem_score = self.similarity_calculator.bert(expanded_val, related_key)
            self.cache_level_1[cache_key] = [val, related_key, deeper_key, sem_score]
            return [val, related_key, deeper_key, sem_score]
        for deeper_key in list_2:
            related_keys = [key for key, value in list_3.items() if value == deeper_key]
            for related_key in related_keys:
                if related_key in self.ddf.columns:
                    df_column = self.ddf[related_key].compute()
                    delayed_results.extend([delayed(inner_loop)(deeper_key, related_key, val) for val in df_column])
        computed_results = dd.compute(*delayed_results)
        result_df = [result for result in computed_results if result is not None]
        result_ddf = dd.from_pandas(pd.DataFrame(result_df, columns=['Value Before', 'Value After', 'Column_Name', 'Score']), npartitions=5)
        return result_ddf

    def find_primary_match(self, expanded_val: Any, list_1: list) -> Any:
          try:
              key, sem_score = self.find_match.find_primary_match(expanded_val, list_1)
              if key and sem_score:
                  return key, sem_score
          except Exception as e:
              print(f"An error occurred while finding the primary match in TextProcessor: {e}")
          return None, None
    def find_secondary_match(self, key: Any, mapped_key: Any, json_data: dict, expanded_val: Any) -> list:
        try:
            return self.find_match.find_secondary_match(key, mapped_key, json_data, expanded_val)
        except Exception as e:
            print(f"An error occurred while finding the secondary match in TextProcessor: {e}")
        return []
    def perform_matching(self, val: Any, expanded_val: Any, list_1: list, list_3: dict, json_data: dict) -> Any:
        try:
            return self.find_match.perform_matching(val, expanded_val, list_1, list_3, json_data)
        except Exception as e:
            print(f"An error occurred while performing the matching in TextProcessor: {e}")
        return None

In [None]:
!python -m spacy download fr_core_news_md

In [None]:
import json
import dask.dataframe as dd
from transformers import BertTokenizer, BertModel
import torch

In [None]:
text_processor = TextProcessor()

In [None]:
data_preparer = DataPreparer()
similarity_calculator = SimilarityCalculator()
# Pass the necessary functions to FindMatch
find_match = FindMatch(similarity_calculator.bert, similarity_calculator.calculate_normalized_score_from_similarity)

In [None]:
# Specify the path to your JSON file
json_file_path = '/content/json_data.json'

# Read the JSON file and store its data in the json_data variable
with open(json_file_path, 'r') as f:
    json_data = json.load(f)

In [None]:
result_ddf = text_processor.process_column(json_data)

In [None]:
result_ddf.head()