In [1]:
# Installation and Imports
%%capture
!pip install pyspark pytest pandas numpy scipy scikit-learn

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.ml.feature import Imputer, VectorAssembler
from pyspark.ml.stat import Correlation
import pandas as pd
import numpy as np
from typing import List, Dict, Optional, Union, Tuple
import unittest
import pytest

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Health Data Analysis") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

In [2]:
# Python Function Definitions For SAS Operations
class SASPySparkMigration:
    """ SAS to PySpark migration"""

    def __init__(self, df: pyspark.sql.DataFrame):
        self.df = df

    def sort_data(self,
                 columns: List[str],
                 ascending: Union[bool, List[bool]] = True) -> pyspark.sql.DataFrame:
        """
        SAS equivalent:
        PROC SORT DATA=dataset;
            BY col1 col2 ... coln;
        RUN;

        For descending sort:
        PROC SORT DATA=dataset;
            BY DESCENDING col1 DESCENDING col2;
        RUN;
        """
        if isinstance(ascending, bool):
            ascending = [ascending] * len(columns)

        order_cols = [F.col(c).asc() if asc else F.col(c).desc()
                     for c, asc in zip(columns, ascending)]
        return self.df.orderBy(*order_cols)

    def calculate_means(self,
                       group_cols: List[str],
                       analysis_cols: List[str]) -> pyspark.sql.DataFrame:
        """
        SAS equivalent:
        PROC MEANS DATA=dataset MEAN STD MIN MAX N;
            CLASS group_col1 group_col2;
            VAR analysis_col1 analysis_col2;
        RUN;
        """
        aggs = []
        for col in analysis_cols:
            aggs.extend([
                F.mean(col).alias(f"{col}_mean"),
                F.stddev(col).alias(f"{col}_stddev"),
                F.min(col).alias(f"{col}_min"),
                F.max(col).alias(f"{col}_max"),
                F.count(col).alias(f"{col}_count")
            ])

        return self.df.groupBy(*group_cols).agg(*aggs)

    def frequency_analysis(self,
                         columns: List[str],
                         include_pct: bool = True) -> pyspark.sql.DataFrame:
        """
        SAS equivalent:
        PROC FREQ DATA=dataset;
            TABLES col1 col2 / NOCUM NOPERCENT;
        RUN;

        With percentages:
        PROC FREQ DATA=dataset;
            TABLES col1 col2 / NOCUM;
        RUN;
        """
        freq_df = self.df.groupBy(*columns).count()

        if include_pct:
            total = self.df.count()
            freq_df = freq_df.withColumn(
                "percentage",
                F.round(F.col("count") * 100 / total, 2)
            )

        return freq_df.orderBy("count", ascending=False)

    def univariate_analysis(self,
                          numeric_col: str) -> pyspark.sql.DataFrame:
        """
        SAS equivalent:
        PROC UNIVARIATE DATA=dataset;
            VAR numeric_col;
        RUN;
        """
        quantiles = self.df.approxQuantile(
            numeric_col,
            [0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99],
            0.01
        )

        stats_df = self.df.select(
            F.mean(numeric_col).alias("mean"),
            F.stddev(numeric_col).alias("stddev"),
            F.skewness(numeric_col).alias("skewness"),
            F.kurtosis(numeric_col).alias("kurtosis"),
            F.min(numeric_col).alias("min"),
            F.max(numeric_col).alias("max")
        )

        stats_pd = stats_df.toPandas()
        quantiles_pd = pd.DataFrame([quantiles],
                                  columns=['p1', 'p5', 'p10', 'p25', 'p50',
                                         'p75', 'p90', 'p95', 'p99'])

        return spark.createDataFrame(
            pd.concat([stats_pd, quantiles_pd], axis=1)
        )

    def cross_tabulation(self,
                        row_col: str,
                        col_col: str,
                        normalize: bool = False) -> pyspark.sql.DataFrame:
        """
        Creates a cross-tabulation (contingency table) of two columns.

        Args:
            row_col (str): Column name to use for rows
            col_col (str): Column name to use for columns
            normalize (bool): If True, adds percentage columns for each value

        Returns:
            pyspark.sql.DataFrame: Cross-tabulation result

        Example:
            >>> analyzer = DataFrameAnalyzer(medical_df)
            >>> result = analyzer.cross_tabulation('diagnosis', 'gender', normalize=True)

        SAS equivalent:
        PROC TABULATE DATA=dataset;
            CLASS row_col col_col;
            TABLE row_col, col_col;
        RUN;

        With percentages:
        PROC TABULATE DATA=dataset;
            CLASS row_col col_col;
            TABLE row_col, col_col / PTOC;
        RUN;
        """
        # Create basic crosstab
        crosstab = self.df.crosstab(row_col, col_col)

        if normalize:
            # Calculate column totals using window function
            window = Window.partitionBy()
            value_columns = crosstab.columns[1:]  # Skip the row label column

            for col in value_columns:
                # Calculate total for the column
                total = F.sum(F.col(col)).over(window)
                # Add percentage column
                crosstab = crosstab.withColumn(
                    f"{col}_pct",
                    F.round(F.col(col) * 100 / total, 2)
                )

        return crosstab

    def rank_data(self,
                 rank_col: str,
                 partition_cols: Optional[List[str]] = None,
                 method: str = 'dense') -> pyspark.sql.DataFrame:
        """
        SAS equivalent:
        PROC RANK DATA=dataset OUT=ranked_data TIES=DENSE;
            BY partition_col1 partition_col2;
            VAR rank_col;
            RANKS rank_col_rank;
        RUN;
        """
        window_spec = Window.partitionBy(*partition_cols if partition_cols else [])

        rank_methods = {
            'dense': F.dense_rank(),
            'row_number': F.row_number(),
            'percent': F.percent_rank()
        }

        return self.df.withColumn(
            f"{rank_col}_rank",
            rank_methods[method].over(window_spec.orderBy(rank_col))
        )

    def format_values(self,
                     column: str,
                     format_dict: Dict[str, str]) -> pyspark.sql.DataFrame:
        """
        SAS equivalent:
        PROC FORMAT;
            VALUE $formatname
                'old_value1' = 'new_value1'
                'old_value2' = 'new_value2';
        RUN;

        DATA new_dataset;
            SET dataset;
            formatted_col = PUT(column, $formatname.);
        RUN;
        """
        return self.df.replace(format_dict, subset=[column])

    def transpose_data(self,
                      id_cols: List[str],
                      pivot_col: str,
                      value_col: str) -> pyspark.sql.DataFrame:
        """
        SAS equivalent:
        PROC TRANSPOSE DATA=dataset OUT=transposed_data;
            BY id_col1 id_col2;
            ID pivot_col;
            VAR value_col;
        RUN;
        """
        return self.df.groupBy(id_cols).pivot(pivot_col).agg(
            F.first(value_col)
        )

    def merge_datasets(self,
                      right_df: pyspark.sql.DataFrame,
                      on: List[str],
                      how: str = 'inner') -> pyspark.sql.DataFrame:
        """
        SAS equivalent:
        DATA merged_data;
            MERGE dataset1(IN=a) dataset2(IN=b);
            BY key1 key2;
            IF a AND b;  /* For inner join */
        RUN;
        """
        return self.df.join(right_df, on=on, how=how)

    def sql_query(self, query: str) -> pyspark.sql.DataFrame:
        """
        SAS equivalent:
        PROC SQL;
            SELECT *
            FROM dataset
            WHERE condition;
        QUIT;
        """
        self.df.createOrReplaceTempView("temp_table")
        return spark.sql(query)

    def correlation_analysis(self,
                           columns: List[str],
                           method: str = 'pearson') -> pyspark.sql.DataFrame:
        """
        SAS equivalent:
        PROC CORR DATA=dataset PEARSON;
            VAR col1 col2 col3;
        RUN;
        """
        vector_col = "correlation_features"
        assembler = VectorAssembler(
            inputCols=columns,
            outputCol=vector_col,
            handleInvalid="skip"
        )

        vector_df = assembler.transform(self.df)
        correlation_matrix = Correlation.corr(vector_df, vector_col, method)

        return correlation_matrix

    def standardize_data(self,
                        numeric_cols: List[str],
                        method: str = 'zscore') -> pyspark.sql.DataFrame:
        """
        SAS equivalent:
        PROC STDIZE DATA=dataset METHOD=STD OUT=standardized_data;
            VAR col1 col2;
        RUN;

        For min-max scaling:
        PROC STDIZE DATA=dataset METHOD=RANGE OUT=standardized_data;
            VAR col1 col2;
        RUN;
        """
        result = self.df
        for col in numeric_cols:
            if method == 'zscore':
                mean = self.df.select(F.mean(col)).collect()[0][0]
                stddev = self.df.select(F.stddev(col)).collect()[0][0]
                result = result.withColumn(
                    f"{col}_standardized",
                    (F.col(col) - mean) / stddev
                )
            elif method == 'minmax':
                min_val = self.df.select(F.min(col)).collect()[0][0]
                max_val = self.df.select(F.max(col)).collect()[0][0]
                result = result.withColumn(
                    f"{col}_standardized",
                    (F.col(col) - min_val) / (max_val - min_val)
                )
        return result

    def handle_missing(self,
                      numeric_cols: List[str],
                      method: str = 'mean') -> pyspark.sql.DataFrame:
        """
        SAS equivalent:
        PROC STDIZE DATA=dataset METHOD=MEAN REPONLY;
            VAR col1 col2;
        RUN;
        """
        imputer = Imputer(
            inputCols=numeric_cols,
            outputCols=[f"{c}_imputed" for c in numeric_cols],
            strategy=method
        )
        return imputer.fit(self.df).transform(self.df)

    def rolling_window_analysis(self,
                               column: str,
                               window_size: int,
                               agg_func: str = 'mean') -> pyspark.sql.DataFrame:
        """
        SAS equivalent:
        PROC EXPAND DATA=dataset OUT=expanded_data;
            ID time_col;
            CONVERT column / TRANSFORMOUT=(MOVAVE window_size);
        RUN;
        """
        window_spec = Window.orderBy().rowsBetween(-window_size + 1, 0)
        agg_funcs = {
            'mean': F.mean,
            'sum': F.sum,
            'min': F.min,
            'max': F.max
        }

        if agg_func not in agg_funcs:
            raise ValueError(f"Unsupported aggregation function: {agg_func}")

        return self.df.withColumn(
            f"{column}_rolling_{agg_func}_{window_size}",
            agg_funcs[agg_func](F.col(column)).over(window_spec)
        )

    def conditional_column_creation(self,
                                    new_col: str,
                                    conditions: Dict[str, str]) -> pyspark.sql.DataFrame:
        """
        SAS equivalent:
        DATA new_dataset;
            SET dataset;
            IF condition1 THEN new_col = value1;
            ELSE IF condition2 THEN new_col = value2;
        RUN;
        """
        case_expr = F.when(F.lit(False), None)  # Placeholder for chaining
        for condition, value in conditions.items():
            case_expr = case_expr.when(F.expr(condition), value)

        return self.df.withColumn(new_col, case_expr)

    def summarize_dataset(self) -> None:
        """
        Provides a summary of the dataset similar to SAS's PROC CONTENTS.
        """
        print("Schema:")
        self.df.printSchema()
        print("\nSample Data:")
        self.df.show(5)
        print("\nRow Count:", self.df.count())
        print("\nColumn Summary:")
        for col in self.df.columns:
            self.df.select(
                F.col(col).alias("column_value")
            ).groupBy("column_value").count().show(5)

    def split_dataset(self,
                     ratio: float,
                     seed: int = 42) -> (pyspark.sql.DataFrame, pyspark.sql.DataFrame):
        """
        SAS equivalent:
        PROC SURVEYSELECT DATA=dataset OUT=sampled_data SAMPRATE=ratio;
        RUN;
        """
        return self.df.randomSplit([ratio, 1 - ratio], seed)

    def time_series_forecasting(self,
                               date_col: str,
                               value_col: str,
                               periods: int) -> None:
        """
        SAS equivalent:
        PROC ARIMA DATA=dataset;
            IDENTIFY VAR=value_col;
            ESTIMATE Q=1 P=2;
            FORECAST OUT=forecast_data LEAD=periods;
        RUN;
        """
        # Placeholder for implementing forecasting models (e.g., ARIMA with statsmodels)
        pass


