In [0]:
# Load the table and select only required columns
df = spark.table("mc.amadeus2.data_jan26").select(
    "trip_origin_city",
    "trip_destination_city",
    "flight_leg_departure_date",
    "flight_leg_total_seats"
)

# Display sample data
display(df.limit(10))

In [0]:
from pyspark.sql.functions import concat, lit, dayofweek

# Create route feature: origin_to_destination
df_with_route = df.withColumn(
    "route",
    concat("trip_origin_city", lit("_to_"), "trip_destination_city")
)

# Extract day_of_week for grouping (1=Sunday, 7=Saturday)
df_with_route = df_with_route.withColumn(
    "day_of_week",
    dayofweek("flight_leg_departure_date")
)

# Display sample
display(df_with_route.limit(10))

In [0]:
from pyspark.sql.functions import avg, round

# Group by route and day_of_week, calculate mean of flight_leg_total_seats
df_aggregated = df_with_route.groupBy("route", "day_of_week").agg(
    round(avg("flight_leg_total_seats"), 0).alias("avg_seats")
)

# Display sample
display(df_aggregated.limit(10))

In [0]:
from sklearn.ensemble import IsolationForest
import pandas as pd

# Convert to pandas DataFrame
df_pandas = df_aggregated.toPandas()

# Initialize Isolation Forest
iso_forest = IsolationForest(random_state=42)

# Fit on avg_seats only (reshape for sklearn)
X = df_pandas[['avg_seats']]
iso_forest.fit(X)

print(f"Model trained on {len(df_pandas)} route-day combinations")
print(f"avg_seats range: {df_pandas['avg_seats'].min()} - {df_pandas['avg_seats'].max()}")
display(df_pandas.head(10))

In [0]:
# Predict anomalies: -1 = anomaly, 1 = normal
df_pandas['anomaly'] = iso_forest.predict(X)

# Count anomalies
anomalies_count = (df_pandas['anomaly'] == -1).sum()
normal_count = (df_pandas['anomaly'] == 1).sum()

print(f"Total records: {len(df_pandas)}")
print(f"Anomalies detected: {anomalies_count} ({anomalies_count/len(df_pandas)*100:.2f}%)")
print(f"Normal records: {normal_count} ({normal_count/len(df_pandas)*100:.2f}%)")

# Display sample with anomalies
print("\nSample of detected anomalies:")
display(df_pandas[df_pandas['anomaly'] == -1].head(10))

In [0]:
import mlflow
import mlflow.sklearn
from mlflow.models import infer_signature

# Start MLflow run
with mlflow.start_run(run_name="flight_seat_anomaly_detection") as run:
    
    # Log parameters
    mlflow.log_param("model_type", "IsolationForest")
    mlflow.log_param("random_state", 42)
    mlflow.log_param("feature", "avg_seats")
    mlflow.log_param("total_records", len(df_pandas))
    
    # Log metrics
    mlflow.log_metric("anomalies_count", anomalies_count)
    mlflow.log_metric("anomalies_percentage", anomalies_count/len(df_pandas)*100)
    mlflow.log_metric("normal_count", normal_count)
    mlflow.log_metric("avg_seats_min", df_pandas['avg_seats'].min())
    mlflow.log_metric("avg_seats_max", df_pandas['avg_seats'].max())
    
    # Create model signature
    predictions = iso_forest.predict(X)
    signature = infer_signature(X, predictions)
    
    # Log the model with signature
    mlflow.sklearn.log_model(iso_forest, "isolation_forest_model", signature=signature)
    
    print(f"MLflow Run ID: {run.info.run_id}")
    print(f"Model logged successfully with signature!")
    print(f"Experiment ID: {run.info.experiment_id}")
    print(f"Artifact URI: {run.info.artifact_uri}")

In [0]:
# Register the model to Unity Catalog
model_name = "mc.amadeus2.flight_seat_anomaly_detector"

# Get the model URI from the last run
model_uri = f"runs:/{run.info.run_id}/isolation_forest_model"

# Register the model
registered_model = mlflow.register_model(
    model_uri=model_uri,
    name=model_name
)

print(f"Model registered successfully!")
print(f"Model name: {model_name}")
print(f"Model version: {registered_model.version}")

