In [1]:
import os
import findspark
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go

from pyspark.ml.functions import vector_to_array
from pyspark.ml.feature import VectorAssembler, PCA

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import DoubleType, FloatType, IntegerType
from pyspark.sql.functions import col, count, when, isnan, avg, stddev, sum

from scipy.stats import f_oneway

In [2]:
findspark.init()

# Initialize Spark session to run locally
spark = SparkSession.builder \
    .appName("Big Data") \
    .config("spark.master", "local[*]") \
    .config("spark.ui.port", "4050") \
    .getOrCreate()

# Print the number of cores being used by Spark
print(f"Spark Session running on {spark.sparkContext.defaultParallelism} cores. UI is available at: {spark.sparkContext.uiWebUrl}")

Spark Session running on 12 cores. UI is available at: http://172.20.0.1:4050


### Defining functions

In [3]:
def load_and_clean_data(file_name: str):
    """Perform basic analysis on the dataset."""

    # Load the dataset and determine the shape
    file_path = os.path.join(os.getcwd(), "data_processed", file_name)
    df = spark.read.csv(file_path, header = True, inferSchema = True)
    df.show(2)

    print(f"{'=' * 75}\n Basic information about the data\n{'=' * 75}\n")
    print(f"Shape of the dataset: ({df.count()}, {len(df.columns)})")
    
    # Identify and drop columns with only one unique value
    constant_columns = [col for col in df.columns if df.select(col).distinct().count() == 1]
    df_cleaned = df.drop(*constant_columns)
    print(f"Number of columns with one unique value: {len(constant_columns)}\nColumns with one unique value: {constant_columns}\nShape of the cleaned dataset: ({df_cleaned.count()}, {len(df_cleaned.columns)})")

    # Check for NaN values in the DataFrame
    nan_counts = df_cleaned.select([count(when(isnan(col(c)) | col(c).isNull(), c)).alias(c) for c in df_cleaned.columns])
    total_nan_count = nan_counts.select([sum(c) for c in nan_counts.columns]).first()[0]
    print(f"Total count of NaN values in the DataFrame: {total_nan_count}")

    # Select only columns with numeric types (DoubleType, FloatType, IntegerType)
    numeric_columns = [field.name for field in df_cleaned.schema.fields if isinstance(field.dataType, (DoubleType, FloatType, IntegerType))]
    numeric_data = df_cleaned.select(numeric_columns)
    clean_data = numeric_data.na.drop()
    print(f"Number of numeric columns: {len(numeric_columns)}")

    print(f"\n{'=' * 75}\n Cleaned data containing only numerical columns\n{'=' * 75}\n")    
    clean_data.show(2)  

    return clean_data

In [4]:
def compute_anova_and_summary_stats(clean_data: DataFrame):
    """Calculate the mean and standard deviation for each feature in the dataset."""

    feature_columns = [c for c in clean_data.columns if c not in ["class", "name", "provider", "patient"]]

    # Create alternating mean and std aggregation list
    aggregations = []
    for feature in feature_columns:
        aggregations.append(avg(col(feature)).alias(f"{feature}_mean"))
        aggregations.append(stddev(col(feature)).alias(f"{feature}_stddev"))

    # Group by class and calculate statistics mean and std deviation for each feature by class
    print(f"{'=' * 75}\n Mean and STD of each feture per class\n{'=' * 75}\n")
    stats_df = clean_data.groupBy("class").agg(*aggregations, count("*").alias("count"))
    stats_df.show(truncate=False)

    # Convert to pandas for ANOVA analysis
    pandas_df = clean_data.select("class", *feature_columns).toPandas()

    # Perform ANOVA for each feature
    anova_results = {}
    for feature in feature_columns:
        class_0_values = pandas_df[pandas_df["class"] == 0][feature]
        class_1_values = pandas_df[pandas_df["class"] == 1][feature]
        f_stat, p_value = f_oneway(class_0_values, class_1_values)
        anova_results[feature] = {"f_stat": f_stat, "p_value": p_value}

    # Create DataFrame from ANOVA results
    anova_df = pd.DataFrame(anova_results).transpose().sort_values(by="p_value")

    # Print each row as text
    print(f"{'=' * 75}\n ANOVA significants test\n{'=' * 75}\n")
    for index, row in anova_df.iterrows():
        print(f" P-value: {row['p_value']:.4f}, F-statistic: {row['f_stat']:.4f} | Feature: {index}")

    # Filter ANOVA results to show only features with significant difference
    no_significant_difference = anova_df[anova_df['p_value'] < 0.05]
    count_significant_difference = len(no_significant_difference)
    print(f"\nNumber of features with significant difference: {count_significant_difference}")

    return pandas_df

In [5]:
def plot_feature_distributions(pandas_df: pd.DataFrame, filename: str, save_folder: str = 'output_graphs'):
    """Plot the distributions of all features between classes."""
    
    # Create the directory named after the filename inside the save_folder
    folder_path = os.path.join(save_folder, filename)
    os.makedirs(folder_path, exist_ok=True)  # Ensure the folder exists

    # Get all numeric feature columns except the class column
    feature_columns = [col for col in pandas_df.columns if col != 'class']
    
    for feature in feature_columns:
        class_0_values = pandas_df[pandas_df["class"] == 0][feature]
        class_1_values = pandas_df[pandas_df["class"] == 1][feature]
        
        # Plot distributions
        fig = go.Figure()
        fig.add_trace(go.Histogram(x=class_0_values, name="Class 0", opacity=0.7))
        fig.add_trace(go.Histogram(x=class_1_values, name="Class 1", opacity=0.7))
        
        fig.update_layout(
            title=f"Distribution of {feature}",
            xaxis_title=f"{feature} values",
            yaxis_title="Count",
            barmode="overlay"
        )
        
        # Save the figure to the specified folder
        save_path = os.path.join(folder_path, f"{feature}_distribution.png")
        fig.write_image(save_path)