In [3]:
# Sample Dataset Creation
def create_health_dataset() -> pyspark.sql.DataFrame:
    """
    Creates a sample healthcare dataset for testing and demonstration

    Returns:
        pyspark.sql.DataFrame: A DataFrame containing health data
    """
    data = [
        ("P001", "Cancer", 45, "M", "ON", 10.5, "2024-01-01", "A", 98.6, 120.5, 15),
        ("P002", "Diabetes", 62, "F", "BC", 15.2, "2024-01-02", "B", 99.1, 135.2, 22),
        ("P003", "Heart Disease", 55, "M", "AB", 12.8, "2024-01-03", "A", 98.9, 142.8, 18),
        ("P004", "Cancer", 38, "F", "ON", 9.3, "2024-01-04", "C", None, 118.3, 12),
        ("P005", "Diabetes", 71, "M", "QC", 14.7, "2024-01-05", "B", 99.3, None, 20),
        ("P006", "Heart Disease", 48, "F", "ON", 11.2, "2024-01-06", "A", 98.8, 128.5, 16),
        ("P007", "Cancer", 59, "M", "BC", 13.5, "2024-01-07", "B", None, 138.9, 19),
        ("P008", "Diabetes", 42, "F", "AB", 8.9, "2024-01-08", "C", 99.0, 122.4, 14),
        ("P009", "Heart Disease", 65, "M", "QC", 16.4, "2024-01-09", "A", 99.2, None, 25),
        ("P010", "Cancer", 52, "F", "ON", 12.1, "2024-01-10", "B", 98.7, 130.6, 17)
    ]

    schema = StructType([
        StructField("patient_id", StringType(), False),
        StructField("diagnosis", StringType(), True),
        StructField("age", IntegerType(), True),
        StructField("gender", StringType(), True),
        StructField("province", StringType(), True),
        StructField("length_of_stay", DoubleType(), True),
        StructField("admission_date", StringType(), True),
        StructField("care_level", StringType(), True),
        StructField("temperature", DoubleType(), True),
        StructField("blood_pressure", DoubleType(), True),
        StructField("visits", IntegerType(), True)
    ])

    return spark.createDataFrame(data, schema)

