The code produces a 1% sample of the original dataset that maintains the same distribution of the 'label' column for each date. The following are the steps followed:

Initialization: It sets up a PySpark session and defines the columns in the data that are considered features (excluding 'date' and 'id').

Outlier Detection:
1. For each unique date in the dataset, it filters the data for that date.
2. For each feature column, it calculates the first and third quartiles (Q1 and Q3) and then determines the Interquartile Range (IQR). Using these, it identifies the lower and upper bounds for outliers.
3. A new column is created for each feature to indicate whether a value is an outlier (1 if yes, 0 if no).
4. A combined 'label' column is derived which is set to 1 if any of the features for a given row is an outlier, otherwise 0.
5. The intermediate outlier columns for individual features are then dropped.

Stratified Sampling:
1. For each date, the code calculates the fraction of rows with each label (0 or 1).
2. It then determines the number of samples required for each label such that the total sample size is 1% of the entire dataset and the samples are evenly spread across different dates, while maintaining the same proportion of each label as in the original data.
3. The data for each date is then sampled based on these calculated fractions.

Aggregation: All the sampled dataframes for each date are unioned together to form a final sampled dataframe.
Summary Statistics: The code calculates and provides summary statistics for both the entire dataset (population) and the sampled dataset.

In [None]:
from pyspark.sql import SparkSession, functions as F
from functools import reduce

# Initialize Spark Session
spark = SparkSession.builder.appName("OutlierDetection").getOrCreate()

# Function to check if a column exists in a DataFrame
def column_exists(df, column_name):
    return column_name in df.columns

# Assuming df is your dataframe
# df = spark.read.... # Read your dataframe here

feature_columns = [c for c in df.columns if c not in ['date', 'id']]
all_dates = [row.date for row in df.select("date").distinct().collect() if row.date is not None]

sampled_dfs = []

# Calculate the desired total sample size and the sample size per date
total_count = df.count()
desired_sample_count = int(0.01 * total_count)
desired_count_per_date = desired_sample_count // len(all_dates)

for date in all_dates:
    temp_df = df.filter(F.col("date") == date)

    # Outlier Detection for each feature column
    for column in feature_columns:
        try:
            quantiles = temp_df.approxQuantile(column, [0.25, 0.75], 0.01)
            Q1, Q3 = quantiles
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            
            temp_df = temp_df.withColumn("outlier_{}".format(column),
                                         F.when((F.col(column) < lower_bound) | (F.col(column) > upper_bound), 1).otherwise(0))
        except:
            print(f"Error processing column {column} on date {date}. Skipping outlier detection for this column.")

    # Check if outlier columns were created
    for c in feature_columns:
        if "outlier_{}".format(c) not in temp_df.columns:
            print(f"Outlier column for {c} was not created!")

    # Sum the outlier columns to create the label column
    temp_df = temp_df.withColumn("label", reduce(lambda x, y: x + y, [F.col("outlier_{}".format(c)) for c in feature_columns]))

    # Drop outlier columns if they exist
    for column in feature_columns:
        outlier_column_name = "outlier_{}".format(column)
        if column_exists(temp_df, outlier_column_name):
            temp_df = temp_df.drop(outlier_column_name)

    label_counts = temp_df.groupBy("label").count().rdd.collectAsMap()
    total_for_date = temp_df.count()
    label_fractions = {label: count / total_for_date for label, count in label_counts.items()}

    samples_for_date = []
    for label, fraction in label_fractions.items():
        desired_samples = int(desired_count_per_date * fraction)
        sample_fraction = min(1.0, desired_samples/label_counts[label])
        
        if sample_fraction == 1.0 and desired_samples > label_counts[label]:
            sampled_data = temp_df.filter(F.col("label") == label).sample(withReplacement=True, fraction=sample_fraction)
        else:
            sampled_data = temp_df.filter(F.col("label") == label).sample(withReplacement=False, fraction=sample_fraction)
        
        samples_for_date.append(sampled_data)

    sampled_temp_df = samples_for_date[0]
    for s_df in samples_for_date[1:]:
        sampled_temp_df = sampled_temp_df.union(s_df)
    
    sampled_dfs.append(sampled_temp_df)

final_sampled_df = sampled_dfs[0]
for s_df in sampled_dfs[1:]:
    final_sampled_df = final_sampled_df.union(s_df)

population_summary = df.describe().toPandas()
sample_summary = final_sampled_df.describe().toPandas()

# Display the population and sample summary
print("Population Summary:")
print(population_summary)
print("\nSample Summary:")
print(sample_summary)

# Stop the Spark Session
spark.stop()
