In [1]:
from collections import namedtuple

In [2]:
DataIngestionConfig = namedtuple("DataIngestionConfig",[
    "root_dir",
    "source_URL",
    "local_data_file",
    "unzip_dir"
])

In [3]:
from DeepClassifier.constants import *
from DeepClassifier.utils import read_yaml,create_directories

In [4]:
from distutils.command.config import config


class ConfigurationManager:
    def __init__(self,
    config_filepath = CONFIG_FILE_PATH,
    params_filepath = PARAMS_FILE_PATH
    ):
        self.config = read_yaml(config_filepath)
        self.params = read_yaml(config_filepath)
        create_directories([self.config.artifacts_root])

    def get_data_ingestion_config(self) -> DataIngestionConfig:
        config = self.config.data_ingestion 

        create_directories([config.root_dir])

        data_ingestion_config = DataIngestionConfig(
            root_dir= config.root_dir,
            source_URL=config.source_URL,
            local_data_file= config.local_data_file,
            unzip_dir=config.unzip_dir
        )
        return data_ingestion_config

        

In [6]:
import os
import urllib.request as request   ## for downloading the data its the kibrary
from zipfile import ZipFile         ## for zipping  and unzipping the file data

class DataIngestion:
    def __init__(self, config: DataIngestionConfig):
        self.config = config

    def download_file(self):
        if not os.path.exists(self.config.local_data_file):  ## To check if file is present or not
            filename, headers = request.urlretrieve(        
                url = self.config.source_URL,
                filename = self.config.local_data_file
            )

    def _get_updated_list_of_files(self, list_of_files):
        return [f for f in list_of_files if f.endswith(".jpg") and ("Cat" in f or "Dog" in f)]

    def _preprocess(self, zf: ZipFile, f: str, working_dir: str):
        target_filepath = os.path.join(working_dir, f)
        if not os.path.exists(target_filepath):
            zf.extract(f, working_dir)
        
        if os.path.getsize(target_filepath) == 0:
            os.remove(target_filepath)

    def unzip_and_clean(self):    ## To unzip the file that we want to open
        with ZipFile(file=self.config.local_data_file, mode="r") as zf:
            list_of_files = zf.namelist()
            updated_list_of_files = self._get_updated_list_of_files(list_of_files)
            for f in updated_list_of_files:
                self._preprocess(zf, f, self.config.unzip_dir)