In [1]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark import SparkContext
from pyspark.sql import types as t
from collections import namedtuple

import numpy as np
from datetime import datetime
from datetime import timedelta
from math import floor

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1571416467653_0002,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


In [6]:
def read_ground_truth():
    sources = ["analyticsEvents", "scout", "speedTest", "xre"]
    days = [19, 20, 21, 22, 23, 24, 25]

    ground_truth_raw = None
    year = 2019
    month = '07'
    for day in days:
        for source in sources:    
            new_df = spark \
                    .read \
                    .parquet("s3://reachability-ground-truth/GroundTruthV1.237/" +
                             f"source={'analyticsEvents'}/" +
                             f"year={year}/month={month}/day={day}/*.gz.parquet") \
                    .withColumn("source", F.lit(source)) \
                    .withColumn("year", F.lit(year)) \
                    .withColumn("month", F.lit(month)) \
                    .withColumn("day", F.lit(day))

            new_df = new_df.withColumnRenamed("mac", "gt_mac")
            if ground_truth_raw is None:
                ground_truth_raw = new_df
            else:
                ground_truth_raw = ground_truth_raw \
                    .unionAll(new_df)
    
            ground_truth_raw.cache()
            raw_count = ground_truth_raw.count()
            print("Ground Truth source: for day: {} raw_count: {}".format(day, raw_count))    


    # Count and Cache ground truth for one day
    return ground_truth_raw

In [7]:
# Very strange that the month partition for Ground Truth contains a leading '0' but preprocessed data doesn't

ground_truth = read_ground_truth()

Ground Truth source: for day: 19 raw_count: 2222761
Ground Truth source: for day: 19 raw_count: 4445522
Ground Truth source: for day: 19 raw_count: 6668283
Ground Truth source: for day: 19 raw_count: 8891044
Ground Truth source: for day: 20 raw_count: 10744325
Ground Truth source: for day: 20 raw_count: 12597606
Ground Truth source: for day: 20 raw_count: 14450887
Ground Truth source: for day: 20 raw_count: 16304168
Ground Truth source: for day: 21 raw_count: 17964322
Ground Truth source: for day: 21 raw_count: 19624476
Ground Truth source: for day: 21 raw_count: 21284630
Ground Truth source: for day: 21 raw_count: 22944784
Ground Truth source: for day: 22 raw_count: 25553906
Ground Truth source: for day: 22 raw_count: 28163028
Ground Truth source: for day: 22 raw_count: 30772150
Ground Truth source: for day: 22 raw_count: 33381272
Ground Truth source: for day: 23 raw_count: 35895690
Ground Truth source: for day: 23 raw_count: 38410108
Ground Truth source: for day: 23 raw_count: 409245

In [12]:
ground_truth.write.partitionBy("year","month", "day")\
        .parquet("s3://via-warren/ground_truth_1_37")

'path s3://via-warren/ground_truth_1_37 already exists.;'
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 841, in parquet
    self._jwrite.parquet(path)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'path s3://via-warren/ground_truth_1_37 already exists.;'



In [4]:
def get_feature_columns():
        # This code gets lists of all of the feature columns in the preprocessed data
    delta_columns = []
    value_columns = []
    lag_inds = ["lead1", "lag0", "lag1", "lag2"]
    source_prefixes = ["scout", "genome_poller"]
    for prefix in source_prefixes:
        for lag_ind in lag_inds:
            delta_columns.append(prefix + "_" + lag_ind + "_" + "delta")
            value_columns.append(prefix + "_" + lag_ind + "_" + "val")
            
    return namedtuple('feature_columns', 'values deltas')(value_columns, delta_columns)

In [5]:
get_feature_columns().values

['scout_lead1_val', 'scout_lag0_val', 'scout_lag1_val', 'scout_lag2_val', 'genome_poller_lead1_val', 'genome_poller_lag0_val', 'genome_poller_lag1_val', 'genome_poller_lag2_val']

In [6]:
def read_preprocessed_data(year, month, day, feature_columns):
    # Read in preprocessed data for one day
    preprocessed_data = spark.read. \
        parquet("s3://rads-prepro-data/" +
                f"year={year}/month={month}/day={day}/*.gz.parquet")
    
    # Count and cache preprocessed data for one day
    preprocessed_data.cache()
    preprocessed_count = preprocessed_data.count()
    print("Preprocessed Data count for day " + str(day) + ": " + str(preprocessed_count))

    # Update delta columns, accounting for 0 delta case (need to do this because of division issues)
    for column in feature_columns.deltas:
        replacement = None
        if "lag0" in column:
            replacement = 0.0001 # Will weight current values with no delta very highly (these will trump all)
        else:
            replacement = 9999999999 # Will weight non-current values with no delta very low (essentially throwing out)
        preprocessed_data = preprocessed_data. \
            withColumn(column, F.when(F.col(column) == 0, replacement). \
                                 otherwise(F.abs(F.col(column))))
    return preprocessed_data

