# Load GLEIF Data from public repository on AWS (s3://gleif/)
Copyright (C) 2021 OS-Climate

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

In [None]:
# pip install boto3
# pip install python-dotenv
# pip install trino
# pip install pandas
# pip install anytree
# pip install sqlalchemy
# pip install sqlalchemy-trino
# pip install osc-ingest-tools

In [None]:
from dotenv import dotenv_values, load_dotenv
import os
import pathlib
import boto3
from botocore import UNSIGNED
from botocore.client import Config
import pandas as pd
import trino
import io
import zipfile
import anytree
import osc_ingest_trino as osc

In [None]:
# Use unsigned access to public S3 resource for GLEIF bucket
s3_resource = boto3.resource(
    service_name="s3",
    config=Config(signature_version=UNSIGNED),
)
bucket = s3_resource.Bucket("gleif")

In [None]:
# Retrieve latest LEI data set
LEI_data_sets = []
for obj in bucket.objects.filter(Prefix="data/csv/rr/"):
    LEI_data_sets.append(obj.key)
print(f"Fetched data sets: {len(LEI_data_sets)}")
current_rr_data_set = LEI_data_sets[len(LEI_data_sets) - 1]
date_str_data, time_str_data = tuple(current_rr_data_set.split("/")[-1].split("-")[:2])

In [None]:
from botocore import UNSIGNED
from botocore.config import Config

client = boto3.client(
    "s3", config=Config(signature_version=UNSIGNED)
)  # low-level functional API
obj = client.get_object(Bucket="gleif", Key=current_rr_data_set)

list_hierarchy_relevant_columns = [
    "Relationship.RelationshipType",
    "Relationship.StartNode.NodeID",
    "Relationship.EndNode.NodeID",
    "Relationship.RelationshipStatus",
]

pd_rr_data_set = pd.read_csv(
    obj["Body"], usecols=list_hierarchy_relevant_columns, compression="gzip"
).convert_dtypes()

# Definition of necessary data processing functions

In [None]:
def _create_issuer_lei_mapping(
    pd_input: pd.DataFrame, relationship_type: str, end_node_key: str
) -> pd.DataFrame:

    df_mask = (pd_input["Relationship.RelationshipStatus"] == "ACTIVE") & (
        pd_input["Relationship.RelationshipType"] == relationship_type
    )
    dict_key_renaming = {
        "Relationship.StartNode.NodeID": "DIRECT_ISSUER_LEI",
        "Relationship.EndNode.NodeID": end_node_key,
    }

    return pd_input.rename(columns=dict_key_renaming)[dict_key_renaming.values()][
        df_mask
    ]


def create_ultimate_issuer_lei_mapping(pd_input: pd.DataFrame) -> pd.DataFrame:

    return _create_issuer_lei_mapping(
        pd_input,
        relationship_type="IS_ULTIMATELY_CONSOLIDATED_BY",
        end_node_key="ULTIMATE_PARENT_ISSUER_LEI",
    )


def create_direct_parent_issuer_lei_mapping(pd_input: pd.DataFrame) -> pd.DataFrame:

    return _create_issuer_lei_mapping(
        pd_input,
        relationship_type="IS_DIRECTLY_CONSOLIDATED_BY",
        end_node_key="DIRECT_PARENT_ISSUER_LEI",
    )


