Datalake improvements , Json sampling, Tags ingestion#27401
Datalake improvements , Json sampling, Tags ingestion#27401
Conversation
There was a problem hiding this comment.
Pull request overview
This PR improves Datalake ingestion behavior by making schema inference more efficient (sampling fewer rows), adding optional cloud-object tag ingestion for Datalake files, and tightening some credential/session plumbing for cloud readers.
Changes:
- Add a schema-inference mode to DataFrame readers so
read_first_chunk()samples a small number of records instead of default chunk sizes. - Add opt-in Datalake table tag ingestion by reading provider object tags/metadata (S3/GCS/Azure) and mapping them to OpenMetadata classifications/tags.
- Pass boto3 session through sampling/reader paths and adjust cloud utilities/tests accordingly.
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| ingestion/src/metadata/readers/dataframe/base.py | Forces schema_inference=True for read_first_chunk() to enable sampling behavior in readers. |
| ingestion/src/metadata/readers/dataframe/json.py | Adds schema-inference sampling for JSON/JSONL streaming and threads file_size through. |
| ingestion/src/metadata/readers/dataframe/dsv.py | Adds schema-inference sampling for CSV/TSV reading (pandas chunk size reduction). |
| ingestion/src/metadata/readers/dataframe/parquet.py | Adds schema-inference batch sizing and avoids redundant size lookups when provided. |
| ingestion/src/metadata/readers/dataframe/avro.py | Adds schema-inference batch sizing and changes S3 Avro reading strategy. |
| ingestion/src/metadata/readers/file/adls.py | Returns only populated Azure storage options (enables DefaultAzureCredential fallback). |
| ingestion/src/metadata/utils/datalake/datalake_utils.py | Changes error behavior for first-chunk fetching (now re-raises). |
| ingestion/src/metadata/utils/s3_utils.py | Re-raises exceptions from S3 pagination helper after logging. |
| ingestion/src/metadata/utils/credentials.py | Adds temp credential file creation for GCP external account credentials. |
| ingestion/src/metadata/mixins/pandas/pandas_mixin.py | Extends get_dataframes to accept an optional boto3 session. |
| ingestion/src/metadata/sampler/pandas/sampler.py | Passes session from Datalake client into dataframe reader path. |
| ingestion/src/metadata/ingestion/source/database/datalake/metadata.py | Implements opt-in per-file tag ingestion for Datalake via provider object tags. |
| ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py | Adds optional get_object_tags() API to DL clients. |
| ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py | Implements S3 object tag retrieval via get_object_tagging. |
| ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py | Implements tag retrieval from blob metadata; modifies test list-buckets function behavior. |
| ingestion/src/metadata/ingestion/source/database/datalake/clients/azure_blob.py | Implements Azure blob tag retrieval via get_blob_tags. |
| ingestion/tests/unit/readers/test_json_reader.py | Adds unit tests validating schema inference reads only a small sample. |
| ingestion/tests/unit/readers/test_s3_reader_credentials.py | Adds S3 JSONL sampling test for schema inference. |
| ingestion/tests/unit/readers/test_avro_reader.py | Updates Avro S3 test expectation to a single get_object call. |
| ingestion/tests/unit/readers/test_parquet_reader.py | Switches GCS mocks to MagicMock for filesystem mocking. |
| credentials_dict = gcp_credentials.gcpConfig.model_dump() | ||
| credentials_dict["type"] = "external_account" |
There was a problem hiding this comment.
For GcpExternalAccount, building the credentials JSON via model_dump() produces field names like externalType, tokenURL, and subjectTokenType that do not match the expected Google external account credential file format (snake_case keys). This also differs from build_google_credentials_dict() (and the existing unit test expectations). Use the same mapping helper for external accounts to generate the correct JSON structure before writing the temp credentials file.
| credentials_dict = gcp_credentials.gcpConfig.model_dump() | |
| credentials_dict["type"] = "external_account" | |
| credentials_dict = build_google_credentials_dict( | |
| gcp_credentials.gcpConfig, single_project | |
| ) |
| logger.error( | ||
| f"Error fetching first chunk of file [{bucket_name}/{key}] using " | ||
| f"[{config_source.__class__.__name__}] due to: [{err}]" | ||
| ) | ||
| raise err |
There was a problem hiding this comment.
raise err here discards the original traceback context and makes debugging harder (it also leads to duplicated logging due to the outer except). Use a bare raise to preserve the original stack trace when rethrowing inside an except block.
| def get_test_list_buckets_fn(self, bucket_name: Optional[str]) -> Callable: | ||
|
|
||
| if bucket_name: | ||
| fn = partial(self._client.get_bucket, bucket_name) | ||
| else: | ||
| fn = self._client.list_buckets | ||
|
|
||
| os.environ.pop("GOOGLE_CLOUD_PROJECT", "") | ||
| if GOOGLE_CREDENTIALS in os.environ: | ||
| os.remove(os.environ[GOOGLE_CREDENTIALS]) | ||
| del os.environ[GOOGLE_CREDENTIALS] | ||
|
|
||
| return fn | ||
| return partial(self._client.get_bucket, bucket_name) | ||
| return self._client.list_buckets |
There was a problem hiding this comment.
This method no longer cleans up GOOGLE_APPLICATION_CREDENTIALS / temp credential files after running the test callable. Since set_google_credentials() writes temp files and close() does not reliably remove them for all credential types/config wrappers, this can leak temp files and leave env vars set for the rest of the process (impacting other tests/connectors). Consider restoring cleanup here or ensuring close() unsets the env var and deletes tracked temp files for all GCP credential modes.
| """Stream Avro from S3 without loading entire file into memory.""" | ||
| schema_response = self.client.get_object(Bucket=bucket_name, Key=key) | ||
| import io | ||
|
|
||
| response = self.client.get_object(Bucket=bucket_name, Key=key) | ||
| try: | ||
| columns = self._get_avro_columns(schema_response["Body"]) | ||
| avro_bytes = io.BytesIO(response["Body"].read()) | ||
| finally: | ||
| schema_response["Body"].close() | ||
| response["Body"].close() |
There was a problem hiding this comment.
In the S3 Avro reader, response["Body"].read() loads the entire object into memory. This defeats the stated goal of streaming Avro and makes read_first_chunk still download/read the full file, which can cause OOM or large latency for big Avro objects. Consider reverting to streaming from the S3 body (even if that requires a second get_object call), or using a bounded/ranged read just for schema inference.
| batch_size = ( | ||
| SCHEMA_INFERENCE_SAMPLE_SIZE | ||
| if getattr(self, "_schema_inference", False) | ||
| else CHUNKSIZE | ||
| ) | ||
|
|
||
| def chunk_generator(): | ||
| response = self.client.get_object(Bucket=bucket_name, Key=key) | ||
| try: | ||
| yield from self._stream_avro_records(response["Body"]) | ||
| finally: | ||
| response["Body"].close() | ||
| yield from self._stream_avro_records(avro_bytes, batch_size=batch_size) | ||
|
|
There was a problem hiding this comment.
chunk_generator() streams from a shared BytesIO that is created once and advanced as it is consumed. If wrapper.dataframes() is invoked more than once (a common pattern for generator factories in this codebase), subsequent iterations will return no data unless the buffer is rewound. Ensure each generator invocation starts from position 0 (or constructs a fresh stream) so dataframes remains repeatable.
There was a problem hiding this comment.
that is a valid comment. The initial implementation ensure that consecutive calls will pull from a fresh generator
| except Exception as exc: | ||
| logger.debug(traceback.format_exc()) | ||
| logger.warning(f"Unexpected exception to yield s3 object: {exc}") | ||
| raise |
There was a problem hiding this comment.
⚠️ Bug: Re-raising in list_s3_objects breaks callers that expect silent failure
Adding raise at the end of the except block in list_s3_objects changes a long-standing contract: callers previously relied on this function to log-and-swallow exceptions, yielding nothing on failure. Multiple callers (s3.py:get_table_names, file_client.py:get_pbit_files, s3/metadata.py:_generate_structured_containers_by_depth, s3/metadata.py:_yield_nested_unstructured_containers) do not wrap calls in try/except and will now propagate unhandled exceptions, potentially crashing entire ingestion workflows on transient S3 errors (e.g., access denied on a single prefix).
Suggested fix:
If the intent is to surface errors, add exception handling in each caller so a single prefix failure doesn't abort the whole ingestion. Alternatively, remove the re-raise and keep the original swallow-and-continue behavior, adding the re-raise only in the specific code paths that need it.
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
| options = {} | ||
| if connection_args.tenantId: | ||
| options["tenant_id"] = connection_args.tenantId | ||
| if connection_args.clientId: | ||
| options["client_id"] = connection_args.clientId | ||
| if connection_args.clientSecret: | ||
| options["client_secret"] = connection_args.clientSecret.get_secret_value() | ||
| return options |
There was a problem hiding this comment.
💡 Edge Case: return_azure_storage_options may return empty dict
The refactored return_azure_storage_options now conditionally adds each field. If all three fields (tenantId, clientId, clientSecret) are None/empty, it returns {}. This is intentional ("allowing DefaultAzureCredential fallback" per the docstring), but downstream code that unpacks these options with **storage_options into adlfs.AzureBlobFileSystem(...) should be verified to work correctly with no auth options. This is likely fine if Azure's DefaultCredential chain is configured, but worth a note in case it causes auth failures in environments where the old explicit credentials were expected.
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
|
|
||
| batch_size = ( | ||
| SCHEMA_INFERENCE_SAMPLE_SIZE | ||
| if getattr(self, "_schema_inference", False) |
There was a problem hiding this comment.
💡 Quality: Schema inference flag uses instance attr instead of parameter
All readers communicate the schema_inference flag by setting self._schema_inference in _read() and reading it back via getattr(self, '_schema_inference', False) in dispatch methods. This mutable-state-on-self pattern is fragile — if a reader instance is reused across calls (first read_first_chunk then read), the _schema_inference flag from the previous call leaks. It works today because _read always sets it, but the pattern is error-prone. Consider passing schema_inference as a parameter through the dispatch chain instead, or at minimum resetting it in read() as well.
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
Code Review
|
| Compact |
|
Was this helpful? React with 👍 / 👎 | Gitar
|
🔴 Playwright Results — 1 failure(s), 26 flaky✅ 3638 passed · ❌ 1 failed · 🟡 26 flaky · ⏭️ 84 skipped
Genuine Failures (failed on all attempts)❌
|



Describe your changes:
Fixes
I worked on ... because ...
Type of change:
Checklist:
Fixes <issue-number>: <short explanation>