In [1]:
import os
import logging
import json
import json
import psycopg2
import sys
import io
import re
current_directory = os.getcwd()
target_directory = os.path.abspath(os.path.join(current_directory, "..", ".."))
sys.path.append(target_directory)

from Production.Update import GLEIF_Update_Helpers
from Production.Backfill import GLEIF_Backfill_Helpers

class GLEIFUpdateLevel1:
    def __init__(self , bool_log = True , str_db_name = "GLEIF_test_db" , bool_downloaded = True):
        self.obj_update_helpers = GLEIF_Update_Helpers.GLEIF_Update_Helpers(bool_Level_1 = True)
        self.obj_backfill_helpers = GLEIF_Backfill_Helpers.GLEIF_Backill_Helpers(bool_Level_1 = True)
        if bool_log:
            logging_folder = "../logging"  # Adjust the folder path as necessary
    
            if os.path.exists(logging_folder):
                if not os.path.isdir(logging_folder):
                    raise FileExistsError(f"'{logging_folder}' exists but is not a directory. Please remove or rename the file.")
            else:
                os.makedirs(logging_folder)
    
            logging.basicConfig(filename=f"{logging_folder}/GLEIF_Update_level_2.log", level=logging.DEBUG, format='%(levelname)s: %(message)s', filemode="w")

        if not bool_downloaded:
            if not os.path.exists("../file_lib"):
                os.makedirs("../file_lib")
                
            self.obj_update_helpers.download_on_machine()
            self.str_json_file_path = self.obj_update_helpers.unpacking_GLEIF_zip_files()
    
        self.str_json_file_path = '../file_lib/Level_1_update_unpacked\\20241130-0000-gleif-goldencopy-lei2-intra-day.json'
        self.conn = psycopg2.connect(dbname = str_db_name, user="Matthew_Pisinski", password="matt1", host="localhost", port="5432")    
        #self.conn.autocommit = True
        self.cursor = self.conn.cursor()
    
    def bulk_upsert_using_copy(self, table_name, columns, data):
        """
        Perform a bulk upsert using PostgreSQL COPY with a temporary table.
        """
        temp_table = f"{table_name}_temp"

        try:
            # Step 1: Create a temporary table
            create_temp_table_query = f"""
                CREATE TEMP TABLE {temp_table} (LIKE {table_name} INCLUDING ALL)
                ON COMMIT DROP;
            """
            self.cursor.execute(create_temp_table_query)

            # Step 2: Copy data into the temporary table
            buffer = io.StringIO()
            for row in data:
                buffer.write('\t'.join([str(item) if item is not None else '\\N' for item in row]) + "\n")
            buffer.seek(0)

            copy_query = f"""
                COPY {temp_table} ({', '.join(columns)})
                FROM STDIN WITH (FORMAT text, DELIMITER '\t', NULL '\\N')
            """
            self.cursor.copy_expert(copy_query, buffer)

            # Step 3: Perform upsert from temporary table to target table
            update_columns = [col for col in columns if col != "lei"]
            set_clause = ", ".join([f"{col}=EXCLUDED.{col}" for col in update_columns])

            upsert_query = f"""
                INSERT INTO {table_name} ({', '.join(columns)})
                SELECT {', '.join(columns)} FROM {temp_table}
                ON CONFLICT (lei) DO UPDATE SET
                    {set_clause};
            """
            self.cursor.execute(upsert_query)

        except Exception as e:
            self.conn.rollback()
            logging.error(f"Error during bulk upsert for table {table_name}: {e}")
            raise
        
        
    def process_entity_data(self , list_dict_records):
        list_entity_meta_data_tuples = []
        
        for dict_record in list_dict_records:
            dict_flat = self.obj_backfill_helpers.flatten_dict(dict_record)
            dict_clean = self.obj_backfill_helpers.clean_keys(input_dict = dict_flat)
            list_output = self.obj_backfill_helpers.get_target_values(dict_data = dict_clean , subset_string = "Entity" , target_keys = ["LegalName", "LegalJurisdiction", "EntityCategory", "EntitySubCategory", "LegalForm_EntityLegalFormCode", "LegalForm_OtherLegalForm", "EntityStatus", "EntityCreationDate", "RegistrationAuthority_RegistrationAuthorityID", "RegistrationAuthority_RegistrationAuthorityEntityID"])
            list_output.insert(0 , dict_clean["LEI"])
            list_entity_meta_data_tuples.append(tuple(list_output))
        
        self.bulk_upsert_using_copy(data = list_entity_meta_data_tuples , table_name = "GLEIF_entity_data" , 
                            columns = 
                            ["lei",
                                "LegalName",
                                "LegalJurisdiction",
                                "EntityCategory",
                                "EntitySubCategory",
                                "LegalForm_EntityLegalFormCode",
                                "LegalForm_OtherLegalForm",
                                "EntityStatus",
                                "EntityCreationDate",
                                "RegistrationAuthority_RegistrationAuthorityID",
                                "RegistrationAuthority_RegistrationAuthorityEntityID"])
        
        """self.bulk_insert_using_copy(data = list_entity_meta_data_tuples , table_name = "GLEIF_entity_data" , 
                            columns = 
                            ["lei",
                                "LegalName",
                                "LegalJurisdiction",
                                "EntityCategory",
                                "EntitySubCategory",
                                "LegalForm_EntityLegalFormCode",
                                "LegalForm_OtherLegalForm",
                                "EntityStatus",
                                "EntityCreationDate",
                                "RegistrationAuthority_RegistrationAuthorityID",
                                "RegistrationAuthority_RegistrationAuthorityEntityID"])"""
    
    def process_other_legal_names(self , list_dict_records):
        list_other_names_tuples = []
        
        for dict_record in list_dict_records:
            dict_flat = self.obj_backfill_helpers.flatten_dict(dict_record)
            dict_clean = self.obj_backfill_helpers.clean_keys(input_dict = dict_flat)
            dict_entity = (self.obj_backfill_helpers.organize_by_prefix(dict_clean))["Entity"]
            list_output = self.obj_backfill_helpers.extract_other_entity_names(data_dict = dict_entity, base_keyword="OtherEntityNames", exclude_keywords=["TranslatedOtherEntityNames"]) 
            for index, tup in enumerate(list_output):
                list_output[index] = (dict_clean["LEI"],) + tup         
            list_other_names_tuples.extend(list_output)

        #self.bulk_insert_using_copy(data = list_other_names_tuples , table_name = "GLEIF_other_legal_names" , columns = ["lei", "OtherEntityNames", "Type"])        
        self.bulk_upsert_using_copy(data = list_other_names_tuples , table_name = "GLEIF_other_legal_names" , columns = ["lei", "OtherEntityNames", "Type"])        

        
    def process_legal_address(self , list_dict_records):
        list_legal_address_tuples = []
        
        for dict_record in list_dict_records:
            dict_flat = self.obj_backfill_helpers.flatten_dict(dict_record)
            dict_clean = self.obj_backfill_helpers.clean_keys(input_dict = dict_flat)
            list_output = self.obj_backfill_helpers.get_target_values(dict_data = dict_clean , target_keys = ["Entity_LegalAddress_FirstAddressLine" , "Entity_LegalAddress_AdditionalAddressLine_1" , "Entity_LegalAddress_AdditionalAddressLine_2" , "Entity_LegalAddress_AdditionalAddressLine_3" , "Entity_LegalAddress_City" , "Entity_LegalAddress_Region" , "Entity_LegalAddress_Country" , "Entity_LegalAddress_PostalCode"])
            list_output.insert(0 , dict_clean["LEI"])
            list_legal_address_tuples.append(tuple(list_output))

        self.bulk_upsert_using_copy(data = list_legal_address_tuples , table_name = "GLEIF_LegalAddress" , 
                                columns = ["lei",
                                            "LegalAddress_FirstAddressLine",
                                            "LegalAddress_AdditionalAddressLine_1",
                                            "LegalAddress_AdditionalAddressLine_2",
                                            "LegalAddress_AdditionalAddressLine_3",
                                            "LegalAddress_City",
                                            "LegalAddress_Region",
                                            "LegalAddress_Country",
                                            "LegalAddress_PostalCode"]) 
        
        """self.bulk_insert_using_copy(data = list_legal_address_tuples , table_name = "GLEIF_LegalAddress" , 
                                columns = ["lei",
                                            "LegalAddress_FirstAddressLine",
                                            "LegalAddress_AdditionalAddressLine_1",
                                            "LegalAddress_AdditionalAddressLine_2",
                                            "LegalAddress_AdditionalAddressLine_3",
                                            "LegalAddress_City",
                                            "LegalAddress_Region",
                                            "LegalAddress_Country",
                                            "LegalAddress_PostalCode"])  """
    
    def process_headquarters_address(self , list_dict_records):
        list_headquarters_address_tuples = []
        
        for dict_record in list_dict_records:
            dict_flat = self.obj_backfill_helpers.flatten_dict(dict_record)
            dict_clean = self.obj_backfill_helpers.clean_keys(input_dict = dict_flat)
            dict_entity = (self.obj_backfill_helpers.organize_by_prefix(dict_clean))["Entity"]
            list_output = self.obj_backfill_helpers.get_target_values(dict_data = dict_entity , target_keys = ["HeadquartersAddress_FirstAddressLine" , "HeadquartersAddress_AdditionalAddressLine_1" , "HeadquartersAddress_AdditionalAddressLine_2" , "HeadquartersAddress_AdditionalAddressLine_3" , "HeadquartersAddress_City" , "HeadquartersAddress_Region" , "HeadquartersAddress_Country" , "HeadquartersAddress_PostalCode"])
            list_output.insert(0 , dict_clean["LEI"])
            list_headquarters_address_tuples.append(tuple(list_output))

        self.bulk_upsert_using_copy(data = list_headquarters_address_tuples , table_name = "GLEIF_HeadquartersAddress" , 
                                columns = ["lei",
                                            "HeadquartersAddress_FirstAddressLine",
                                            "HeadquartersAddress_AdditionalAddressLine_1",
                                            "HeadquartersAddress_AdditionalAddressLine_2",
                                            "HeadquartersAddress_AdditionalAddressLine_3",
                                            "HeadquartersAddress_City",
                                            "HeadquartersAddress_Region",
                                            "HeadquartersAddress_Country",
                                            "HeadquartersAddress_PostalCode"]) 
        
        """self.bulk_insert_using_copy(data = list_headquarters_address_tuples , table_name = "GLEIF_HeadquartersAddress" , 
                                columns = ["lei",
                                            "HeadquartersAddress_FirstAddressLine",
                                            "HeadquartersAddress_AdditionalAddressLine_1",
                                            "HeadquartersAddress_AdditionalAddressLine_2",
                                            "HeadquartersAddress_AdditionalAddressLine_3",
                                            "HeadquartersAddress_City",
                                            "HeadquartersAddress_Region",
                                            "HeadquartersAddress_Country",
                                            "HeadquartersAddress_PostalCode"]) """
    
    def process_legal_entity_events(self , list_dict_records):
        list_legal_entity_events_tuples = []
        
        for dict_record in list_dict_records:
            dict_flat = self.obj_backfill_helpers.flatten_dict(dict_record)
            dict_clean = self.obj_backfill_helpers.clean_keys(input_dict = dict_flat)
            dict_entity = (self.obj_backfill_helpers.organize_by_prefix(dict_clean))["Entity"]
            list_output = self.obj_backfill_helpers.extract_event_data(dict_data = dict_entity , base_keyword="LegalEntityEvents" , target_keys=["group_type", "event_status", "LegalEntityEventType", "LegalEntityEventEffectiveDate", "LegalEntityEventRecordedDate", "ValidationDocuments"])
            for index, tup in enumerate(list_output):
                list_output[index] = (dict_clean["LEI"],) + tup 
            list_legal_entity_events_tuples.extend(list_output)

        self.bulk_upsert_using_copy(data = list_legal_entity_events_tuples , table_name = "GLEIF_LegalEntityEvents" , 
                                columns = ["lei",
                                        "group_type",
                                        "event_status",
                                        "LegalEntityEventType",
                                        "LegalEntityEventEffectiveDate",
                                        "LegalEntityEventRecordedDate",
                                        "ValidationDocuments"])
        
        """self.bulk_insert_using_copy(data = list_legal_entity_events_tuples , table_name = "GLEIF_LegalEntityEvents" , 
                                columns = ["lei",
                                        "group_type",
                                        "event_status",
                                        "LegalEntityEventType",
                                        "LegalEntityEventEffectiveDate",
                                        "LegalEntityEventRecordedDate",
                                        "ValidationDocuments"])"""
        
    def process_registration_data(self , list_dict_records):
        list_registration_tuples = []
        
        for dict_record in list_dict_records:
            dict_flat = self.obj_backfill_helpers.flatten_dict(dict_record)
            dict_clean = self.obj_backfill_helpers.clean_keys(input_dict = dict_flat)
            dict_registration = (self.obj_backfill_helpers.organize_by_prefix(dict_clean))["Registration"]
            list_output = self.obj_backfill_helpers.get_target_values(dict_data = dict_registration , target_keys = ["InitialRegistrationDate" , "LastUpdateDate" , "RegistrationStatus" , "NextRenewalDate" , "ManagingLOU" , "ValidationSources" , "ValidationAuthority"])    
            list_output.insert(0 , dict_clean["LEI"])
            list_registration_tuples.append(tuple(list_output))

        self.bulk_upsert_using_copy(data = list_registration_tuples , table_name = "GLEIF_registration_data" , 
                                columns = [
                                            "lei",
                                            "InitialRegistrationDate",
                                            "LastUpdateDate",
                                            "RegistrationStatus",
                                            "NextRenewalDate",
                                            "ManagingLOU",
                                            "ValidationSources",
                                            "ValidationAuthority"]) 
        
        """self.bulk_insert_using_copy(data = list_registration_tuples , table_name = "GLEIF_registration_data" , 
                                columns = [
                                            "lei",
                                            "InitialRegistrationDate",
                                            "LastUpdateDate",
                                            "RegistrationStatus",
                                            "NextRenewalDate",
                                            "ManagingLOU",
                                            "ValidationSources",
                                            "ValidationAuthority"]) """
    
    def process_geoencoding_data(self , list_dict_records):
        list_extension_data_tuples = []
        
        for dict_record in list_dict_records:
            dict_flat = self.obj_backfill_helpers.flatten_dict(dict_record)
            dict_clean = self.obj_backfill_helpers.clean_keys(input_dict = dict_flat)
            dict_extension = (self.obj_backfill_helpers.organize_by_prefix(dict_clean))["Extension"]
            dict_mega_flat = self.obj_backfill_helpers.further_flatten_geocoding(dict_data = dict_extension)
            if any(re.search(r"_\d+_", key) for key in dict_mega_flat.keys()):
                list_dicts = self.obj_backfill_helpers.split_into_list_of_dictionaries(dict_data = dict_mega_flat)
                for dict_extension in list_dicts:
                    list_output = self.obj_backfill_helpers.get_target_values(dict_data = dict_extension , subset_string = True, target_keys = ["relevance" , "match_type" , "lat" , "lng" , "geocoding_date" , "TopLeft.Latitude" , "TopLeft.Longitude" , "BottomRight.Latitude" , "BottomRight.Longitude" , "match_level" , "mapped_street" , "mapped_housenumber" , "mapped_postalcode" , "mapped_city" , "mapped_district" , "mapped_state" , "mapped_country"])
                    list_output.insert(0 , dict_clean["LEI"])
                    list_extension_data_tuples.append(tuple(list_output))
            else:
                list_output = self.obj_backfill_helpers.get_target_values(dict_data = dict_mega_flat , subset_string = True, target_keys = ["relevance" , "match_type" , "lat" , "lng" , "geocoding_date" , "TopLeft.Latitude" , "TopLeft.Longitude" , "BottomRight.Latitude" , "BottomRight.Longitude" , "match_level" , "mapped_street" , "mapped_housenumber" , "mapped_postalcode" , "mapped_city" , "mapped_district" , "mapped_state" , "mapped_country"])
                list_output.insert(0 , dict_clean["LEI"])
                list_extension_data_tuples.append(tuple(list_output))

        self.bulk_upsert_using_copy(data = list_extension_data_tuples , table_name = "GLEIF_geocoding" , 
                                columns = ["lei",
                                            "relevance",
                                            "match_type",
                                            "lat",
                                            "lng",
                                            "geocoding_date",
                                            "TopLeft_Latitude",
                                            "TopLeft_Longitude",
                                            "BottomRight_Latitude",
                                            "BottomRight_longitude",
                                            "match_level",
                                            "mapped_street",
                                            "mapped_housenumber",
                                            "mapped_postalcode",
                                            "mapped_city",
                                            "mapped_district",
                                            "mapped_state",
                                            "mapped_country"]) 
        
        """self.bulk_insert_using_copy(data = list_extension_data_tuples , table_name = "GLEIF_geocoding" , 
                                columns = ["lei",
                                            "relevance",
                                            "match_type",
                                            "lat",
                                            "lng",
                                            "geocoding_date",
                                            "TopLeft_Latitude",
                                            "TopLeft_Longitude",
                                            "BottomRight_Latitude",
                                            "BottomRight_longitude",
                                            "match_level",
                                            "mapped_street",
                                            "mapped_housenumber",
                                            "mapped_postalcode",
                                            "mapped_city",
                                            "mapped_district",
                                            "mapped_state",
                                            "mapped_country"]) """
    
    def process_all_data(self , list_dict_records):
        self.process_entity_data(list_dict_records = list_dict_records)
        self.process_other_legal_names(list_dict_records = list_dict_records)
        self.process_legal_address(list_dict_records = list_dict_records)
        self.process_headquarters_address(list_dict_records = list_dict_records)
        self.process_legal_entity_events(list_dict_records = list_dict_records)
        self.process_registration_data(list_dict_records = list_dict_records)
        self.process_geoencoding_data(list_dict_records = list_dict_records)
    
    def storing_GLEIF_data_in_database(self):
        
        with open(self.str_json_file_path, 'r', encoding='utf-8') as file:
            dict_relationships = json.load(file)            
            self.process_all_data(list_dict_records = dict_relationships["records"])               
        self.conn.commit()
        
        self.conn.close()
    