def add_hierarchy_level_to_nodes(
    list_unassigned_relationships: list, hierarchy_level: int, dict_nodes: dict
) -> bool:
    """
    Recursive function, which matches the most recent version of the tree with a list of currently
    unassigned directly consolidated relationships.
    Loops through the hierarchy levels, takes the most recent version of the tree and checks if in the tree there are potential parents
    of directly consolidated entities in the current list of unassigned directly consolidated
    -> If a parent is found, the relationship is appended to the tree and popped from the list of unassigned relationships
    """
    list_remaining_unassigned_relationships = []
    amount_initially_unassigned_entities = len(list_unassigned_relationships)
    print(f"Hierarchy_level {hierarchy_level}")

    for unassigned_relationship_tuple in list_unassigned_relationships:
        # Check if there direct parent issuer is already allocated in the tree structure,
        # otherwise try again in the next call of the function
        if (
            unassigned_relationship_tuple[1] in dict_nodes.keys()
            and unassigned_relationship_tuple[0] != unassigned_relationship_tuple[1]
        ):
            parent_node = dict_nodes.get(unassigned_relationship_tuple[1])
            if (
                unassigned_relationship_tuple[0] in dict_nodes.keys()
            ):  # LEI was already assigned
                try:
                    # Overwrite the previous parent attribute in the node with a new one
                    dict_nodes.get(
                        unassigned_relationship_tuple[0]
                    ).parent = parent_node
                except:
                    print(
                        f"Unable to change parent for {unassigned_relationship_tuple[0]}"
                    )
                    list_remaining_unassigned_relationships.append(
                        unassigned_relationship_tuple
                    )
            else:
                # assign new parent child relationship in the tree structure
                dict_nodes[unassigned_relationship_tuple[0]] = anytree.Node(
                    unassigned_relationship_tuple[0], parent=parent_node
                )
        else:
            list_remaining_unassigned_relationships.append(
                unassigned_relationship_tuple
            )

    # Terminate iff there was no mutation in the tree structure since the last function call,
    # call function recursively otherwise
    if amount_initially_unassigned_entities == len(
        list_remaining_unassigned_relationships
    ):
        print(
            f"Remaining {len(list_remaining_unassigned_relationships)} could not be assigned to the tree"
        )
        return True
    else:
        add_hierarchy_level_to_nodes(
            list_unassigned_relationships=list_remaining_unassigned_relationships,
            hierarchy_level=hierarchy_level + 1,
            dict_nodes=dict_nodes,
        )


def create_company_hierarchy_tree_structure(pd_input: pd.DataFrame):

    list_tuple_ultimate_issuer_relationship = create_ultimate_issuer_lei_mapping(
        pd_rr_data_set
    ).to_records(index=False)

    list_tuple_direct_parent_issuer_relationship = (
        create_direct_parent_issuer_lei_mapping(pd_rr_data_set).to_records(index=False)
    )

    list_direct_issuer_LEI, list_ultimate_issuer_LEI = zip(
        *list_tuple_ultimate_issuer_relationship
    )

    root = anytree.Node("root")

    tmp_dict_nodes = (
        {}
    )  # this dict is used as a performant hashmap lookup table to the nodes to avoid
    # using the native search function of anytree which loops through the entire tree

    for ultimate_issuer_lei in set(list_ultimate_issuer_LEI):
        tmp_dict_nodes[ultimate_issuer_lei] = anytree.Node(
            ultimate_issuer_lei, parent=root
        )

    # Some relationships in the GLEIF data are exclusively entered as an ultimate issuer relationship and not also
    # as a directly consolidated relationship. To build a complete dataset and also include those we take the following approach:
    # initially we assume all ultimate issuer relationships also as directly consolidated relationships. In a subsequent
    # process step this assumption is checked:
    # we overwrite the initially assumed direct relationship to the ultimate issuer with the more specific data from
    # the direct issuer relationship dataset:
    #   - Case1: There is no entry in the directly consolidated issuer relationship,
    #            meaning the assumed relationship was correct
    #   - Case2: There is an entry in the directly consolidated issuer relationship: We delete the initially assumed
    #            entry and replace it with the more specific one
    for relationship_tuple in list_tuple_ultimate_issuer_relationship:
        parent_node = tmp_dict_nodes.get(relationship_tuple[1])
        tmp_dict_nodes[relationship_tuple[0]] = anytree.Node(
            relationship_tuple[0], parent=parent_node
        )

    add_hierarchy_level_to_nodes(
        list_unassigned_relationships=list_tuple_direct_parent_issuer_relationship,
        hierarchy_level=1,
        dict_nodes=tmp_dict_nodes,
    )
    return root

