In [None]:
"""
=============================================================================
CAPSTONE BIG DATA PROJECT
02 - SPARK DATA AUGMENTATION (CORRECTED VERSION)
=============================================================================
Loads cleaned data with enhanced features from Google Drive
Generates 10M records while preserving all engineered features
=============================================================================
"""

# =============================================================================
# CELL 1: INSTALLATION & SETUP
# =============================================================================
print("=" * 80)
print("üöÄ INSTALLING PYSPARK")
print("=" * 80)

!pip install pyspark==3.4.1 -q

print("‚úÖ Installation complete!")


üöÄ INSTALLING PYSPARK
[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m310.8/310.8 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m200.5/200.5 kB[0m [31m15.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
dataproc-spark-connect 1.0.1 requires pyspark[connect]~=4.0.0, but you have pyspark 3.4.1 which is incompatible.[0m[31m
[0m‚úÖ Installation complete!


In [None]:
# =============================================================================
# CELL 2: IMPORTS
# =============================================================================
print("\n" + "=" * 80)
print("üì¶ IMPORTING LIBRARIES")
print("=" * 80)

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, rand, randn, when, lit, monotonically_increasing_id,
    date_add, date_sub, expr, round as spark_round,
    concat, lpad, row_number
)
from pyspark.sql.window import Window
import time
from datetime import datetime

print("‚úÖ All libraries imported!")


üì¶ IMPORTING LIBRARIES
‚úÖ All libraries imported!


In [None]:
# =============================================================================
# CELL 3: CREATE SPARK SESSION
# =============================================================================
print("\n" + "=" * 80)
print("‚ö° CREATING SPARK SESSION")
print("=" * 80)

spark = SparkSession.builder \
    .appName("E-commerce_Data_Augmentation") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "400") \
    .config("spark.sql.adaptive.enabled", "true") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

print(f"‚úÖ Spark {spark.version} initialized")
print(f"‚úÖ Cores: {spark.sparkContext.defaultParallelism}")



‚ö° CREATING SPARK SESSION
‚úÖ Spark 3.4.1 initialized
‚úÖ Cores: 2


In [None]:
# =============================================================================
# CELL 4: MOUNT GOOGLE DRIVE
# =============================================================================
print("\n" + "=" * 80)
print("‚òÅÔ∏è MOUNTING GOOGLE DRIVE")
print("=" * 80)

from google.colab import drive
drive.mount('/content/drive')

print("‚úÖ Google Drive mounted!")


‚òÅÔ∏è MOUNTING GOOGLE DRIVE
Mounted at /content/drive
‚úÖ Google Drive mounted!


In [None]:
# =============================================================================
# CELL 5: LOAD CLEANED DATA WITH FEATURES FROM GOOGLE DRIVE
# =============================================================================
print("\n" + "=" * 80)
print("üì• LOADING CLEANED DATA WITH FEATURES FROM GOOGLE DRIVE")
print("=" * 80)

# Path to your cleaned data with all features
CLEANED_DATA_PATH = "/content/drive/MyDrive/BigDataProject/cleaned_data_with_features.parquet"

print(f"üìÇ Loading from: {CLEANED_DATA_PATH}")

start_time = time.time()

# Read parquet
df_base = spark.read.parquet(CLEANED_DATA_PATH)

load_time = time.time() - start_time

base_count = df_base.count()
base_cols = len(df_base.columns)

print(f"\n‚úÖ Loaded in {load_time:.2f} seconds")
print(f"üìä Rows: {base_count:,}")
print(f"üìä Columns: {base_cols}")

# Show new features first
print("\nüéØ NEW FEATURES CREATED IN CLEANING PHASE:")
print("-" * 80)

new_features = [
    # Shipping Delay Features
    'delay_severity', 'order_to_ship_days',

    # RFM Features
    'total_orders_count', 'total_lifetime_value', 'average_order_value',
    'days_since_last_order', 'order_frequency_rate',

    # Financial Features
    'profit_margin', 'profit_category', 'discount_impact',

    # Discount Features
    'has_discount', 'discount_percentage', 'discount_category',

    # Customer Value Features
    'customer_value_segment',

    # Product & Order Features
    'price_category', 'order_size_category', 'is_large_order',

    # Original derived features
    'actual_shipping_days', 'shipping_delay', 'price_per_unit'
]

existing_new_features = [f for f in new_features if f in df_base.columns]
print(f"‚úÖ Found {len(existing_new_features)} new features:")
for i, feature in enumerate(existing_new_features, 1):
    print(f"   {i:2d}. {feature}")

# Show ALL columns
print(f"\nüìã ALL COLUMNS in the dataset ({base_cols} total):")
print("-" * 80)
cols_per_line = 4
for i in range(0, len(df_base.columns), cols_per_line):
    chunk = df_base.columns[i:i+cols_per_line]
    line_parts = []
    for j, col_name in enumerate(chunk):
        line_parts.append(f"{i+j+1:3d}. {col_name}")
    print(" | ".join(line_parts))
print("-" * 80)

# Show data types distribution
print("\nüìù DATA TYPE DISTRIBUTION:")
type_counts = {}
for field in df_base.schema.fields:
    dtype = str(field.dataType)
    type_counts[dtype] = type_counts.get(dtype, 0) + 1

for dtype, count in sorted(type_counts.items()):
    print(f"  ‚Ä¢ {dtype}: {count} columns")

# Show key columns for augmentation
print("\nüîë KEY COLUMNS FOR AUGMENTATION:")
key_columns = [
    'Customer Id', 'Order Id', 'Sales', 'order_date_timestamp',
    'shipping_date_timestamp', 'Delivery Status', 'Late_delivery_risk',
    'total_orders_count', 'total_lifetime_value', 'average_order_value',
    'days_since_last_order', 'has_discount', 'discount_category',
    'profit_category', 'customer_value_segment'
]

for col_name in key_columns:
    if col_name in df_base.columns:
        dtype = [str(f.dataType) for f in df_base.schema.fields if f.name == col_name][0]
        print(f"  ‚úì {col_name:25} ({dtype})")
    else:
        print(f"  ‚úó {col_name:25} (MISSING)")

# Show sample with new features - FIXED VERSION (no pandas conversion)
print("\nüëÄ SAMPLE OF DATA WITH NEW FEATURES (first 3 rows):")
print("=" * 80)

# Get first 3 rows using Spark collect()
sample_rows = df_base.limit(3).collect()

for idx, row in enumerate(sample_rows):
    print(f"\nüì¶ Row {idx + 1}:")
    print("-" * 60)

    # Show original columns (first 5)
    print("Original columns (sample):")
    original_cols = [c for c in df_base.columns if c not in new_features]
    for col in original_cols[:5]:  # Show first 5 original columns
        value = row[col]
        if isinstance(value, (float, int)) and not isinstance(value, bool):
            print(f"  {col:25} = {value:.4f}" if isinstance(value, float) else f"  {col:25} = {value}")
        else:
            # Handle date/timestamp columns
            if 'date' in col.lower() or 'timestamp' in col.lower():
                print(f"  {col:25} = {str(value)[:19] if value else 'None'}")
            else:
                print(f"  {col:25} = {str(value)[:50] + '...' if value and len(str(value)) > 50 else value}")

    # Show new features
    print("\n*NEW FEATURES*:")
    for col in existing_new_features:
        if hasattr(row, col):  # Check if column exists in row
            value = row[col]
            if isinstance(value, float):
                print(f"  *{col:25} = {value:.4f}")
            elif isinstance(value, int):
                print(f"  *{col:25} = {value}")
            else:
                print(f"  *{col:25} = {value}")

print("\n" + "=" * 80)
print("üìä BASIC STATISTICS OF NEW FEATURES:")
print("=" * 80)

# Show statistics for numeric new features
numeric_new_features = []
for f in existing_new_features:
    dtype = str(df_base.schema[f].dataType)
    if any(num_type in dtype for num_type in ['Integer', 'Double', 'Float', 'Long']):
        numeric_new_features.append(f)

if numeric_new_features:
    print("\nüìà Statistical Summary of New Numeric Features:")
    df_base.select(numeric_new_features).describe().show()

# Show value counts for categorical new features
categorical_new_features = []
for f in existing_new_features:
    if str(df_base.schema[f].dataType) == 'StringType':
        categorical_new_features.append(f)

if categorical_new_features:
    print("\nüìä Value Distribution of New Categorical Features (first 3):")
    for cat_col in categorical_new_features[:3]:  # Limit to first 3
        print(f"\n  {cat_col}:")
        df_base.groupBy(cat_col).count().orderBy(col('count').desc()).show(5, truncate=False)

# ALTERNATIVE: Show sample using Spark's show() method (safer)
print("\n" + "=" * 80)
print("üìÑ ALTERNATIVE SAMPLE VIEW (using Spark show()):")
print("=" * 80)

# Show a mix of original and new features
sample_columns = [
    'Order Id', 'Customer Id', 'Sales',
    'order_date_timestamp', 'Delivery Status',
    'total_orders_count', 'average_order_value',
    'has_discount', 'profit_category', 'customer_value_segment'
]

# Filter to existing columns
existing_sample_cols = [c for c in sample_columns if c in df_base.columns]

if existing_sample_cols:
    print(f"\nShowing {len(existing_sample_cols)} key columns:")
    df_base.select(existing_sample_cols).show(3, truncate=25)

# Cache for faster operations
df_base.cache()
print("\n‚úÖ Data with all features cached and ready for augmentation!")
print("=" * 80)

# Final verification
print("\nüîç FINAL VERIFICATION:")
print(f"‚Ä¢ Total columns loaded: {len(df_base.columns)}")
print(f"‚Ä¢ New features loaded: {len(existing_new_features)}")
print(f"‚Ä¢ Data is cached: {df_base.is_cached}")
print("=" * 80)


üì• LOADING CLEANED DATA WITH FEATURES FROM GOOGLE DRIVE
üìÇ Loading from: /content/drive/MyDrive/BigDataProject/cleaned_data_with_features.parquet

‚úÖ Loaded in 0.17 seconds
üìä Rows: 180,519
üìä Columns: 82

üéØ NEW FEATURES CREATED IN CLEANING PHASE:
--------------------------------------------------------------------------------
‚úÖ Found 20 new features:
    1. delay_severity
    2. order_to_ship_days
    3. total_orders_count
    4. total_lifetime_value
    5. average_order_value
    6. days_since_last_order
    7. order_frequency_rate
    8. profit_margin
    9. profit_category
   10. discount_impact
   11. has_discount
   12. discount_percentage
   13. discount_category
   14. customer_value_segment
   15. price_category
   16. order_size_category
   17. is_large_order
   18. actual_shipping_days
   19. shipping_delay
   20. price_per_unit

üìã ALL COLUMNS in the dataset (82 total):
--------------------------------------------------------------------------------
  1. Ty

In [None]:
# =============================================================================
# CELL 6: CONFIGURATION
# =============================================================================
print("\n" + "=" * 80)
print("‚öôÔ∏è COMPREHENSIVE AUGMENTATION CONFIGURATION")
print("=" * 80)

TARGET_ROWS = 10_000_000  # 10 million
BATCH_SIZE = 500_000      # 500K per batch
VARIATION_FACTOR = 0.15   # ¬±15% variation for numeric

print(f"üéØ Target: {TARGET_ROWS:,} records")
print(f"üì¶ Batch size: {BATCH_SIZE:,}")
print(f"üé≤ Numeric variation: ¬±{VARIATION_FACTOR*100}%")

# Calculate strategy
replication_factor = (TARGET_ROWS // base_count) + 1
total_batches = (TARGET_ROWS // BATCH_SIZE) + 1

print(f"\nüìä Strategy:")
print(f"  ‚Ä¢ Replication factor: {replication_factor}x")
print(f"  ‚Ä¢ Total batches: {total_batches}")
print(f"  ‚Ä¢ Columns to augment: ALL {base_cols} columns")


‚öôÔ∏è COMPREHENSIVE AUGMENTATION CONFIGURATION
üéØ Target: 10,000,000 records
üì¶ Batch size: 500,000
üé≤ Numeric variation: ¬±15.0%

üìä Strategy:
  ‚Ä¢ Replication factor: 56x
  ‚Ä¢ Total batches: 21
  ‚Ä¢ Columns to augment: ALL 82 columns


In [None]:
# =============================================================================
# CELL 7: ANALYZE ALL COLUMNS FOR AUGMENTATION
# =============================================================================
print("\n" + "=" * 80)
print("üîç ANALYZING ALL COLUMNS FOR AUGMENTATION")
print("=" * 80)

from pyspark.sql.types import IntegerType, DoubleType, FloatType, LongType, StringType, DateType, TimestampType, BooleanType

# Categorize ALL columns
all_columns_by_type = {
    'numeric': [],
    'string': [],
    'date': [],
    'timestamp': [],
    'boolean': [],
    'id': [],
    'other': []
}

# Analyze each column
for field in df_base.schema.fields:
    col_name = field.name
    dtype = field.dataType

    # Check for ID columns
    if 'id' in col_name.lower() and not ('city' in col_name.lower() or 'country' in col_name.lower()):
        all_columns_by_type['id'].append(col_name)

    # Categorize by data type
    elif isinstance(dtype, (IntegerType, DoubleType, FloatType, LongType)):
        all_columns_by_type['numeric'].append(col_name)
    elif isinstance(dtype, StringType):
        all_columns_by_type['string'].append(col_name)
    elif isinstance(dtype, DateType):
        all_columns_by_type['date'].append(col_name)
    elif isinstance(dtype, TimestampType):
        all_columns_by_type['timestamp'].append(col_name)
    elif isinstance(dtype, BooleanType):
        all_columns_by_type['boolean'].append(col_name)
    else:
        all_columns_by_type['other'].append(col_name)

# Print summary
print(f"\nüìä COLUMN ANALYSIS ({base_cols} total columns):")
for col_type, columns in all_columns_by_type.items():
    if columns:
        print(f"  ‚Ä¢ {col_type.upper()}: {len(columns)} columns")

# Show examples of each type
print(f"\nüîπ EXAMPLES OF EACH COLUMN TYPE:")
for col_type, columns in all_columns_by_type.items():
    if columns:
        print(f"\n  {col_type.upper()}:")
        for col in columns[:3]:
            dtype = str([f.dataType for f in df_base.schema.fields if f.name == col][0])
            print(f"    - {col:30} ({dtype})")
        if len(columns) > 3:
            print(f"      ... and {len(columns) - 3} more")


üîç ANALYZING ALL COLUMNS FOR AUGMENTATION

üìä COLUMN ANALYSIS (82 total columns):
  ‚Ä¢ NUMERIC: 40 columns
  ‚Ä¢ STRING: 28 columns
  ‚Ä¢ DATE: 2 columns
  ‚Ä¢ TIMESTAMP: 3 columns
  ‚Ä¢ ID: 9 columns

üîπ EXAMPLES OF EACH COLUMN TYPE:

  NUMERIC:
    - Days for shipping (real)       (IntegerType())
    - Days for shipment (scheduled)  (IntegerType())
    - Benefit per order              (DoubleType())
      ... and 37 more

  STRING:
    - Type                           (StringType())
    - Delivery Status                (StringType())
    - Category Name                  (StringType())
      ... and 25 more

  DATE:
    - order_date_date                (DateType())
    - shipping_date_date             (DateType())

  TIMESTAMP:
    - order_date_timestamp           (TimestampType())
    - shipping_date_timestamp        (TimestampType())
    - cleaned_timestamp              (TimestampType())

  ID:
    - Category Id                    (IntegerType())
    - Customer Id           

In [None]:
# =============================================================================
# CELL 8: DEFINE COLUMN-SPECIFIC AUGMENTATION STRATEGIES
# =============================================================================
print("\n" + "=" * 80)
print("üîß DEFINING COLUMN-SPECIFIC AUGMENTATION STRATEGIES")
print("=" * 80)

# Get unique values for categorical columns (for intelligent augmentation)
categorical_columns = []
unique_values_cache = {}

# Identify categorical string columns (those with limited distinct values)
for col_name in all_columns_by_type['string']:
    try:
        distinct_count = df_base.select(col_name).distinct().count()
        if 2 <= distinct_count <= 50:  # Reasonable range for categorical
            categorical_columns.append(col_name)
            # Cache unique values
            unique_vals = df_base.select(col_name).distinct().rdd.flatMap(lambda x: x).collect()
            unique_values_cache[col_name] = unique_vals

            print(f"  üìä {col_name}: {distinct_count} unique values")
    except:
        pass

print(f"\n‚úÖ Identified {len(categorical_columns)} categorical columns")

# Define augmentation strategies
augmentation_strategies = {
    'numeric': 'Random variation (¬±15%)',
    'string_categorical': 'Random selection from existing values',
    'string_free': 'String pattern variation',
    'date': 'Random offset (¬±365 days)',
    'timestamp': 'Random offset (¬±30 days)',
    'id': 'Synthetic ID generation',
    'boolean': 'Random flip (10% chance)'
}

print("\nüéØ AUGMENTATION STRATEGIES:")
for strategy, description in augmentation_strategies.items():
    print(f"  ‚Ä¢ {strategy:20}: {description}")


üîß DEFINING COLUMN-SPECIFIC AUGMENTATION STRATEGIES
  üìä Type: 4 unique values
  üìä Delivery Status: 4 unique values
  üìä Category Name: 50 unique values
  üìä Customer Country: 2 unique values
  üìä Customer Segment: 3 unique values
  üìä Customer State: 46 unique values
  üìä Department Name: 11 unique values
  üìä Market: 5 unique values
  üìä Order Region: 23 unique values
  üìä Order Status: 9 unique values
  üìä Shipping Mode: 4 unique values
  üìä delay_severity: 3 unique values
  üìä profit_category: 4 unique values
  üìä discount_category: 4 unique values
  üìä customer_value_segment: 4 unique values
  üìä price_category: 3 unique values
  üìä order_size_category: 3 unique values

‚úÖ Identified 17 categorical columns

üéØ AUGMENTATION STRATEGIES:
  ‚Ä¢ numeric             : Random variation (¬±15%)
  ‚Ä¢ string_categorical  : Random selection from existing values
  ‚Ä¢ string_free         : String pattern variation
  ‚Ä¢ date                : Random of

In [None]:
# =============================================================================
# CELL 7: DATA AUGMENTATION FUNCTION
# =============================================================================
print("\n" + "=" * 80)
print("üîß DEFINING AUGMENTATION FUNCTIONS")
print("=" * 80)

def augment_numeric_columns(df, numeric_cols, variation=0.15):
    """
    Add random variation to numeric columns (¬±variation%)
    """
    df_result = df

    for col_name in numeric_cols:
        if col_name in df.columns:
            # Skip ID columns
            if 'id' in col_name.lower() or col_name.endswith('_year'):
                continue

            # Add random noise: value * (1 + random(-variation, +variation))
            df_result = df_result.withColumn(
                col_name,
                col(col_name) * (1 + (rand() * 2 * variation - variation))
            )

    return df_result

def augment_dates(df, date_cols, days_range=730):
    """
    Add random variation to date columns (¬±days_range days)
    """
    df_result = df

    for col_name in date_cols:
        if col_name in df.columns and 'timestamp' in col_name:
            # Add random days
            random_days = (rand() * days_range * 2 - days_range).cast("int")
            df_result = df_result.withColumn(
                col_name,
                expr(f"date_add({col_name}, {random_days})")
            )

    return df_result

def add_unique_ids(df, batch_number):
    """
    Generate unique IDs for the augmented data
    """
    # Create unique ID based on batch and row number
    window = Window.orderBy(monotonically_increasing_id())

    df = df.withColumn("row_num", row_number().over(window))
    df = df.withColumn(
        "synthetic_id",
        concat(
            lit(batch_number),
            lit("_"),
            lpad(col("row_num"), 10, "0")
        )
    )

    return df.drop("row_num")

print("‚úÖ Augmentation functions defined!")


üîß DEFINING AUGMENTATION FUNCTIONS
‚úÖ Augmentation functions defined!


In [None]:
# =============================================================================
# CELL 8: IDENTIFY COLUMN TYPES
# =============================================================================
print("\n" + "=" * 80)
print("üîç IDENTIFYING COLUMN TYPES FOR AUGMENTATION")
print("=" * 80)

from pyspark.sql.types import IntegerType, DoubleType, FloatType, LongType

# Get numeric columns (excluding IDs)
numeric_cols = []
for field in df_base.schema.fields:
    if isinstance(field.dataType, (IntegerType, DoubleType, FloatType, LongType)):
        if 'id' not in field.name.lower() and not field.name.endswith('_year'):
            numeric_cols.append(field.name)

print(f"\nüìä Numeric columns to augment ({len(numeric_cols)}):")
for col in numeric_cols[:10]:
    print(f"  ‚Ä¢ {col}")
if len(numeric_cols) > 10:
    print(f"  ... and {len(numeric_cols) - 10} more")

# Get date columns
date_cols = [c for c in df_base.columns if 'timestamp' in c or '_date' in c]
print(f"\nüìÖ Date columns to augment ({len(date_cols)}):")
for col in date_cols:
    print(f"  ‚Ä¢ {col}")


üîç IDENTIFYING COLUMN TYPES FOR AUGMENTATION

üìä Numeric columns to augment (38):
  ‚Ä¢ Days for shipping (real)
  ‚Ä¢ Days for shipment (scheduled)
  ‚Ä¢ Benefit per order
  ‚Ä¢ Sales per customer
  ‚Ä¢ Late_delivery_risk
  ‚Ä¢ Customer Zipcode
  ‚Ä¢ Latitude
  ‚Ä¢ Longitude
  ‚Ä¢ Order Item Discount
  ‚Ä¢ Order Item Discount Rate
  ... and 28 more

üìÖ Date columns to augment (13):
  ‚Ä¢ order_date_timestamp
  ‚Ä¢ order_date_date
  ‚Ä¢ order_date_year
  ‚Ä¢ order_date_month
  ‚Ä¢ order_date_day
  ‚Ä¢ order_date_dayofweek
  ‚Ä¢ shipping_date_timestamp
  ‚Ä¢ shipping_date_date
  ‚Ä¢ shipping_date_year
  ‚Ä¢ shipping_date_month
  ‚Ä¢ shipping_date_day
  ‚Ä¢ shipping_date_dayofweek
  ‚Ä¢ cleaned_timestamp


In [None]:
# =============================================================================
# FUNCTION: AUGMENT NUMERIC COLUMNS (FIXED)
# =============================================================================

from pyspark.sql import functions as F

def augment_numeric_columns(df, numeric_cols, variation):
    """
    Apply random numeric variation to numeric columns.
    """
    df_result = df

    for col_name in numeric_cols:
        df_result = df_result.withColumn(
            col_name,
            F.col(col_name) * (1 + (F.rand() * 2 * variation - variation))
        )

    return df_result


In [None]:
# =============================================================================
# FUNCTION: AUGMENT DATE COLUMNS (FIXED)
# =============================================================================

from pyspark.sql import functions as F

def augment_dates(df, date_cols, days_range=730):
    """
    Apply random +/- date shift to date or timestamp columns.
    """
    df_result = df

    for col_name in date_cols:
        # Random integer in [-days_range, +days_range]
        random_offset = (
            (F.rand() * 2 * days_range) - days_range
        ).cast("int")

        df_result = df_result.withColumn(
            col_name,
            F.date_add(F.col(col_name), random_offset)
        )

    return df_result


In [None]:
numeric_cols = [
    "Benefit per order",
    "Sales per customer",
    "Order Item Discount",
    "Order Item Discount Rate",
    "Order Item Product Price",
    "Order Item Profit Ratio",
    "Sales",
    "Order Item Total",
    "Order Profit Per Order",
    "Product Price",
    "Latitude",
    "Longitude",
    "price_per_unit"
]


In [None]:
int_cols = [
    "Days for shipping (real)",
    "Days for shipment (scheduled)",
    "Late_delivery_risk",
    "Order Item Quantity",
    "actual_shipping_days",
    "shipping_delay"
]


In [None]:
date_cols = [
    "order_date_date",
    "shipping_date_date"
]

timestamp_cols = [
    "order_date_timestamp",
    "shipping_date_timestamp"
]


In [None]:
from pyspark.sql import functions as F

def augment_numeric_columns(df, numeric_cols, variation=0.1):
    df_out = df
    for c in numeric_cols:
        df_out = df_out.withColumn(
            c,
            F.col(c) * (1 + (F.rand() * 2 * variation - variation))
        )
    return df_out


In [None]:
from pyspark.sql.types import DateType

def augment_dates(df, date_cols, days_range=730):
    df_out = df
    for c in date_cols:
        offset = ((F.rand() * 2 * days_range) - days_range).cast("int")
        df_out = df_out.withColumn(c, F.date_add(F.col(c), offset))
    return df_out


In [None]:
def augment_timestamps(df, ts_cols, days_range=730):
    df_out = df
    seconds_range = days_range * 86400

    for c in ts_cols:
        offset = ((F.rand() * 2 * seconds_range) - seconds_range).cast("long")
        df_out = df_out.withColumn(
            c,
            F.col(c) + F.expr("INTERVAL 1 SECOND") * offset
        )
    return df_out


In [None]:
def rebuild_date_parts(df):
    return (
        df
        .withColumn("order_date_year", F.year("order_date_date"))
        .withColumn("order_date_month", F.month("order_date_date"))
        .withColumn("order_date_day", F.dayofmonth("order_date_date"))
        .withColumn("order_date_dayofweek", F.dayofweek("order_date_date"))
        .withColumn("shipping_date_year", F.year("shipping_date_date"))
        .withColumn("shipping_date_month", F.month("shipping_date_date"))
        .withColumn("shipping_date_day", F.dayofmonth("shipping_date_date"))
        .withColumn("shipping_date_dayofweek", F.dayofweek("shipping_date_date"))
    )


In [None]:
from pyspark.sql.window import Window

def add_unique_ids(df, batch_num):
    w = Window.orderBy(F.monotonically_increasing_id())
    return df.withColumn(
        "global_row_id",
        F.row_number().over(w) + batch_num * 1_000_000
    )


In [None]:
# =============================================================================
# CELL: GENERATE ~10 MILLION ROWS
# =============================================================================
from pyspark.sql import functions as F
import time

TARGET_ROWS = 10_000_000
VARIATION_FACTOR = 0.1
BATCH_SIZE = 1_000_000

base_count = df_base.count()
replication_factor = (TARGET_ROWS // base_count) + 1
total_batches = (TARGET_ROWS // BATCH_SIZE) + 1

print(f"Base rows: {base_count:,}")
print(f"Replication factor: {replication_factor}")
print(f"Total batches: {total_batches}")

generated_total = 0
start_time = time.time()

for batch_num in range(1, total_batches + 1):
    print(f"\nüì¶ Batch {batch_num}/{total_batches}")

    # Fast replication
    df_batch = (
        df_base
        .withColumn("rep", F.explode(F.array([F.lit(i) for i in range(replication_factor)])))
        .drop("rep")
        .limit(BATCH_SIZE)
    )

    # Augment
    df_batch = augment_numeric_columns(df_batch, numeric_cols, VARIATION_FACTOR)
    df_batch = augment_dates(df_batch, date_cols)
    df_batch = augment_timestamps(df_batch, timestamp_cols)
    df_batch = rebuild_date_parts(df_batch)
    df_batch = add_unique_ids(df_batch, batch_num)

    # Metadata
    df_batch = (
        df_batch
        .withColumn("batch_number", F.lit(batch_num))
        .withColumn("generation_timestamp", F.current_timestamp())
    )

    # Save
    out_path = f"/content/drive/MyDrive/BigDataProject/augmented/batch_{batch_num:02d}"
    df_batch.write.mode("overwrite").parquet(out_path)

    batch_count = df_batch.count()
    generated_total += batch_count

    print(f"  ‚úÖ Rows generated: {batch_count:,}")
    print(f"  üìä Total so far: {generated_total:,}")

    if generated_total >= TARGET_ROWS:
        break

print(f"\nüéâ DONE ‚Äî Generated {generated_total:,} rows")
print(f"‚è±Ô∏è Total time: {(time.time() - start_time)/60:.2f} minutes")


Base rows: 180,519
Replication factor: 56
Total batches: 11

üì¶ Batch 1/11


AnalysisException: [DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE] Cannot resolve "date_add(order_date_year, CAST((((rand(9081673550657482041) * 2) * 730) - 730) AS INT))" due to data type mismatch: Parameter 1 requires the "DATE" type, however "order_date_year" has the type "INT".;
'Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
+- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
   +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
      +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
         +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
            +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
               +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                  +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                     +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                        +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                           +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                              +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                 +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                    +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                       +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                          +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                             +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                                +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                                   +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                                      +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                                         +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                                            +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                                               +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                                                  +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                                                     +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                                                        +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                                                           +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                                                              +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                                                                 +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                                                                    +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                                                                       +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                                                                          +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                                                                             +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                                                                                +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#121180, (Longitude#5326 * ((((rand(-7434364398281245726) * cast(2 as double)) * 0.1) - 0.1) + cast(1 as double))) AS Longitude#121263, Market#5327, Order City#5328, ... 58 more fields]
                                                                                                   +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#121097, Department Id#5323, Department Name#5324, (Latitude#5325 * ((((rand(-1157108701397579152) * cast(2 as double)) * 0.1) - 0.1) + cast(1 as double))) AS Latitude#121180, Longitude#5326, Market#5327, Order City#5328, ... 58 more fields]
                                                                                                      +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, (cast(Customer Zipcode#5322 as double) * ((((rand(7345310289686739361) * cast(2 as double)) * 0.1) - 0.1) + cast(1 as double))) AS Customer Zipcode#121097, Department Id#5323, Department Name#5324, Latitude#5325, Longitude#5326, Market#5327, Order City#5328, ... 58 more fields]
                                                                                                         +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, Sales per customer#120931, Delivery Status#5310, (cast(Late_delivery_risk#5311 as double) * ((((rand(-7166691072334075753) * cast(2 as double)) * 0.1) - 0.1) + cast(1 as double))) AS Late_delivery_risk#121014, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#5322, Department Id#5323, Department Name#5324, Latitude#5325, Longitude#5326, Market#5327, Order City#5328, ... 58 more fields]
                                                                                                            +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, Benefit per order#120848, (Sales per customer#5309 * ((((rand(6602042589660231820) * cast(2 as double)) * 0.1) - 0.1) + cast(1 as double))) AS Sales per customer#120931, Delivery Status#5310, Late_delivery_risk#5311, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#5322, Department Id#5323, Department Name#5324, Latitude#5325, Longitude#5326, Market#5327, Order City#5328, ... 58 more fields]
                                                                                                               +- Project [Type#5305, Days for shipping (real)#120682, Days for shipment (scheduled)#120765, (Benefit per order#5308 * ((((rand(-4285924019671688210) * cast(2 as double)) * 0.1) - 0.1) + cast(1 as double))) AS Benefit per order#120848, Sales per customer#5309, Delivery Status#5310, Late_delivery_risk#5311, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#5322, Department Id#5323, Department Name#5324, Latitude#5325, Longitude#5326, Market#5327, Order City#5328, ... 58 more fields]
                                                                                                                  +- Project [Type#5305, Days for shipping (real)#120682, (cast(Days for shipment (scheduled)#5307 as double) * ((((rand(1739542816260732246) * cast(2 as double)) * 0.1) - 0.1) + cast(1 as double))) AS Days for shipment (scheduled)#120765, Benefit per order#5308, Sales per customer#5309, Delivery Status#5310, Late_delivery_risk#5311, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#5322, Department Id#5323, Department Name#5324, Latitude#5325, Longitude#5326, Market#5327, Order City#5328, ... 58 more fields]
                                                                                                                     +- Project [Type#5305, (cast(Days for shipping (real)#5306 as double) * ((((rand(-7442665788789900230) * cast(2 as double)) * 0.1) - 0.1) + cast(1 as double))) AS Days for shipping (real)#120682, Days for shipment (scheduled)#5307, Benefit per order#5308, Sales per customer#5309, Delivery Status#5310, Late_delivery_risk#5311, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#5322, Department Id#5323, Department Name#5324, Latitude#5325, Longitude#5326, Market#5327, Order City#5328, ... 58 more fields]
                                                                                                                        +- GlobalLimit 1000000
                                                                                                                           +- LocalLimit 1000000
                                                                                                                              +- Project [Type#5305, Days for shipping (real)#5306, Days for shipment (scheduled)#5307, Benefit per order#5308, Sales per customer#5309, Delivery Status#5310, Late_delivery_risk#5311, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#5322, Department Id#5323, Department Name#5324, Latitude#5325, Longitude#5326, Market#5327, Order City#5328, ... 58 more fields]
                                                                                                                                 +- Project [Type#5305, Days for shipping (real)#5306, Days for shipment (scheduled)#5307, Benefit per order#5308, Sales per customer#5309, Delivery Status#5310, Late_delivery_risk#5311, Category Id#5312, Category Name#5313, Customer City#5314, Customer Country#5315, Customer Fname#5316, Customer Id#5317, Customer Lname#5318, Customer Segment#5319, Customer State#5320, Customer Street#5321, Customer Zipcode#5322, Department Id#5323, Department Name#5324, Latitude#5325, Longitude#5326, Market#5327, Order City#5328, ... 59 more fields]
                                                                                                                                    +- Generate explode(array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, ... 32 more fields)), false, [rep#120516]
                                                                                                                                       +- Relation [Type#5305,Days for shipping (real)#5306,Days for shipment (scheduled)#5307,Benefit per order#5308,Sales per customer#5309,Delivery Status#5310,Late_delivery_risk#5311,Category Id#5312,Category Name#5313,Customer City#5314,Customer Country#5315,Customer Fname#5316,Customer Id#5317,Customer Lname#5318,Customer Segment#5319,Customer State#5320,Customer Street#5321,Customer Zipcode#5322,Department Id#5323,Department Name#5324,Latitude#5325,Longitude#5326,Market#5327,Order City#5328,... 58 more fields] parquet


In [None]:
# =============================================================================
# CELL 11: COMBINE ALL BATCHES (FROM GOOGLE DRIVE)
# =============================================================================
print("\n" + "=" * 80)
print("üîó COMBINING ALL BATCHES")
print("=" * 80)

from google.colab import drive
from pyspark.sql import functions as F
import os

# 1. Mount Drive (safe if already mounted)
drive.mount('/content/drive')

BASE_BATCH_DIR = "/content/drive/MyDrive/BigDataProject/augmented"

# 2. List all batch folders
batch_dirs = sorted([
    os.path.join(BASE_BATCH_DIR, d)
    for d in os.listdir(BASE_BATCH_DIR)
    if d.startswith("batch_")
])

print(f"üì¶ Found {len(batch_dirs)} batches")

# 3. Read and combine all batches
df_augmented = spark.read.parquet(*batch_dirs)

# 4. Verify final count
final_count = df_augmented.count()
print(f"\n‚úÖ Combined dataset size: {final_count:,} rows")

# 5. Repartition for performance
df_augmented = df_augmented.repartition(200)
print("‚úÖ Repartitioned into 200 partitions")

print("=" * 80)



üîó COMBINING ALL BATCHES
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
üì¶ Found 10 batches


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/socket.py", line 720, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
# =============================================================================
# CELL 12: FINAL STATISTICS
# =============================================================================
print("\n" + "=" * 80)
print("üìä FINAL AUGMENTED DATASET STATISTICS")
print("=" * 80)

import time

# If start_time not defined earlier, set it safely
try:
    total_time = time.time() - start_time
except NameError:
    total_time = None

print(f"\nüìà Generation Summary:")
print(f"  ‚Ä¢ Target rows: {TARGET_ROWS:,}")
print(f"  ‚Ä¢ Generated rows: {final_count:,}")
print(f"  ‚Ä¢ Achievement: {(final_count / TARGET_ROWS) * 100:.1f}%")
print(f"  ‚Ä¢ Total columns: {len(df_augmented.columns)}")

# Number of batches = distinct batch_number
num_batches = df_augmented.select("batch_number").distinct().count()
print(f"  ‚Ä¢ Total batches: {num_batches}")

if total_time:
    print(f"\n‚è±Ô∏è Performance:")
    print(f"  ‚Ä¢ Total time: {total_time / 60:.2f} minutes")
    print(f"  ‚Ä¢ Rows per second: {final_count / total_time:,.0f}")

print("\nüëÄ Sample of augmented data:")
df_augmented.show(10, truncate=True)

print("=" * 80)



üìä FINAL AUGMENTED DATASET STATISTICS

üìà Generation Summary:
  ‚Ä¢ Target rows: 10,000,000
  ‚Ä¢ Generated rows: 10,000,000
  ‚Ä¢ Achievement: 100.0%
  ‚Ä¢ Total columns: 63
  ‚Ä¢ Total batches: 10

‚è±Ô∏è Performance:
  ‚Ä¢ Total time: 37.50 minutes
  ‚Ä¢ Rows per second: 4,444

üëÄ Sample of augmented data:
+--------+------------------------+-----------------------------+-------------------+------------------+----------------+------------------+-----------+--------------------+---------------+--------------+-----------+--------------+----------------+----------------+-------------+---------------+------------------+-------------------+------------+--------------------+--------------------+-----------------+-----------------------+--------+----------------------+-------------------+------------------------+-------------+------------------------+-----------------------+-------------------+------------------+------------------+----------------------+---------------+--------------

In [None]:
# =============================================================================
# CELL 13: DATA QUALITY VALIDATION
# =============================================================================
print("\n" + "=" * 80)
print("‚úÖ DATA QUALITY VALIDATION")
print("=" * 80)

from pyspark.sql.functions import min as spark_min, max as spark_max

# ------------------------------------------------------------
# Check 1: Uniqueness of GLOBAL synthetic IDs
# ------------------------------------------------------------
if "global_row_id" in df_augmented.columns:
    unique_ids = df_augmented.select("global_row_id").distinct().count()
    print(f"‚úì Unique global_row_id: {unique_ids:,} / {final_count:,}")
    print(f"  Uniqueness rate: {(unique_ids / final_count) * 100:.2f}%")
else:
    print("‚ö†Ô∏è global_row_id column not found")

# ------------------------------------------------------------
# Check 2: Batch distribution
# ------------------------------------------------------------
print(f"\n‚úì Batch distribution:")
df_augmented.groupBy("batch_number") \
    .count() \
    .orderBy("batch_number") \
    .show()

# ------------------------------------------------------------
# Check 3: Numeric ranges (example: Sales)
# ------------------------------------------------------------
if "Sales" in df_augmented.columns:
    print(f"\n‚úì Sales statistics:")
    df_augmented.select("Sales").describe().show()

# ------------------------------------------------------------
# Check 4: Date range
# ------------------------------------------------------------
if "order_date_timestamp" in df_augmented.columns:
    print(f"\n‚úì Order date range:")
    df_augmented.select(
        spark_min("order_date_timestamp").alias("min_date"),
        spark_max("order_date_timestamp").alias("max_date")
    ).show()

print("=" * 80)



‚úÖ DATA QUALITY VALIDATION
‚úì Unique global_row_id: 10,000,000 / 10,000,000
  Uniqueness rate: 100.00%

‚úì Batch distribution:
+------------+-------+
|batch_number|  count|
+------------+-------+
|           1|1000000|
|           2|1000000|
|           3|1000000|
|           4|1000000|
|           5|1000000|
|           6|1000000|
|           7|1000000|
|           8|1000000|
|           9|1000000|
|          10|1000000|
+------------+-------+


‚úì Sales statistics:
+-------+------------------+
|summary|             Sales|
+-------+------------------+
|  count|          10000000|
|   mean|203.55479498353094|
| stddev| 136.0737924636744|
|    min| 8.991419818509405|
|    max| 2199.918320497723|
+-------+------------------+


‚úì Order date range:
+-------------------+-------------------+
|           min_date|           max_date|
+-------------------+-------------------+
|2013-01-01 02:51:42|2020-01-30 22:42:09|
+-------------------+-------------------+



In [None]:
# =============================================================================
# CELL 14: SAVE FINAL AUGMENTED DATASET
# =============================================================================
print("\n" + "=" * 80)
print("üíæ SAVING FINAL AUGMENTED DATASET")
print("=" * 80)

from google.colab import drive

# Mount Drive (safe if already mounted)
drive.mount('/content/drive')

FINAL_PARQUET_PATH = "/content/drive/MyDrive/BigDataProject/final_10M_parquet"

# Save full dataset (Parquet, compressed)
df_augmented.write \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .parquet(FINAL_PARQUET_PATH)

print(f"‚úÖ Final Parquet saved to:\n{FINAL_PARQUET_PATH}")




üíæ SAVING FINAL AUGMENTED DATASET
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
‚úÖ Final Parquet saved to:
/content/drive/MyDrive/BigDataProject/final_10M_parquet


In [None]:
# =============================================================================
# CELL 15: DOWNLOAD SAMPLE
# =============================================================================
print("\n" + "=" * 80)
print("üì• DOWNLOAD SAMPLE FOR INSPECTION")
print("=" * 80)

from google.colab import files
files.download(sample_csv)

print("\n" + "=" * 80)
print("üéâ DATA AUGMENTATION COMPLETE!")
print("=" * 80)

print(f"\nüìã SUMMARY:")
print(f"  ‚úÖ Generated: {final_count:,} records")
print(f"  ‚úÖ Time taken: {total_time/60:.2f} minutes")
print(f"  ‚úÖ Data saved as: {final_parquet}")
print(f"  ‚úÖ Sample downloaded: {sample_csv}")

print(f"\nüìã NEXT STEPS:")
print(f"  1. Verify the sample CSV")
print(f"  2. Load data into MongoDB or HDFS")
print(f"  3. Proceed to Feature Engineering")
print(f"  4. Build ML models")

print("=" * 80)

# Stop Spark
spark.stop()
print("\nüõë Spark session stopped")