In [None]:
# import logging
# from pathlib import Path
# from soda.scan import Scan

# class DQSodaCheck:
#     """
#     Wrapper for running Soda Core data quality scans against a provided data source.
    
#     Args:
#         checks_path: Path to the SodaCL checks YAML file.
#         data_source_name: Name of the data source to scan.
#         scan_name: Logical name for the scan.
#         verbose: If True, enables verbose Soda logging.
#         return_json: If True, returns the scan results as a JSON object.
#     """

#     logging.basicConfig(level=logging.INFO)
#     logger = logging.getLogger("dq.soda")

#     ui_color = "#8fcf94"
#     ui_fgcolor = "#000000"

#     def __init__(
#         self,
#         checks_path: str,
#         data_source_name: str,
#         scan_name: str = "soda_scan",
#         verbose: bool = False,
#         return_json: bool = False,
#     ):
#         self.config_path = "/Users/josephparadis/Documents/GitHub/soda-core/configuration2.yml"
#         self.checks_path = checks_path
#         self.data_source_name = data_source_name
#         self.scan_name = scan_name
#         self.verbose = verbose
#         self.return_json = return_json

#         # Validate paths
#         # if not Path(self.config_path).exists():
#         #     raise FileNotFoundError(f"Config file not found: {self.config_path}")

#         if not Path(self.checks_path).exists():
#             raise FileNotFoundError(f"Checks file not found: {self.checks_path}")

#     def execute(self) -> int:
#         """
#         Execute the Soda Core scan.
        
#         Returns:
#             Exit code from Soda scan:
#             0 = Pass, 1 = Warning, 2 = Fail, 3 = Error
#         """

#         scan = Scan()
#         scan.set_verbose(self.verbose)
#         # scan.add_configuration_yaml_file(None)
#         scan.add_configuration_yaml_file(self.config_path)
#         scan.set_data_source_name(self.data_source_name)
#         scan.add_sodacl_yaml_files(self.checks_path)
#         scan.set_scan_definition_name(self.scan_name)

#         result = scan.execute()

#         if result == 0:
#             self.logger.info("‚úÖ Soda scan successful: all checks passed.")
#         elif result in (1, 2):
#             raise ValueError(
#                 f"‚ùå Soda scan completed with warnings or failures. Exit code: {result}"
#             )
#         else:
#             raise RuntimeError(
#                 f"‚ùå Soda scan encountered an execution error. Exit code: {result}"
#             )

#         if self.return_json:
#             return scan.get_scan_results()

In [None]:
import logging
from pathlib import Path
from soda.scan import Scan

class BaseSodaOperator:
    """
    Wrapper for running Soda Core data quality scans against a provided data source.
    
    Args:
        checks_path: Path to the SodaCL checks YAML file.
        data_source_name: Name of the data source to scan.
        scan_name: Logical name for the scan.
        verbose: If True, enables verbose Soda logging.
        return_json: If True, returns the scan results as a JSON object.
    """

    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger("dq.soda")

    def __init__(
        self,
        checks_path: str,
        data_source_name: str,
        scan_name: str = "soda_scan",
        verbose: bool = False,
        return_json: bool = False,
    ):
        self.checks_path = checks_path
        self.data_source_name = data_source_name
        self.scan_name = scan_name
        self.verbose = verbose
        self.return_json = return_json

        if not Path(self.checks_path).exists():
            raise FileNotFoundError(f"Checks file not found: {self.checks_path}")

    def execute(self) -> int:
        """
        Execute the Soda Core scan.
        
        Returns:
            Exit code from Soda scan:
            0 = Pass, 1 = Warning, 2 = Fail, 3 or 4 = Error
        """

        scan = Scan()
        scan.set_verbose(self.verbose)
        scan.add_configuration_yaml_file(None)
        scan.set_data_source_name(self.data_source_name)
        scan.add_sodacl_yaml_files(self.checks_path)
        scan.set_scan_definition_name(self.scan_name)

        result = scan.execute()

        if result == 0:
            self.logger.info("‚úÖ Soda scan successful: all data quality checks passed with no runtime issues.")

        elif result == 1:
            self.logger.warning("‚ö†Ô∏è Soda scan completed with warnings: one or more checks raised non-critical issues.")

        elif result == 2:
            self.logger.warning("‚ùå Soda scan failed: one or more data quality checks did not meet requirements.")

        elif result in (3, 4):
            self.logger.warning("‚ùå Soda runtime error: a technical issue occurred: {result}")

        else:
            self.logger.warning("üö® Soda scan returned an unknown exit code: {result}")


        if self.return_json:
            return scan.get_scan_results()