In [7]:
preprocessed_data = read_preprocessed_data(2019, 7, 19, get_feature_columns())
preprocessed_data.count()

Preprocessed Data count for day 19: 13149363551
13149363551

In [26]:
def simple_heuristic_model(preprocessed_data, feature_columns):
    # Update value columns, reachable -> 1, unreachable -> -1 (need to do this for heuristic to work)
    for column in value_columns:
        if "scout" in column:
            preprocessed_data = preprocessed_data. \
                withColumn(column, F.when(F.col(column) == 6, 1).otherwise(-1))
        if "genome_poller" in column:
            preprocessed_data = preprocessed_data. \
                withColumn(column, F.when(F.col(column) == 1, 1).otherwise(-1))

    # Heuristic score computed here. If >= 0 then reachable prediction, otherwise nonreachable prediction
    model_scored_data = preprocessed_data. \
        withColumn("reachability_score",
                   0.75*(F.col("scout_lead1_val") / F.col("scout_lead1_delta") + 
                         F.col("scout_lag0_val") / F.col("scout_lag0_delta") +
                         F.col("scout_lag1_val") / F.col("scout_lag1_delta") + 
                         F.col("scout_lag2_val") / F.col("scout_lag2_delta")) + 
                   0.25*(F.col("genome_poller_lead1_val") / F.col("genome_poller_lead1_delta") + 
                         F.col("genome_poller_lag0_val") / F.col("genome_poller_lag0_delta") +
                         F.col("genome_poller_lag1_val") / F.col("genome_poller_lag1_delta") + 
                         F.col("genome_poller_lag2_val") / F.col("genome_poller_lag2_delta"))). \
        withColumn("heuristic_guess", F.when(F.col("reachability_score") >= 0, 1).otherwise(-1)). \
        select("mac", "ts", "heuristic_guess")
    
    # Count and cache scored preprocessed data for one day
    model_scored_data.cache()
    model_scored_data = preprocessed_data.count()
    print("Scored Preprocessed Data count for day " + str(day) + ": " + str(preprocessed_count))
    
    return model_scored_data

In [27]:
sources = ["analyticsEvents", "scout", "speedTest", "xre"]
days = [19, 20, 21, 22, 23, 24, 25]

ground_truth_raw = None
year = 2019
month = 7
for day in days:
    for source in sources:
        # Read in raw ground truth from all sources for one day
        ground_truth_raw = read_ground_truth(ground_truth_raw, source, year, month, day)
    
    raw_count = ground_truth_raw.count()
    print("Ground Truth source: for day: {} raw_count: {}".format(day, raw_count))

Ground Truth source: analyticsEvents for day: 19 raw_count: 2222761
Ground Truth source: scout for day: 19 raw_count: 4445522
Ground Truth source: speedTest for day: 19 raw_count: 6668283
Ground Truth source: xre for day: 19 raw_count: 8891044
Ground Truth source: for day: 19 raw_count: 8891044
Ground Truth source: analyticsEvents for day: 20 raw_count: 10744325
Ground Truth source: scout for day: 20 raw_count: 12597606
Ground Truth source: speedTest for day: 20 raw_count: 14450887
Ground Truth source: xre for day: 20 raw_count: 16304168
Ground Truth source: for day: 20 raw_count: 16304168
Ground Truth source: analyticsEvents for day: 21 raw_count: 17964322
Ground Truth source: scout for day: 21 raw_count: 19624476
Ground Truth source: speedTest for day: 21 raw_count: 21284630
Ground Truth source: xre for day: 21 raw_count: 22944784
Ground Truth source: for day: 21 raw_count: 22944784
Ground Truth source: analyticsEvents for day: 22 raw_count: 25553906
Ground Truth source: scout for da

In [None]:
sources = ["analyticsEvents", "scout", "speedTest", "xre"]
days = ["19", "20", "21", "22", "23", "24", "25"]

