From c76cb737eaebce379e777199a483bb2115bb925d Mon Sep 17 00:00:00 2001 From: Jeffrey S Rengifo Marin <97353049+Delacrobix@users.noreply.github.com> Date: Wed, 9 Apr 2025 08:25:11 -0500 Subject: [PATCH] supporting blog content: onelake connector part II (#445) --- .../connectors/config.py | 230 +++++++++++++ .../connectors/sources/onelake.py | 314 ++++++++++++++++++ .../requirements/framework.txt | 48 +++ 3 files changed, 592 insertions(+) create mode 100644 supporting-blog-content/onelake-connector-part-ii/connectors/config.py create mode 100644 supporting-blog-content/onelake-connector-part-ii/connectors/sources/onelake.py create mode 100644 supporting-blog-content/onelake-connector-part-ii/requirements/framework.txt diff --git a/supporting-blog-content/onelake-connector-part-ii/connectors/config.py b/supporting-blog-content/onelake-connector-part-ii/connectors/config.py new file mode 100644 index 00000000..8af12c68 --- /dev/null +++ b/supporting-blog-content/onelake-connector-part-ii/connectors/config.py @@ -0,0 +1,230 @@ +# +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License 2.0; +# you may not use this file except in compliance with the Elastic License 2.0. +# + +import os + +from envyaml import EnvYAML + +from connectors.logger import logger + +DEFAULT_ELASTICSEARCH_MAX_RETRIES = 5 +DEFAULT_ELASTICSEARCH_RETRY_INTERVAL = 10 + +DEFAULT_MAX_FILE_SIZE = 10485760 # 10MB + + +def load_config(config_file): + logger.info(f"Loading config from {config_file}") + yaml_config = EnvYAML(config_file, flatten=False).export() + nested_yaml_config = {} + for key, value in yaml_config.items(): + _nest_configs(nested_yaml_config, key, value) + configuration = dict(_merge_dicts(_default_config(), nested_yaml_config)) + _ent_search_config(configuration) + + return configuration + + +def add_defaults(config, default_config=None): + if default_config is None: + default_config = _default_config() + configuration = dict(_merge_dicts(default_config, config)) + return configuration + + +# Left - in Enterprise Search; Right - in Connectors +config_mappings = { + "elasticsearch.host": "elasticsearch.host", + "elasticsearch.username": "elasticsearch.username", + "elasticsearch.password": "elasticsearch.password", + "elasticsearch.headers": "elasticsearch.headers", + "log_level": "service.log_level", +} + +# Enterprise Search uses Ruby and is in lower case always, so hacking it here for now +# Ruby-supported log levels: 'debug', 'info', 'warn', 'error', 'fatal', 'unknown' +# Python-supported log levels: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL', 'NOTSET' +log_level_mappings = { + "debug": "DEBUG", + "info": "INFO", + "warn": "WARNING", + "error": "ERROR", + "fatal": "CRITICAL", + "unknown": "NOTSET", +} + + +def _default_config(): + return { + "elasticsearch": { + "host": "http://localhost:9200", + "username": "elastic", + "password": "changeme", + "ssl": True, + "verify_certs": True, + "bulk": { + "queue_max_size": 1024, + "queue_max_mem_size": 25, + "queue_refresh_interval": 1, + "queue_refresh_timeout": 600, + "display_every": 100, + "chunk_size": 1000, + "max_concurrency": 5, + "chunk_max_mem_size": 5, + "max_retries": DEFAULT_ELASTICSEARCH_MAX_RETRIES, + "retry_interval": DEFAULT_ELASTICSEARCH_RETRY_INTERVAL, + "concurrent_downloads": 10, + "enable_operations_logging": False, + }, + "max_retries": DEFAULT_ELASTICSEARCH_MAX_RETRIES, + "retry_interval": DEFAULT_ELASTICSEARCH_RETRY_INTERVAL, + "retry_on_timeout": True, + "request_timeout": 120, + "max_wait_duration": 120, + "initial_backoff_duration": 1, + "backoff_multiplier": 2, + "log_level": "info", + "feature_use_connectors_api": True, + }, + "service": { + "idling": 30, + "heartbeat": 300, + "preflight_max_attempts": 10, + "preflight_idle": 30, + "max_errors": 20, + "max_errors_span": 600, + "max_concurrent_content_syncs": 1, + "max_concurrent_access_control_syncs": 1, + "max_file_download_size": DEFAULT_MAX_FILE_SIZE, + "job_cleanup_interval": 300, + "log_level": "INFO", + }, + "sources": { + "onelake": "connectors.sources.onelake:OneLakeDataSource", + "azure_blob_storage": "connectors.sources.azure_blob_storage:AzureBlobStorageDataSource", + "box": "connectors.sources.box:BoxDataSource", + "confluence": "connectors.sources.confluence:ConfluenceDataSource", + "dir": "connectors.sources.directory:DirectoryDataSource", + "dropbox": "connectors.sources.dropbox:DropboxDataSource", + "github": "connectors.sources.github:GitHubDataSource", + "gmail": "connectors.sources.gmail:GMailDataSource", + "google_cloud_storage": "connectors.sources.google_cloud_storage:GoogleCloudStorageDataSource", + "google_drive": "connectors.sources.google_drive:GoogleDriveDataSource", + "graphql": "connectors.sources.graphql:GraphQLDataSource", + "jira": "connectors.sources.jira:JiraDataSource", + "microsoft_teams": "connectors.sources.microsoft_teams:MicrosoftTeamsDataSource", + "mongodb": "connectors.sources.mongo:MongoDataSource", + "mssql": "connectors.sources.mssql:MSSQLDataSource", + "mysql": "connectors.sources.mysql:MySqlDataSource", + "network_drive": "connectors.sources.network_drive:NASDataSource", + "notion": "connectors.sources.notion:NotionDataSource", + "onedrive": "connectors.sources.onedrive:OneDriveDataSource", + "oracle": "connectors.sources.oracle:OracleDataSource", + "outlook": "connectors.sources.outlook:OutlookDataSource", + "postgresql": "connectors.sources.postgresql:PostgreSQLDataSource", + "redis": "connectors.sources.redis:RedisDataSource", + "s3": "connectors.sources.s3:S3DataSource", + "salesforce": "connectors.sources.salesforce:SalesforceDataSource", + "servicenow": "connectors.sources.servicenow:ServiceNowDataSource", + "sharepoint_online": "connectors.sources.sharepoint_online:SharepointOnlineDataSource", + "sharepoint_server": "connectors.sources.sharepoint_server:SharepointServerDataSource", + "slack": "connectors.sources.slack:SlackDataSource", + "zoom": "connectors.sources.zoom:ZoomDataSource", + }, + } + + +def _ent_search_config(configuration): + if "ENT_SEARCH_CONFIG_PATH" not in os.environ: + return + logger.info("Found ENT_SEARCH_CONFIG_PATH, loading ent-search config") + ent_search_config = EnvYAML(os.environ["ENT_SEARCH_CONFIG_PATH"]) + for es_field in config_mappings.keys(): + if es_field not in ent_search_config: + continue + + connector_field = config_mappings[es_field] + es_field_value = ent_search_config[es_field] + + if es_field == "log_level": + if es_field_value not in log_level_mappings: + msg = f"Unexpected log level: {es_field_value}. Allowed values: {', '.join(log_level_mappings.keys())}" + raise ValueError(msg) + es_field_value = log_level_mappings[es_field_value] + + _nest_configs(configuration, connector_field, es_field_value) + + logger.debug(f"Overridden {connector_field}") + + +def _nest_configs(configuration, field, value): + """ + Update configuration field value taking into account the nesting. + + Configuration is a hash of hashes, so we need to dive inside to do proper assignment. + + E.g. _nest_config({}, "elasticsearch.bulk.queuesize", 20) will result in the following config: + { + "elasticsearch": { + "bulk": { + "queuesize": 20 + } + } + } + """ + subfields = field.split(".") + last_key = subfields[-1] + + current_leaf = configuration + for subfield in subfields[:-1]: + if subfield not in current_leaf: + current_leaf[subfield] = {} + current_leaf = current_leaf[subfield] + + if isinstance(current_leaf.get(last_key), dict): + current_leaf[last_key] = dict(_merge_dicts(current_leaf[last_key], value)) + else: + current_leaf[last_key] = value + + +def _merge_dicts(hsh1, hsh2): + for k in set(hsh1.keys()).union(hsh2.keys()): + if k in hsh1 and k in hsh2: + if isinstance(hsh1[k], dict) and isinstance( + hsh2[k], dict + ): # only merge objects + yield (k, dict(_merge_dicts(hsh1[k], hsh2[k]))) + else: + yield (k, hsh2[k]) + elif k in hsh1: + yield (k, hsh1[k]) + else: + yield (k, hsh2[k]) + + +class DataSourceFrameworkConfig: + """ + The configs that will be exposed to DataSource instances. + This abstraction prevents DataSource instances from having access to all configuration, while also + preventing them from requiring substantial changes to access new configs that may be added. + """ + + def __init__(self, max_file_size): + """ + Should not be called directly. Use the Builder. + """ + self.max_file_size = max_file_size + + class Builder: + def __init__(self): + self.max_file_size = DEFAULT_MAX_FILE_SIZE + + def with_max_file_size(self, max_file_size): + self.max_file_size = max_file_size + return self + + def build(self): + return DataSourceFrameworkConfig(self.max_file_size) diff --git a/supporting-blog-content/onelake-connector-part-ii/connectors/sources/onelake.py b/supporting-blog-content/onelake-connector-part-ii/connectors/sources/onelake.py new file mode 100644 index 00000000..9a205e37 --- /dev/null +++ b/supporting-blog-content/onelake-connector-part-ii/connectors/sources/onelake.py @@ -0,0 +1,314 @@ +"""OneLake connector to retrieve data from datalakes""" + +from functools import partial + +from azure.identity import ClientSecretCredential +from azure.storage.filedatalake import DataLakeServiceClient + +from connectors.source import BaseDataSource + +ACCOUNT_NAME = "onelake" + + +class OneLakeDataSource(BaseDataSource): + """OneLake""" + + name = "OneLake" + service_type = "onelake" + incremental_sync_enabled = True + + def __init__(self, configuration): + """Set up the connection to the azure base client + + Args: + configuration (DataSourceConfiguration): Object of DataSourceConfiguration class. + """ + super().__init__(configuration=configuration) + self.tentant_id = self.configuration["tentant_id"] + self.client_id = self.configuration["client_id"] + self.client_secret = self.configuration["client_secret"] + self.workspace_name = self.configuration["workspace_name"] + self.data_path = self.configuration["data_path"] + + @classmethod + def get_default_configuration(cls): + """Get the default configuration for OneLake + + Returns: + dictionary: Default configuration + """ + return { + "tentant_id": { + "label": "OneLake tenant id", + "order": 1, + "type": "str", + }, + "client_id": { + "label": "OneLake client id", + "order": 2, + "type": "str", + }, + "client_secret": { + "label": "OneLake client secret", + "order": 3, + "type": "str", + "sensitive": True, # Hide sensitive data like passwords or secrets + }, + "workspace_name": { + "label": "OneLake workspace name", + "order": 4, + "type": "str", + }, + "data_path": { + "label": "OneLake data path", + "tooltip": "Path in format .Lakehouse/files/", + "order": 5, + "type": "str", + }, + "account_name": { + "tooltip": "In the most cases is 'onelake'", + "default_value": ACCOUNT_NAME, + "label": "Account name", + "order": 6, + "type": "str", + }, + } + + async def ping(self): + """Verify the connection with OneLake""" + + self._logger.info("Generating file system client...") + + try: + await self._get_directory_paths(self.configuration["data_path"]) + self._logger.info("Connection to OneLake successful") + + except Exception: + self._logger.exception("Error while connecting to OneLake.") + raise + + def _get_account_url(self): + """Get the account URL for OneLake + + Returns: + str: Account URL + """ + + return f"https://{self.configuration['account_name']}.dfs.fabric.microsoft.com" + + def _get_token_credentials(self): + """Get the token credentials for OneLake + + Returns: + obj: Token credentials + """ + + tentant_id = self.configuration["tentant_id"] + client_id = self.configuration["client_id"] + client_secret = self.configuration["client_secret"] + + try: + return ClientSecretCredential(tentant_id, client_id, client_secret) + except Exception as e: + self._logger.error(f"Error while getting token credentials: {e}") + raise + + def _get_service_client(self): + """Get the service client for OneLake + + Returns: + obj: Service client + """ + + try: + return DataLakeServiceClient( + account_url=self._get_account_url(), + credential=self._get_token_credentials(), + ) + except Exception as e: + self._logger.error(f"Error while getting service client: {e}") + raise + + def _get_file_system_client(self): + """Get the file system client for OneLake + + Returns: + obj: File system client + """ + try: + return self._get_service_client().get_file_system_client( + self.configuration["workspace_name"] + ) + except Exception as e: + self._logger.error(f"Error while getting file system client: {e}") + raise + + def _get_directory_client(self): + """Get the directory client for OneLake + + Returns: + obj: Directory client + """ + + try: + return self._get_file_system_client().get_directory_client( + self.configuration["data_path"] + ) + except Exception as e: + self._logger.error(f"Error while getting directory client: {e}") + raise + + async def _get_file_client(self, file_name): + """Get file client from OneLake + + Args: + file_name (str): name of the file + + Returns: + obj: File client + """ + + try: + return self._get_directory_client().get_file_client(file_name) + except Exception as e: + self._logger.error(f"Error while getting file client: {e}") + raise + + async def _get_directory_paths(self, directory_path): + """List directory paths from data lake + + Args: + directory_path (str): Directory path + + Returns: + list: List of paths + """ + + try: + paths = self._get_file_system_client().get_paths(path=directory_path) + + return paths + except Exception as e: + self._logger.error(f"Error while getting directory paths: {e}") + raise + + async def format_file(self, file_client): + """Format file_client to be processed + + Args: + file_client (obj): File object + + Returns: + dict: Formatted file + """ + + try: + file_properties = file_client.get_file_properties() + + return { + "_id": f"{file_client.file_system_name}_{file_properties.name.split('/')[-1]}", + "name": file_properties.name.split("/")[-1], + "created_at": file_properties.creation_time.isoformat(), + "_timestamp": file_properties.last_modified.isoformat(), + "size": file_properties.size, + } + except Exception as e: + self._logger.error( + f"Error while formatting file or getting file properties: {e}" + ) + raise + + async def download_file(self, file_client): + """Download file from OneLake + + Args: + file_client (obj): File client + + Returns: + generator: File stream + """ + + try: + download = file_client.download_file() + stream = download.chunks() + + for chunk in stream: + yield chunk + except Exception as e: + self._logger.error(f"Error while downloading file: {e}") + raise + + async def get_content(self, file_name, doit=None, timestamp=None): + """Obtains the file content for the specified file in `file_name`. + + Args: + file_name (obj): The file name to process to obtain the content. + timestamp (timestamp, optional): Timestamp of blob last modified. Defaults to None. + doit (boolean, optional): Boolean value for whether to get content or not. Defaults to None. + + Returns: + str: Content of the file or None if not applicable. + """ + + if not doit: + return + + file_client = await self._get_file_client(file_name) + file_properties = file_client.get_file_properties() + file_extension = self.get_file_extension(file_name) + + doc = { + "_id": f"{file_client.file_system_name}_{file_properties.name}", # workspacename_data_path + "name": file_properties.name.split("/")[-1], + "_timestamp": file_properties.last_modified, + "created_at": file_properties.creation_time, + } + + can_be_downloaded = self.can_file_be_downloaded( + file_extension=file_extension, + filename=file_properties.name, + file_size=file_properties.size, + ) + + if not can_be_downloaded: + return doc + + extracted_doc = await self.download_and_extract_file( + doc=doc, + source_filename=file_properties.name.split("/")[-1], + file_extension=file_extension, + download_func=partial(self.download_file, file_client), + ) + + return extracted_doc if extracted_doc is not None else doc + + async def prepare_files(self, doc_paths): + """Prepare files for processing + + Args: + doc_paths (list): List of paths extracted from OneLake + + Yields: + tuple: File document and partial function to get content + """ + + for path in doc_paths: + file_name = path.name.split("/")[-1] + field_client = await self._get_file_client(file_name) + + yield self.format_file(field_client) + + async def get_docs(self, filtering=None): + """Get documents from OneLake and index them + + Yields: + tuple: dictionary with meta-data of each file and a partial function to get the file content. + """ + + directory_paths = await self._get_directory_paths( + self.configuration["data_path"] + ) + + async for file in self.prepare_files(directory_paths): + file_dict = await file + yield file_dict, partial(self.get_content, file_dict["name"]) diff --git a/supporting-blog-content/onelake-connector-part-ii/requirements/framework.txt b/supporting-blog-content/onelake-connector-part-ii/requirements/framework.txt new file mode 100644 index 00000000..13f558ae --- /dev/null +++ b/supporting-blog-content/onelake-connector-part-ii/requirements/framework.txt @@ -0,0 +1,48 @@ +aiohttp==3.10.4 +aiofiles==23.2.1 +aiomysql==0.1.1 +httpx==0.27.0 +httpx-ntlm==1.4.0 +elasticsearch[async]==8.15.0 +elastic-transport==8.15.0 +pyyaml==6.0.1 +cffi==1.16.0 +envyaml==1.10.211231 +ecs-logging==2.0.0 +pympler==1.0.1 +cron-schedule-triggers==0.0.11 +tzcron==1.0.0 +pytz==2019.3 +python-dateutil==2.8.2 +aiogoogle==5.3.0 +uvloop==0.20.0; sys_platform != 'win32' +fastjsonschema==2.16.2 +base64io==1.0.3 +azure-storage-blob==12.19.1 +SQLAlchemy==2.0.1 +oracledb==2.3.0 +asyncpg==0.29.0 +python-tds==1.12.0 +sqlalchemy-pytds==0.3.5 +pyOpenSSL==24.1.0 +dropbox==11.36.2 +beautifulsoup4==4.12.2 +gidgethub==5.2.1 +wcmatch==8.4.1 +msal==1.30.0 +exchangelib==5.4.0 +ldap3==2.9.1 +lxml==4.9.3 +pywinrm==0.4.3 +click==8.1.7 +colorama==0.4.6 +tabulate==0.9.0 +redis==5.0.1 +simple-term-menu==1.6.4 +graphql-core==3.2.3 +notion-client==2.2.1 +certifi==2024.7.4 +aioboto3==12.4.0 +pyasn1<0.6.1 +azure-identity==1.19.0 +azure-storage-file-datalake==12.17.0