In [2]:
obj = GLEIFUpdateLevel1(bool_log = True)
obj.storing_GLEIF_data_in_database()


InvalidColumnReference: there is no unique or exclusion constraint matching the ON CONFLICT specification


Help from chatgpt

In [1]:
import os
import logging
import json
import json
import psycopg2
import sys
import io
import re
from psycopg2 import sql
current_directory = os.getcwd()
target_directory = os.path.abspath(os.path.join(current_directory, "..", ".."))
sys.path.append(target_directory)

from Production.Update import GLEIF_Update_Helpers
from Production.Backfill import GLEIF_Backfill_Helpers

class GLEIFUpdateLevel1:
    def __init__(self , bool_log = True , str_db_name = "GLEIF_test_db" , bool_downloaded = True):
        self.obj_update_helpers = GLEIF_Update_Helpers.GLEIF_Update_Helpers(bool_Level_1 = True)
        self.obj_backfill_helpers = GLEIF_Backfill_Helpers.GLEIF_Backill_Helpers(bool_Level_1 = True)
        if bool_log:
            logging_folder = "../logging"  # Adjust the folder path as necessary
    
            if os.path.exists(logging_folder):
                if not os.path.isdir(logging_folder):
                    raise FileExistsError(f"'{logging_folder}' exists but is not a directory. Please remove or rename the file.")
            else:
                os.makedirs(logging_folder)
    
            logging.basicConfig(filename=f"{logging_folder}/GLEIF_Update_level_2.log", level=logging.DEBUG, format='%(levelname)s: %(message)s', filemode="w")

        if not bool_downloaded:
            if not os.path.exists("../file_lib"):
                os.makedirs("../file_lib")
                
            self.obj_update_helpers.download_on_machine()
            self.str_json_file_path = self.obj_update_helpers.unpacking_GLEIF_zip_files()
    
        self.str_json_file_path = '../file_lib/Level_1_update_unpacked\\20241130-0000-gleif-goldencopy-lei2-intra-day.json'
        self.conn = psycopg2.connect(dbname = str_db_name, user="Matthew_Pisinski", password="matt1", host="localhost", port="5432")    
        #self.conn.autocommit = True
        self.cursor = self.conn.cursor()

    def bulk_upsert_using_copy(self, table_name, columns, data):
        """
        Perform a bulk upsert using PostgreSQL COPY with a temporary table.
        
        Args:
            table_name (str): Name of the target table.
            columns (list): List of column names.
            data (list): List of tuples containing the data to upsert.
        """
        temp_table = f"{table_name}_temp"

        try:
            # Step 1: Create a temporary table
            create_temp_table_query = f"""
                CREATE TEMP TABLE {temp_table} (LIKE {table_name} INCLUDING ALL)
                ON COMMIT DROP;
            """
            self.cursor.execute(create_temp_table_query)
            logging.info(f"Temporary table {temp_table} created.")

            # Step 2: Copy data into the temporary table
            buffer = io.StringIO()
            for row in data:
                # Escape backslashes, tabs, and newlines; replace None with \N
                row_converted = []
                for item in row:
                    if item is None:
                        row_converted.append('\\N')
                    elif isinstance(item, str):
                        item = item.replace('\\', '\\\\').replace('\t', '\\t').replace('\n', '\\n')
                        row_converted.append(item)
                    else:
                        row_converted.append(str(item))
                buffer.write('\t'.join(row_converted) + '\n')
            buffer.seek(0)

            copy_query = f"""
                COPY {temp_table} ({', '.join(columns)})
                FROM STDIN WITH (FORMAT text, DELIMITER E'\\t', NULL '\\N')
            """
            self.cursor.copy_expert(copy_query, buffer)
            logging.info(f"Data copied to temporary table {temp_table}.")

            # Step 3: Perform upsert from temporary table to target table
            update_columns = [col for col in columns if col != "lei"]
            set_clause = ", ".join([f"{col}=EXCLUDED.{col}" for col in update_columns])

            upsert_query = f"""
                INSERT INTO {table_name} ({', '.join(columns)})
                SELECT {', '.join(columns)} FROM {temp_table}
                ON CONFLICT (lei) DO UPDATE SET
                    {set_clause};
            """
            self.cursor.execute(upsert_query)
            logging.info(f"Upsert operation completed for table {table_name}.")

        except Exception as e:
            self.conn.rollback()
            logging.error(f"Error during bulk upsert for table {table_name}: {e}")
            raise

    def bulk_insert_using_copy(self, table_name, columns, data):
        """
        Perform a bulk insert using PostgreSQL COPY.
        
        Args:
            table_name (str): Name of the target table.
            columns (list): List of column names.
            data (list): List of tuples containing the data to insert.
        """
        try:
            buffer = io.StringIO()
            for row in data:
                # Escape backslashes, tabs, and newlines; replace None with \N
                row_converted = []
                for item in row:
                    if item is None:
                        row_converted.append('\\N')
                    elif isinstance(item, str):
                        item = item.replace('\\', '\\\\').replace('\t', '\\t').replace('\n', '\\n')
                        row_converted.append(item)
                    else:
                        row_converted.append(str(item))
                buffer.write('\t'.join(row_converted) + '\n')
            buffer.seek(0)

            copy_query = f"""
                COPY {table_name} ({', '.join(columns)})
                FROM STDIN WITH (FORMAT text, DELIMITER E'\\t', NULL '\\N')
            """
            self.cursor.copy_expert(copy_query, buffer)
            logging.info(f"Bulk insert completed for table {table_name}.")
        
        except Exception as e:
            self.conn.rollback()
            logging.error(f"Error during bulk insert for table {table_name}: {e}")
            raise

    def process_entity_data(self, list_dict_records):
        list_entity_meta_data_tuples = []
        
        for dict_record in list_dict_records:
            dict_flat = self.obj_backfill_helpers.flatten_dict(dict_record)
            dict_clean = self.obj_backfill_helpers.clean_keys(input_dict=dict_flat)
            list_output = self.obj_backfill_helpers.get_target_values(
                dict_data=dict_clean,
                subset_string="Entity",
                target_keys=[
                    "LegalName",
                    "LegalJurisdiction",
                    "EntityCategory",
                    "EntitySubCategory",
                    "LegalForm_EntityLegalFormCode",
                    "LegalForm_OtherLegalForm",
                    "EntityStatus",
                    "EntityCreationDate",
                    "RegistrationAuthority_RegistrationAuthorityID",
                    "RegistrationAuthority_RegistrationAuthorityEntityID"
                ]
            )
            list_output.insert(0, dict_clean.get("LEI"))
            list_entity_meta_data_tuples.append(tuple(list_output))
        
        self.bulk_upsert_using_copy(
            data=list_entity_meta_data_tuples,
            table_name="GLEIF_entity_data",
            columns=[
                "lei",
                "LegalName",
                "LegalJurisdiction",
                "EntityCategory",
                "EntitySubCategory",
                "LegalForm_EntityLegalFormCode",
                "LegalForm_OtherLegalForm",
                "EntityStatus",
                "EntityCreationDate",
                "RegistrationAuthority_RegistrationAuthorityID",
                "RegistrationAuthority_RegistrationAuthorityEntityID"
            ]
        )

    def process_other_legal_names(self, list_dict_records):
        list_other_names_tuples = []
        
        for dict_record in list_dict_records:
            dict_flat = self.obj_backfill_helpers.flatten_dict(dict_record)
            dict_clean = self.obj_backfill_helpers.clean_keys(input_dict=dict_flat)
            dict_entity = self.obj_backfill_helpers.organize_by_prefix(dict_clean).get("Entity", {})
            list_output = self.obj_backfill_helpers.extract_other_entity_names(
                data_dict=dict_entity,
                base_keyword="OtherEntityNames",
                exclude_keywords=["TranslatedOtherEntityNames"]
            )
            for tup in list_output:
                list_other_names_tuples.append((dict_clean.get("LEI"),) + tup)
        
        self.bulk_insert_using_copy(
            data=list_other_names_tuples,
            table_name="GLEIF_other_legal_names",
            columns=["lei", "OtherEntityNames", "Type"]
        )

    def process_legal_address(self, list_dict_records):
        list_legal_address_tuples = []
        
        for dict_record in list_dict_records:
            dict_flat = self.obj_backfill_helpers.flatten_dict(dict_record)
            dict_clean = self.obj_backfill_helpers.clean_keys(input_dict=dict_flat)
            list_output = self.obj_backfill_helpers.get_target_values(
                dict_data=dict_clean,
                target_keys=[
                    "Entity_LegalAddress_FirstAddressLine",
                    "Entity_LegalAddress_AdditionalAddressLine_1",
                    "Entity_LegalAddress_AdditionalAddressLine_2",
                    "Entity_LegalAddress_AdditionalAddressLine_3",
                    "Entity_LegalAddress_City",
                    "Entity_LegalAddress_Region",
                    "Entity_LegalAddress_Country",
                    "Entity_LegalAddress_PostalCode"
                ]
            )
            list_output.insert(0, dict_clean.get("LEI"))
            list_legal_address_tuples.append(tuple(list_output))
        
        self.bulk_upsert_using_copy(
            data=list_legal_address_tuples,
            table_name="GLEIF_LegalAddress",
            columns=[
                "lei",
                "LegalAddress_FirstAddressLine",
                "LegalAddress_AdditionalAddressLine_1",
                "LegalAddress_AdditionalAddressLine_2",
                "LegalAddress_AdditionalAddressLine_3",
                "LegalAddress_City",
                "LegalAddress_Region",
                "LegalAddress_Country",
                "LegalAddress_PostalCode"
            ]
        )

    def process_headquarters_address(self, list_dict_records):
        list_headquarters_address_tuples = []
        
        for dict_record in list_dict_records:
            dict_flat = self.obj_backfill_helpers.flatten_dict(dict_record)
            dict_clean = self.obj_backfill_helpers.clean_keys(input_dict=dict_flat)
            dict_entity = self.obj_backfill_helpers.organize_by_prefix(dict_clean).get("Entity", {})
            list_output = self.obj_backfill_helpers.get_target_values(
                dict_data=dict_entity,
                target_keys=[
                    "HeadquartersAddress_FirstAddressLine",
                    "HeadquartersAddress_AdditionalAddressLine_1",
                    "HeadquartersAddress_AdditionalAddressLine_2",
                    "HeadquartersAddress_AdditionalAddressLine_3",
                    "HeadquartersAddress_City",
                    "HeadquartersAddress_Region",
                    "HeadquartersAddress_Country",
                    "HeadquartersAddress_PostalCode"
                ]
            )
            list_output.insert(0, dict_clean.get("LEI"))
            list_headquarters_address_tuples.append(tuple(list_output))
        
        self.bulk_upsert_using_copy(
            data=list_headquarters_address_tuples,
            table_name="GLEIF_HeadquartersAddress",
            columns=[
                "lei",
                "HeadquartersAddress_FirstAddressLine",
                "HeadquartersAddress_AdditionalAddressLine_1",
                "HeadquartersAddress_AdditionalAddressLine_2",
                "HeadquartersAddress_AdditionalAddressLine_3",
                "HeadquartersAddress_City",
                "HeadquartersAddress_Region",
                "HeadquartersAddress_Country",
                "HeadquartersAddress_PostalCode"
            ]
        )

    def process_legal_entity_events(self, list_dict_records):
        list_legal_entity_events_tuples = []
        
        for dict_record in list_dict_records:
            dict_flat = self.obj_backfill_helpers.flatten_dict(dict_record)
            dict_clean = self.obj_backfill_helpers.clean_keys(input_dict=dict_flat)
            dict_entity = self.obj_backfill_helpers.organize_by_prefix(dict_clean).get("Entity", {})
            list_output = self.obj_backfill_helpers.extract_event_data(
                dict_data=dict_entity,
                base_keyword="LegalEntityEvents",
                target_keys=[
                    "group_type",
                    "event_status",
                    "LegalEntityEventType",
                    "LegalEntityEventEffectiveDate",
                    "LegalEntityEventRecordedDate",
                    "ValidationDocuments"
                ]
            )
            for tup in list_output:
                list_legal_entity_events_tuples.append((dict_clean.get("LEI"),) + tup)
        
        self.bulk_insert_using_copy(
            data=list_legal_entity_events_tuples,
            table_name="GLEIF_LegalEntityEvents",
            columns=[
                "lei",
                "group_type",
                "event_status",
                "LegalEntityEventType",
                "LegalEntityEventEffectiveDate",
                "LegalEntityEventRecordedDate",
                "ValidationDocuments"
            ]
        )

    def process_registration_data(self, list_dict_records):
        list_registration_tuples = []
        
        for dict_record in list_dict_records:
            dict_flat = self.obj_backfill_helpers.flatten_dict(dict_record)
            dict_clean = self.obj_backfill_helpers.clean_keys(input_dict=dict_flat)
            dict_registration = self.obj_backfill_helpers.organize_by_prefix(dict_clean).get("Registration", {})
            list_output = self.obj_backfill_helpers.get_target_values(
                dict_data=dict_registration,
                target_keys=[
                    "InitialRegistrationDate",
                    "LastUpdateDate",
                    "RegistrationStatus",
                    "NextRenewalDate",
                    "ManagingLOU",
                    "ValidationSources",
                    "ValidationAuthority_ValidationAuthorityID",
                    "ValidationAuthority_ValidationAuthorityEntityID"
                ]
            )
            list_output.insert(0, dict_clean.get("LEI"))
            list_registration_tuples.append(tuple(list_output))
        
        self.bulk_upsert_using_copy(
            data=list_registration_tuples,
            table_name="GLEIF_registration_data",
            columns=[
                "lei",
                "InitialRegistrationDate",
                "LastUpdateDate",
                "RegistrationStatus",
                "NextRenewalDate",
                "ManagingLOU",
                "ValidationSources",
                "ValidationAuthorityID",
                "ValidationAuthorityEntityID"
            ]
        )

    def process_geoencoding_data(self, list_dict_records):
        list_extension_data_tuples = []
        
        for dict_record in list_dict_records:
            dict_flat = self.obj_backfill_helpers.flatten_dict(dict_record)
            dict_clean = self.obj_backfill_helpers.clean_keys(input_dict=dict_flat)
            dict_extension = self.obj_backfill_helpers.organize_by_prefix(dict_clean).get("Extension", {})
            dict_mega_flat = self.obj_backfill_helpers.further_flatten_geocoding(dict_data=dict_extension)
            if any(re.search(r"_\d+_", key) for key in dict_mega_flat.keys()):
                list_dicts = self.obj_backfill_helpers.split_into_list_of_dictionaries(dict_data=dict_mega_flat)
                for dict_ext in list_dicts:
                    list_output = self.obj_backfill_helpers.get_target_values(
                        dict_data=dict_ext,
                        subset_string=True,
                        target_keys=[
                            "relevance",
                            "match_type",
                            "lat",
                            "lng",
                            "geocoding_date",
                            "TopLeft.Latitude",
                            "TopLeft.Longitude",
                            "BottomRight.Latitude",
                            "BottomRight.Longitude",
                            "match_level",
                            "mapped_street",
                            "mapped_housenumber",
                            "mapped_postalcode",
                            "mapped_city",
                            "mapped_district",
                            "mapped_state",
                            "mapped_country"
                        ]
                    )
                    list_output.insert(0, dict_clean.get("LEI"))
                    list_extension_data_tuples.append(tuple(list_output))
            else:
                list_output = self.obj_backfill_helpers.get_target_values(
                    dict_data=dict_mega_flat,
                    subset_string=True,
                    target_keys=[
                        "relevance",
                        "match_type",
                        "lat",
                        "lng",
                        "geocoding_date",
                        "TopLeft.Latitude",
                        "TopLeft.Longitude",
                        "BottomRight.Latitude",
                        "BottomRight.Longitude",
                        "match_level",
                        "mapped_street",
                        "mapped_housenumber",
                        "mapped_postalcode",
                        "mapped_city",
                        "mapped_district",
                        "mapped_state",
                        "mapped_country"
                    ]
                )
                list_output.insert(0, dict_clean.get("LEI"))
                list_extension_data_tuples.append(tuple(list_output))
        
        self.bulk_insert_using_copy(
            data=list_extension_data_tuples,
            table_name="GLEIF_geocoding",
            columns=[
                "lei",
                "relevance",
                "match_type",
                "lat",
                "lng",
                "geocoding_date",
                "TopLeft_Latitude",
                "TopLeft_Longitude",
                "BottomRight_Latitude",
                "BottomRight_longitude",
                "match_level",
                "mapped_street",
                "mapped_housenumber",
                "mapped_postalcode",
                "mapped_city",
                "mapped_district",
                "mapped_state",
                "mapped_country"
            ]
        )

    def process_all_data(self, list_dict_records):
        """
        Processes all types of data and performs upserts/inserts accordingly.
        
        Args:
            list_dict_records (list): List of record dictionaries.
        """
        self.process_entity_data(list_dict_records=list_dict_records)
        self.process_other_legal_names(list_dict_records=list_dict_records)
        self.process_legal_address(list_dict_records=list_dict_records)
        self.process_headquarters_address(list_dict_records=list_dict_records)
        self.process_legal_entity_events(list_dict_records=list_dict_records)
        self.process_registration_data(list_dict_records=list_dict_records)
        self.process_geoencoding_data(list_dict_records=list_dict_records)

    def remove_duplicates(self, table_name, unique_columns):
        """
        Removes duplicate rows from a PostgreSQL table based on specified unique columns.
        Keeps the row with the smallest id and deletes others.

        Args:
            table_name (str): Name of the table to clean.
            unique_columns (list of str): Columns that define a unique record.

        Returns:
            None
        """
        try:
            logging.info(f"Starting duplicate removal for table '{table_name}' based on columns {unique_columns}.")
            
            # Construct the PARTITION BY clause
            partition_by = sql.SQL(', ').join([sql.Identifier(col) for col in unique_columns])
            
            # Construct the DELETE query using ROW_NUMBER()
            delete_query = sql.SQL("""
                DELETE FROM {table} a
                USING (
                    SELECT id, ROW_NUMBER() OVER (
                        PARTITION BY {partition_by}
                        ORDER BY id
                    ) AS rnum
                    FROM {table}
                ) b
                WHERE a.id = b.id AND b.rnum > 1;
            """).format(
                table=sql.Identifier(table_name),
                partition_by=partition_by
            )
            
            # Execute the DELETE query
            self.cursor.execute(delete_query)
            logging.info(f"Duplicate removal completed for table '{table_name}'.")
            
        except Exception as e:
            logging.error(f"Error removing duplicates from table '{table_name}': {e}")
            raise

    def remove_duplicates(self, table_name, unique_columns):
        """
        Removes duplicate rows from a PostgreSQL table based on specified unique columns.
        Keeps the row with the smallest id and deletes others.

        Args:
            table_name (str): Name of the table to clean.
            unique_columns (list of str): Columns that define a unique record.

        Returns:
            None
        """
        try:
            logging.info(f"Starting duplicate removal for table '{table_name}' based on columns {unique_columns}.")

            # Construct the PARTITION BY clause
            partition_by = sql.SQL(', ').join([sql.Identifier(col) for col in unique_columns])

            # Construct the DELETE query using ROW_NUMBER()
            delete_query = sql.SQL("""
                DELETE FROM {table} a
                USING (
                    SELECT id, ROW_NUMBER() OVER (
                        PARTITION BY {partition_by}
                        ORDER BY id
                    ) AS rnum
                    FROM {table}
                ) b
                WHERE a.id = b.id AND b.rnum > 1;
            """).format(
                table=sql.Identifier(table_name),
                partition_by=partition_by
            )

            # Execute the DELETE query
            self.cursor.execute(delete_query)
            logging.info(f"Duplicate removal completed for table '{table_name}'.")

        except Exception as e:
            logging.error(f"Error removing duplicates from table '{table_name}': {e}")
            raise

    def clean_all_duplicates(self):
        """
        Cleans duplicates from all specified tables.
        Defines the tables and their unique columns.
        """
        tables_to_clean = {
            "gleif_other_legal_names": ["lei", "otherentitynames", "type"],
            "gleif_legalentityevents": [
                "lei",
                "group_type",
                "event_status",
                "legalentityeventtype",
                "legalentityeventeffectivedate",
                "legalentityeventrecordeddate",
                "validationdocuments"
            ],
            "gleif_geocoding": [
                "lei",
                "relevance",
                "match_type",
                "lat",
                "lng",
                "geocoding_date",
                "topleft_latitude",
                "topleft_longitude",
                "bottomright_latitude",
                "bottomright_longitude",
                "match_level",
                "mapped_street",
                "mapped_housenumber",
                "mapped_postalcode",
                "mapped_city",
                "mapped_district",
                "mapped_state",
                "mapped_country"
            ]
        }

        for table, unique_cols in tables_to_clean.items():
            self.remove_duplicates(table, unique_cols)
            logging.info(f"Duplicates cleaned for table '{table}'.")
    
    def storing_GLEIF_data_in_database(self):
        """
        Stores GLEIF update data in the PostgreSQL database.
        """
        with open(self.str_json_file_path, 'r', encoding='utf-8') as file:
            dict_relationships = json.load(file)
            records = dict_relationships["records"]
            
            # Process records in batches
            self.process_all_data(list_dict_records = records)
        
        # Commit the transaction
        self.conn.commit()
        self.clean_all_duplicates()