In [None]:
def rename_recursive_key_dicts(
    dict_to_rename: dict, dict_of_key_old_to_new, new_name_dict_holder_key: str
) -> None:

    for old_key, new_key in dict_of_key_old_to_new.items():
        if old_key in dict_to_rename.keys():
            dict_to_rename[new_key] = dict_to_rename.pop(old_key)

    # after renaming
    if new_name_dict_holder_key in dict_to_rename.keys():
        list_dict_holder = dict_to_rename.get(new_name_dict_holder_key)
        for held_dict in list_dict_holder:
            rename_recursive_key_dicts(
                dict_to_rename=held_dict,
                dict_of_key_old_to_new=dict_of_key_old_to_new,
                new_name_dict_holder_key=new_name_dict_holder_key,
            )


def add_recursive_key_to_dict(
    dict_to_add_to: dict, key_to_add: str,
    value_to_add: int, name_dict_holder_key: str
):
    """
    Increments the value of key_to_add by one
    """
    dict_to_add_to[key_to_add] = value_to_add

    if name_dict_holder_key in dict_to_add_to.keys():
        list_dict_holder = dict_to_add_to.get(name_dict_holder_key)
        for held_dict in list_dict_holder:
            add_recursive_key_to_dict(
                dict_to_add_to=held_dict,
                key_to_add=key_to_add,
                value_to_add=value_to_add + 1,
                name_dict_holder_key=name_dict_holder_key,
            )

# Creating data set for ultimate issuer LEI mapping

In [None]:
df_ultimate_issuer_relationship_LEI = create_ultimate_issuer_lei_mapping(pd_rr_data_set)

str_timestamp = f'{date_str_data[:4]}-{date_str_data[4:6]}-{date_str_data[6:]}T{time_str_data[:2]}'
data_time_stamp = pd.Timestamp(str_timestamp, tz='UTC')

df_ultimate_issuer_relationship_LEI["time_stamp"] = data_time_stamp
df_ultimate_issuer_relationship_LEI = df_ultimate_issuer_relationship_LEI.convert_dtypes()
df_ultimate_issuer_relationship_LEI = osc_ingest_trino.enforce_sql_column_names(
    df_ultimate_issuer_relationship_LEI)

# Creating data set for ISIN to LEI mapping
So far there is no access to the data via the GLEIF AWS bucket / this will hopefully change in the future

In [None]:
url_gleif_isin_LEI = "https://isinmapping.gleif.org/api/v2/isin-lei/4012/download"
tmp_file_name = "isin-lei-20220409T070350.zip"

df_mapping_LEI_ISIN = pd.read_csv(url_gleif_isin_LEI, compression="zip")

str_timestamp = f'{date_str_data[:4]}-{date_str_data[4:6]}-{date_str_data[6:]}T{time_str_data[:2]}'
data_time_stamp = pd.Timestamp(str_timestamp, tz='UTC')

df_mapping_LEI_ISIN["time_stamp"] = data_time_stamp
df_mapping_LEI_ISIN = df_mapping_LEI_ISIN.convert_dtypes()
df_mapping_LEI_ISIN = osc_ingest_trino.enforce_sql_column_names(
    df_mapping_LEI_ISIN)

# Creating data set for company hierarchy tree

In [None]:
anytree_root = create_company_hierarchy_tree_structure(pd_rr_data_set)

from anytree.exporter import DictExporter

exporter = DictExporter()

dict_anytree_formating = exporter.export(anytree_root)

# Renaming for better readability
dict_key_renaming = {"name": "entity_LEI", "children": "entity_children"}
rename_recursive_key_dicts(
    dict_to_rename=dict_anytree_formating,
    dict_of_key_old_to_new=dict_key_renaming,
    new_name_dict_holder_key="entity_children",
)
add_recursive_key_to_dict(
    dict_to_add_to=dict_anytree_formating,
    key_to_add="entity_hierarchy_level",
    value_to_add=-1,
    name_dict_holder_key="entity_children",
)