ground_truth_raw = None
year = '2019'
month = '07'
for day in days:
    for source in sources:
        # Read in raw ground truth from all sources for one day
        ground_truth_raw = read_ground_truth(ground_truth_raw, year, month, day)
    
    raw_count = ground_truth_raw.count()
    print("Ground Truth source: {} for day: {} count: {}".format(source, day, raw_count))

    # Read in preprocessed data for one day
    preprocessed_data = spark.read. \
        parquet("s3://rads-prepro-data/" +
                f"year={year}/month={month}/day={day}/*.gz.parquet")
    
    # Count and cache preprocessed data for one day
    preprocessed_data.cache()
    preprocessed_count = preprocessed_data.count()
    print("Preprocessed Data count for day " + str(day) + ": " + str(preprocessed_count))

    # This code gets lists of all of the feature columns in the preprocessed data
    delta_columns = []
    value_columns = []
    lag_inds = ["lead1", "lag0", "lag1", "lag2"]
    source_prefixes = ["scout", "genome_poller"]
    for prefix in source_prefixes:
        for lag_ind in lag_inds:
            delta_columns.append(prefix + "_" + lag_ind + "_" + "delta")
            value_columns.append(prefix + "_" + lag_ind + "_" + "val")
            
    # Get heuristic score for preprocessed data
    scored_preprocessed_data = preprocessed_data

    # Update delta columns, accounting for 0 delta case (need to do this because of division issues)
    for column in delta_columns:
        replacement = None
        if "lag0" in column:
            replacement = 0.0001 # Will weight current values with no delta very highly (these will trump all)
        else:
            replacement = 9999999999 # Will weight non-current values with no delta very low (essentially throwing out)
        scored_preprocessed_data = scored_preprocessed_data. \
            withColumn(column, F.when(F.col(column) == 0, replacement). \
                                 otherwise(F.abs(F.col(column))))

    # Update value columns, reachable -> 1, unreachable -> -1 (need to do this for heuristic to work)
    for column in value_columns:
        if "scout" in column:
            scored_preprocessed_data = scored_preprocessed_data. \
                withColumn(column, F.when(F.col(column) == 6, 1).otherwise(-1))
        if "genome_poller" in column:
            scored_preprocessed_data = scored_preprocessed_data. \
                withColumn(column, F.when(F.col(column) == 1, 1).otherwise(-1))

    # Heuristic score computed here. If >= 0 then reachable prediction, otherwise nonreachable prediction
    scored_preprocessed_data = scored_preprocessed_data. \
        withColumn("reachability_score",
                   0.75*(F.col("scout_lead1_val") / F.col("scout_lead1_delta") + 
                         F.col("scout_lag0_val") / F.col("scout_lag0_delta") +
                         F.col("scout_lag1_val") / F.col("scout_lag1_delta") + 
                         F.col("scout_lag2_val") / F.col("scout_lag2_delta")) + 
                   0.25*(F.col("genome_poller_lead1_val") / F.col("genome_poller_lead1_delta") + 
                         F.col("genome_poller_lag0_val") / F.col("genome_poller_lag0_delta") +
                         F.col("genome_poller_lag1_val") / F.col("genome_poller_lag1_delta") + 
                         F.col("genome_poller_lag2_val") / F.col("genome_poller_lag2_delta"))). \
        withColumn("heuristic_guess", F.when(F.col("reachability_score") >= 0, 1).otherwise(-1)). \
        select("mac", "ts", "heuristic_guess")
    
    # Count and cache scored preprocessed data for one day
    scored_preprocessed_data.cache()
    scored_preprocessed_count = scored_preprocessed_data.count()
    print("Scored Preprocessed Data count for day " + str(day) + ": " + str(scored_preprocessed_count))
    
    # Unpersist original preprocessed data, don't need
    preprocessed_data.unpersist()
    
    # Join Ground Truth to Scored Preprocessed data to assess accuracy
    accuracy_join = scored_preprocessed_data \
        .join(ground_truth_raw,
              (scored_preprocessed_data.mac == ground_truth_raw.gt_mac) &
              (ground_truth_raw.startTs <= scored_preprocessed_data.ts) &
              (scored_preprocessed_data.ts <= ground_truth_raw.endTs),
              "left_outer") \
        .groupBy("mac", "ts", "heuristic_guess", "reachable") \
        .agg(F.collect_set(F.col("source")).alias("sources"))
    
    # Aggregate Accuracy Metrics into Confusion Matrix Measurements
    accuracy_aggregates = accuracy_join \
        .groupBy("heuristic_guess", "reachable", "sources") \
        .agg(F.count("*").alias("entries"))
    
    # Count and Cache Accuracy Aggregates so I can see them
    accuracy_aggregates.cache()
    print("Accuracy Aggregates Completed, " + str(accuracy_aggregates.count()) + " entries")
    
    if acc_agg is None:
        acc_agg = accuracy_aggregates
    else:
        acc_agg = acc_agg.unionAll(accuracy_aggregates)
    acc_agg.cache()
    
    # Write out data to s3
    accuracy_aggregates \
        .withColumn("sources", F.regexp_replace(
                                   F.regexp_replace(
                                       F.col("sources").cast("string"), "[", ""
                                   ), "]", ""
                               )) \
        .withColumn("year", F.lit("2019")) \
        .withColumn("month", F.lit("7")) \
        .withColumn("day", F.lit(day)) \
        .coalesce(1) \
        .write \
        .mode("append").format("csv") \
        .save(f"s3://via-nathan/heuristic_accuracy_assessment/")
    
    # unpersist data
    scored_preprocessed_data.unpersist()
    ground_truth_raw.unpersist()
    
    