In [2]:
obj = GLEIFUpdateLevel1(bool_log = True)
obj.storing_GLEIF_data_in_database()

In [5]:
def reset_id_values(self, table_name):
    """
    Resets the id column values of the specified table to be consecutive,
    starting from 1 up to the number of rows, and resets the sequence accordingly.

    Args:
        table_name (str): Name of the table.

    Returns:
        None
    """
    try:
        # Begin a transaction
        self.cursor.execute("BEGIN;")
        logging.info(f"Started transaction for resetting id values in table '{table_name}'.")
        print(f"Started transaction for resetting id values in table '{table_name}'.")

        # Create a temporary table with new id values
        temp_table = f"temp_{table_name}"
        self.cursor.execute(sql.SQL("""
            CREATE TEMP TABLE {temp_table} AS
            SELECT
                ROW_NUMBER() OVER (ORDER BY id) AS id,
                t.*
            FROM {table_name} t;
        """).format(
            temp_table=sql.Identifier(temp_table),
            table_name=sql.Identifier(table_name)
        ))
        logging.info(f"Temporary table '{temp_table}' created with new id values.")
        print(f"Temporary table '{temp_table}' created with new id values.")

        # Truncate the original table
        self.cursor.execute(sql.SQL("TRUNCATE TABLE {table_name} RESTART IDENTITY;").format(
            table_name=sql.Identifier(table_name)
        ))
        logging.info(f"Table '{table_name}' truncated.")
        print(f"Table '{table_name}' truncated.")

        # Insert data back into the original table
        columns = self.get_table_columns(table_name)
        columns_list = sql.SQL(', ').join([sql.Identifier(col) for col in columns])
        self.cursor.execute(sql.SQL("""
            INSERT INTO {table_name} ({columns})
            SELECT {columns} FROM {temp_table};
        """).format(
            table_name=sql.Identifier(table_name),
            columns=columns_list,
            temp_table=sql.Identifier(temp_table)
        ))
        logging.info(f"Data inserted back into table '{table_name}' with new id values.")
        print(f"Data inserted back into table '{table_name}' with new id values.")

        # Reset the sequence
        self.cursor.execute(sql.SQL("""
            SELECT setval(pg_get_serial_sequence(%s, 'id'), (SELECT MAX(id) FROM {table_name}), true);
        """).format(
            table_name=sql.Identifier(table_name)
        ), [table_name])
        logging.info(f"Sequence reset for table '{table_name}'.")
        print(f"Sequence reset for table '{table_name}'.")

        # Commit the transaction
        self.conn.commit()
        logging.info(f"Transaction committed for table '{table_name}'.")
        print(f"Transaction committed for table '{table_name}'.")

    except Exception as e:
        self.conn.rollback()
        logging.error(f"Error resetting id values for table '{table_name}': {e}")
        print(f"Error resetting id values for table '{table_name}': {e}")
        raise