list_hierarchy_items = dict_anytree_formating.get("entity_children")

import json

json_hierarchy_file = f"{date_str_data}_{time_str_data}_company_hierarchy_tree.json"

with open(f"/tmp/{json_hierarchy_file}", "w") as file:
    json.dump(list_hierarchy_items, file)

# Create an S3 client to upload the data to a S3 bucket

In [None]:
dotenv_dir = os.environ.get(
    "CREDENTIAL_DOTENV_DIR", os.environ.get("PWD", "/opt/app-root/src")
)

dotenv_path = pathlib.Path(dotenv_dir) / "credentials.env"
if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path=dotenv_path, override=True)

In [None]:
# Create an S3 client to upload the files
s3 = boto3.client(
    service_name="s3",
    endpoint_url=os.environ["S3_DEV_ENDPOINT"],
    aws_access_key_id=os.environ["S3_DEV_ACCESS_KEY"],
    aws_secret_access_key=os.environ["S3_DEV_SECRET_KEY"],
)

# This affects both upload location and SQL schema/catalog information
gleif_schema = "sandbox"

# Upload data set ultimate issuer LEI mapping to S3

In [None]:
df_ultimate_issuer_relationship_LEI.info(verbose=True)

if False:
    tmp_file_name = f"{date_str_data}_{time_str_data}_ultimate_issuer_LEI_mapping.parquet"

    df_ultimate_issuer_relationship_LEI.to_parquet(f"/tmp/{tmp_file_name}", index=False)

    s3.upload_file(
        Bucket=os.environ["S3_DEV_BUCKET"],
        Key=f"trino/{gleif_schema}/date={date_str_data}/time={time_str_data}/ultimateissuer/{tmp_file_name}",
        Filename=f"/tmp/{tmp_file_name}",
    )

# Upload data set ISIN LEI mapping to S3

In [None]:
df_mapping_LEI_ISIN.info(verbose=True)

if False:
    tmp_file_name = f"{date_str_data}_{time_str_data}_ISIN_LEI_mapping.parquet"

    df_mapping_LEI_ISIN.to_parquet(f"/tmp/{tmp_file_name}", index=False)

    s3.upload_file(
        Bucket=os.environ["S3_DEV_BUCKET"],
        Key=f"trino/{gleif_schema}/date={date_str_data}/time={time_str_data}/isinlei/{tmp_file_name}",
        Filename=f"/tmp/{tmp_file_name}",
    )

# Upload data set company hierarchy tree to S3

In [None]:
if False:
    s3.upload_file(
        Bucket=os.environ["S3_DEV_BUCKET"],
        Key=f"trino/{gleif_schema}/date={date_str_data}/time={time_str_data}/companyhierarchy/{json_hierarchy_file}",
        Filename=f"/tmp/{json_hierarchy_file}",
    )

# Establish SQLAlchemy connection to Trino

In [None]:
import trino
from sqlalchemy.engine import create_engine

ingest_catalog = 'osc_datacommons_dev'
gleif_schema = 'sandbox'

env_var_prefix = 'TRINO'

sqlstring = 'trino://{user}@{host}:{port}/'.format(
    user = os.environ[f'{env_var_prefix}_USER'],
    host = os.environ[f'{env_var_prefix}_HOST'],
    port = os.environ[f'{env_var_prefix}_PORT']
)
sqlargs = {
    'auth': trino.auth.JWTAuthentication(os.environ[f'{env_var_prefix}_PASSWD']),
    'http_scheme': 'https',
    'catalog': ingest_catalog,
    'schema': gleif_schema,
}
engine = create_engine(sqlstring, connect_args = sqlargs)
connection = engine.connect()

# Define tablenames to use
tablename_isin_lei = "gleif_isin_lei"
tablename_direct_issuer_ultimate_issuer = "gleif_direct_issuer_ultimate_issuer"

