# Data Preparation for Model Training (Apache Spark)

This notebook prepares the XOR logic gate dataset for model training.

## 1. Import Libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, count, when, isnan, isnull
import os

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Data Preparation for Model Training") \
    .getOrCreate()

print(f"Spark version: {spark.version}")

## 2. Load Data

In [None]:
# Load the dataset
df = spark.read.csv('../data/raw/input.csv', header=True, inferSchema=True)
print(f"Dataset shape: ({df.count()}, {len(df.columns)})")
df.show()

## 3. Explore Data

### Data Types (df.dtypes)

In [None]:
df.dtypes

### Data Info (df.info())

In [None]:
df.printSchema()

### Check for Missing Value

In [None]:
# Check for missing values
print("Missing Values:")
missing_counts = df.select([
    count(when(isnull(c) | isnan(c), c)).alias(c)
    for c in df.columns
])
missing_counts.show()

total_missing = sum([
    df.filter(isnull(col(c)) | isnan(col(c))).count()
    for c in df.columns
])
print(f"\nTotal missing values: {total_missing}")

### Statistical summary

In [None]:
df.describe().show()

## 4. Transform Data

### 4.1 Convert columns to DoubleType (Spark standard)

In [None]:
for column in df.columns:
    df = df.withColumn(column, col(column).cast(DoubleType()))

## 5. Data Summary

### data types

In [None]:
df.dtypes

### data info

In [None]:
df.printSchema()

### df (data)

In [None]:
df.show()

## 6. Save Prepared Data

### prepare output directory

In [None]:
output_dir = '../data/processed'
os.makedirs(output_dir, exist_ok=True)

### Save as CSV

In [None]:
csv_path = os.path.join(output_dir, 'prepared_data.csv_spark')
# Spark writes to a directory, so we use coalesce(1) to get a single file
df.coalesce(1).write.mode('overwrite').option('header', 'true').csv(csv_path)
print(f"✓ Saved CSV: {csv_path}")

### Save as Parquet

In [None]:
parquet_path = os.path.join(output_dir, 'prepared_data.parquet_spark')
df.coalesce(1).write.mode('overwrite').parquet(parquet_path)
print(f"✓ Saved Parquet: {parquet_path}")

### Verify saved files

In [None]:
print("Saved files/directories:")
for f in os.listdir(output_dir):
    filepath = os.path.join(output_dir, f)
    if os.path.isdir(filepath):
        # For Spark output directories, show contents
        total_size = sum(os.path.getsize(os.path.join(filepath, sf)) for sf in os.listdir(filepath))
        print(f"  - {f}/ ({total_size} bytes)")
    else:
        size = os.path.getsize(filepath)
        print(f"  - {f} ({size} bytes)")