In [15]:
ground_truth_raw = None
new_df = spark \
        .read \
        .parquet("s3://reachability-ground-truth/GroundTruthV1.237/" +
                 f"source={'analyticsEvents'}/" +
                 f"year=2019/month=07/day=19/*.gz.parquet") \
        .withColumn("source", F.lit('analyticsEvents')) \
        .withColumn("year", F.lit("2019")) \
        .withColumn("month", F.lit("07")) \
        .withColumn("day", F.lit('19'))

new_df = new_df.withColumnRenamed("mac", "gt_mac")
if ground_truth_raw is None:
    ground_truth_raw = new_df
else:
    ground_truth_raw = ground_truth_raw \
            .unionAll(new_df)

In [16]:
ground_truth_raw.printSchema()

root
 |-- gt_mac: string (nullable = true)
 |-- startTs: long (nullable = true)
 |-- endTs: long (nullable = true)
 |-- reachable: boolean (nullable = true)
 |-- source: string (nullable = false)
 |-- year: string (nullable = false)
 |-- month: string (nullable = false)
 |-- day: string (nullable = false)

In [2]:
acc_agg = None

In [2]:
acc_agg.show()

name 'acc_agg' is not defined
Traceback (most recent call last):
NameError: name 'acc_agg' is not defined



In [35]:
scored_preprocessed_data.unpersist()
ground_truth_raw.unpersist()
accuracy_aggregates.unpersist()

DataFrame[heuristic_guess: int, reachable: boolean, sources: array<string>, entries: bigint]

In [27]:
sources = ["analyticsEvents", "scout", "speedTest", "xre"]
# days = ["19", "20", "21", "22", "23", "24", "25"]
day = "25"

ground_truth_raw = None

for source in sources:
    # Read in raw ground truth from all sources for one day
    new_df = spark \
        .read \
        .parquet("s3://reachability-ground-truth/GroundTruthV1.22/" +
                 f"source={source}/" +
                 f"year=2019/month=07/day={day}/*.gz.parquet") \
        .withColumn("source", F.lit(source)) \
        .withColumn("year", F.lit("2019")) \
        .withColumn("month", F.lit("07")) \
        .withColumn("day", F.lit(day))
    if ground_truth_raw is None:
        ground_truth_raw = new_df
    else:
        ground_truth_raw = ground_truth_raw \
            .unionAll(new_df)

# Count and Cache ground truth for one day
ground_truth_raw = ground_truth_raw.withColumnRenamed("mac", "gt_mac")
ground_truth_raw.cache()
raw_count = ground_truth_raw.count()
print("Ground Truth raw count for day " + str(day) + ": " + str(raw_count))


    

Ground Truth raw count for day 25: 292738671

In [28]:
# Read in preprocessed data for one day
preprocessed_data = spark.read. \
    parquet("s3://rads-prepro-data/" +
            f"year=2019/month=7/day={day}/*.gz.parquet")

# Count and cache preprocessed data for one day
# preprocessed_data.cache()
# preprocessed_count = preprocessed_data.count()
# print("Preprocessed Data count for day " + str(day) + ": " + str(preprocessed_count))



In [29]:
# This code gets lists of all of the feature columns in the preprocessed data
delta_columns = []
value_columns = []
lag_inds = ["lead1", "lag0", "lag1", "lag2"]
source_prefixes = ["scout", "genome_poller"]
for prefix in source_prefixes:
    for lag_ind in lag_inds:
        delta_columns.append(prefix + "_" + lag_ind + "_" + "delta")
        value_columns.append(prefix + "_" + lag_ind + "_" + "val")

# Get heuristic score for preprocessed data
scored_preprocessed_data = preprocessed_data

# Update delta columns, accounting for 0 delta case (need to do this because of division issues)
for column in delta_columns:
    replacement = None
    if "lag0" in column:
        replacement = 0.0001 # Will weight current values with no delta very highly (these will trump all)
    else:
        replacement = 9999999999 # Will weight non-current values with no delta very low (essentially throwing out)
    scored_preprocessed_data = scored_preprocessed_data. \
        withColumn(column, F.when(F.col(column) == 0, replacement). \
                             otherwise(F.abs(F.col(column))))