In [None]:
def reset_id_values(table_name):
    """
    Resets the id column values of the specified table to be consecutive,
    starting from 1 up to the number of rows, and resets the sequence accordingly.

    Args:
        table_name (str): Name of the table.

    Returns:
        None
    """
    try:
        # Begin a transaction
        obj.cursor.execute("BEGIN;")
        logging.info(f"Started transaction for resetting id values in table '{table_name}'.")
        print(f"Started transaction for resetting id values in table '{table_name}'.")

        # Create a temporary table with new id values
        temp_table = f"temp_{table_name}"
        obj.cursor.execute(sql.SQL("""
            CREATE TEMP TABLE {temp_table} AS
            SELECT
                ROW_NUMBER() OVER (ORDER BY id) AS id,
                t.*
            FROM {table_name} t;
        """).format(
            temp_table=sql.Identifier(temp_table),
            table_name=sql.Identifier(table_name)
        ))
        logging.info(f"Temporary table '{temp_table}' created with new id values.")
        print(f"Temporary table '{temp_table}' created with new id values.")

        # Truncate the original table
        obj.cursor.execute(sql.SQL("TRUNCATE TABLE {table_name} RESTART IDENTITY;").format(
            table_name=sql.Identifier(table_name)
        ))
        logging.info(f"Table '{table_name}' truncated.")
        print(f"Table '{table_name}' truncated.")

        # Insert data back into the original table
        columns = obj.get_table_columns(table_name)
        columns_list = sql.SQL(', ').join([sql.Identifier(col) for col in columns])
        obj.cursor.execute(sql.SQL("""
            INSERT INTO {table_name} ({columns})
            SELECT {columns} FROM {temp_table};
        """).format(
            table_name=sql.Identifier(table_name),
            columns=columns_list,
            temp_table=sql.Identifier(temp_table)
        ))
        logging.info(f"Data inserted back into table '{table_name}' with new id values.")
        print(f"Data inserted back into table '{table_name}' with new id values.")

        # Reset the sequence
        obj.cursor.execute(sql.SQL("""
            SELECT setval(pg_get_serial_sequence(%s, 'id'), (SELECT MAX(id) FROM {table_name}), true);
        """).format(
            table_name=sql.Identifier(table_name)
        ), [table_name])
        logging.info(f"Sequence reset for table '{table_name}'.")
        print(f"Sequence reset for table '{table_name}'.")

        # Commit the transaction
        obj.conn.commit()
        logging.info(f"Transaction committed for table '{table_name}'.")
        print(f"Transaction committed for table '{table_name}'.")

    except Exception as e:
        obj.conn.rollback()
        logging.error(f"Error resetting id values for table '{table_name}': {e}")
        print(f"Error resetting id values for table '{table_name}': {e}")
        raise


