In [None]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

## Spark Session Initialization

In [None]:
from pyspark.sql import SparkSession

Spark = SparkSession.builder \
    .appName("Aircraft Compliance Exploration") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.driver.cores", "2") \
    .config("master", "local[*]") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.driver.host", "localhost") \
    .config("spark.driver.port", "4040") \
    .getOrCreate()

In [None]:
from IPython.display import Markdown, display

display(Markdown("""
Key configurations:

- **Memory Allocation**:
  - Driver memory: `2g`
  - Executor memory: `2g`
- **CPU Cores**:
  - Driver cores: `2`
  - Executor cores: `2`
- **Execution Mode**:
  - Runs in local mode using all available cores: `local[*]`
- **Network Settings**:
  - Driver bind address: `127.0.0.1`
  - Driver host: `localhost`
  - Driver port: `4040`

These settings are suitable for developing and testing Spark applications on a single machine, while working with moderate-sized datasets.
"""))


Key configurations:

- **Memory Allocation**:
  - Driver memory: `2g`
  - Executor memory: `2g`
- **CPU Cores**:
  - Driver cores: `2`
  - Executor cores: `2`
- **Execution Mode**:
  - Runs in local mode using all available cores: `local[*]`
- **Network Settings**:
  - Driver bind address: `127.0.0.1`
  - Driver host: `localhost`
  - Driver port: `4040`

These settings are suitable for developing and testing Spark applications on a single machine, while working with moderate-sized datasets.


## Loading the data as a Spark Dataframe

In [None]:
file_path = '/content/drive/MyDrive/UA_SA_ML/case_study.csv'

In [None]:
df = Spark.read.csv(file_path, header=True, inferSchema=True)

## Examining the schema

In [None]:
df.printSchema()

root
 |-- date: date (nullable = true)
 |-- airport: string (nullable = true)
 |-- reference_number: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- departures: integer (nullable = true)
 |-- count_of_compliant_audit: integer (nullable = true)
 |-- count_of_audit: integer (nullable = true)



## Descriptive Statistics

In [2]:
# Get all numeric columns of type int or float
numeric_columns = [field.name for field in df.schema.fields
                   if isinstance(field.dataType, (T.StringType, T.IntegerType, T.FloatType, T.DoubleType))]

# Describe only those numeric columns
summary_stats = df.describe(numeric_columns)
summary_stats.show(truncate=False)

NameError: name 'df' is not defined

## Time Series Analysis

In [None]:
import plotly.express as px
import pandas as pd

In [None]:
df.select("airline").distinct().show()

+---------------+
|        airline|
+---------------+
|United Airlines|
|            UAX|
|Partner Airline|
|       Mainline|
|           NULL|
+---------------+



In [None]:
def calculate_compliance_rate(df, metric_name='compliance_rate', numerator_col='count_of_compliant_audit', denominator_col='count_of_audit'):
    df = df.withColumn(metric_name, F.expr(f"({numerator_col} / {denominator_col}) * 100"))
    return df

In [None]:
df = calculate_compliance_rate(df, metric_name='compliance_rate', numerator_col='count_of_compliant_audit', denominator_col='count_of_audit')

In [None]:
import plotly.express as px
import pandas as pd
from datetime import datetime
from pyspark.sql import functions as F