In [0]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import ServedEntityInput, EndpointCoreConfigInput

# Initialize Databricks client
w = WorkspaceClient()

# Define endpoint configuration
endpoint_name = "flight-seat-anomaly-detector"
model_name = "mc.amadeus2.flight_seat_anomaly_detector"
model_version = registered_model.version

# Create or update the serving endpoint
try:
    endpoint = w.serving_endpoints.create_and_wait(
        name=endpoint_name,
        config=EndpointCoreConfigInput(
            served_entities=[
                ServedEntityInput(
                    entity_name=model_name,
                    entity_version=str(model_version),
                    scale_to_zero_enabled=True,
                    workload_size="Small"
                )
            ]
        )
    )
    print(f"Model serving endpoint created successfully!")
    print(f"Endpoint name: {endpoint_name}")
    print(f"Endpoint state: {endpoint.state.config_update}")
except Exception as e:
    if "already exists" in str(e):
        print(f"Endpoint '{endpoint_name}' already exists. Updating...")
        endpoint = w.serving_endpoints.update_config_and_wait(
            name=endpoint_name,
            served_entities=[
                ServedEntityInput(
                    entity_name=model_name,
                    entity_version=str(model_version),
                    scale_to_zero_enabled=True,
                    workload_size="Small"
                )
            ]
        )
        print(f"Endpoint updated successfully!")
    else:
        raise e

In [0]:
from pyspark.sql.functions import concat, lit, dayofweek, avg, round

# Load the anomaly_updates table
df_updates = spark.table("mc.amadeus2.anomaly_updates").select(
    "trip_origin_city",
    "trip_destination_city",
    "flight_leg_departure_date",
    "new_flight_leg_total_seats"
)

# Create route feature: origin_to_destination
df_updates_with_route = df_updates.withColumn(
    "route",
    concat("trip_origin_city", lit("_to_"), "trip_destination_city")
)

# Extract day_of_week for grouping
df_updates_with_route = df_updates_with_route.withColumn(
    "day_of_week",
    dayofweek("flight_leg_departure_date")
)

# Group by route and day_of_week, calculate mean of new_flight_leg_total_seats
df_updates_aggregated = df_updates_with_route.groupBy("route", "day_of_week").agg(
    round(avg("new_flight_leg_total_seats"), 0).alias("avg_seats")
)

print(f"Total route-day combinations to score: {df_updates_aggregated.count()}")
display(df_updates_aggregated.limit(10))

In [0]:
df_updates_aggregated.write.mode("overwrite").saveAsTable("mc.amadeus2.anomaly_updates_aggregated")

In [0]:
import requests
import json
from databricks.sdk import WorkspaceClient

# Initialize client
w = WorkspaceClient()

# Get the endpoint URL and token
endpoint_name = "flight-seat-anomaly-detector"
token = w.dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
host = w.dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()

# Prepare data for inference - convert to pandas and format as required by the model
df_updates_pandas = df_updates_aggregated.toPandas()

# Handle NaN values - drop rows with NaN in avg_seats
print(f"Total records before cleaning: {len(df_updates_pandas)}")
df_updates_pandas = df_updates_pandas.dropna(subset=['avg_seats'])
print(f"Total records after removing NaN: {len(df_updates_pandas)}")

# The model expects a dataframe with 'avg_seats' column
input_data = df_updates_pandas[['avg_seats']].to_dict(orient='split')

# Call the endpoint
url = f"{host}/serving-endpoints/{endpoint_name}/invocations"
headers = {
    "Authorization": f"Bearer {token}",
    "Content-Type": "application/json"
}

payload = {
    "dataframe_split": input_data
}

print(f"\nCalling endpoint: {endpoint_name}")
print(f"Scoring {len(df_updates_pandas)} records...")

response = requests.post(url, headers=headers, json=payload)

if response.status_code == 200:
    predictions = response.json()['predictions']
    df_updates_pandas['anomaly'] = predictions
    print(f"\n‚úì Successfully scored {len(predictions)} records")
else:
    print(f"Error: {response.status_code}")
    print(response.text)
    raise Exception(f"Model endpoint call failed with status {response.status_code}")

