# Data Discovery Automation

## Imports

In [30]:
# Manipulation
import pandas as pd
import os
import datetime
import re
import time
import bson
import gzip

# Dev
from ddpa_report import htmlGenerator as hg

In [21]:
import os
import pandas as pd
import pyarrow.parquet as pq

def read_bson_file(path):
    with gzip.open(path, 'rb') as f_in:
        data = bson.decode_all(f_in.read())
        
    df = pd.DataFrame(data)
    return df

class FileHandler:
    def __init__(self):
        self.readers = {
            'csv': self.read_csv_files_dataframe,
            'excel': self.read_excel_files_dataframe,
            'parquet': self.read_parquet_files_dataframe,
        }
    
    #def get_files(self, folder_path):
        #return [filename for filename in os.listdir(folder_path)]
    
    # Scans the files insider a folder_path
    def get_files(self, folder_path):
        files = [filename for filename in os.listdir(folder_path)]
        full_paths = [os.path.join(folder_path, file) for file in files]
        return full_paths
    
    # Using pandas function to read parquet files
    def read_parquet_files_dataframe(self, files):
        dataframes = []
        for file in files:
            table = pd.read_parquet(file)
            dataframes.append(table)
        return pd.concat(dataframes, ignore_index=True)
    
    # Using pandas function to read csv files
    def read_csv_files_dataframe(self, files):
        dataframes = []
        for file in files:
            table = pd.read_csv(file)
            dataframes.append(table)
        return pd.concat(dataframes, ignore_index=True)
    
    # Using pandas function to read excel files
    def read_excel_files_dataframe(self, files):
        dataframes = []
        for file in files:
            table = pd.read_excel(file_path)
            dataframes.append(table)
        return pd.concat(dataframes, ignore_index=True)
    
    
    # Reads the files inside the folder and maps it to the appropriate pandas load function
    def read_list_files(self, files, file_type):
        if file_type not in self.readers:
            raise ValueError(f"Unsupported file type '{file_type}'. Supported types are: {', '.join(self.readers.keys())}")
        return self.readers[file_type](files)  # Adjust header parameter as needed