# Update value columns, reachable -> 1, unreachable -> -1 (need to do this for heuristic to work)
for column in value_columns:
    if "scout" in column:
        scored_preprocessed_data = scored_preprocessed_data. \
            withColumn(column, F.when(F.col(column) == 6, 1).otherwise(-1))
    if "genome_poller" in column:
        scored_preprocessed_data = scored_preprocessed_data. \
            withColumn(column, F.when(F.col(column) == 1, 1).otherwise(-1))

# Heuristic score computed here. If >= 0 then reachable prediction, otherwise nonreachable prediction
scored_preprocessed_data = scored_preprocessed_data. \
    withColumn("reachability_score",
               0.75*(F.col("scout_lead1_val") / F.col("scout_lead1_delta") + 
                     F.col("scout_lag0_val") / F.col("scout_lag0_delta") +
                     F.col("scout_lag1_val") / F.col("scout_lag1_delta") + 
                     F.col("scout_lag2_val") / F.col("scout_lag2_delta")) + 
               0.25*(F.col("genome_poller_lead1_val") / F.col("genome_poller_lead1_delta") + 
                     F.col("genome_poller_lag0_val") / F.col("genome_poller_lag0_delta") +
                     F.col("genome_poller_lag1_val") / F.col("genome_poller_lag1_delta") + 
                     F.col("genome_poller_lag2_val") / F.col("genome_poller_lag2_delta"))). \
    withColumn("heuristic_guess", F.when(F.col("reachability_score") >= 0, 1).otherwise(-1)). \
    select("mac", "ts", "heuristic_guess")

# Count and cache scored preprocessed data for one day
scored_preprocessed_data.cache()
scored_preprocessed_count = scored_preprocessed_data.count()
print("Scored Preprocessed Data count for day " + str(day) + ": " + str(scored_preprocessed_count))



Scored Preprocessed Data count for day 25: 13146357906

In [30]:
# Unpersist original preprocessed data, don't need
# preprocessed_data.unpersist()



In [31]:
# Join Ground Truth to Scored Preprocessed data to assess accuracy
accuracy_join = scored_preprocessed_data \
    .join(ground_truth_raw,
          (scored_preprocessed_data.mac == ground_truth_raw.gt_mac) &
          (ground_truth_raw.startTs <= scored_preprocessed_data.ts) &
          (scored_preprocessed_data.ts <= ground_truth_raw.endTs),
          "left_outer") \
    .groupBy("mac", "ts", "heuristic_guess", "reachable") \
    .agg(F.collect_set(F.col("source")).alias("sources"))

# Aggregate Accuracy Metrics into Confusion Matrix Measurements
accuracy_aggregates = accuracy_join \
    .groupBy("heuristic_guess", "reachable", "sources") \
    .agg(F.count("*").alias("entries"))

# Count and Cache Accuracy Aggregates so I can see them
accuracy_aggregates.cache()
print("Accuracy Aggregates Completed, " + str(accuracy_aggregates.count()) + " entries")

Accuracy Aggregates Completed, 12 entries

In [32]:
accuracy_aggregates \
    .withColumn("sources", F.regexp_replace(
                               F.regexp_replace(
                                   F.regexp_replace(
                                       F.col("sources").cast("string"), "\[", ""
                                   ), "\]", ""
                               ), "\,", "")).show()

+---------------+---------+-------------------+-----------+
|heuristic_guess|reachable|            sources|    entries|
+---------------+---------+-------------------+-----------+
|             -1|     true|          speedTest|         26|
|             -1|     null|                   |   56414423|
|              1|     true|          speedTest|      64883|
|              1|     null|                   |13035091354|
|              1|     true|xre analyticsEvents|      78262|
|             -1|     true|xre analyticsEvents|         64|
|              1|     true|    analyticsEvents|   12162818|
|              1|    false|              scout|       2611|
|              1|     true|                xre|    8321245|
|             -1|     true|                xre|      82528|
|             -1|     true|    analyticsEvents|       5419|
|             -1|    false|              scout|   33910703|
+---------------+---------+-------------------+-----------+

In [33]:
# Write out data to s3
accuracy_aggregates \
    .withColumn("sources", F.regexp_replace(
                               F.regexp_replace(
                                   F.regexp_replace(
                                       F.col("sources").cast("string"), "\[", ""
                                   ), "\]", ""
                               ), "\,", "")) \
    .withColumn("year", F.lit("2019")) \
    .withColumn("month", F.lit("7")) \
    .withColumn("day", F.lit(day)) \
    .coalesce(1) \
    .write \
    .mode("append").format("csv") \
    .save(f"s3://via-nathan/heuristic_accuracy_assessment/day={day}/")

