In [1]:
import os
os.chdir("../")
%pwd

'c:\\Users\\karthikeya\\Fraud_Detection'

In [3]:
import os
from dataclasses import dataclass
from sklearn.model_selection import train_test_split

@dataclass
class DataIngestionConfig:
    raw_data_path = os.path.join("artifacts", "raw_data.parquet")
    train_data_path = os.path.join("artifacts", "train_data.parquet")
    test_data_path = os.path.join("artifacts", "test_data.parquet")



class DataIngestion:

    def __init__(self):
        self.data_ingestion_config = DataIngestionConfig()

    def initiate_data_ingestion(self, raw_data_from_database):

        self.raw_data = raw_data_from_database
        self.raw_data.to_parquet(self.data_ingestion_config.raw_data_path)

        train_data, test_data = train_test_split(self.raw_data)

        train_data.to_parquet(self.data_ingestion_config.train_data_path)
        test_data.to_parquet(self.data_ingestion_config.test_data_path)

        return train_data, test_data


In [4]:
import os
from dataclasses import dataclass
from abc import ABC, abstractmethod
from sklearn.model_selection import train_test_split
import pandas as pd


@dataclass
class DataIngestionConfig:
    raw_data_path = os.path.join("artifacts", "raw_data.parquet")
    train_data_path = os.path.join("artifacts", "train_data.parquet")
    test_data_path = os.path.join("artifacts", "test_data.parquet")


# Strategy Interface for Data Ingestion
class DataIngestionStrategy(ABC):
    @abstractmethod
    def ingest(self):
        """Ingest data from a specific source."""
        pass


# Concrete Strategy for Database Data Ingestion
class DatabaseDataIngestion(DataIngestionStrategy):
    def __init__(self, raw_data_from_db, config: DataIngestionConfig):
        self.raw_data = raw_data_from_db
        self.config = config

    def ingest(self):
        # Ingest data from the database
        self.raw_data.to_parquet(self.config.raw_data_path)
        return self.raw_data


# Concrete Strategy for File Data Ingestion
class FileDataIngestion(DataIngestionStrategy):
    def __init__(self, file_path: str, config: DataIngestionConfig):
        self.file_path = file_path
        self.config = config

    def ingest(self):
        # Load data from file
        raw_data = pd.read_parquet(self.file_path)
        raw_data.to_parquet(self.config.raw_data_path)
        return raw_data


# Data splitting class (Single Responsibility Principle)
class DataSplitter:
    def __init__(self, raw_data):
        self.raw_data = raw_data

    def split(self, test_data_size, random_state:int = None):
        # Split data into train and test
        return train_test_split(self.raw_data, test_size=test_data_size, random_state=random_state)
         


# Context Class that uses Strategy Pattern
class DataPipeline:
    def __init__(self, ingestion_strategy: DataIngestionStrategy, data_source):
        self.ingestion_strategy = ingestion_strategy
        self.data_source = data_source
        self.data_splitter = None

    def execute(self):
        # Ingest the data using the selected strategy
        raw_data = self.ingestion_strategy.ingest()

        # Split the data
        self.data_splitter = DataSplitter(raw_data)
        train_data, test_data = self.data_splitter.split()

        # Save the split data
        train_data.to_parquet(DataIngestionConfig().train_data_path)
        test_data.to_parquet(DataIngestionConfig().test_data_path)

        return train_data, test_data


# Usage Example

# Assume you have raw data from some source
raw_data_from_db = 

# Create a data pipeline using the Database Data Ingestion strategy
db_strategy = DatabaseDataIngestion(raw_data_from_db, DataIngestionConfig())
pipeline = DataPipeline(ingestion_strategy=db_strategy, data_source=raw_data_from_db)

# Execute the pipeline
train_data, test_data = pipeline.execute()

# If you want to switch to file-based ingestion, just change the strategy
file_strategy = FileDataIngestion(file_path="path_to_file", config=DataIngestionConfig())
file_pipeline = DataPipeline(ingestion_strategy=file_strategy, data_source=None)

# Execute the new pipeline with the file ingestion strategy
train_data, test_data = file_pipeline.execute()