In [None]:
reset_id_values()

Sequence for 'gleif_entity_data.id' reset to 906289.


Break

In [None]:
def bulk_insert_using_copy(self , table_name , columns, data):
        """Perform a bulk insert using PostgreSQL COPY with an in-memory buffer

        Args:
            table_name (_type_): Name of the table to insert into
            columns (_type_): List of column names for the table
            data (_type_): List of tuples with the data to be inserted
        """
        
        buffer = io.StringIO()
        
        #write data to the buffer
        
        for row in data:
            '''row_converted = [
            x.replace('\\', '\\\\') if isinstance(x, str) else x 
            for x in row]'''
        # Replace None with \N for PostgreSQL NULL representation
        #row_converted = [str(x) if x is not None else '\\N' for x in row_converted]
            #buffer.write('\t'.join(row_converted) + "\n")
            #buffer.write('\t'.join(map(str , row_converted)) + "\n")
            #buffer.write('\t'.join(row_converted) + "\n")
            buffer.write('\t'.join(map(str , row)) + "\n")
        buffer.seek(0) #reset buffer position to the beginning
        
        #Construct the copy query
        #copy_query = f"COPY {table_name} ({', '.join(columns)}) FROM STDIN WITH DELIMITER '\t', NULL '\\N'"
        
        #copy_query = f"""COPY {table_name} ({', '.join(columns)}) FROM STDIN WITH (FORMAT text, DELIMITER E'\\t', NULL '\\N')"""
        copy_query = f"COPY {table_name} ({', '.join(columns)}) FROM STDIN WITH DELIMITER '\t'"
        self.cursor.copy_expert(copy_query , buffer)
        self.conn.commit()