In [0]:
# Count anomalies
anomalies_count = (df_updates_pandas['anomaly'] == -1).sum()
normal_count = (df_updates_pandas['anomaly'] == 1).sum()

print(f"\n=== ANOMALY DETECTION RESULTS ===")
print(f"Total records scored: {len(df_updates_pandas)}")
print(f"Anomalies detected: {anomalies_count} ({anomalies_count/len(df_updates_pandas)*100:.2f}%)")
print(f"Normal records: {normal_count} ({normal_count/len(df_updates_pandas)*100:.2f}%)")

# Display anomalies sorted by avg_seats (highest first)
print("\n=== DETECTED ANOMALIES (sorted by avg_seats) ===")
anomalies_df = df_updates_pandas[df_updates_pandas['anomaly'] == -1].sort_values('avg_seats', ascending=False)
display(anomalies_df)

# Display normal records sample
print("\n=== SAMPLE OF NORMAL RECORDS ===")
normal_df = df_updates_pandas[df_updates_pandas['anomaly'] == 1].sort_values('avg_seats', ascending=False)
display(normal_df.head(20))

In [0]:
import mlflow

# Load the model from Unity Catalog to get anomaly scores (use version 1)
model_name = "mc.amadeus2.flight_seat_anomaly_detector"
loaded_model = mlflow.sklearn.load_model(f"models:/{model_name}/1")

# Get anomaly scores from the model (more negative = more anomalous)
anomalies_scores = loaded_model.score_samples(df_updates_pandas[['avg_seats']])
df_updates_pandas['anomaly_score'] = anomalies_scores

# Calculate baseline from normal records
normal_baseline = df_updates_pandas[df_updates_pandas['anomaly'] == 1]['avg_seats'].median()
normal_mean = df_updates_pandas[df_updates_pandas['anomaly'] == 1]['avg_seats'].mean()
normal_std = df_updates_pandas[df_updates_pandas['anomaly'] == 1]['avg_seats'].std()

# Calculate deviation multiplier (divide by 100, round to 0, add +/- sign)
deviation_raw = ((df_updates_pandas['avg_seats'] - normal_baseline) / normal_baseline).round(0)
df_updates_pandas['deviation_multiplier'] = deviation_raw.apply(lambda x: f"+{int(x)}%" if x > 0 else f"{int(x)}x")

# Calculate standard deviations from mean
df_updates_pandas['std_deviations'] = ((df_updates_pandas['avg_seats'] - normal_mean) / normal_std).round(2)

print(f"=== NORMAL BASELINE STATISTICS ===")
print(f"Median (baseline): {normal_baseline:.0f} seats")
print(f"Mean: {normal_mean:.2f} seats")
print(f"Std Dev: {normal_std:.2f} seats")
print(f"\n=== TOP 20 ANOMALIES WITH DEVIATION METRICS ===")

# Show top anomalies with all metrics (hide anomaly_score)
top_anomalies = df_updates_pandas[df_updates_pandas['anomaly'] == -1].sort_values('avg_seats', ascending=False).head(20)
display(top_anomalies[['route', 'day_of_week', 'avg_seats', 'deviation_multiplier', 'std_deviations']])

In [0]:
from pyspark.sql.functions import split, col

# Convert anomalies to Spark DataFrame
df_anomalies_spark = spark.createDataFrame(df_updates_pandas[df_updates_pandas['anomaly'] == -1])

# Split route to get origin and destination IATA codes
df_anomalies_split = df_anomalies_spark.withColumn(
    "origin_city", split("route", "_to_")[0]
).withColumn(
    "destination_city", split("route", "_to_")[1]
)

# Load IATA lookup table
df_iata = spark.table("mc.amadeus2.iata")

# Join for origin city/country
df_enriched = df_anomalies_split.alias("a").join(
    df_iata.alias("i_origin"),
    col("a.origin_city") == col("i_origin.iata"),
    "left"
).select(
    col("a.*"),
    col("i_origin.city").alias("origin_city_full"),
    col("i_origin.country").alias("origin_country_full")
)

