In [1]:
import os


In [2]:
%pwd

'c:\\telco churn project\\src\\research'

In [3]:
import os

# Set working directory to project root
os.chdir("c:/telco churn project")


In [4]:
%pwd

'c:\\telco churn project'

In [5]:
import yaml

with open("config/config.yaml", "r") as f:
    config = yaml.safe_load(f)

config


{'artifacts_root': 'artifacts',
 'data_ingestion': {'root_dir': 'artifacts/data_ingestion',
  'source_file': 'data/WA_Fn-UseC_-Telco-Customer-Churn.csv',
  'raw_data': 'artifacts/data_ingestion/raw.csv',
  'train_data': 'artifacts/data_ingestion/train.csv',
  'test_data': 'artifacts/data_ingestion/test.csv',
  'test_size': 0.2,
  'random_state': 42},
 'data_transformation': {'root_dir': 'artifacts/data_transformation',
  'train_data_path': 'artifacts/data_ingestion/train.csv',
  'test_data_path': 'artifacts/data_ingestion/test.csv',
  'transformed_train_path': 'artifacts/data_transformation/train_transformed.csv',
  'transformed_test_path': 'artifacts/data_transformation/test_transformed.csv',
  'transformer_object_path': 'artifacts/data_transformation/transformer.pkl'},
 'data_validation': {'root_dir': 'artifacts/data_validation',
  'status_file': 'artifacts/data_validation/status.txt',
  'schema_file': 'config/schema.yaml',
  'data_file': 'artifacts/data_ingestion/train.csv'}}

In [6]:
import pandas as pd
import yaml
import os


In [9]:
with open("config/config.yaml", "r") as f:
    config = yaml.safe_load(f)

validation_config = config["data_validation"]

data_path = validation_config["data_file"]
schema_path = validation_config["schema_file"]
status_path = validation_config["status_file"]


In [11]:
# Load dataset
df = pd.read_csv(data_path)

# Load schema
with open(schema_path, "r") as f:
    schema = yaml.safe_load(f)

expected_columns = schema["columns"]


In [12]:
actual_columns = df.columns.tolist()
expected_column_names = list(expected_columns.keys())

missing_columns = [col for col in expected_column_names if col not in actual_columns]
extra_columns = [col for col in actual_columns if col not in expected_column_names]


In [13]:
mismatched_types = []
for col, expected_dtype in expected_columns.items():
    if col in df.columns:
        actual_dtype = str(df[col].dtype)
        if actual_dtype != expected_dtype:
            mismatched_types.append((col, expected_dtype, actual_dtype))


In [14]:
target_col = schema.get("target_column")
target_dtype = schema.get("target_dtype")
target_mismatch = False

if target_col not in df.columns:
    target_mismatch = True
elif str(df[target_col].dtype) != target_dtype:
    target_mismatch = True


In [15]:
os.makedirs(os.path.dirname(status_path), exist_ok=True)

with open(status_path, "w") as f:
    if missing_columns or extra_columns or mismatched_types or target_mismatch:
        f.write("Validation Failed\n")
        if missing_columns:
            f.write(f"Missing columns: {missing_columns}\n")
        if extra_columns:
            f.write(f"Extra columns: {extra_columns}\n")
        if mismatched_types:
            f.write("Mismatched data types:\n")
            for col, expected, actual in mismatched_types:
                f.write(f"  - {col}: expected {expected}, got {actual}\n")
        if target_mismatch:
            f.write(f"Target column '{target_col}' is missing or of wrong dtype\n")
    else:
        f.write("Validation Successful\n")
