In [None]:
import os

os.chdir("..")

In [None]:
%pwd

In [None]:
from pydantic import BaseModel
from src.jigsaw.entity.common import Directory
from src.jigsaw.entity.common import FilePath
from typing import TypeVar
from pathlib import Path


class DataSchema(BaseModel):
    name: str
    schema: dict[str, str]
    train: list[FilePath]
    test: list[FilePath]
    target: str


class DataValidationConfig(BaseModel):
    outdir: Directory
    indir: Directory
    statistics: bool
    schemas: list[DataSchema]

In [None]:
from box import ConfigBox
from src.jigsaw.utils.common import load_yaml
from src.jigsaw.constants import *
from src.jigsaw.entity.common import FilePath
from src.jigsaw import logger


class ConfigurationManager:
    def __init__(
        self,
        config_path: FilePath = CONFIG_FILE_PATH,
        params_path: FilePath = PARAMS_FILE_PATH,
        schema_path: FilePath = SCHEMA_FILE_PATH,
    ):
        self.config = load_yaml(config_path)
        self.params = load_yaml(params_path)
        self.schema = load_yaml(schema_path)
        self.artifact_root = Directory(path=self.config.artifact_root)

    def get_data_validation_config(self):
        config = self.config.data_validation
        schemas = []

        for name, schema in self.schema.items():
            if name in os.listdir(
                self.artifact_root.path / self.config.data_ingestion.outdir
            ):
                schemas.append(
                    DataSchema(
                        name=name,
                        schema=schema.columns.to_dict(),
                        train=schema.train,
                        test=schema.test,
                        target=schema.target,
                    )
                )
            else:
                logger.error(f'Data "{name}" doesn\'t exists)')

        target_dir = Directory(path=self.artifact_root.path / config.outdir)
        input_dir = self.artifact_root.path / self.config.data_ingestion.outdir
        if not input_dir.exists():
            logger.error(f"Data Validation : Data can't be found")
            raise Exception(f"Data Validation : Data can't be found")

        return DataValidationConfig(
            outdir=target_dir,
            indir=Directory(path=input_dir),
            statistics=config.statistics,
            schemas=schemas,
        )

In [None]:
from pydantic import validate_call
import pandas as pd
from src.jigsaw import logger
from src.jigsaw.entity.common import FilePath
from pathlib import Path
from src.jigsaw.utils.common import read_csv, save_json
from collections import defaultdict


class DataValidationComponent:
    def __init__(self, config: DataValidationConfig):
        self.config = config
        self.names = [i.name for i in self.config.schemas]
        tree = lambda: defaultdict(tree)
        self.status = tree()

        self.outdir = self.config.outdir.path
        self.indir = self.config.indir.path

        print("=" * 100)
        string = "Datasets Available"
        print(
            "|{}{}{}|".format(
                " " * (100 - (len(string) // 2 - 1)), string, " " * (100 - (len(string) // 2 - 1))
            )
        )
        print("=" * 100)
        for name in self.names:
            print(
                "|{}{}{}|".format(
                    " " * (100 - (len(name) // 2 - 1)), name, " " * (100 - (len(name) // 2 - 1))
                )
            )
        print("=" * 20)

    def validate_all(self):
        for schema in self.config.schemas:
            for file in schema.train:
                self.find_missing_columns(
                    self.indir / schema.name / file, schema, train=True
                )

            for file in schema.test:
                self.find_missing_columns(
                    self.indir / schema.name / file, schema, train=False
                )

        save_json(self.status, self.outdir / "status.json")

    @validate_call
    def find_missing_columns(self, data_path: Path, schema: DataSchema, train=True):
        data = read_csv(data_path)
        data_cols = data.columns
        if not train:
            schema.schema.pop(schema.target)
        target_cols = schema.schema.keys()

        validation_status = True
        for col in target_cols:
            if col not in data_cols:
                logger.error(f"Missing column {col} in file {data_path.as_posix()}")
                validation_status = False
            else:
                validation_status &= True

        self.status[str(data_path)]["missing_values"] = not validation_status
        if validation_status:
            logger.info(f"{data_path} : No Missing Values")

    def get_statistics(self, path: FilePath):
        pass


In [None]:
class DataValidationPipeline:
    def __init__(
        self,
    ):
        self.config = ConfigurationManager().get_data_validation_config()
        self.comp = DataValidationComponent(self.config)

    def kickoff(
        self,
    ):
        self.comp.validate_all()
        if self.config.statistics:
            self.comp.get_statistics()

In [None]:
DataValidationPipeline().kickoff()

In [None]:
from pathlib import Path

os.path.split(Path("research/input/research.ipynb"))[0]