# Join for destination city/country
df_enriched = df_enriched.alias("a").join(
    df_iata.alias("i_dest"),
    col("a.destination_city") == col("i_dest.iata"),
    "left"
).select(
    col("a.route"),
    col("a.origin_city"),
    col("a.origin_city_full"),
    col("a.origin_country_full"),
    col("a.destination_city"),
    col("i_dest.city").alias("destination_city_full"),
    col("i_dest.country").alias("destination_country_full"),
    col("a.day_of_week"),
    col("a.avg_seats"),
    col("a.deviation_multiplier"),
    col("a.std_deviations")
)

print(f"‚úì Enriched {df_enriched.count()} anomalies with city/country names")
print("\n=== Top 20 Enriched Anomalies ===")
display(df_enriched.orderBy("avg_seats", ascending=False).limit(20))

# Flight Seat Anomaly Detection - Model Inference Instructions

## Overview
This document provides instructions for autonomously performing anomaly detection inference on flight seat data using the deployed model endpoint.

---

## Data Sources

### Input Table
- **Full Path**: `mc.amadeus2.anomaly_updates`
- **Schema**: 
  - `trip_origin_city` (STRING)
  - `trip_destination_city` (STRING)
  - `flight_leg_departure_date` (DATE)
  - `flight_leg_origin_city` (STRING)
  - `flight_leg_destination_city` (STRING)
  - `new_flight_leg_total_seats` (INT)
  - `flight_leg_total_seats` (INT)
  - `multiplier` (DOUBLE)

### Lookup Table
- **Full Path**: `mc.amadeus2.iata`
- **Schema**:
  - `city` (STRING) - Full city name
  - `country` (STRING) - Full country name
  - `iata` (STRING) - IATA airport code

### Model Endpoint
- **Endpoint Name**: `flight-seat-anomaly-detector`
- **Model Name**: `mc.amadeus2.flight_seat_anomaly_detector`
- **Model Version**: 1
- **Input Feature**: `avg_seats` (single numeric column)
- **Output**: Anomaly prediction (-1 = anomaly, 1 = normal)

---

## Implementation Functions

### Function 1: Load and Prepare Data
```python
def load_and_prepare_anomaly_data(table_name: str = "mc.amadeus2.anomaly_updates") -> pd.DataFrame:
    """
    Load anomaly updates table and prepare for model inference.
    
    Args:
        table_name: Full path to the anomaly updates table
        
    Returns:
        pandas DataFrame with route, day_of_week, and avg_seats columns
    """
    from pyspark.sql.functions import concat, lit, dayofweek, avg, round
    
    # Load table
    df = spark.table(table_name).select(
        "trip_origin_city",
        "trip_destination_city",
        "flight_leg_departure_date",
        "new_flight_leg_total_seats"
    )
    
    # Create route feature
    df = df.withColumn(
        "route",
        concat("trip_origin_city", lit("_to_"), "trip_destination_city")
    )
    
    # Extract day of week
    df = df.withColumn(
        "day_of_week",
        dayofweek("flight_leg_departure_date")
    )
    
    # Aggregate by route and day_of_week
    df_agg = df.groupBy("route", "day_of_week").agg(
        round(avg("new_flight_leg_total_seats"), 0).alias("avg_seats")
    )
    
    # Convert to pandas and clean
    df_pandas = df_agg.toPandas()
    df_pandas = df_pandas.dropna(subset=['avg_seats'])
    
    print(f"‚úì Loaded and prepared {len(df_pandas)} route-day combinations")
    return df_pandas
```

### Function 2: Invoke Model Endpoint
```python
def invoke_anomaly_model(df: pd.DataFrame, endpoint_name: str = "flight-seat-anomaly-detector") -> pd.DataFrame:
    """
    Call the deployed model endpoint to get anomaly predictions.
    
    Args:
        df: DataFrame with 'avg_seats' column
        endpoint_name: Name of the serving endpoint
        
    Returns:
        DataFrame with added 'anomaly' column (-1 or 1)
    """
    import requests
    from databricks.sdk import WorkspaceClient
    
    # Initialize client
    w = WorkspaceClient()
    token = w.dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
    host = w.dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()
    
    # Prepare payload
    input_data = df[['avg_seats']].to_dict(orient='split')
    payload = {"dataframe_split": input_data}
    
    # Call endpoint
    url = f"{host}/serving-endpoints/{endpoint_name}/invocations"
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    
    response = requests.post(url, headers=headers, json=payload)
    
    if response.status_code == 200:
        predictions = response.json()['predictions']
        df['anomaly'] = predictions
        print(f"‚úì Successfully scored {len(predictions)} records")
        return df
    else:
        raise Exception(f"Model endpoint call failed: {response.status_code} - {response.text}")
```