In [None]:
class DQTrinoSodaCheck(BaseSodaOperator):
    ui_color = "#d925c1"
    ui_fgcolor = "#000000"

    def __init__(
        self,
        checks_path: str,
        scan_name: str = "soda_scan",
        verbose: bool = False,
        return_json: bool = False,
        ):

        super().__init__(
            checks_path=checks_path,
            scan_name=scan_name,
            data_source_name="trino",
            verbose=verbose,
            return_json=return_json,
        )

    def execute(self):
        return super().execute()

class DQSnowflakeSodaCheck(BaseSodaOperator):
    ui_color = "#328fa8"
    ui_fgcolor = "#000000"
    
    def __init__(
        self,
        checks_path: str,
        scan_name: str = "soda_scan",
        verbose: bool = False,
        return_json: bool = False,
        ):

        super().__init__(
            checks_path=checks_path,
            scan_name=scan_name,
            data_source_name="snowflake",
            verbose=verbose,
            return_json=return_json,
        )

    def execute(self):
        return super().execute()

In [None]:
import logging
from pathlib import Path
from typing import Any, Dict, Optional, TypedDict

from soda.scan import Scan  # TODO: Get this from our GitHub repo

class BaseSodaOperator:
    """
    Airflow-style base operator for executing Soda Core scans.

    This class encapsulates Soda scan setup and provides separate execution
    paths for exit-code-based control flow or JSON result consumption.

    Args:
        checks_path: Path to a SodaCL checks YAML file.
        data_source_name: Name of the Soda data source.
        scan_name: Logical name for the scan.
        verbose: If True, enables verbose Soda logging.
    """
    # Immediate logging behavior
    logging.basicConfig(level=logging.INFO)

    ui_color = "#ededed"
    ui_fgcolor = "#000000"

    logger = logging.getLogger("dq.soda")

    def __init__(
        self,
        *,
        checks_path: str,
        data_source_name: str,
        scan_name: str = "soda_scan",
        verbose: bool = False,
    ):
        self.checks_path = checks_path
        self.data_source_name = data_source_name
        self.scan_name = scan_name
        self.verbose = verbose

        if not Path(self.checks_path).exists():
            raise FileNotFoundError(f"Checks file not found: {self.checks_path}")


    def execute(self, context: Optional[dict] = None) -> dict:
        """
        Airflow-compatible execute method.

        Args:
            context: Airflow task context (unused).

        Returns:
            Dictionary containing:
            - resultCode: Soda exit code
            - All Soda scan result fields
        """
        result_code, scan_results = self._run_scan()

        return {"resultCode": result_code, **scan_results}
    
    
    def _run_scan(self) -> tuple[int, dict]:
        """
        Configure and execute a Soda scan.

        Returns:
            The executed Scan object.
        """
        scan = Scan()
        scan.set_verbose(self.verbose)
        scan.add_configuration_yaml_file(None)
        scan.set_data_source_name(self.data_source_name)
        scan.add_sodacl_yaml_files(self.checks_path)
        scan.set_scan_definition_name(self.scan_name)

        result_code = scan.execute()
        scan_results = scan.get_scan_results()
        
        self._log_result(result_code)

        return result_code, scan_results


    def _log_result(self, result_code: int) -> None:
        """
        Log scan outcome using Soda exit codes.
        """
        if result_code == 0:
            self.logger.info(f"‚úÖ Result {result_code}: All data quality checks passed.")
        elif result_code == 1:
            self.logger.warning(f"‚ö†Ô∏è Result {result_code}: Scan completed with warnings.")
        elif result_code == 2:
            self.logger.warning(f"‚ùå Result {result_code}: Data quality check failures detected.")
        elif result_code in (3, 4):
            self.logger.error(f"‚ùå Result {result_code}: Soda runtime or configuration error.")
        else:
            self.logger.error(f"üö® Result {result_code}: Unknown Soda exit code.")


class DQTrinoSodaCheck(BaseSodaOperator):
    """
    Soda Core operator configured for Trino.
    """

    ui_color = "#d925c1"
    ui_fgcolor = "#000000"

    def __init__(
        self,
        *,
        checks_path: str,
        scan_name: str = "soda_scan",
        verbose: bool = False,
    ):
        super().__init__(
            checks_path=checks_path,
            data_source_name="trino",
            scan_name=scan_name,
            verbose=verbose,
        )


