In [None]:
import pandas as pd
from pathlib import Path
import glob
import yaml
from typing import *
from functools import reduce

import pyspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession, DataFrame, Row, Window

In [None]:
base_path = "/data/saturation/central_composite_design/ccd6"
base_path_holdout = "/data/saturation/central_composite_design/ccd6_holdout_larger"

holdout_slopes = {1, 2, 3}
holdout_effective_radius_multipliers = {1.1, 1.5, 1.9}
holdout_min_rim_percentages = {.25, .475, .7}
holdout_r_stat_multipliers = {3, 6, 9}

# base_path = "/data/saturation/central_composite_design/ccd9"
# base_path_holdout = "/data/saturation/central_composite_design/ccd9_holdout"

# holdout_slopes = {1, 1.5, 2, 2.5, 3}
# holdout_effective_radius_multipliers = {1.1, 1.3, 1.5, 1.7, 1.9}
# holdout_min_rim_percentages = {.25, .375, .5, .625, .75}
# holdout_r_stat_multipliers = {3, 4.5, 6, 7.5, 9}

n_cores = 28

In [None]:
def read_config(path: Path) -> Dict:
    with path.open("r") as config_file:
        config = yaml.safe_load(config_file)
    return config


def get_aggregations_for_column(col: str) -> Iterable:
    # Percentiles 5 to 95 step 5
    for quantile in range(1, 20):
        yield F.percentile_approx(col, quantile / 20, accuracy=int(1e6)).alias(f"{col}_{quantile*5:.0f}_percentile")
    
    yield F.percentile_approx(col, .99, accuracy=int(1e6)).alias(f"{col}_99_percentile")
    yield F.percentile_approx(col, .50, accuracy=int(1e6)).alias(f"{col}_median")
    yield F.min(col).alias(f"{col}_min")
    yield F.max(col).alias(f"{col}_max")
    yield F.mean(col).alias(f"{col}_mean")
    yield F.stddev_samp(col).alias(f"{col}_stdev")   
    

def calculate_stats(data: DataFrame) -> DataFrame:
    # Number of craters from the end of the simulation to consider as in saturation
    N_CRATERS_IN_SATURATION = 50000
    
    columns_to_calculate_stats = [
        "areal_density",
        "z",
        "za",
        "n_craters_in_study_region",
        "n_craters_added_in_study_region"
    ]
    
    # Grab the last N_CRATERS_IN_SATURATION craters from each simulation
    window = Window.partitionBy("simulation_id").orderBy(F.col("n_craters_added_in_study_region").desc())
    last_n_craters_by_sim = data.withColumn("row_number", F.row_number().over(window)) \
        .filter(F.col("row_number") <= 50) \
        .drop("row_number")

    # Set up the aggregations for each column of interest
    aggregations = [
        aggregation
        for col in columns_to_calculate_stats
        for aggregation in get_aggregations_for_column(col)
    ]

    return last_n_craters_by_sim.groupBy("simulation_id").agg(*aggregations)

In [None]:
spark = SparkSession.builder \
                    .master(f"local[{n_cores}]") \
                    .appName("Saturation") \
                    .config("spark.driver.memory", "48g") \
                    .getOrCreate()
sc = spark.sparkContext

In [None]:
# Read statistics
stats_df = spark.read.parquet(f"{base_path}/*/statistics_*.parquet").cache()

In [None]:
data = calculate_stats(stats_df)

In [None]:
# Read configs
completed_filenames = list(Path(base_path).glob("*/completed.txt"))
configs = map(lambda x: x.parent / "config.yaml", completed_filenames)
configs = map(read_config, configs)
configs = sc.parallelize(configs)

config_columns = [
    "simulation_id",
    "slope",
    "r_stat_multiplier",
    "effective_radius_multiplier",
    "min_rim_percentage"
]
configs_df = configs.toDF().select(config_columns).repartition("simulation_id")

In [None]:
joined = configs_df.join(data, on="simulation_id")

In [None]:
# Write out the result
joined.toPandas().to_csv(f"{base_path}/post_saturation_statistics.csv", index=False)

In [None]:
# Prepare samples from each simulation, overall and post-saturation

In [None]:
def read_config(path: Path) -> Dict:
    with path.open("r") as config_file:
        config = yaml.safe_load(config_file)
    return config

def read_configs(base_path: str) -> pyspark.RDD:
    completed_filenames = list(Path(base_path).glob("*/completed.txt"))
    configs = map(lambda x: x.parent / "config.yaml", completed_filenames)
    configs = map(read_config, configs)
    return sc.parallelize(configs)

def create_configs_df(configs: pyspark.RDD) -> DataFrame:
    config_columns = [
        "simulation_id",
        "slope",
        "r_stat_multiplier",
        "effective_radius_multiplier",
        "min_rim_percentage"
    ]
    return configs.toDF().select(config_columns)

def sample_post_saturation_by_simulation(data: DataFrame,
                                         configs: pyspark.RDD,
                                         n_craters_to_sample: int) -> DataFrame:
    configs_df = create_configs_df(configs)

    window = Window.partitionBy("simulation_id").orderBy(F.col("n_craters_added_in_study_region"))
    craters_with_row_number = data.withColumn("row_number", F.row_number().over(window))
    
    saturation_points = craters_with_row_number.groupby("simulation_id").agg(F.max("row_number").alias("n_craters_max"))
    saturation_points = saturation_points.withColumn("saturation_point", (F.col("n_craters_max") / 3 * 2).cast("int"))
    
    with_saturation_points = craters_with_row_number.join(saturation_points, on="simulation_id", how="inner")
    
    filtered = with_saturation_points \
        .filter(F.col("row_number") - F.col("saturation_point") >= 0) \
        .cache() \
        .filter((F.col("row_number") - F.col("saturation_point")) % ((F.col("n_craters_max") - F.col("saturation_point")) / n_craters_to_sample).cast("int") == 0) \
        .drop("row_number") \
        .drop("saturation_point") \
        .drop("n_craters_max")
    
    return configs_df.join(filtered, on="simulation_id")

