In [13]:
%%configure -f
{
    "conf":
    {
        "spark.sql.shuffle.partitions": 64,
        "spark.sql.broadcastTimeout": 14400,
        "spark.port.maxRetries": 100,
        "spark.executor.allowSparkContext": "true"

    }
}

In [14]:
# Import from Whl

from typing import Dict
from transparency_engine.containers import ContainerKeys, build_container
from transparency_engine.io.data_handler import DataHandler, DataHandlerModes
from transparency_engine.pipeline import TransparencyPipeline
from transparency_engine.typing import PipelineSteps


### Manually Update SubFolderpath for This Run

In [None]:
# MANUALLY UPDATE FOLDERPATH
subfolderpath = 'BeneficialOwnership/2023-06-12'


## Pipeline Configurations

In [15]:
pipeline_config = {
    "name": "Transparency Engine Pipeline",
    "description": "Transparency Engine using open or customer data",
    "storage": {
        "type": "hive",
        "root": f"BeneficialOwnership_{date}"
    },
    "steps": [
        "prep",
        "individual_link_prediction",
        "individual_link_filtering",
        "macro_link_prediction",
        "macro_link_filtering",
        "scoring",
        "report"
    ]
}


In [None]:
step_config = {
    "steps": {
        "prep": [
            {
                "name": "activity",
                "type": "dynamic",
                "path": f"abfss://curated@storageAccountName.dfs.core.windows.net/{subfolderpath}/activity.csv",
                "steps": [
                    "load",
                    "preprocess"
                ],
                "config": {}
            },
            {
                "name": "contact",
                "type": "static",
                "path": f"abfss://curated@storageAccountName.dfs.core.windows.net/{subfolderpath}/contact.csv",
                "steps": [
                    "load",
                    "fuzzy_match",
                    "preprocess"
                ],
                "fuzzy_match_on": [
                    {
                        "name": "name",
                        "config": {
                            "min_similarity": 0.9
                        }
                    },
                    {
                        "name": "address",
                        "config": {
                            "min_similarity": 0.9
                        }
                    }
                ],
                "config": {}
            },
            {
                "name": "ownership",
                "type": "static",
                "path": f"abfss://curated@storageAccountName.dfs.core.windows.net/{subfolderpath}/ownership.csv",
                "steps": [
                    "load",
                    "preprocess"
                ],
                "config": {}
            },
            {
                "name": "entity",
                "type": "entity",
                "path": f"abfss://curated@storageAccountName.dfs.core.windows.net/{subfolderpath}/entityweight.csv",
                "steps": [
                    "load"
                ]
            },
            {
                "name": "entityReviewFlag",
                "type": "reviewflag",
                "path": f"abfss://curated@storageAccountName.dfs.core.windows.net/{subfolderpath}/entityredflag.csv",
                "metadata": {
                    "type": "reviewflagmetadata",
                    "path": f"abfss://curated@storageAccountName.dfs.core.windows.net/{subfolderpath}/redflagdefinition.csv"
                },
                "steps": [
                    "load",
                    "preprocess"
                ]
            },
            {
                "name": "attributeDefinition",
                "type": "metadata",
                "path": f"abfss://curated@storageAccountName.dfs.core.windows.net/{subfolderpath}/attributedefinition.csv",
                "steps": [
                    "load"
                ]
            }
        ],
        "individual_link_prediction": {
            "static": [
                {
                    "name": "contact",
                    "config": {
                        "min_weight": 0.01,
                        "min_similarity": 0.01,
                        "direct_link_min_weight": 0.01
                    }
                },
                {
                    "name": "ownership",
                    "config": {
                        "min_weight": 0.01,
                        "min_similarity": 0.01,
                        "direct_link_min_weight": 0.01
                    }
                }
            ],
            "dynamic": [
                {
                    "name": "activity",
                    "config": {
                        "min_weight": 1.0,
                        "sync_min_similarity": 0.5,
                        "async_min_similarity": 0.5,
                        "n_connected_components": 100,
                        "min_component_size": 5
                    }
                }
            ]
        },
        "individual_link_filtering": {
            "dynamic": [
                {
                    "name": "activity",
                    "config": {
                        "min_overall_similarity": 0.0,
                        "min_sync_similarity": 0.8,
                        "min_async_similarity": 0.8,
                        "sync_attributes": [
                            "tender",
                            "buyer",
                            "item"
                        ],
                        "async_attributes": [
                            "buyer",
                            "item"
                        ]
                    }
                }
            ]
        },
        "macro_link_prediction": {
            "name": "macro",
            "inputs": [
                "activity_filtered_links",
                "contact_links",
                "ownership_links"
            ],
            "config": {
                "min_weight": 0.1,
                "min_similarity": 0.1,
                "direct_link_min_weight": 0.1
            }
        },
        "macro_link_filtering": {
            "name": "macro",
            "static": [
                {
                    "name": "contact",
                    "config": {
                        "include_fuzzy_match": True
                    }
                },
                {
                    "name": "ownership",
                    "config": {
                        "include_fuzzy_match": False
                    }
                }
            ],
            "dynamic": [
                {
                    "name": "activity",
                    "config": {
                        "include_fuzzy_match": False
                    }
                }
            ],
            "config": {
                "max_path_length": 5
            }
        },
        "scoring": {
            "entity": "entity",
            "entity_flag": "entityReviewFlag",
            "flag_metadata": "entityReviewFlag_metadata",
            "predicted_links": "macro",
            "config": {}
        },
        "report": {
            "entity": "entity",
            "static": [
                {
                    "name": "contact",
                    "config": {}
                },
                {
                    "name": "ownership",
                    "config": {}
                }

            ],
            "dynamic": [
                {
                    "name": "activity",
                    "config": {}
                }
            ],
            "other": [],
            "entity_flag": "entityReviewFlag",
            "network_score": "network_scoring",
            "predicted_links": "macro",
            "flag_metadata": "entityReviewFlag_metadata",
            "attribute_metadata": "attributeDefinition",
            "config": {
                "sync_attributes": [
                    "tender",
                    "buyer",
                    "item"
                ],
                "async_attributes": [
                    "buyer",
                    "item"
                ],
                "entity_name_attribute": "name",
                "base_url": "http://localhost:3000/report/"
            }
        }
    }
}

## Pipeline Execution

In [17]:
pipeline = TransparencyPipeline()

storage_config: Dict[str, str] = pipeline_config.get("storage", dict())

build_container(
    {
        ContainerKeys.STEP_CONFIG: step_config,
        ContainerKeys.PIPELINE_CONFIG: pipeline_config,
        ContainerKeys.DATA_HANDLER: (
            DataHandler,
            DataHandlerModes.from_string(storage_config.get("type", "")),
            storage_config.get("root", ""),
        ),
    },
    modules=["transparency_engine.pipeline"],
    packages=[],
)

In [None]:
steps = PipelineSteps.from_string_list(pipeline_config.get("steps", []))
pipeline.run(steps=steps)