In [None]:
# Show available schemas to ensure trino connection is set correctly
qres = engine.execute("show schemas")
qres.fetchall()

In [None]:
# Dropping tables for ISIN/LEI mapping and Direct Issuer/Ultimate Issuer in case they exist already
drop_table = engine.execute(f"drop table if exists {gleif_schema}.{tablename_isin_lei}")
print(drop_table.fetchall())

drop_table = engine.execute(f"drop table if exists {gleif_schema}.{tablename_direct_issuer_ultimate_issuer}")
print(drop_table.fetchall())

Create table for ISIN LEI mapping

In [None]:
column_schema = osc_ingest_trino.create_table_schema_pairs(df_mapping_LEI_ISIN, typemap={"datetime64[ns, UTC]":"timestamp(6)"})

tabledef = f"""create table if not exists {gleif_schema}.{tablename_isin_lei}(
{column_schema}
) with (
    partitioning = array['bucket(lei,20)'],
    format = 'ORC'
)"""

print(tabledef)
# tables created externally may not show up immediately in cloud-beaver
create_table = engine.execute(tabledef)
create_table.fetchall()

In [None]:
df_mapping_LEI_ISIN.to_sql(tablename_isin_lei,
                           con=engine, schema=gleif_schema, if_exists='append',
                           index=False,
                           method=osc_ingest_trino.TrinoBatchInsert(batch_size=10000, verbose = False))

Create table for issuer/ultimate issuer mapping

In [None]:
column_schema = osc_ingest_trino.create_table_schema_pairs(df_ultimate_issuer_relationship_LEI, typemap={"datetime64[ns, UTC]":"timestamp(6)"})

tabledef = f"""create table if not exists {gleif_schema}.{tablename_direct_issuer_ultimate_issuer}(
{column_schema}
) with (
    partitioning = array['bucket(direct_issuer_lei,20)'],
    format = 'ORC'
)"""

print(tabledef)
# tables created externally may not show up immediately in cloud-beaver
create_table = engine.execute(tabledef)
create_table.fetchall()

In [None]:
df_ultimate_issuer_relationship_LEI.to_sql(tablename_direct_issuer_ultimate_issuer,
                                           con=engine, schema=gleif_schema, if_exists='append',
                                           index=False,
                                           method=osc_ingest_trino.TrinoBatchInsert(batch_size=10000, verbose = False))

Testing connection

In [None]:
import trino
from sqlalchemy.engine import create_engine

sqlstring = "trino://{user}@{host}:{port}/".format(
    user=os.environ["TRINO_USER"],
    host=os.environ["TRINO_HOST"],
    port=os.environ["TRINO_PORT"],
)
sqlargs = {
    "auth": trino.auth.JWTAuthentication(os.environ["TRINO_PASSWD"]),
    "http_scheme": "https",
    "catalog": "osc_datacommons_dev",
}
engine = create_engine(sqlstring, connect_args=sqlargs)
print("connecting with engine " + str(engine))
connection = engine.connect()

df = pd.read_sql("show catalogs", engine).convert_dtypes()
df

In [None]:
engine.execute(f"show tables in {gleif_schema}").fetchall()

In [None]:
engine.execute(f"select count (*) from {gleif_schema}.gleif_direct_issuer_ultimate_issuer").fetchall()

In [None]:
sqlquery = (
    f"select * from {gleif_schema}.{tablename_direct_issuer_ultimate_issuer}"
)

df = pd.read_sql(sqlquery, engine).convert_dtypes()
df

In [None]:
engine.execute(f"select * from {gleif_schema}.{tablename_direct_issuer_ultimate_issuer} where direct_issuer_lei='1HNPXZSMMB7HMBMVBS46'").fetchall()

In [None]:
engine.execute(f"select * from {gleif_schema}.{tablename_direct_issuer_ultimate_issuer} where direct_issuer_lei='8YQ2GSDWYZXO2EDN3511'").fetchall()

In [None]:
len(df_mapping_LEI_ISIN)