<a href="https://colab.research.google.com/github/zainali78690/IBM-Course/blob/main/Unit_test_improved.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
import pandas as pd
import numpy as np
import unittest
from unittest.mock import MagicMock, patch

In [2]:
def entity_id_search(dataframe, identifier_list, dataframe_type):
    #Load dataframe and pre-process for merge
    if dataframe_type == 'entity':
        imported_data = pd.read_csv("entity_instrument_management/files/kb_org_table.csv")
        imported_data = imported_data.iloc[: , 1:]
        imported_data = imported_data.astype(str).replace('nan',np.nan)
        imported_data = imported_data.fillna('')
    else:
        imported_data = pd.read_csv("entity_instrument_management/files/kb_ins_table.csv")
        imported_data = imported_data.iloc[: , 1:]
        imported_data = imported_data.astype(str).replace('nan',np.nan)
        imported_data = imported_data.fillna('')

    dataframe = dataframe.fillna('')

    #Create empty dataframe to append rows
    merged_df = pd.DataFrame()
    
    #Loop for merge
    for identifier in identifier_list:
        keys = [identifier]
        search_results = imported_data.merge(dataframe[keys], on=keys)
        merged_df = pd.concat([merged_df,search_results])

    #Remove Duplicate ID entries from merge
    drop_duplicates = merged_df.drop_duplicates(keep = 'first')

    return drop_duplicates

In [7]:
class TestEntityIdSearch(unittest.TestCase):
    def setUp(self):
        self.entity_df = pd.DataFrame({'id': ['1', 'bad', '3'], 'name': ['Company A', 'bad', 'Company C']})
        self.ins_df = pd.DataFrame({'id': ['101', '102', 'bad'], 'symbol': ['AAPL', 'GOOG', 'bad']})

    def test_entity_id_search_with_entity_dataframe(self):
        identifier_list = ['id','name']
        dataframe_type = 'entity'

        # Create a mock dataframe for the csv file
        mock_csv_data = {'uuid': ['', 'A123', 'B123', 'C123'], 'id': ['', '1', '2', '3'], 'name': ['', 'Company A', 'Company B', 'Company C']}
        mock_csv_df = pd.DataFrame(mock_csv_data)

        # Replace pd.read_csv with MagicMock that returns the mock dataframe
        pd.read_csv = MagicMock(return_value=mock_csv_df)

        expected_output = pd.DataFrame({'uuid': ['A123', 'C123'],'id': ['1', '3'], 'name': ['Company A', 'Company C']})

        result = entity_id_search(self.entity_df, identifier_list, dataframe_type)

        self.assertTrue(expected_output.equals(result))

    def test_entity_id_search_with_instrument_dataframe(self):
        identifier_list = ['id','name']
        dataframe_type = 'instrument'

        # Create a mock dataframe for the csv file
        mock_csv_data = {'uuid': ['', 'D123', 'E123', 'F123'], 'id': ['','101', '102', '103'], 'symbol': ['','AAPL', 'GOOG', 'MSFT']}
        mock_csv_df = pd.DataFrame(mock_csv_data)

        # Replace pd.read_csv with MagicMock that returns the mock dataframe
        pd.read_csv = MagicMock(return_value=mock_csv_df)

        expected_output = pd.DataFrame({'uuid': ['D123', 'E123'], 'id': ['101', '102'], 'symbol': ['AAPL', 'GOOG']})

        result = entity_id_search(self.ins_df, identifier_list, dataframe_type)

        self.assertTrue(expected_output.equals(result))

In [4]:
def generate_id_new(dataframe, identifier_list, dataframe_type):
    # Create ID for matched data in new file with saved file
    matched = entity_id_search(dataframe, identifier_list, dataframe_type)
    
    if dataframe_type == 'instrument':
        identifier_tuple = tuple(identifier_list)
    else:
        identifier_list = identifier_list + ['bw_entity'] + ['sector']
        identifier_tuple = tuple(identifier_list)

    # if statement to assign KPMGID name
    if dataframe_type == 'entity':
        dataframe_id = 'kpmg_entity_id'
    else:
        dataframe_id = 'kpmg_instrument_id'

    # IF STATEMET DEPENDING ON dataframe_str - (why needed)
    # Make table able for merge (ensure both dfs have empty values not NAN)
    matched_no_id = matched.fillna('')
    matched_no_id = matched_no_id.drop([dataframe_id], axis = 1)

    dataframe = dataframe.fillna('')

    # Create ID for non matched data in new data for 
    df_left = dataframe.merge(matched_no_id, on = identifier_tuple, how = 'left', indicator = True)
    df_outer = df_left[df_left['_merge'] == 'left_only']
    df_outer = df_outer.drop('_merge', axis = 1)
    df_outer[dataframe_id] = df_outer.apply(lambda x: generate_uuid(), axis=1)
    df_outer = df_outer[ [dataframe_id] + [ col for col in df_outer.columns if col != dataframe_id ] ]

    #Combine 2 tables for final output - ask Yang if NAN or '' needed
    combined_df = pd.concat([matched, df_outer], ignore_index=True, sort=False)
    combined_df = combined_df.fillna('')

    #Overwrite csv
    if dataframe_type == 'entity':
        combined_df.to_csv("entity_instrument_management/files/kb_org_table.csv",mode='w+')
    else:
        combined_df.to_csv("entity_instrument_management/files/kb_ins_table.csv", mode = 'w+')

        
    #Rename Column to 'KPMG_uuid'
    if dataframe_type == 'entity':
        combined_df_rename = combined_df.rename(columns = {'kpmg_entity_id':'kpmg_uuid'})
    else:
        combined_df_rename = combined_df.rename(columns = {'kpmg_instrument_id':'kpmg_uuid'})


    return combined_df_rename

In [10]:
class TestGenerateIdNew(unittest.TestCase):
    
    @patch('my_module.entity_id_search')
    def test_generate_id_new(self, mock_entity_id_search):
        # Mock the entity_id_search function to return a dataframe with one row
        mock_entity_id_search.return_value = pd.DataFrame({'kpmg_instrument_id': ['kpmg_id_1'],'col1': ['val1'], 'col2': ['val2']})
        
        # Create input dataframe for generate_id_new function
        input_data = pd.DataFrame({'col1': ['val1', 'val3'], 'col2': ['val2', 'val4']})
        
        # Call the generate_id_new function
        output = generate_id_new(input_data, ['col1','col2'], 'instrument')
        
        # Assert the output has the expected number of rows and columns
        self.assertEqual(output.shape, (3, 3))
        
        # Assert the KPMG_uuid column has been added to the output
        self.assertIn('kpmg_uuid', output.columns)
        
        # Assert the KPMG_uuid column has a unique UUID value for each row
        self.assertEqual(output['kpmg_uuid'].nunique(), output.shape[0])
        
        # Assert the entity_id_search function has been called with the expected arguments
        mock_entity_id_search.assert_called_once_with(input_data, ['col1','col2'], 'instrument')

        