In [36]:
class DataDiscovery:   
    # Formatting Functions
    # Formats the return of pandas' min() and max() functions into string for better readability
    @staticmethod
    def format_float(value):
        return str(value)
    
    # Replaces values under column with empty strings into null/none
    @staticmethod
    def replace_empty_strings_with_null(data_frame):
        return data_frame.replace('', None)
    
    # Export Function
    def create_excel_sheet(self, dataframe_dict, distrib_dict, output_file):
        
        metrics_df_width = len(dataframe_dict['metrics_df'].columns)
        metrics_df_length = (len(dataframe_dict['metrics_df']) + 2) + len(dataframe_dict['dataset_overview'])
        
        # Create an Excel writer object
        excel_writer = pd.ExcelWriter(output_file, engine='xlsxwriter')

        # Write the basic dataset_overview to the Excel sheet
        dataframe_dict['dataset_overview'].to_excel(excel_writer, sheet_name='Sheet1', startrow = 0, startcol = 1, index=False)

        # Write the metrics_df below dataset_overview
        dataframe_dict['metrics_df'].to_excel(excel_writer, sheet_name='Sheet1', startrow=len(dataframe_dict['dataset_overview']) + 2, index=True)
        
        # Write the date_format_df next to the metrics_df
        dataframe_dict['date_format_df'].to_excel(excel_writer, sheet_name='Sheet1', 
                                                  # startrow=len(dataframe_dict['smac_format_df']) + 2,
                                                  startcol = metrics_df_width + 2, 
                                                  index=False)
        
        # Write the data_report_df next to the date_format_df
        dataframe_dict['date_report_df'].to_excel(excel_writer, sheet_name='Sheet1', 
                                                  # startrow = len(dataframe_dict['smac_format_df']) + 2,
                                                  startcol = (metrics_df_width + 2) + len(dataframe_dict['date_format_df']),
                                                  index = False)
        
        # Write the smac_format_df below date_format_df
        dataframe_dict['smac_format_df'].to_excel(excel_writer, sheet_name='Sheet1', 
                                                  startrow=len(dataframe_dict['date_format_df']) + 2,
                                                  startcol= metrics_df_width + 2, 
                                                  index=False)
          
        # Write the smac_report_df next to smart_format_df
        dataframe_dict['smac_report_df'].to_excel(excel_writer, sheet_name='Sheet1', 
                                                  startrow = len(dataframe_dict['date_format_df']) + 2,
                                                  startcol = (metrics_df_width + 2) + len(dataframe_dict['date_format_df']),
                                                  index = False)
        
        distrib_col_width = 0
        counter_width = 0
        
        # Excel Formatting for Distribution Columns
        for key, value in distrib_dict.items():
            value.to_excel(
                excel_writer, sheet_name = 'Sheet1',
                startrow = metrics_df_length + 2,
                startcol = distrib_col_width,
                index = False
            )
            
            distrib_col_width += (len(value.columns) + 1) 
        
         
        # Get the xlsxwriter workbook and worksheet objects
        workbook = excel_writer.book
        worksheet = excel_writer.sheets['Sheet1']

        # Random formatting on excel
        format_bold = workbook.add_format({'bold': True})
        worksheet.set_column('A:A', 20, format_bold)

        # Close the Excel writer
        excel_writer.close()
                
   # Main Function
    def discovery_summary(self, dataframe, smac_column, phone_no_column, date_columns, distrib_columns, output_file):
        discovery_data_frame = self.replace_empty_strings_with_null(dataframe)
        
        # Attribute metrics
        # Get number of rows and columns
        num_rows, num_columns = discovery_data_frame.shape
        
        # Get data types per column
        data_types = discovery_data_frame.dtypes
        
        # Count number of unique values per column
        unique_values = discovery_data_frame.nunique()
        
        # Count nullity of columns
        nullity_summary = discovery_data_frame.isnull().sum()
        
        # Calculates for the percentage of nullity per column
        null_percentage = (nullity_summary / num_rows * 100).apply(lambda x: f'{x:.2f}%')

        # Get min and max values of each column
        min_values = discovery_data_frame.min(numeric_only=True).apply(self.format_float)
        max_values = discovery_data_frame.max(numeric_only=True).apply(self.format_float)

        # Get most frequent value in each column
        most_frequent_values = discovery_data_frame.mode().iloc[0]

        metrics_data = {
            'Data Type': data_types,
            'Unique Values': unique_values,
            'Null/Empty Counts': nullity_summary,
            'Null Percentage': null_percentage,
            'Min Values': min_values,
            'Max Values': max_values,
            'Most Frequent Values': most_frequent_values
        }

        metrics_df = pd.DataFrame(metrics_data)

        # Dataset Overview
        num_duplicates = discovery_data_frame.duplicated().sum()
        
        overview = {
            "Number of Columns": [num_columns],
            "Number of Rows": [num_rows],
            "Duplicates": [num_duplicates]
        }

        # Checking Formats
        fc = FormatChecker()

        # Calls the SMAC validation functions
        if(smac_column != None):
            smac_format_df = fc.get_smac_formats(discovery_data_frame, smac_column)
            smac_report_df = fc.check_smac_format(discovery_data_frame, smac_column)
        else:
            smac_format_df = pd.DataFrame() # Empty df
            smac_report_df = pd.DataFrame()

        # Calls the phone number validation functions
        if(phone_no_column != None):
            phone_no_format_df = fc.get_phone_no_formats(discovery_data_frame, phone_no_column)
        else:
            phone_no_format_df = pd.DataFrame() # Empty df
        
        # Calls the date column validation functions
        date_report_df, date_format_df = fc.check_date_format(discovery_data_frame, date_columns)
    
        dataset_overview_df = pd.DataFrame(overview)
        
        # Create a dictionary of DataFrames
        dataframes_dict = {
            'dataset_overview': dataset_overview_df,
            'metrics_df': metrics_df,
            'smac_report_df': smac_report_df,
            'smac_format_df': smac_format_df,
            'phone_no_format_df': phone_no_format_df,
            'date_report_df': date_report_df,
            'date_format_df': date_format_df
        }
        
        
        dict_of_distrib_df = {}  # Create an empty dictionary

        # Distribution Dataframes
        for cols in distrib_columns:
            df_name = cols
            df_data = self.column_distribution(discovery_data_frame, cols)
            dict_of_distrib_df[df_name] = df_data  # Add the DataFrame to the dictionary with its name as the key

        # Print dataframe names in dictionary (debugging)
        #print(dict_of_distrib_df.keys())

        # self.create_excel_sheet(dataframes_dict, dict_of_distrib_df, output_file)
        hg.generate_report(overview)
        return metrics_df
    
    def column_distribution(self, df, column_name):
        # Using groupby to count the distribution of values under a column
        distrib_df = df.groupby(column_name).size().reset_index(name='count')
        
        # Descending sort
        distrib_df = distrib_df.sort_values(by='count', ascending=False)

        # Getting total_count of values for percentage calculation
        total_entries = distrib_df['count'].sum()

        # Add a new column with the percentage values
        distrib_df['percentage'] = (distrib_df['count'] / total_entries * 100).map('{:.2f}%'.format)

        return distrib_df