def create_supplementary_health_dataset() -> pyspark.sql.DataFrame:
    """
    Creates a supplementary health dataset for testing merge-operations.
    Contains additional patient information that complements the main dataset.

    Returns:
        pyspark.sql.DataFrame: A DataFrame containing supplementary health data
    """
    data = [
        ("P001", "A+", 14.2, "Non-smoker", "Private", 3),
        ("P002", "B-", 13.8, "Smoker", "Medicare", 2),
        ("P003", "O+", 15.1, "Non-smoker", "Medicaid", 4),
        ("P004", "AB+", 12.9, "Former smoker", "Private", 1),
        ("P005", "A-", 13.5, "Smoker", "Uninsured", 3),
        ("P006", "O-", 14.8, "Non-smoker", "Medicare", 2),
        ("P007", "B+", 13.2, "Smoker", "Private", 4),
        ("P008", "AB-", 14.5, "Non-smoker", "Medicaid", 1),
        ("P009", "O+", 13.7, "Former smoker", "Private", 3),
        ("P010", "A+", 14.9, "Non-smoker", "Medicare", 2)
    ]

    schema = StructType([
        StructField("patient_id", StringType(), False),
        StructField("blood_type", StringType(), True),
        StructField("hemoglobin", DoubleType(), True),
        StructField("smoking_status", StringType(), True),
        StructField("insurance_type", StringType(), True),
        StructField("follow_ups_scheduled", IntegerType(), True)
    ])

    return spark.createDataFrame(data, schema)