def sample_by_simulation(data: DataFrame,
                         configs: pyspark.RDD,
                         n_craters_to_sample: int) -> DataFrame:
    configs_df = create_configs_df(configs)

    window = Window.partitionBy("simulation_id").orderBy(F.col("n_craters_added_in_study_region"))
    craters_with_row_number = data.withColumn("row_number", F.row_number().over(window))
    
    saturation_points = craters_with_row_number.groupby("simulation_id").agg(F.max("row_number").alias("n_craters_max"))
    with_saturation_points = craters_with_row_number.join(saturation_points, on="simulation_id", how="inner")
    
    filtered = with_saturation_points \
        .filter(
            ((F.col("row_number") < F.lit(50000)) & (F.col("row_number") % (F.lit(50000) / n_craters_to_sample).cast("int") == 0))
            | ((F.col("row_number") % (F.col("n_craters_max") / n_craters_to_sample)).cast("int") == 0)
        ) \
        .drop("row_number") \
        .drop("n_craters_max")
    
    return configs_df.join(filtered, on="simulation_id")

In [None]:
train_configs = read_configs(base_path)
configs = train_configs.toDF().toPandas()

in_sample_holdout_ids = configs.simulation_id[configs.slope.isin(holdout_slopes)
                                              & configs.effective_radius_multiplier.isin(holdout_effective_radius_multipliers)
                                              & configs.min_rim_percentage.isin(holdout_min_rim_percentages)
                                              & configs.r_stat_multiplier.isin(holdout_r_stat_multipliers)]
in_sample_holdout_ids = set(in_sample_holdout_ids)

In [None]:
train_df = spark.read.parquet(f"{base_path}/*/statistics_*.parquet").repartition("simulation_id").cache()

train_configs = read_configs(base_path).cache()

in_sample_holdout_df = train_df.filter(train_df.simulation_id.isin(in_sample_holdout_ids))
train_df = train_df.filter(~train_df.simulation_id.isin(in_sample_holdout_ids))

holdout_df = spark.read.parquet(f"{base_path_holdout}/*/statistics_*.parquet").cache()
holdout_configs = read_configs(base_path_holdout)

n_craters_to_sample = [
    100,
    500,
    1000,
    5000,
    10000
]
for n in n_craters_to_sample:
    sample = sample_post_saturation_by_simulation(train_df, train_configs, n)
    sample.toPandas().to_csv(f"{base_path}/post_saturation_sample_{n}.csv", index=False)
    
    sample = sample_post_saturation_by_simulation(in_sample_holdout_df, train_configs, n)
    sample.toPandas().to_csv(f"{base_path}/post_saturation_in_sample_holdout_sample_{n}.csv", index=False)
    
    sample = sample_post_saturation_by_simulation(holdout_df, holdout_configs, n)
    sample.toPandas().to_csv(f"{base_path_holdout}/post_saturation_sample_{n}.csv", index=False)

In [None]:
train_df = spark.read.parquet(f"{base_path}/*/statistics_*.parquet").repartition("simulation_id").cache()

train_configs = read_configs(base_path).cache()

in_sample_holdout_df = train_df.filter(train_df.simulation_id.isin(in_sample_holdout_ids))
train_df = train_df.filter(~train_df.simulation_id.isin(in_sample_holdout_ids))

holdout_df = spark.read.parquet(f"{base_path_holdout}/*/statistics_*.parquet").cache()
holdout_configs = read_configs(base_path_holdout)

n_craters_to_sample = [
    100,
    500,
    1000,
    5000,
    10000
]
for n in n_craters_to_sample:
    sample = sample_by_simulation(train_df, train_configs, n)
    sample.toPandas().to_csv(f"{base_path}/sample_{n}.csv", index=False)
    
    sample = sample_by_simulation(in_sample_holdout_df, train_configs, n)
    sample.toPandas().to_csv(f"{base_path}/in_sample_holdout_sample_{n}.csv", index=False)
    
    sample = sample_by_simulation(holdout_df, holdout_configs, n)
    sample.toPandas().to_csv(f"{base_path_holdout}/sample_{n}.csv", index=False)

In [None]:
train_configs_df = train_configs.toDF().repartition("simulation_id").cache()

for holdout_simulation_id in in_sample_holdout_ids:
    filtered = in_sample_holdout_df.filter(F.col("simulation_id") == holdout_simulation_id)
    joined = train_configs_df.join(filtered, on="simulation_id").sort(F.col("n_craters_added_in_study_region"))
    joined.toPandas().to_parquet(f"{base_path}/simulation_{holdout_simulation_id}.parquet")

In [None]:
holdout_configs_df = holdout_configs.toDF().repartition("simulation_id").cache()
holdout_simulation_ids = [x["simulation_id"] for x in holdout_configs.collect()]

for holdout_simulation_id in holdout_simulation_ids:
    filtered = holdout_df.filter(F.col("simulation_id") == holdout_simulation_id)
    joined = holdout_configs_df.join(filtered, on="simulation_id").sort(F.col("n_craters_added_in_study_region"))
    joined.toPandas().to_parquet(f"{base_path_holdout}/simulation_{holdout_simulation_id}.parquet")