In [34]:
accuracy_aggregates.show() # the 25th

+---------------+---------+--------------------+-----------+
|heuristic_guess|reachable|             sources|    entries|
+---------------+---------+--------------------+-----------+
|             -1|     true|         [speedTest]|         26|
|             -1|     null|                  []|   56414423|
|              1|     true|         [speedTest]|      64883|
|              1|     null|                  []|13035091354|
|              1|     true|[xre, analyticsEv...|      78262|
|             -1|     true|[xre, analyticsEv...|         64|
|              1|     true|   [analyticsEvents]|   12162818|
|              1|    false|             [scout]|       2611|
|              1|     true|               [xre]|    8321245|
|             -1|     true|               [xre]|      82528|
|             -1|     true|   [analyticsEvents]|       5419|
|             -1|    false|             [scout]|   33910703|
+---------------+---------+--------------------+-----------+

In [25]:
accuracy_aggregates.show() # the 24th

+---------------+---------+--------------------+-----------+
|heuristic_guess|reachable|             sources|    entries|
+---------------+---------+--------------------+-----------+
|             -1|     true|         [speedTest]|         27|
|             -1|     null|                  []|   59321053|
|              1|     true|         [speedTest]|      66683|
|              1|     null|                  []|13015626171|
|              1|     true|[xre, analyticsEv...|      76568|
|             -1|     true|[xre, analyticsEv...|         81|
|              1|     true|   [analyticsEvents]|   11894990|
|              1|    false|             [scout]|       2329|
|              1|     true|               [xre]|    8544465|
|             -1|     true|               [xre]|      66862|
|             -1|     true|   [analyticsEvents]|       4427|
|             -1|    false|             [scout]|   41673217|
+---------------+---------+--------------------+-----------+

In [17]:
accuracy_aggregates.show() # the 23rd

+---------------+---------+--------------------+-----------+
|heuristic_guess|reachable|             sources|    entries|
+---------------+---------+--------------------+-----------+
|             -1|     true|         [speedTest]|         25|
|             -1|     null|                  []|   56685196|
|              1|     true|         [speedTest]|      59529|
|              1|     null|                  []|12986535890|
|              1|     true|[xre, analyticsEv...|      79483|
|             -1|     true|[xre, analyticsEv...|         65|
|              1|     true|   [analyticsEvents]|   11804385|
|              1|    false|             [scout]|       3188|
|              1|     true|               [xre]|    8685031|
|             -1|     true|               [xre]|      89109|
|             -1|     true|   [analyticsEvents]|       5561|
|             -1|    false|             [scout]|   74869067|
+---------------+---------+--------------------+-----------+

In [9]:
accuracy_aggregates.show() # the 22nd

+---------------+---------+--------------------+-----------+
|heuristic_guess|reachable|             sources|    entries|
+---------------+---------+--------------------+-----------+
|             -1|     true|         [speedTest]|         13|
|             -1|     null|                  []|   55494778|
|              1|     true|         [speedTest]|      62350|
|              1|     null|                  []|12999524260|
|              1|     true|[xre, analyticsEv...|      79933|
|             -1|     true|[xre, analyticsEv...|         68|
|              1|     true|   [analyticsEvents]|   13678165|
|              1|    false|             [scout]|       2469|
|              1|     true|               [xre]|    8737318|
|             -1|     true|               [xre]|      83387|
|             -1|     true|   [analyticsEvents]|       4831|
|             -1|    false|             [scout]|   45913223|
+---------------+---------+--------------------+-----------+

In [30]:
accuracy_aggregates.show() # the 21st

+---------------+---------+--------------------+-----------+
|heuristic_guess|reachable|             sources|    entries|
+---------------+---------+--------------------+-----------+
|             -1|     true|         [speedTest]|         12|
|             -1|     null|                  []|   53589101|
|              1|     true|         [speedTest]|      61639|
|              1|     null|                  []|12993801419|
|              1|     true|[xre, analyticsEv...|      79216|
|             -1|     true|[xre, analyticsEv...|         62|
|              1|     true|   [analyticsEvents]|   13710594|
|              1|    false|             [scout]|       2665|
|              1|     true|               [xre]|    9553401|
|             -1|     true|               [xre]|      80410|
|             -1|     true|   [analyticsEvents]|       4031|
|             -1|    false|             [scout]|   68504744|
+---------------+---------+--------------------+-----------+

In [19]:
accuracy_aggregates.show() # the 20th

+---------------+---------+--------------------+-----------+
|heuristic_guess|reachable|             sources|    entries|
+---------------+---------+--------------------+-----------+
|             -1|     true|         [speedTest]|         13|
|             -1|     null|                  []|   53756635|
|              1|     true|         [speedTest]|      56472|
|              1|     null|                  []|13017873036|
|              1|     true|[xre, analyticsEv...|      78966|
|             -1|     true|[xre, analyticsEv...|         61|
|              1|     true|   [analyticsEvents]|   14103325|
|              1|    false|             [scout]|       3019|
|              1|     true|               [xre]|    9148220|
|             -1|     true|               [xre]|      79879|
|             -1|     true|   [analyticsEvents]|       3928|
|             -1|    false|             [scout]|   55250660|
+---------------+---------+--------------------+-----------+

In [13]:
accuracy_aggregates.show()

+---------------+---------+--------------------+--------+
|heuristic_guess|reachable|             sources| entries|
+---------------+---------+--------------------+--------+
|             -1|     null|                  []|  272385|
|              1|     true|         [speedTest]|     287|
|              1|     null|                  []|65388693|
|              1|     true|[xre, analyticsEv...|     453|
|              1|     true|   [analyticsEvents]|   73393|
|              1|    false|             [scout]|      17|
|              1|     true|               [xre]|   42266|
|             -1|     true|               [xre]|     371|
|             -1|     true|   [analyticsEvents]|      25|
|             -1|    false|             [scout]|  177388|
+---------------+---------+--------------------+--------+

In [14]:
accuracy_aggregates.agg(F.sum(
    F.when(F.array_contains(F.col("sources"), "xre") & F.array_contains(F.col("sources"), "analyticeEvents"), 
           F.col("entries")*2).otherwise(F.col("entries")))).show()

+------------------------------------------------------------------------------------------------------------------------------+
|sum(CASE WHEN (array_contains(sources, xre) AND array_contains(sources, analyticeEvents)) THEN (entries * 2) ELSE entries END)|
+------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                      65955278|
+------------------------------------------------------------------------------------------------------------------------------+

In [None]:
accuracy_aggregates.agg(F.sum("entries")).show()

In [None]:
scored_preprocessed_data.distinct().count()

In [None]:
accuracy_aggregates \
    .withColumn("sources", F.col("sources").cast("string")) \
    .coalesce(1) \
    .write \
    .mode("append").format("csv") \
    .save("s3://via-nathan/heuristic_accuracy_assessment/")

In [None]:
# Create ground truth dataset using 5 minute windows
first_time = floor(datetime(2019, 7, int(day), 0, 0, 0).timestamp())
last_time = floor(datetime(2019, 7, int(day), 23, 55, 0).timestamp())
time_interval = 60*5

all_times_df = spark.range(first_time, last_time + time_interval, time_interval). \
    withColumnRenamed("id", "ts")

gt_macs = ground_truth_raw.select(F.col("mac").alias("mac_")).distinct()

all_mac_times = gt_macs.crossJoin(all_times_df)

times_gt_join = all_mac_times \
    .join(ground_truth_raw,
          (all_mac_times.mac_ == ground_truth_raw.mac) &
          (ground_truth_raw.startTs <= all_mac_times.ts) &
          (all_mac_times.ts <= ground_truth_raw.endTs),
          "inner") \
    .select("mac", "ts", "reachable", "source", "year", "month", "day")

five_minute_gt = times_gt_join \
    .groupBy("mac", "ts", "reachable", "year", "month", "day") \
    .agg(F.collect_set(F.col("source")).alias("sources"))

five_minute_gt.cache()
five_minute_count = five_minute_gt.count()
print("5 Minute Ground Truth count for day " + str(day) + ": " + str(ground_truth_raw_count))

In [None]:
times_gt_join.count()

In [None]:
times_gt_join.select("mac", "ts", "reachable", "year", "month", "day"). \
        where(((F.col("reachable") == True) | (F.col("reachable") == False)) &
                F.col("source").isin(["scout", "xre", "speedTest", "analyticsEvents"])). \
        distinct().count()

In [None]:
final_gt.count()

In [None]:
valid_entries.count()

In [None]:
ground_truth_raw.count()

In [None]:
final_gt.cache()

In [None]:
preprocessed_data = spark.read. \
    parquet("s3://rads-prepro-data/" +
            "year=2019/month=7/day=19/*.gz.parquet")

In [None]:
# This code gets lists of all of the feature columns in the preprocessed data
delta_columns = []
value_columns = []
lag_inds = ["lead1", "lag0", "lag1", "lag2"]
source_prefixes = ["scout", "genome_poller"]
for prefix in source_prefixes:
    for lag_ind in lag_inds:
        delta_columns.append(prefix + "_" + lag_ind + "_" + "delta")
        value_columns.append(prefix + "_" + lag_ind + "_" + "val")
        
for col in delta_columns:
    print(col)
for col in value_columns:
    print(col)

In [None]:
# Get heuristic score for preprocessed data
scored_preprocessed_data = preprocessed_data

# Update delta columns, accounting for 0 delta case (need to do this because of division issues)
for column in delta_columns:
    replacement = None
    if "lag0" in column:
        replacement = 0.0001 # Will weight current values with no delta very highly (these will trump all)
    else:
        replacement = 9999999999 # Will weight non-current values with no delta very low (essentially throwing out)
    scored_preprocessed_data = scored_preprocessed_data. \
        withColumn(column, F.when(F.col(column) == 0, replacement). \
                             otherwise(F.abs(F.col(column))))
    
# Update value columns, reachable -> 1, unreachable -> -1 (need to do this for heuristic to work)
for column in value_columns:
    if "scout" in column:
        scored_preprocessed_data = scored_preprocessed_data. \
            withColumn(column, F.when(F.col(column) == 6, 1).otherwise(-1))
    if "genome_poller" in column:
        scored_preprocessed_data = scored_preprocessed_data. \
            withColumn(column, F.when(F.col(column) == 1, 1).otherwise(-1))
    
# Heuristic score computed here. If >= 0 then reachable prediction, otherwise nonreachable prediction
scored_preprocessed_data = scored_preprocessed_data. \
    withColumn("reachability_score",
               0.75*(F.col("scout_lead1_val") / F.col("scout_lead1_delta") + 
                     F.col("scout_lag0_val") / F.col("scout_lag0_delta") +
                     F.col("scout_lag1_val") / F.col("scout_lag1_delta") + 
                     F.col("scout_lag2_val") / F.col("scout_lag2_delta")) + 
               0.25*(F.col("genome_poller_lead1_val") / F.col("genome_poller_lead1_delta") + 
                     F.col("genome_poller_lag0_val") / F.col("genome_poller_lag0_delta") +
                     F.col("genome_poller_lag1_val") / F.col("genome_poller_lag1_delta") + 
                     F.col("genome_poller_lag2_val") / F.col("genome_poller_lag2_delta"))). \
    withColumn("heuristic_guess", F.when(F.col("reachability_score") >= 0, 1).otherwise(-1)). \
    select("mac", "ts",
           "scout_lead1_val", "scout_lag0_val", "scout_lag1_val", "scout_lag2_val",
           "scout_lead1_delta", "scout_lag0_delta", "scout_lag1_delta", "scout_lag2_delta",
           "genome_poller_lead1_val", "genome_poller_lag0_val", "genome_poller_lag1_val", "genome_poller_lag2_val",
           "genome_poller_lead1_delta", "genome_poller_lag0_delta", "genome_poller_lag1_delta", "genome_poller_lag2_delta",
           "reachability_score", "heuristic_guess")

In [None]:
scored_preprocessed_data.cache()

In [None]:
accuracy_join = scored_preprocessed_data. \
    join(final_gt,
         ["mac", "ts"],
         "left_outer") #. \
    #groupBy("mac", "ts", "reachable", "heuristic_guess", "year", "month", "day"). \
    #agg(F.collect_set(F.col("source")).alias("sources"))

In [None]:
accuracy_join.cache()

In [None]:
accuracy_join.groupBy("mac", "ts").agg(F.count("*").alias("entries")).where(F.col("entries") > 1).count()

In [None]:
scored_preprocessed_data.groupBy("mac", "ts").agg(F.count("*").alias("entries")).where(F.col("entries") > 1).count()

In [None]:
accuracy_join.groupBy("mac", "ts").agg(F.count("*").alias("entries")).where(F.col("entries") > 1).count()

In [None]:
scored_preprocessed_data.groupBy("mac", "ts").agg(F.count("*").alias("entries")).where(F.col("entries") > 1).count()

In [None]:
accuracy_join.groupBy("year", "month", "day", "sources", "reachable", "heuristic_guess"). \
    agg(F.count("*").alias("entries")). \
    orderBy("year", "month", "day", "sources", "reachable", "heuristic_guess"). \
    show()

In [None]:
accuracy_join.groupBy("year", "month", "day", "sources", "reachable", "heuristic_guess"). \
    agg(F.count("*").alias("entries")). \
    orderBy("year", "month", "day", "sources", "reachable", "heuristic_guess"). \
    show()