### Function 3: Calculate Deviation Metrics
```python
def calculate_deviation_metrics(df: pd.DataFrame, model_name: str = "mc.amadeus2.flight_seat_anomaly_detector") -> pd.DataFrame:
    """
    Calculate anomaly scores and deviation metrics from normal baseline.
    
    Args:
        df: DataFrame with 'avg_seats' and 'anomaly' columns
        model_name: Full model name in Unity Catalog
        
    Returns:
        DataFrame with added deviation metrics columns
    """
    import mlflow
    
    # Load model to get anomaly scores
    loaded_model = mlflow.sklearn.load_model(f"models:/{model_name}/1")
    anomaly_scores = loaded_model.score_samples(df[['avg_seats']])
    df['anomaly_score'] = anomaly_scores
    
    # Calculate baseline from normal records
    normal_baseline = df[df['anomaly'] == 1]['avg_seats'].median()
    normal_mean = df[df['anomaly'] == 1]['avg_seats'].mean()
    normal_std = df[df['anomaly'] == 1]['avg_seats'].std()
    
    # Calculate deviation multiplier
    deviation_raw = ((df['avg_seats'] - normal_baseline) / normal_baseline).round(0)
    df['deviation_multiplier'] = deviation_raw.apply(lambda x: f"+{int(x)}x" if x > 0 else f"{int(x)}x")
    
    # Calculate standard deviations
    df['std_deviations'] = ((df['avg_seats'] - normal_mean) / normal_std).round(2)
    
    print(f"‚úì Calculated deviation metrics")
    print(f"  Normal baseline (median): {normal_baseline:.0f} seats")
    print(f"  Normal mean: {normal_mean:.2f} seats")
    print(f"  Normal std dev: {normal_std:.2f} seats")
    
    return df
```

### Function 4: Enrich with IATA Data
```python
def enrich_with_iata_data(df: pd.DataFrame, iata_table: str = "mc.amadeus2.iata"):
    """
    Enrich anomaly data with full city and country names from IATA lookup.
    
    Args:
        df: DataFrame with 'route' column and anomaly flag
        iata_table: Full path to IATA lookup table
        
    Returns:
        Spark DataFrame with enriched city/country information
    """
    from pyspark.sql.functions import split, col
    
    # Filter anomalies only
    df_anomalies = df[df['anomaly'] == -1]
    
    # Convert to Spark
    df_spark = spark.createDataFrame(df_anomalies)
    
    # Split route into origin and destination
    df_split = df_spark.withColumn(
        "origin_city", split("route", "_to_")[0]
    ).withColumn(
        "destination_city", split("route", "_to_")[1]
    )
    
    # Load IATA table
    df_iata = spark.table(iata_table)
    
    # Join for origin
    df_enriched = df_split.alias("a").join(
        df_iata.alias("i_origin"),
        col("a.origin_city") == col("i_origin.iata"),
        "left"
    ).select(
        col("a.*"),
        col("i_origin.city").alias("origin_city_full"),
        col("i_origin.country").alias("origin_country_full")
    )
    
    # Join for destination
    df_enriched = df_enriched.alias("a").join(
        df_iata.alias("i_dest"),
        col("a.destination_city") == col("i_dest.iata"),
        "left"
    ).select(
        col("a.route"),
        col("a.origin_city"),
        col("a.origin_city_full"),
        col("a.origin_country_full"),
        col("a.destination_city"),
        col("i_dest.city").alias("destination_city_full"),
        col("i_dest.country").alias("destination_country_full"),
        col("a.day_of_week"),
        col("a.avg_seats"),
        col("a.deviation_multiplier"),
        col("a.std_deviations")
    )
    
    print(f"‚úì Enriched {df_enriched.count()} anomalies with IATA data")
    return df_enriched
```