In [6]:
def visualize_pca_in_3d(df: DataFrame):
    """Reduce dimensionality with PCA and visualize in 3D using Plotly."""
    
    # Vector assembler to create a 'features' column
    feature_columns = [col for col in df.columns if col != 'class']
    assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
    data = assembler.transform(df)

    # Apply PCA to reduce the features to 3 components
    pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
    pca_model = pca.fit(data)
    pca_points = pca_model.transform(data).select("pcaFeatures", "class")

    # Convert to Pandas DataFrame for visualization
    df = pca_points.withColumn("feature", vector_to_array("pcaFeatures")).select(
        [col("feature")[i] for i in range(3)] + ['class'])
    pd_df = df.toPandas()

    # Plotting using Plotly
    fig = px.scatter_3d(
        data_frame=pd_df, 
        x='feature[0]', y='feature[1]', z='feature[2]', 
        color='class', 
        symbol='class',
        symbol_sequence=['x', 'circle'], 
        color_continuous_scale='jet'
    )
    fig.update_traces(marker_size=5)
    fig.show()

In [7]:
def run_analysis_pipeline(file_name: str) -> None:
    """Run the complete data analysis pipeline."""
    
    print(f"\n{'=' * 75}\n Data preview\n{'=' * 75}\n")
    clean_data = load_and_clean_data(file_name)
    pandas_df = compute_anova_and_summary_stats(clean_data)

    # Plot distributions for all features
    print(f"\n{'=' * 75}\n Generating distribution graphs\n{'=' * 75}\n")
    plot_feature_distributions(pandas_df, file_name)
    print('Graphs successfully saved in the "output_graphs" folder')

    # Visualize PCA in 3D
    visualize_pca_in_3d(clean_data)

### Performing analysis using different datasets

In [8]:
run_analysis_pipeline('features_128_full_mask.csv')


 Data preview

+--------------------------------+--------------------------+------------------------------+------------------------------+---------------------------+----------------------------------+-------------------------------------------+-------------------------------+-----------------------------------------+----------------------------------+-------------------------------+-------------------------------+----------------------------------+----------------------------------+------------------------------+---------------------------------+------------------------------+-------------------------------------+----------------------------------+-----------------------------------+-------------------------------------------+--------------------------------------+--------------------------------+--------------------------------+--------------------------+---------------------------+--------------------------------------+----------------------------+---------------------------+------

In [9]:
run_analysis_pipeline('features_128_lesion_mask.csv')


 Data preview

+--------------------------------+--------------------------+------------------------------+------------------------------+---------------------------+----------------------------------+-------------------------------------------+-------------------------------+-----------------------------------------+----------------------------------+-------------------------------+-------------------------------+----------------------------------+----------------------------------+------------------------------+---------------------------------+------------------------------+-------------------------------------+----------------------------------+-----------------------------------+-------------------------------------------+--------------------------------------+--------------------------------+--------------------------------+--------------------------+---------------------------+--------------------------------------+----------------------------+---------------------------+------

In [10]:
run_analysis_pipeline('features_256_full_mask.csv')


 Data preview

+--------------------------------+--------------------------+------------------------------+------------------------------+---------------------------+----------------------------------+-------------------------------------------+-------------------------------+-----------------------------------------+----------------------------------+-------------------------------+-------------------------------+----------------------------------+----------------------------------+------------------------------+---------------------------------+------------------------------+-------------------------------------+----------------------------------+-----------------------------------+-------------------------------------------+--------------------------------------+--------------------------------+--------------------------------+--------------------------+---------------------------+--------------------------------------+----------------------------+---------------------------+------

In [11]:
run_analysis_pipeline('features_256_lesion_mask.csv')


 Data preview

+--------------------------------+--------------------------+------------------------------+------------------------------+---------------------------+----------------------------------+-------------------------------------------+-------------------------------+-----------------------------------------+----------------------------------+-------------------------------+-------------------------------+----------------------------------+----------------------------------+------------------------------+---------------------------------+------------------------------+-------------------------------------+----------------------------------+-----------------------------------+-------------------------------------------+--------------------------------------+--------------------------------+--------------------------------+--------------------------+---------------------------+--------------------------------------+----------------------------+---------------------------+------

In [12]:
run_analysis_pipeline('features_512_full_mask.csv')


 Data preview

+--------------------------------+--------------------------+------------------------------+------------------------------+---------------------------+----------------------------------+-------------------------------------------+-------------------------------+-----------------------------------------+----------------------------------+-------------------------------+-------------------------------+----------------------------------+----------------------------------+------------------------------+---------------------------------+------------------------------+-------------------------------------+----------------------------------+-----------------------------------+-------------------------------------------+--------------------------------------+--------------------------------+--------------------------------+--------------------------+---------------------------+--------------------------------------+----------------------------+---------------------------+------

In [13]:
run_analysis_pipeline('features_512_lesion_mask.csv')


 Data preview

+--------------------------------+--------------------------+------------------------------+------------------------------+---------------------------+----------------------------------+-------------------------------------------+-------------------------------+-----------------------------------------+----------------------------------+-------------------------------+-------------------------------+----------------------------------+----------------------------------+------------------------------+---------------------------------+------------------------------+-------------------------------------+----------------------------------+-----------------------------------+-------------------------------------------+--------------------------------------+--------------------------------+--------------------------------+--------------------------+---------------------------+--------------------------------------+----------------------------+---------------------------+------

In [14]:
spark.stop()