def create_cross_comparison_graph(df, date_col='date', filter_column='airline', filter_value='United Airlines', metric_cols='compliance_rate', start_date=None, end_date=None):
    """
    Creates a spline graph for cross-comparison of average metric values
    across audit groups (reference_number) and monthly time periods, with optional filtering by column and date range.
    Supports multiple metrics and automatically detects formatting for hover (percentage, int, float, etc.).

    Parameters:
    - df: PySpark DataFrame with the schema including 'date', 'reference_number', and the metric columns.
    - date_col: Column name for date (default: 'date').
    - filter_column: Column to filter on (e.g., 'airline'). If None, no filtering is applied.
    - filter_value: Value(s) to filter by (e.g., 'United Airlines' or ['United Airlines', 'Delta']). If None, no filtering is applied.
    - metric_cols: Column name(s) for the metrics to analyze (e.g., 'compliance_rate' or ['compliance_rate', 'another_metric']). Required.
    - start_date: Start date for filtering (format: 'YYYY-MM-DD' or None). If None, no start date filter is applied.
    - end_date: End date for filtering (format: 'YYYY-MM-DD' or None). If None, no end date filter is applied.
    """
    # Handle metric_cols as string or list
    if isinstance(metric_cols, str):
        metric_cols = [metric_cols]

    # Validate that reference_number, metric_cols, and date_col exist in DataFrame
    if 'reference_number' not in df.columns:
        raise ValueError("Column 'reference_number' not found in DataFrame")
    for metric in metric_cols:
        if metric not in df.columns:
            raise ValueError(f"Metric column '{metric}' not found in DataFrame")
    if date_col not in df.columns:
        raise ValueError(f"Date column '{date_col}' not found in DataFrame")

    # Apply filtering for filter_column and filter_value
    df_filtered = df
    if filter_column is not None and filter_value is not None:
        if filter_column in df.columns:
            if isinstance(filter_value, list):
                df_filtered = df_filtered.filter(F.col(filter_column).isin(filter_value))
            else:
                df_filtered = df_filtered.filter(F.col(filter_column) == filter_value)

    # Apply date range filtering if start_date or end_date is provided
    if start_date is not None:
        try:
            datetime.strptime(start_date, '%Y-%m-%d')  # Validate format
            df_filtered = df_filtered.filter(F.col(date_col) >= start_date)
        except ValueError:
            raise ValueError("start_date must be in 'YYYY-MM-DD' format or None")
    if end_date is not None:
        try:
            datetime.strptime(end_date, '%Y-%m-%d')  # Validate format
            df_filtered = df_filtered.filter(F.col(date_col) <= end_date)
        except ValueError:
            raise ValueError("end_date must be in 'YYYY-MM-DD' format or None")

    # Drop rows where any metric_col is null to ensure clean aggregation
    if metric_cols:
        for metric in metric_cols:
            df_filtered = df_filtered.filter(F.col(metric).isNotNull())

    # Aggregate average metric values by month and reference_number
    aggregated = df_filtered.groupBy(
        F.date_format(F.col(date_col), "yyyy-MM").alias("month"),
        "reference_number"
    ).agg(
        *(F.avg(F.col(metric)).alias(f"avg_{metric}") for metric in metric_cols)
    ).orderBy("month", "reference_number")

    # Convert to Pandas for Plotly
    pd_agg = aggregated.toPandas()

    # Melt the DataFrame for plotting multiple metrics
    pd_melt = pd.melt(
        pd_agg,
        id_vars=['month', 'reference_number'],
        value_vars=[f'avg_{metric}' for metric in metric_cols],
        var_name='avg_metric',
        value_name='avg_value'
    )
    pd_melt['metric'] = pd_melt['avg_metric'].str.replace('avg_', '')

    # Define title based on filtering
    title = "Cross Comparison of Average Metrics Across Audit Groups Over Time"
    if filter_column is not None and filter_value is not None:
        if isinstance(filter_value, list):
            title += f" ({', '.join(filter_value)})"
        else:
            title += f" ({filter_value})"
    if start_date is not None or end_date is not None:
        date_range = f"[{start_date or 'Start'} to {end_date or 'End'}]"
        title += f" {date_range}"

    # Create line plot with splines for time series comparison across audit groups
    fig = px.line(
        pd_melt,
        x="month",
        y="avg_value",
        color="reference_number",
        facet_row="metric",
        line_shape='spline',  # Use spline interpolation for smooth lines
        title=title,
        labels={
            "month": "Month (YYYY-MM)",
            "avg_value": "Average Value",
            "reference_number": "Audit Group",
            "metric": "Metric"
        },
        category_orders={"metric": metric_cols},  # Preserve metric order
        color_discrete_sequence=px.colors.sequential.Jet,
    )

    # Apply dark theme with black background
    fig.update_layout(
        template="plotly_dark",
        paper_bgcolor="black",
        plot_bgcolor="black",
        font_color="white",
        title_font_size=20,
        legend_title_font_color="white",
        legend_font_color="white"
    )

    # Rotate x-axis labels for better readability
    fig.update_xaxes(tickangle=45)

    # Automatically detect formats and update y-axes and hover templates per metric
    for i, metric in enumerate(metric_cols):
        col = f"avg_{metric}"
        vals = pd_agg[col].dropna()
        if len(vals) == 0:
            continue
        min_v = vals.min()
        max_v = vals.max()

        # Determine if percentage based on name or range
        name_lower = metric.lower()
        is_percentage = any(word in name_lower for word in ['rate', 'percent', 'pct', 'compliance', 'ratio', 'proportion']) or (min_v >= 0 and max_v <= 100)

        if is_percentage:
            if max_v <= 1.0001:
                hover_format = '.2%'
                y_label = f"Average {metric.replace('_', ' ').title()} (%)"
            else:
                hover_format = '.2f%'
                y_label = f"Average {metric.replace('_', ' ').title()} (%)"
        else:
            if all(abs(v - round(v)) < 1e-6 for v in vals):
                hover_format = '.0f'
                y_label = f"Average {metric.replace('_', ' ').title()}"
            else:
                hover_format = '.2f'
                y_label = f"Average {metric.replace('_', ' ').title()}"

        # Update y-axis label and hover template for this subplot row
        row_num = i + 1
        fig.update_yaxes(title_text=y_label, row=row_num, col=1)
        fig.update_traces(
            hovertemplate="Month: %{x}<br>Avg: %{y:" + hover_format + "}<br>Audit Group: %{trace.name}",
            row=row_num, col=1
        )

    # Show the figure
    fig.show()