class DQSnowflakeSodaCheck(BaseSodaOperator):
    """
    Soda Core operator configured for Snowflake.
    """

    ui_color = "#328fa8"
    ui_fgcolor = "#000000"

    def __init__(
        self,
        *,
        checks_path: str,
        scan_name: str = "soda_scan",
        verbose: bool = False,
    ):
        super().__init__(
            checks_path=checks_path,
            data_source_name="snowflake",
            scan_name=scan_name,
            verbose=verbose,
        )

In [2]:
PROJECT_ROOT = "/Users/josephparadis/Documents/GitHub/soda-core"

# scan = DQSodaCheck(
#     checks_path=f"{PROJECT_ROOT}/checks_trino.yml",
#     data_source_name="trino",
#     scan_name="trino_daily_check",
#     verbose=False,
#     return_json=True,
# )

# scan = DQSodaCheck(
#     checks_path=f"{PROJECT_ROOT}/checks.yml",
#     data_source_name="snowflake",
#     scan_name="snowflake_daily_check",
#     verbose=False,
#     return_json=True,
# )

# scan = DQTrinoSodaCheck(
#     checks_path=f"{PROJECT_ROOT}/checks_trino.yml",
#     scan_name="trino_daily_check",
#     verbose=False,
#     # return_json=True,
# )

# scan = DQSnowflakeSodaCheck(
#     checks_path=f"{PROJECT_ROOT}/checks.yml",
#     scan_name="snowflake_daily_check",
#     verbose=False,
#     return_json=True,
# )

# results = scan.execute()

scan = DQTrinoSodaCheck(
    checks_path=f"{PROJECT_ROOT}/checks_trino.yml",
    scan_name="trino_daily_check",
    verbose=False,
)

results = scan.execute()


INFO:soda.scan:[15:46:10] Soda Core 3.5.6
INFO:soda.scan:[15:46:10] Running Soda scan: `trino_daily_check` on data source: `trino`
INFO:soda.scan:[15:46:11] Heimdall Job URL: https://heimdall.aws.pattern.com/jobs/46c84218-78b2-4668-b684-f76b610c2b77
INFO:soda.scan:[15:46:11] Waiting for Heimdall Job to complete...
INFO:soda.scan:[15:46:17] Scan summary:
INFO:soda.scan:[15:46:17] 1/1 check PASSED: 
INFO:soda.scan:[15:46:17]     ADHOC.CATALOG_LISTINGS_PDP in trino
INFO:soda.scan:[15:46:17]       missing_count(id) = 0 [PASSED]
INFO:dq.soda:‚úÖ Result 0: All data quality checks passed.


In [3]:
results

{'resultCode': 0,
 'definitionName': 'trino_daily_check',
 'defaultDataSource': 'trino',
 'dataTimestamp': '2025-12-19T22:46:10+00:00',
 'scanStartTimestamp': '2025-12-19T22:46:10+00:00',
 'scanEndTimestamp': '2025-12-19T22:46:17+00:00',
 'hasErrors': False,
 'hasFailures': False,
 'metrics': [{'identity': 'metric-trino_daily_check-trino-ADHOC.CATALOG_LISTINGS_PDP-id-missing_count',
   'metricName': 'missing_count',
   'value': 0,
   'dataSourceName': 'trino'}],
 'checks': [{'identity': '677c2f2d',
   'name': 'missing_count(id) = 0',
   'type': 'generic',
   'definition': 'checks for ADHOC.CATALOG_LISTINGS_PDP:\n  missing_count(id) = 0',
   'resourceAttributes': [],
   'location': {'filePath': '/Users/josephparadis/Documents/GitHub/soda-core/checks_trino.yml',
    'line': 3,
    'col': 5},
   'dataSource': 'trino',
   'table': 'ADHOC.CATALOG_LISTINGS_PDP',
   'filter': None,
   'column': 'id',
   'metrics': ['metric-trino_daily_check-trino-ADHOC.CATALOG_LISTINGS_PDP-id-missing_count'],

In [6]:
summary = {
"definitionName": results.get("definitionName"),
"dataSource": results.get("defaultDataSource"),
"start": results.get("scanStartTimestamp"),
"end": results.get("scanEndTimestamp"),
"hasFailures": results.get("hasFailures"),
"hasWarnings": results.get("hasWarnings"),
"hasErrors": results.get("hasErrors"),
"metrics": []
}

summary

{'definitionName': 'trino_daily_check',
 'dataSource': 'trino',
 'start': '2025-12-19T22:39:26+00:00',
 'end': '2025-12-19T22:39:34+00:00',
 'hasFailures': False,
 'hasErrors': False,
 'metrics': []}