### Function 5: Display Results
```python
def display_anomaly_results(df: pd.DataFrame, df_enriched):
    """
    Display comprehensive anomaly detection results.
    
    Args:
        df: Full DataFrame with all predictions and metrics
        df_enriched: Spark DataFrame with enriched anomaly data
    """
    # Summary statistics
    total = len(df)
    anomalies = (df['anomaly'] == -1).sum()
    normal = (df['anomaly'] == 1).sum()
    
    print("\n" + "="*60)
    print("ANOMALY DETECTION RESULTS")
    print("="*60)
    print(f"Total records scored: {total:,}")
    print(f"Anomalies detected: {anomalies:,} ({anomalies/total*100:.2f}%)")
    print(f"Normal records: {normal:,} ({normal/total*100:.2f}%)")
    print("="*60)
    
    # Top anomalies with metrics
    print("\nüìä TOP 20 ANOMALIES (with deviation metrics)")
    top_anomalies = df[df['anomaly'] == -1].sort_values('avg_seats', ascending=False).head(20)
    display(top_anomalies[['route', 'day_of_week', 'avg_seats', 'deviation_multiplier', 'std_deviations']])
    
    # Enriched anomalies with full names
    print("\nüåç TOP 20 ENRICHED ANOMALIES (with city/country names)")
    display(df_enriched.orderBy("avg_seats", ascending=False).limit(20))
```

---

## Complete Execution Pipeline

```python
def run_anomaly_detection_pipeline():
    """
    Execute the complete anomaly detection pipeline.
    """
    import pandas as pd
    
    print("üöÄ Starting Anomaly Detection Pipeline...\n")
    
    # Step 1: Load and prepare data
    print("[1/5] Loading and preparing data...")
    df = load_and_prepare_anomaly_data()
    
    # Step 2: Invoke model
    print("\n[2/5] Invoking model endpoint...")
    df = invoke_anomaly_model(df)
    
    # Step 3: Calculate metrics
    print("\n[3/5] Calculating deviation metrics...")
    df = calculate_deviation_metrics(df)
    
    # Step 4: Enrich with IATA
    print("\n[4/5] Enriching with IATA data...")
    df_enriched = enrich_with_iata_data(df)
    
    # Step 5: Display results
    print("\n[5/5] Displaying results...")
    display_anomaly_results(df, df_enriched)
    
    print("\n‚úÖ Pipeline completed successfully!")
    
    return df, df_enriched

# Execute pipeline
df_results, df_enriched_results = run_anomaly_detection_pipeline()
```

---

## Usage for Angular Application

### API Integration Points

1. **Data Endpoint**: Query `mc.amadeus2.anomaly_updates` via Databricks SQL API
2. **Model Endpoint**: `https://<workspace-url>/serving-endpoints/flight-seat-anomaly-detector/invocations`
3. **Results Format**: JSON with fields:
   - `route`: Origin_to_Destination format
   - `origin_city_full`: Full origin city name
   - `origin_country_full`: Full origin country
   - `destination_city_full`: Full destination city name
   - `destination_country_full`: Full destination country
   - `avg_seats`: Average seat count
   - `deviation_multiplier`: Deviation from normal (e.g., "+11x")
   - `std_deviations`: Standard deviations from mean

### Expected Output Schema
```json
{
  "route": "CPH_to_DXB",
  "origin_city": "CPH",
  "origin_city_full": "Copenhagen - Copenhagen Airport",
  "origin_country_full": "Denmark",
  "destination_city": "DXB",
  "destination_city_full": "Dubai - Dubai International Airport",
  "destination_country_full": "United Arab Emirates",
  "day_of_week": 5,
  "avg_seats": 1568,
  "deviation_multiplier": "+11x",
  "std_deviations": 29.39
}
```

---

## Notes

- **Normal Baseline**: Median of normal records (~127 seats)
- **Anomaly Threshold**: Automatically determined by Isolation Forest model
- **Deviation Multiplier**: Shows how many times above/below normal baseline
- **Standard Deviations**: Statistical distance from mean of normal records
- **Day of Week**: 1=Sunday, 2=Monday, ..., 7=Saturday (Spark dayofweek function)

---

## Error Handling

- Handle NaN values in `avg_seats` by dropping rows
- Verify model endpoint is running before calling
- Check for missing IATA codes in lookup table (will result in NULL city/country)
- Validate input data has required columns before processing