In [None]:
create_cross_comparison_graph(df, date_col='date', filter_column='reference_number', filter_value=['Audit 3'], metric_col='compliance_rate', start_date=None, end_date=None)

TypeError: create_cross_comparison_graph() got an unexpected keyword argument 'metric_col'

In [None]:
create_cross_comparison_graph(df, date_col='date', filter_column='reference_number', filter_value=['Audit 3'], metric_cols=['count_of_audit', 'compliance_rate'], start_date=None, end_date=None)

In [None]:
create_cross_comparison_graph(df, date_col='date', filter_column='reference_number', filter_value=['Audit 3'], metric_col='count_of_compliant_audit', start_date=None, end_date=None)

In [None]:
from pyspark.sql import functions as F
import plotly.express as px
import pandas as pd
from datetime import datetime

def create_outlier_box_plot(df, date_col='date', filter_column='airline', filter_value='United Airlines', metric_col='compliance_rate', start_date=None, end_date=None):
    """
    Creates a box plot to visualize the distribution and outliers of a metric (e.g., compliance_rate)
    across audit groups (reference_number), with optional filtering by column and date range.

    Parameters:
    - df: PySpark DataFrame with the schema including 'date', 'reference_number', and the metric column.
    - date_col: Column name for date (default: 'date').
    - filter_column: Column to filter on (e.g., 'airline'). If None, no filtering is applied.
    - filter_value: Value to filter by (e.g., 'United Airlines'). If None, no filtering is applied.
    - metric_col: Column name for the metric to analyze (e.g., 'compliance_rate'). Required.
    - start_date: Start date for filtering (format: 'YYYY-MM-DD' or None). If None, no start date filter is applied.
    - end_date: End date for filtering (format: 'YYYY-MM-DD' or None). If None, no end date filter is applied.

    The plot uses a dark theme with black background and a United Airlines-inspired blue color palette.
    Outliers are shown as points beyond the whiskers.
    """
    # Validate that metric_col and date_col exist in DataFrame
    if metric_col not in df.columns:
        raise ValueError(f"Metric column '{metric_col}' not found in DataFrame")
    if date_col not in df.columns:
        raise ValueError(f"Date column '{date_col}' not found in DataFrame")

    # Apply filtering for filter_column and filter_value
    df_filtered = df
    if filter_column is not None and filter_value is not None:
        if filter_column in df.columns:
            df_filtered = df_filtered.filter(F.col(filter_column) == filter_value)
        else:
            raise ValueError(f"Filter column '{filter_column}' not found in DataFrame")

    # Apply date range filtering if start_date or end_date is provided
    if start_date is not None:
        try:
            datetime.strptime(start_date, '%Y-%m-%d')  # Validate format
            df_filtered = df_filtered.filter(F.col(date_col) >= start_date)
        except ValueError:
            raise ValueError("start_date must be in 'YYYY-MM-DD' format or None")
    if end_date is not None:
        try:
            datetime.strptime(end_date, '%Y-%m-%d')  # Validate format
            df_filtered = df_filtered.filter(F.col(date_col) <= end_date)
        except ValueError:
            raise ValueError("end_date must be in 'YYYY-MM-DD' format or None")

    # Drop nulls in metric_col to ensure clean data for plotting
    df_filtered = df_filtered.filter(F.col(metric_col).isNotNull())

    # Select relevant columns for the box plot
    df_for_plot = df_filtered.select("reference_number", metric_col)

    # Convert to Pandas for Plotly
    pd_df = df_for_plot.toPandas()

    # Define title based on filtering
    title = f"{metric_col.replace('_', ' ').title()} Across Audit Groups (Outliers Highlighted)"
    if filter_column is not None and filter_value is not None:
        title += f" ({filter_value})"
    if start_date is not None or end_date is not None:
        date_range = f"[{start_date or 'Start'} to {end_date or 'End'}]"
        title += f" {date_range}"

    # Create box plot to show distribution and outliers across audit groups
    fig = px.box(
        pd_df,
        x="reference_number",
        y=metric_col,
        color="reference_number",
        title=title,
        labels={
            "reference_number": "Audit Group",
            metric_col: f"{metric_col.replace('_', ' ').title()} (%)"
        },
        color_discrete_sequence=px.colors.sequential.Jet  # Jet
    )

    # Show outliers as points
    fig.update_traces(boxpoints='outliers', jitter=0.3)  # Jitter for better visibility of outliers

    # Apply dark theme with black background
    fig.update_layout(
        template="plotly_dark",
        paper_bgcolor="black",
        plot_bgcolor="black",
        font_color="white",
        title_font_size=20,
        legend_title_font_color="white",
        legend_font_color="white"
    )

    # Rotate x-axis labels for better readability
    fig.update_xaxes(tickangle=45)

    # Show the figure
    fig.show()