In [4]:
# Define Unit Tests
class TestSASPySparkMigration(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.df = create_health_dataset()
        cls.migration = SASPySparkMigration(cls.df)
        # Create a second DataFrame for merge-testing
        cls.df2 = create_supplementary_health_dataset()

    def test_sort_data(self):
        """Test PROC SORT equivalent"""
        sorted_df = self.migration.sort_data(["age"])
        ages = sorted_df.select("age").collect()
        self.assertTrue(all(ages[i][0] <= ages[i+1][0]
                          for i in range(len(ages)-1)))

    def test_calculate_means(self):
        """Test PROC MEANS equivalent"""
        means_df = self.migration.calculate_means(
            ["diagnosis"],
            ["length_of_stay", "visits"]
        )
        self.assertTrue(means_df.count() > 0)
        self.assertTrue("length_of_stay_mean" in means_df.columns)
        self.assertTrue("visits_mean" in means_df.columns)

    def test_frequency_analysis(self):
        """Test PROC FREQ equivalent"""
        freq_df = self.migration.frequency_analysis(["diagnosis", "gender"])
        self.assertTrue("percentage" in freq_df.columns)
        self.assertTrue(freq_df.count() > 0)

    def test_univariate_analysis(self):
        """Test PROC UNIVARIATE equivalent"""
        univ_df = self.migration.univariate_analysis("age")
        self.assertTrue(univ_df.count() == 1)
        required_cols = ["mean", "stddev", "skewness", "kurtosis", "p25", "p50", "p75"]
        self.assertTrue(all(col in univ_df.columns for col in required_cols))

    def test_cross_tabulation(self):
        """Test PROC TABULATE equivalent"""
        cross_df = self.migration.cross_tabulation("diagnosis", "gender", normalize=True)
        self.assertTrue(cross_df.count() > 0)
        self.assertTrue("M_pct" in cross_df.columns or "F_pct" in cross_df.columns)

    def test_rank_data(self):
        """Test PROC RANK equivalent"""
        ranked_df = self.migration.rank_data("length_of_stay", ["diagnosis"])
        self.assertTrue("length_of_stay_rank" in ranked_df.columns)

    def test_format_values(self):
        """Test PROC FORMAT equivalent"""
        format_dict = {"A": "High", "B": "Medium", "C": "Low"}
        formatted_df = self.migration.format_values("care_level", format_dict)
        unique_values = formatted_df.select("care_level").distinct().collect()
        self.assertTrue(all(row[0] in ["High", "Medium", "Low"] for row in unique_values))

    def test_transpose_data(self):
        """Test PROC TRANSPOSE equivalent"""
        trans_df = self.migration.transpose_data(
            ["patient_id"],
            "diagnosis",
            "length_of_stay"
        )
        self.assertTrue("Cancer" in trans_df.columns)
        self.assertTrue("Diabetes" in trans_df.columns)

    def test_merge_datasets(self):
        """Test DATA MERGE equivalent"""
        merged_df = self.migration.merge_datasets(
            self.df2,
            on=["patient_id"],
            how="inner"
        )
        # Check if merged DataFrame has more columns than original
        self.assertTrue(len(merged_df.columns) > len(self.df.columns))
        # Check if common keys are preserved
        self.assertTrue("patient_id" in merged_df.columns)

    def test_sql_query(self):
        """Test PROC SQL equivalent"""
        query = "SELECT COUNT(*) as count, diagnosis FROM temp_table GROUP BY diagnosis"
        sql_df = self.migration.sql_query(query)
        # Check if query executed successfully
        self.assertTrue(sql_df.count() > 0)
        self.assertTrue("count" in sql_df.columns)
        self.assertTrue("diagnosis" in sql_df.columns)

    def test_correlation_analysis(self):
        """Test PROC CORR equivalent"""
        corr_df = self.migration.correlation_analysis(
            ["age", "length_of_stay", "visits"]
        )
        self.assertTrue(corr_df.count() > 0)
        # Check if correlation matrix is symmetric
        corr_matrix = corr_df.collect()[0][0].toArray()
        self.assertTrue(np.allclose(corr_matrix, corr_matrix.T))

    def test_standardize_data(self):
        """Test PROC STDIZE equivalent"""
        std_df = self.migration.standardize_data(
            ["age", "length_of_stay"],
            method="zscore"
        )
        self.assertTrue("age_standardized" in std_df.columns)
        self.assertTrue("length_of_stay_standardized" in std_df.columns)

        # Test min-max scaling
        minmax_df = self.migration.standardize_data(
            ["age"],
            method="minmax"
        )
        minmax_values = minmax_df.select("age_standardized").collect()
        self.assertTrue(all(0 <= row[0] <= 1 for row in minmax_values if row[0] is not None))

    def test_handle_missing(self):
        """Test handling missing values"""
        imputed_df = self.migration.handle_missing(
            ["temperature", "blood_pressure"]
        )
        self.assertTrue("temperature_imputed" in imputed_df.columns)
        self.assertTrue("blood_pressure_imputed" in imputed_df.columns)

        # Check if no nulls remain in imputed columns
        null_count = imputed_df.select(
            [F.count(F.when(F.col(f"{c}_imputed").isNull(), c))
             for c in ["temperature", "blood_pressure"]]
        ).collect()[0]
        self.assertTrue(all(count == 0 for count in null_count))

In [9]:
# Now run the tests
if __name__ == '__main__':
    unittest.main(argv=[''], verbosity=2, exit=False)


test_calculate_means (__main__.TestSASPySparkMigration)
Test PROC MEANS equivalent ... ok
test_correlation_analysis (__main__.TestSASPySparkMigration)
Test PROC CORR equivalent ... ok
test_cross_tabulation (__main__.TestSASPySparkMigration)
Test PROC TABULATE equivalent ... ok
test_format_values (__main__.TestSASPySparkMigration)
Test PROC FORMAT equivalent ... ok
test_frequency_analysis (__main__.TestSASPySparkMigration)
Test PROC FREQ equivalent ... ok
test_handle_missing (__main__.TestSASPySparkMigration)
Test handling missing values ... ok
test_merge_datasets (__main__.TestSASPySparkMigration)
Test DATA MERGE equivalent ... ok
test_rank_data (__main__.TestSASPySparkMigration)
Test PROC RANK equivalent ... ok
test_sort_data (__main__.TestSASPySparkMigration)
Test PROC SORT equivalent ... ok
test_sql_query (__main__.TestSASPySparkMigration)
Test PROC SQL equivalent ... ok
test_standardize_data (__main__.TestSASPySparkMigration)
Test PROC STDIZE equivalent ... ok
test_transpose_data (

In [5]:
def run_examples():
    # Create sample dataset
    health_df = create_health_dataset()
    migration = SASPySparkMigration(health_df)

    # Example 1: Sorting
    print("Sorted Data:")
    sorted_df = migration.sort_data(["diagnosis", "age"], ascending=[True, False])
    sorted_df.show()

    # Example 2: Basic Statistics by Diagnosis
    print("Basic Statistics for Length of Stay and Visits by Diagnosis:")
    migration.calculate_means(
        ["diagnosis"],
        ["length_of_stay", "visits"]
    ).show()

    # Example 3: Frequency Analysis with Cross Tabulation
    print("\nDiagnosis and Gender Distribution:")
    migration.cross_tabulation(
        "diagnosis",
        "gender",
        normalize=True
    ).show()

    # Example 4: Univariate Analysis
    print("\nUnivariate Analysis of Age:")
    migration.univariate_analysis("age").show()

    # Example 5: Rank Patients by Visits
    print("\nRanking Patients by Visits:")
    migration.rank_data(
        "visits",
        partition_cols=["diagnosis"],
        method="dense"
    ).show()

    # Example 6: Format Gender Values
    print("\nFormatted Gender Values:")
    format_dict = {"M": "Male", "F": "Female"}
    migration.format_values("gender", format_dict).show()

    # Example 7: Transpose Data
    print("\nTransposed Data (Diagnosis as Pivot):")
    migration.transpose_data(
        ["patient_id"],
        "diagnosis",
        "length_of_stay"
    ).show()

    # Example 8: Merge Datasets
    print("\nMerged Dataset:")
    other_df = create_supplementary_health_dataset()  # Assume this creates a compatible DataFrame
    migration.merge_datasets(
        other_df,
        on=["patient_id"],
        how="inner"
    ).show()

    # Example 9: SQL Analysis
    print("\nSQL Analysis of Average Stay by Diagnosis:")
    query = """
    SELECT diagnosis,
           COUNT(*) as patient_count,
           ROUND(AVG(length_of_stay), 2) as avg_stay,
           ROUND(AVG(visits), 2) as avg_visits
    FROM temp_table
    GROUP BY diagnosis
    ORDER BY avg_stay DESC
    """
    migration.sql_query(query).show()

    # Example 10: Correlation Analysis
    print("\nCorrelation Analysis:")
    migration.correlation_analysis(
        ["age", "length_of_stay", "visits"]
    ).show()

    # Example 11: Standardize Values
    print("\nStandardized Values:")
    migration.standardize_data(["age", "length_of_stay"], method="zscore").show()

    # Example 12: Handle Missing Values
    print("\nHandle Missing Values (Impute with Mean):")
    migration.handle_missing(["temperature", "blood_pressure"], method="mean").show()

    # Example 13: Rolling Window Analysis
    print("\nRolling Window Analysis (7-day moving average of temperature):")
    migration.rolling_window_analysis(
        "temperature",
        window_size=7,
        agg_func="mean"
    ).show()

    # Example 14: Conditional Column Creation
    print("\nConditional Column Creation:")
    conditions = {
        "age < 18": "'Minor'",
        "age >= 18 AND age < 65": "'Adult'",
        "age >= 65": "'Senior'"
    }
    migration.conditional_column_creation("age_group", conditions).show()

    # Example 15: Dataset Summary
    print("\nDataset Summary:")
    migration.summarize_dataset()

    # Example 16: Split Dataset
    print("\nSplit Dataset into Training and Testing:")
    train_df, test_df = migration.split_dataset(ratio=0.8)
    print("Training Data Sample:")
    train_df.show(5)
    print("Testing Data Sample:")
    test_df.show(5)

    # Example 17: Time Series Forecasting
    print("\nTime Series Forecasting (placeholder):")
    migration.time_series_forecasting("date", "length_of_stay", periods=10)

if __name__ == '__main__':
    # Run examples
    run_examples()

    # Run unittest test suite
    unittest.main(argv=['first-arg-is-ignored'], exit=False)


Sorted Data:
+----------+-------------+---+------+--------+--------------+--------------+----------+-----------+--------------+------+
|patient_id|    diagnosis|age|gender|province|length_of_stay|admission_date|care_level|temperature|blood_pressure|visits|
+----------+-------------+---+------+--------+--------------+--------------+----------+-----------+--------------+------+
|      P007|       Cancer| 59|     M|      BC|          13.5|    2024-01-07|         B|       NULL|         138.9|    19|
|      P010|       Cancer| 52|     F|      ON|          12.1|    2024-01-10|         B|       98.7|         130.6|    17|
|      P001|       Cancer| 45|     M|      ON|          10.5|    2024-01-01|         A|       98.6|         120.5|    15|
|      P004|       Cancer| 38|     F|      ON|           9.3|    2024-01-04|         C|       NULL|         118.3|    12|
|      P005|     Diabetes| 71|     M|      QC|          14.7|    2024-01-05|         B|       99.3|          NULL|    20|
|      P002

.............
----------------------------------------------------------------------
Ran 13 tests in 22.171s

OK