In [6]:
class FormatChecker:
    
    def get_smac_formats(self, dataframe, column_name):
        smac_no_list = []
        smac_length_list = []
        
        # Iterates though the smac_no column
        for smac in dataframe[column_name]:
            
            # Turning datetime object to string
            smac_str = str(smac)

            # Skips null values
            if pd.notna(smac):
                
                # Gets the first three characters in a smac_no value
                smac_no_list.append(smac_str[:3])
                
                # Gets the length of a given smac_no
                smac_length_list.append(len(smac_str))

        smac_format_dict = {
            'smac_first_three_idx': smac_no_list,
            'smac_no_length': smac_length_list
        }

        smac_format_df = pd.DataFrame(smac_format_dict)
        
        # Groupby in accordance to length and the first_three_idx found in smac_no
        final_df = smac_format_df.groupby(['smac_first_three_idx', 'smac_no_length']).size().reset_index(name='frequency')

        return final_df
    
    def all_digits(self, phone_no_str):
        # Regex for numbers and a optional '+' sign
        pattern = r'^\+[0-9]+$'
        
        # Use re.match to check if the entire string matches the pattern
        if re.match(pattern, phone_no_str):
            return True
        else:
            return False
    
    def get_phone_no_formats(self, dataframe, column_name):
        phone_idx_list = []
        phone_length_list = []
        is_all_digit = []
        
        # Iterates through the phone_no column
        for phone_number in dataframe[column_name]:
            
            # Casting the phone_no value into a string
            phone_number_str = str(phone_number)
            
            # Skips null values
            if pd.notna(phone_number):
                
                # Gets the first character of a given phone_no
                phone_idx_list.append(phone_number_str[0])
                
                # Gets the length of a given phone_no
                phone_length_list.append(len(phone_number_str))
    
                # Flags true if phone_no is numericals only with occasional '+' sign 
                if(fc.all_digits(phone_number_str)):
                    is_all_digit.append(1)
                else:
                    is_all_digit.append(0)
                    
        phone_format_dict = {
            'phone_first_idx': phone_idx_list,
            'phone_no_length': phone_length_list,
            'is_all_digit': is_all_digit
        }
        
        phone_format_df = pd.DataFrame(phone_format_dict)
        
        # A groupby for all_digit flag, phone_no_length, and the phone_no's first index
        final_phone_df = phone_format_df.groupby(['phone_first_idx', 'phone_no_length', 'is_all_digit']).size().reset_index(name='frequency')
        
        return final_phone_df
            
    
    # SMAC_NO Format Checker
    def check_smac_format(self, dataframe, column_name):
        total_count = dataframe[column_name].notnull().sum()
        correct_count = 0
        correct_values = []
        incorrect_values = []

        # Iterates through the smac_no column
        for number in dataframe[column_name]:
            
            # Casting smac_no value into a string
            number_str = str(number)
            
            # Skips null values
            if pd.notna(number):
                
                # checks if given smac_no is 16 digits and starts wiht 8
                if len(number_str) == 16 and number_str[0] == '8':
                    correct_count += 1
                    correct_values.append(number)
                else:
                    incorrect_values.append(number)

        incorrect_count = total_count - correct_count
        correct_percentage = ((correct_count / total_count) * 100)
        incorrect_percentage = 100 - correct_percentage
        
        
        smac_report_dict = {
            'smac_correct_count': correct_count,
            'correct_percentage': correct_percentage,
            'smac_incorrect_count': incorrect_count,
            'incorrect_percentage': incorrect_percentage
        }

        smac_report_df = pd.DataFrame([smac_report_dict])
        correct_df = pd.DataFrame({column_name: correct_values})
        incorrect_df = pd.DataFrame({column_name: incorrect_values})

        return smac_report_df
    
    # Date Format Checker
    def has_valid_delimiter(self, date_str):
        # Check if the date string contains '-' as delimiters
        return '-' in date_str

    def check_date_format(self, dataframe, column_names):
        date_format = '%Y-%m-%d'

        all_date_reports = []
        all_final_date_formats = []
        
        
        # Iterates through the columns where there are dates
        for column_name in column_names:
            total_count = dataframe[column_name].notnull().sum()
            correct_count = 0
            correct_values = []
            incorrect_values = []
            date_delimiters = []
            column_for_date_str = []
            
            # Iterates through date values under the date column
            for date in dataframe[column_name]:
                
                # Skips null values
                if pd.notna(date):
                    
                    # Turning datetime format into string
                    date_str = str(date)

                    # Scans the date string for any possible delimiter
                    delimiters = [char for char in date_str if not char.isalnum()]

                    # Checks if a given date_string has the appriopriate delimiter
                    if fc.has_valid_delimiter(date_str):
                        
                        # If it has the appropriate delimiter, we check for the y/m/d format
                        try:
                            pd.to_datetime(date_str, format=date_format)
                            correct_count += 1
                            correct_values.append(date)
                            column_for_date_str.append(date)

                            # Gets the first delimiter found in a date string
                            if delimiters:
                                
                                # Append the first delimiter found to date_delimiters
                                date_delimiters.append(delimiters[0])
                            else:
                                
                                # Handle the case where there are no delimiters
                                date_delimiters.append("No delimiter found")

                        # If the date_string has the wrong y/m/d format, we tag as incorrect
                        except ValueError:
                            incorrect_values.append(date)
                            column_for_date_str.append(date)
                            
                            if delimiters:
                                date_delimiters.append(delimiters[0])
                            else:
                                date_delimiters.append("No delimiter found")
                        
                    # If the date_string doesn't have the appropriate delimiter, we tag as incorrect value
                    else:
                        incorrect_values.append(date)
                        column_for_date_str.append(date)
                        
                        # We get the delimiter that appeared on the given date_string
                        if delimiters:
                                date_delimiters.append(delimiters[0])
                        else:
                            date_delimiters.append("No delimiter found")

            incorrect_count = total_count - correct_count
            correct_percentage = ((correct_count / total_count) * 100)
            incorrect_percentage = 100 - correct_percentage

            date_report_dict = {
                'column_name': column_name,
                'correct_date_format_count': correct_count,
                'correct_percentage': correct_percentage,
                'incorrect_date_format_count': incorrect_count,
                'incorrect_percentage': incorrect_percentage
            }

            date_format_dict = {
                'column_name': column_name,
                'date_delimiters': date_delimiters,
            }

            date_format_df = pd.DataFrame(date_format_dict)

            final_date_format_df = date_format_df.groupby(['date_delimiters']).size().reset_index(name='frequency')
            
            # Use transform to create a new 'column_name' column
            final_date_format_df['column_name'] = date_format_df.groupby('date_delimiters')['column_name'].transform('first')

            date_report_df = pd.DataFrame([date_report_dict])

            all_date_reports.append(date_report_df)
            all_final_date_formats.append(final_date_format_df)

        # Combine the results for all columns
        combined_date_reports = pd.concat(all_date_reports, ignore_index=True)
        combined_final_date_formats = pd.concat(all_final_date_formats, ignore_index=True)

        return combined_date_reports, combined_final_date_formats
    # Example usage:
    # date_report_df, final_date_format_df = check_date_format(your_dataframe, ['column1', 'column2'])


In [37]:
dg = DataDiscovery()
fh = FileHandler()
fc = FormatChecker()
from ddpa_report import htmlGenerator as hg

# output_file = 'LOYALTIES_TABLE_METRICS_OUTPUT.xlsx'
loyalties = r"C:\Users\gil.cruzada\Desktop\Aquisition Development\DDPA; HTML Output\loyalties.bson.gz"
loyalties_df = read_bson_file(loyalties)

date_columns = ['expiry', 'dateAdded']
smac_column = 'cardNumber'
phone_no_column = None
distrib_columns = ['seqNo', 'type', 'isExpired']

start = time.time()
dg.discovery_summary(loyalties_df, smac_column, phone_no_column, date_columns, distrib_columns, output_file)
end = time.time()

print(f'Runtime: {end - start}')

Runtime: 7.556441783905029


## Tests