In [None]:
create_outlier_box_plot(df, date_col='date', filter_column=None, filter_value=None, metric_col='count_of_compliant_audit', start_date='2025-01-01', end_date='2025-05-01')

In [None]:
create_outlier_box_plot(df, date_col='date', filter_column='reference_number', filter_value=['Audit 1', 'Audit 2'], metric_col='compliance_rate', start_date=None, end_date=None)

SparkRuntimeException: [UNSUPPORTED_FEATURE.LITERAL_TYPE] The feature is not supported: Literal for '[Audit 1, Audit 2]' of class java.util.ArrayList.

=============================================================================================

# Ad Hoc

## 50th Percentile

In [None]:
# df.createOrReplaceTempView("your_table")

# Spark.sql("""
#     SELECT
#       percentile_approx(departures, 0.5) AS median_departures,
#       percentile_approx(count_of_compliant_audit, 0.5) AS median_compliant,
#       percentile_approx(count_of_audit, 0.5) AS median_audit
#     FROM your_table
# """).show(truncate=False)

+-----------------+----------------+------------+
|median_departures|median_compliant|median_audit|
+-----------------+----------------+------------+
|888              |1               |1           |
+-----------------+----------------+------------+



In [None]:
from pyspark.sql.functions import percentile_approx, mode

# Custom stats including median (50th percentile) and mode
df.agg(
    percentile_approx(col("departures"), 0.5).alias("median_departures"),
    mode(col("departures")).alias("mode_departures"),
    percentile_approx(col("count_of_compliant_audit"), 0.5).alias("median_compliant"),
    mode(col("count_of_compliant_audit")).alias("mode_compliant"),
    percentile_approx(col("count_of_audit"), 0.5).alias("median_audit"),
    mode(col("count_of_audit")).alias("mode_audit")
).show(truncate=False)

+-----------------+---------------+----------------+--------------+------------+----------+
|median_departures|mode_departures|median_compliant|mode_compliant|median_audit|mode_audit|
+-----------------+---------------+----------------+--------------+------------+----------+
|888              |0              |1               |1             |1           |1         |
+-----------------+---------------+----------------+--------------+------------+----------+



## Statistics for Categorical Columns

In [None]:
from pyspark.sql.functions import count, countDistinct

# Unique counts per categorical column
unique_counts = df.agg(
    countDistinct(col("airport")).alias("unique_airports"),
    countDistinct(col("reference_number")).alias("unique_reference_numbers"),
    countDistinct(col("airline")).alias("unique_airlines")
)
unique_counts.show(1, truncate=False)

# # Frequency distribution (top N for each)
# df.groupBy("airport").agg(count("*").alias("frequency")).orderBy("frequency", ascending=False).show(10)
# df.groupBy("reference_number").agg(count("*").alias("frequency")).orderBy("frequency", ascending=False).show(10)
# df.groupBy("airline").agg(count("*").alias("frequency")).orderBy("frequency", ascending=False).show(10)

