**Data Cleaning with Spark**

We have quite a lot of big dataframes, so lets also try the same as 2.0 but written in spark and compare run times. 

**Generate labelled resistivity data**


Start of by aligning the raw resistivity with the fish events. 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
import os

spark = SparkSession.builder.getOrCreate()

# Load and prepare events file with all of the known fish events
events_path = "/FileStore/rachlenn/Thr 20 process/test_KMThu16_2021_07_eventonly.csv"

# Name the columns
events_df = (
    spark.read
        .option("header", True)
        .csv(events_path)
        .withColumn("Time", F.to_timestamp("Time", "yyyy-MM-dd HH:mm:ss").cast("timestamp")) # make it time 
        .withColumn("Time", F.from_utc_timestamp("Time", "UTC"))  # ensure UTC aware
        .withColumn("start_time", F.col("Time") - F.expr("INTERVAL 2.5 SECONDS")) # Create +/- event windows 2 seconds before
        .withColumn("end_time",   F.col("Time") + F.expr("INTERVAL 2.5 SECONDS")) # And 2 seconds after
)

# Function to label fish presence
def label_fish_presence(df, events_df):
    # Join condition: df.Time between event.start_time and event.end_time
    joined = (
        df.join(
            events_df.select("start_time", "end_time"),
            (df.Time >= events_df.start_time) & (df.Time <= events_df.end_time),
            "left"
        )
        .withColumn("fish_present", F.when(F.col("start_time").isNotNull(), F.lit(1)).otherwise(F.lit(0)))
        .drop("start_time", "end_time")
    )
    return joined

#  Process each daily dataset to align with fish event
input_pattern = "/FileStore/rachlenn/DuplicateFree/*_no_duplicate"
output_folder = "/FileStore/rachlenn/labeled"

# Get list of files using dbutils
file_paths = [f.path for f in dbutils.fs.ls(input_pattern.replace("*_no_duplicate", "")) if f.path.endswith("_no_duplicate")]

for file_path in file_paths:
    print(f"Processing {file_path}...")

    # Load daily data
    df = (
        spark.read
            .option("header", True)
            .csv(file_path)
            .toDF("timestamp", "upstream", "downstream")
            .withColumn("Time", F.from_utc_timestamp((F.col("timestamp")/1000).cast(T.TimestampType()), "UTC"))
            .drop("timestamp")
            .withColumn("upstream",   F.col("upstream").cast("double"))
            .withColumn("downstream", F.col("downstream").cast("double"))
            .withColumn("differential_conductance", (F.col("downstream") - F.col("upstream")) / 2)
    )

    # Label fish events
    labeled_df = label_fish_presence(df, events_df)

    # Save
    filename = os.path.basename(file_path).replace("_no_duplicate", "_S_labelled")
    save_path = os.path.join(output_folder, filename)
    labeled_df.write.csv(save_path, header=True, mode="overwrite")

print("All datasets labelled and saved.")



Lets take a wee look at what one of the df looks like now.

In [0]:
df = spark.read \
    .option("header", True) \
    .csv("dbfs:/FileStore/rachlenn/labeled/test_KMThu16_2021_07_17_15_36_26Z_S_labelled")

display(df)


Determine what the average sampling rate is for window bins

In [0]:
# Calculate sps 

#Calculate the sampling rate
from pyspark.sql import functions as F, Window

# Read all CSVs into one Spark DataFrame
df = (
    spark.read
         .option("header", True)  # set False if files have no header
         .csv("dbfs:/FileStore/rachlenn/labeled/*_S_labelled")
         .withColumn("Time", F.to_timestamp("Time"))
)

# Sort by Time
w = Window.orderBy("Time")

# Calculate delta in seconds between consecutive rows
df_with_delta = df.withColumn(
    "delta_s",
    (F.col("Time").cast("long") - F.lag("Time").over(w).cast("long")).cast("double")
)

# Compute average sampling rate
avg_rate_df = (
    df_with_delta.filter(F.col("delta_s").isNotNull())
                 .agg((F.lit(1) / F.avg("delta_s")).alias("avg_hz"))
)

avg_rate = avg_rate_df.collect()[0]["avg_hz"]
print(f"Average sampling rate: {avg_rate:.2f} Hz")


**Extract Windows**

The ML models should be fed windows of the event, so need to generate these before balancing the date stream. 

We need the windows to be of fixed n arrays 

In [0]:
from pyspark.sql import functions as F, Window

# Load labeled data (with headers)
df = spark.read.option("header", True).csv(
    "dbfs:/FileStore/rachlenn/labeled/test_KMThu16_2021_07_17_15_36_26Z_labelled"
)

# Ensure proper types
df = (df
      .withColumn("Time", F.to_timestamp("Time"))
      .withColumn("upstream", F.col("upstream").cast("double"))
      .withColumn("downstream", F.col("downstream").cast("double"))
      .withColumn("differential_conductance", F.col("differential_conductance").cast("double"))
      .withColumn("fish_present", F.col("fish_present").cast("int"))
)

# Create 10-second window start timestamps
df = df.withColumn("window_start",
                   (F.unix_timestamp("Time") / 10).cast("long") * 10)

# Aggregate to window level
window_df = (df.groupBy("window_start")
               .agg(
                   F.collect_list("upstream").alias("upstream_series"),
                   F.collect_list("downstream").alias("downstream_series"),
                   F.collect_list("differential_conductance").alias("diff_series"),
                   F.max("fish_present").alias("fish_present")  # label = 1 if any row has fish
               )
               .withColumn("window_start_time", F.from_unixtime("window_start"))
               .drop("window_start")
            )

display(window_df)
