In [None]:
from pydantic import BaseModel
from pathlib import Path
from typing import Optional

class DataSource(BaseModel):
    id: str
    source : str
    type : Optional[str] = None
    link : str

class DataIngestionConfig(BaseModel):
    data_sources : dict[str, DataSource]
    outdir : Path

class DataIngestionArtifact(BaseModel):
    names : list[str]
    path : Path

In [60]:
import os
os.chdir("/Users/morizin/Documents/Code/crash-detection-project")

CONFIG_PATH = 'config/config.yaml'

In [61]:
from typeguard import typechecked
import yaml
from pathlib import Path
from box import ConfigBox
from typing import Any

# @typechecked
def load_yaml(path: Path | str, boxed = True) -> ConfigBox | dict[str | Any]:
    if isinstance(path, str):
        path = Path(path)
        
    config = yaml.safe_load(open(path, "r"))
    if boxed:
        config = ConfigBox(config)
    return config

config = load_yaml(CONFIG_PATH)

In [62]:
class ConfigManager:
    def __init__(self, config_path = CONFIG_PATH):
        self.config = load_yaml(CONFIG_PATH)
        self.artifact_path = Path(self.config.artifact_path)
        self.schema_path = self.config.schema_path

        os.makedirs(self.artifact_path, exist_ok= True)

    def get_data_sources(self) -> dict[str, DataSource] :
        data_sources_v = dict()
        for name, source in self.config.data_sources.items():
            data_sources_v[name] = DataSource(
                id = name,
                source=source.source,
                type = source.type if hasattr(source, "type") else None,
                link = source.link 
            )
        return data_sources_v
    
    def get_data_ingestion_config(self, ) -> DataIngestionConfig:

        outdir = self.artifact_path / "data"
        os.makedirs(outdir, exist_ok= True)

        return DataIngestionConfig(
            data_sources=self.get_data_sources(),
            outdir=outdir
        )

In [72]:
import kaggle
from src.crash_detection import logger

class DataIngestionComponent:
    def __init__(self, cfg : DataIngestionConfig):
        try:
            self.cfg : DataIngestionConfig = cfg
            self.kaggle_api = kaggle.KaggleApi()
        except Exception as e:
            logger.error(f"Error at Data Ingestion Component {e}")
    
    def __call__(self):
        for name, source in self.cfg.data_sources.items():
            if (self.cfg.outdir / name).exists() and len(os.listdir(self.cfg.outdir / name)):
                continue
            match source.source:
                case "kaggle":
                    match source.type:
                        case "datasets":
                            self.kaggle_api.dataset_download_cli(
                                source.link, 
                                path=self.cfg.outdir / name,
                                unzip=True,
                            )
                        case _:
                            print(f"Source having type {source.type} not found")
                case _:
                    print(f"Source {source.source} not found")
        return DataIngestionArtifact(
            names = list(self.cfg.data_sources.keys()),
            path = self.cfg.outdir
        )

In [73]:
cfg = ConfigManager()
dic = DataIngestionComponent(cfg.get_data_ingestion_config())

In [74]:
dic()

DataIngestionArtifact(names=['gta-crash'], path=PosixPath('artifacts/data'))

In [None]:
import sys

sys.path.append("../src")