+---------------+------------------------+---------------+
|unique_airports|unique_reference_numbers|unique_airlines|
+---------------+------------------------+---------------+
|313            |3                       |4              |
+---------------+------------------------+---------------+



## Group by date or derived time periods (e.g., month/year) for trends.

In [None]:
from pyspark.sql.functions import year, month, sum, avg

# Monthly averages for numerical columns
df.groupBy(year("date").alias("year"), month("date").alias("month")).agg(
    avg("departures").alias("avg_departures"),
    avg("count_of_compliant_audit").alias("avg_compliant"),
    avg("count_of_audit").alias("avg_audit"),
    sum("departures").alias("total_departures")
).orderBy("year", "month").show(truncate=False)

+----+-----+------------------+------------------+------------------+----------------+
|year|month|avg_departures    |avg_compliant     |avg_audit         |total_departures|
+----+-----+------------------+------------------+------------------+----------------+
|2022|1    |2429.2628205128203|0.5               |1.0               |1136895         |
|2022|2    |1936.7315315315316|0.8               |1.0036036036036036|1074886         |
|2022|3    |2047.6406779661017|0.6111111111111112|1.0               |1208108         |
|2022|4    |1983.7766798418972|0.6666666666666666|1.0               |1003791         |
|2022|5    |2039.8386363636364|0.5               |1.0               |897529          |
|2022|6    |1826.0669456066946|1.0               |1.00418410041841  |872860          |
|2022|7    |1966.679389312977 |0.3333333333333333|1.0025445292620865|772905          |
|2022|8    |2163.273607748184 |0.8               |1.0048426150121066|893432          |
|2022|9    |2038.6694677871149|0.5         

## Compliance Rate

In [None]:
from pyspark.sql.functions import expr

# Add compliance rate column and get stats
df_with_rate = df.withColumn("compliance_rate", expr("(count_of_compliant_audit / count_of_audit) * 100"))
df_with_rate.describe("compliance_rate").show(truncate=False)

## Null Count and Percentage

In [None]:
from pyspark.sql.functions import col, sum, lit

total_rows = df.count()

# Collect null stats
null_stats = [
    (c,
     df.filter(col(c).isNull()).count(),
     (df.filter(col(c).isNull()).count() / total_rows) * 100
    )
    for c in df.columns
]

# Convert back to Spark DataFrame
null_stats_df = Spark.createDataFrame(null_stats, ["column_name", "null_count", "null_pct"])

null_stats_df.show(truncate=False)

+------------------------+----------+-----------------+
|column_name             |null_count|null_pct         |
+------------------------+----------+-----------------+
|date                    |0         |0.0              |
|airport                 |0         |0.0              |
|reference_number        |0         |0.0              |
|airline                 |3116      |1.592843450479233|
|departures              |0         |0.0              |
|count_of_compliant_audit|192063    |98.17916932907347|
|count_of_audit          |0         |0.0              |
+------------------------+----------+-----------------+



## Unique Values

In [None]:
from pyspark.sql.functions import col, countDistinct

# Get total rows (optional, for context or further use)
total_rows = df.count()

# Collect unique value counts
unique_stats = [
    (c, df.select(countDistinct(col(c)).alias('unique_count')).collect()[0]['unique_count'])
    for c in df.columns
]

# Convert to Spark DataFrame
unique_stats_df = Spark.createDataFrame(unique_stats, ["column_name", "unique_count"])

# Display results
unique_stats_df.show(truncate=False)

+------------------------+------------+
|column_name             |unique_count|
+------------------------+------------+
|date                    |1300        |
|airport                 |313         |
|reference_number        |3           |
|airline                 |4           |
|departures              |8078        |
|count_of_compliant_audit|8           |
|count_of_audit          |7           |
+------------------------+------------+



In [None]:
from pyspark.sql.types import NumericType

selected_column = 'airport'

is_numeric = isinstance(df.schema[selected_column].dataType, NumericType)

In [None]:
print(is_numeric)

False


In [None]:
selected_column = 'hazard_type'
df = hazards_df


non_numbers_only = [x[selected_column] for x in df.select(selected_column).collect() if isinstance(x[selected_column], int)]

NameError: name 'hazards_df' is not defined

In [None]:
hazards_df.count()