In [5]:
with open(obj.str_json_file_path, 'r', encoding='utf-8') as file:
        dict_relationships = json.load(file)   

In [6]:
list_dicts = dict_relationships["records"]

In [11]:
display((list_dicts[:10]))

[{'LEI': {'$': '0VNOQKIRP7AG4HT7MV54'},
  'Entity': {'LegalName': {'@xml:lang': 'en', '$': 'ENABLE PRODUCTS, LLC'},
   'LegalAddress': {'@xml:lang': 'en',
    'FirstAddressLine': {'$': 'C/O CORPORATION SERVICE COMPANY'},
    'City': {'$': 'OKLAHOMA CITY'},
    'Region': {'$': 'US-OK'},
    'Country': {'$': 'US'},
    'PostalCode': {'$': '73159'}},
   'HeadquartersAddress': {'@xml:lang': 'en',
    'FirstAddressLine': {'$': 'Bok Park Plaza'},
    'City': {'$': 'Oklahoma City'},
    'Region': {'$': 'US-OK'},
    'Country': {'$': 'US'},
    'PostalCode': {'$': '73102'}},
   'RegistrationAuthority': {'RegistrationAuthorityID': {'$': 'RA000630'},
    'RegistrationAuthorityEntityID': {'$': '3512174322'}},
   'LegalJurisdiction': {'$': 'US-OK'},
   'EntityCategory': {'$': 'GENERAL'},
   'LegalForm': {'EntityLegalFormCode': {'$': 'B8XC'}},
   'EntityStatus': {'$': 'INACTIVE'},
   'EntityCreationDate': {'$': '2008-03-31T00:00:00.000Z'},
   'LegalEntityEvents': {'LegalEntityEvent': [{'@group_type