# Silver — Frames + Labels + Event Windows + Synthetic PLC
Extracts sampled frames, indexes frames/labels, groups anomaly events, and generates synthetic PLC telemetry.

**Key knobs**
- `EVERY_N_FRAMES` (sampling)
- `GAP_MS` (event grouping)


**Cell 1: Config**

In [1]:
from pathlib import Path
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Dataset constants (from dataset description)
FPS_SOURCE = 120.0

# Bronze locations
BRONZE_VIDEOS_SPARK = "Files/bronze/videos/conveyer/raw_mp4"
BRONZE_VIDEOS_FS    = "/lakehouse/default/Files/bronze/videos/conveyer/raw_mp4"

ANOM_SPARK = "Files/bronze/labels/conveyer/anomaly"
NORM_SPARK = "Files/bronze/labels/conveyer/normal"  # optional

# Silver output folder for extracted frames (you can rename)
SILVER_FRAMES_FS = "/lakehouse/default/Files/silver/frames/conveyer/frames_4fps"

# Frame sampling: 30 => ~4 fps (120/30)
EVERY_N_FRAMES = 30

print("BRONZE_VIDEOS_SPARK:", BRONZE_VIDEOS_SPARK)
print("BRONZE_VIDEOS_FS   :", BRONZE_VIDEOS_FS)
print("SILVER_FRAMES_FS   :", SILVER_FRAMES_FS)
print("ANOM_SPARK         :", ANOM_SPARK)
print("NORM_SPARK         :", NORM_SPARK)

StatementMeta(, a51c70e5-f288-4ded-a934-054f25b60979, 3, Finished, Available, Finished)

BRONZE_VIDEOS_SPARK: Files/bronze/videos/conveyer/raw_mp4
BRONZE_VIDEOS_FS   : /lakehouse/default/Files/bronze/videos/conveyer/raw_mp4
SILVER_FRAMES_FS   : /lakehouse/default/Files/silver/frames/conveyer/frames_4fps
ANOM_SPARK         : Files/bronze/labels/conveyer/anomaly
NORM_SPARK         : Files/bronze/labels/conveyer/normal


**Cell 2: Sanity check: list videos with Spark**

In [2]:
mp4 = (spark.read.format("binaryFile")
       .option("pathGlobFilter", "*.mp4")
       .load(BRONZE_VIDEOS_SPARK)
       .select("path"))

mkv = (spark.read.format("binaryFile")
       .option("pathGlobFilter", "*.mkv")
       .load(BRONZE_VIDEOS_SPARK)
       .select("path"))

videos_df = mp4.unionByName(mkv, allowMissingColumns=True).distinct()
display(videos_df.limit(20))
print("Found videos:", videos_df.count())

StatementMeta(, a51c70e5-f288-4ded-a934-054f25b60979, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 492ec677-1a33-46f5-a2d4-f18754ed03fc)

Found videos: 19


**Cell 3: Ensure OpenCV is available**

In [3]:
# If cv2 import works without pip, comment out the pip line.
%pip -q install opencv-python-headless

import cv2
print("cv2 version:", cv2.__version__)

StatementMeta(, a51c70e5-f288-4ded-a934-054f25b60979, 5, Finished, Available, Finished)

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
numba 0.59.0 requires numpy<1.27,>=1.22, but you have numpy 2.2.6 which is incompatible.
matplotlib 3.8.0 requires numpy<2,>=1.21, but you have numpy 2.2.6 which is incompatible.
datasets 2.19.1 requires fsspec[http]<=2024.3.1,>=2023.1.0, but you have fsspec 2024.6.1 which is incompatible.
pywavelets 1.5.0 requires numpy<2.0,>=1.22.4, but you have numpy 2.2.6 which is incompatible.
pandas 2.1.4 requires numpy<2,>=1.23.2; python_version == "3.11", but you have numpy 2.2.6 which is incompatible.
astropy 5.3.4 requires numpy<2,>=1.21, but you have numpy 2.2.6 which is incompatible.
contourpy 1.2.0 requires numpy<2.0,>=1.20, but you have numpy 2.2.6 which is incompatible.
scipy 1.11.4 requires numpy<1.28.0,>=1.21.6, but you have numpy 2.2.6 which is incompatible.
nni 3.0 requires filelock<3.12, but you h

**Cell 4a: Copy to local and extract frames**

In [4]:
from notebookutils import mssparkutils

items = mssparkutils.fs.ls(BRONZE_VIDEOS_SPARK)
video_items = [i for i in items if i.name.lower().endswith((".mp4", ".mkv"))]

print("Found videos:", len(video_items))
for i in video_items[:10]:
    print(i.name, "|", i.path)

StatementMeta(, a51c70e5-f288-4ded-a934-054f25b60979, 6, Finished, Available, Finished)

Found videos: 19
boardcamera_iron_ore.mp4 | abfss://7c3d7169-c07f-4d97-8a72-6b9d1af82e14@msit-onelake.dfs.fabric.microsoft.com/a4595720-2bea-4dcb-b8bf-d91252a0c89a/Files/bronze/videos/conveyer/raw_mp4/boardcamera_iron_ore.mp4
boardcamera_iron_ore_2.mp4 | abfss://7c3d7169-c07f-4d97-8a72-6b9d1af82e14@msit-onelake.dfs.fabric.microsoft.com/a4595720-2bea-4dcb-b8bf-d91252a0c89a/Files/bronze/videos/conveyer/raw_mp4/boardcamera_iron_ore_2.mp4
boardcamera_iron_ore_3.mp4 | abfss://7c3d7169-c07f-4d97-8a72-6b9d1af82e14@msit-onelake.dfs.fabric.microsoft.com/a4595720-2bea-4dcb-b8bf-d91252a0c89a/Files/bronze/videos/conveyer/raw_mp4/boardcamera_iron_ore_3.mp4
boardcamera_iron_ore_4.mp4 | abfss://7c3d7169-c07f-4d97-8a72-6b9d1af82e14@msit-onelake.dfs.fabric.microsoft.com/a4595720-2bea-4dcb-b8bf-d91252a0c89a/Files/bronze/videos/conveyer/raw_mp4/boardcamera_iron_ore_4.mp4
boardcamera_iron_ore_5.mp4 | abfss://7c3d7169-c07f-4d97-8a72-6b9d1af82e14@msit-onelake.dfs.fabric.microsoft.com/a4595720-2bea-4dcb-b8bf

**Cell 4b: Extract sampled frames from videos to Silver**

This writes:
Files/silver/frames/conveyer/frames_4fps/<video_id>/frame_0000120.jpg

In [5]:
import os
import cv2
from pathlib import Path

local_video_dir = "/tmp/conveyer_videos"
os.makedirs(local_video_dir, exist_ok=True)

out_root = Path(SILVER_FRAMES_FS)
out_root.mkdir(parents=True, exist_ok=True)

def copy_to_local(one_lake_path: str) -> str:
    """
    Copy from OneLake (abfss://...) to local file path.
    """
    fname = one_lake_path.split("/")[-1]
    local_path = os.path.join(local_video_dir, fname)

    # Only copy if not already present
    if not os.path.exists(local_path) or os.path.getsize(local_path) == 0:
        # destination must be prefixed with file:
        mssparkutils.fs.cp(one_lake_path, "file:" + local_path)

    return local_path

def extract_frames(local_video_path: str, video_id: str, every_n: int) -> dict:
    cap = cv2.VideoCapture(local_video_path)
    if not cap.isOpened():
        return {"video_id": video_id, "ok": False, "reason": "could_not_open_local", "local": local_video_path}

    out_dir = out_root / video_id
    out_dir.mkdir(parents=True, exist_ok=True)

    frame_num = 0
    saved = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if frame_num % every_n == 0:
            fp = out_dir / f"frame_{frame_num:07d}.jpg"
            cv2.imwrite(str(fp), frame)
            saved += 1
        frame_num += 1

    cap.release()
    return {"video_id": video_id, "ok": True, "read": frame_num, "saved": saved}

results = []
for v in video_items:
    video_id = v.name.rsplit(".", 1)[0]

    # copy OneLake -> local /tmp
    local_path = copy_to_local(v.path)

    # extract local -> silver frames
    res = extract_frames(local_path, video_id, EVERY_N_FRAMES)
    results.append(res)
    print(res)

print("Done extracting. ok:", sum(1 for r in results if r.get("ok")), "/", len(results))

StatementMeta(, a51c70e5-f288-4ded-a934-054f25b60979, 7, Finished, Available, Finished)

{'video_id': 'boardcamera_iron_ore', 'ok': True, 'read': 29309, 'saved': 977}
{'video_id': 'boardcamera_iron_ore_2', 'ok': True, 'read': 4278, 'saved': 143}
{'video_id': 'boardcamera_iron_ore_3', 'ok': True, 'read': 6935, 'saved': 232}
{'video_id': 'boardcamera_iron_ore_4', 'ok': True, 'read': 4905, 'saved': 164}
{'video_id': 'boardcamera_iron_ore_5', 'ok': True, 'read': 14319, 'saved': 478}
{'video_id': 'boardcamera_iron_ore_6', 'ok': True, 'read': 10350, 'saved': 345}
{'video_id': 'boardcamera_iron_ore_7_wood', 'ok': True, 'read': 4310, 'saved': 144}
{'video_id': 'boardcamera_iron_ore_7_wood_2', 'ok': True, 'read': 4313, 'saved': 144}
{'video_id': 'boardcamera_iron_ore_7_wood_3', 'ok': True, 'read': 3907, 'saved': 131}
{'video_id': 'boardcamera_iron_ore_canga', 'ok': True, 'read': 3073, 'saved': 103}
{'video_id': 'boardcamera_iron_ore_canga2', 'ok': True, 'read': 4655, 'saved': 156}
{'video_id': 'boardcamera_iron_ore_canga3', 'ok': True, 'read': 3740, 'saved': 125}
{'video_id': 'boar

**Cell 5: Build silver_frame_index from extracted frames**

In [6]:
import re
from pathlib import Path
from pyspark.sql import Row

frames_root = Path(SILVER_FRAMES_FS)
pat = re.compile(r"frame_(\d+)\.jpg$")

rows = []
for video_dir in frames_root.iterdir():
    if not video_dir.is_dir():
        continue
    video_id = video_dir.name
    for img in video_dir.glob("*.jpg"):
        m = pat.search(img.name)
        if not m:
            continue
        frame_number = int(m.group(1))
        ts_ms = frame_number * (1000.0 / FPS_SOURCE)
        frame_path = str(img).replace("/lakehouse/default/Files/", "Files/")
        rows.append(Row(video_id=video_id, frame_number=frame_number, ts_ms=float(ts_ms), frame_path=frame_path))

silver_frame_index = spark.createDataFrame(rows)
display(silver_frame_index.limit(20))

silver_frame_index.write.mode("overwrite").format("delta").saveAsTable("silver_frame_index")
print("Wrote: silver_frame_index | rows:", silver_frame_index.count())

StatementMeta(, a51c70e5-f288-4ded-a934-054f25b60979, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c72f27a5-7ed2-4f9a-bb35-600af6c999ac)

Wrote: silver_frame_index | rows: 4713


**Cell 6: Index label frames (flat folder) → silver_label_frame_index**

This parses:
- frame_number from leading digits
- video_id from the middle part before _detected_

In [7]:
def index_labels_flat(folder_spark: str, label_value: int):
    df = (spark.read.format("binaryFile")
          .option("pathGlobFilter", "*.jpg")
          .load(folder_spark)
          .select(F.col("path")))

    df = df.withColumn("file", F.regexp_extract("path", r"([^/]+)$", 1))

    df = (df
        .withColumn("frame_number", F.regexp_extract("file", r"^(\d+)-", 1).cast("int"))
        .withColumn("video_id", F.regexp_extract("file", r"^\d+-(.*)_detected_\d+\.jpg$", 1))
        .withColumn("label_prior", F.lit(label_value))
        .withColumn("ts_ms", F.col("frame_number") * F.lit(1000.0 / FPS_SOURCE))
        .withColumn("label_path", F.col("path"))
    )

    return df.filter(F.col("frame_number").isNotNull() & (F.col("video_id") != "")) \
             .select("video_id", "frame_number", "ts_ms", "label_prior", "label_path")

anom_df = index_labels_flat(ANOM_SPARK, 1)

# normal folder is optional
try:
    norm_df = index_labels_flat(NORM_SPARK, 0)
    labels_df = anom_df.unionByName(norm_df)
except Exception as e:
    print("Normal folder not found or unreadable; proceeding with anomalies only:", e)
    labels_df = anom_df

labels_df.write.mode("overwrite").format("delta").saveAsTable("silver_label_frame_index")
display(labels_df.limit(20))
print("Wrote: silver_label_frame_index | rows:", labels_df.count())

StatementMeta(, a51c70e5-f288-4ded-a934-054f25b60979, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 02ebb96a-6749-4c11-a3e3-ed792cb2d753)

Wrote: silver_label_frame_index | rows: 2991


**Cell 7: Group labeled frames → silver_event_windows**

In [9]:
from pyspark.sql import functions as F

WINDOW_MS = 10_000  # 10 seconds as integer milliseconds

frames = spark.table("silver_frame_index")
labels = spark.table("silver_label_frame_index").filter(F.col("label_prior") == 1)

# Ensure ts_ms is BIGINT (milliseconds)
frames = frames.withColumn("ts_ms", F.col("ts_ms").cast("long"))
labels = labels.withColumn("ts_ms", F.col("ts_ms").cast("long"))

# Per-video max timestamp
video_span = frames.groupBy("video_id").agg(F.max("ts_ms").alias("max_ts_ms"))

# Create 0..(max_ts_ms - WINDOW_MS) step WINDOW_MS (all BIGINT)
windows = (video_span
    .withColumn("start_ms", F.lit(0).cast("long"))
    .withColumn("stop_ms", (F.col("max_ts_ms") - F.lit(WINDOW_MS)).cast("long"))
    .withColumn(
        "win_start_arr",
        F.expr(f"sequence(start_ms, stop_ms, {WINDOW_MS})")
    )
    .select("video_id", F.explode("win_start_arr").alias("t_start_ms"))
    .withColumn("t_end_ms", (F.col("t_start_ms") + F.lit(WINDOW_MS)).cast("long"))
)

# Mark a window anomalous if any anomaly-labeled frame falls inside it
anom_hits = (labels
    .join(windows, on="video_id", how="inner")
    .where((F.col("ts_ms") >= F.col("t_start_ms")) & (F.col("ts_ms") < F.col("t_end_ms")))
    .select("video_id", "t_start_ms")
    .dropDuplicates()
    .withColumn("label_prior", F.lit(1))
)

event_windows_all = (windows
    .join(anom_hits, on=["video_id", "t_start_ms"], how="left")
    .withColumn("label_prior", F.coalesce(F.col("label_prior"), F.lit(0)))
    .withColumn("event_id", F.concat_ws(":", F.col("video_id"), F.col("t_start_ms").cast("string")))
    .select("event_id","video_id","t_start_ms","t_end_ms","label_prior")
)

event_windows_all.write.mode("overwrite").format("delta").saveAsTable("silver_event_windows")

print("silver_event_windows rows:", event_windows_all.count())
display(event_windows_all.groupBy("label_prior").count())
display(event_windows_all.orderBy("video_id","t_start_ms").limit(20))

StatementMeta(, a51c70e5-f288-4ded-a934-054f25b60979, 11, Finished, Available, Finished)

AnalysisException: [DELTA_FAILED_TO_MERGE_FIELDS] Failed to merge fields 't_start_ms' and 't_start_ms'

**Cell 8 — Thermal profiles table → dim_material_thermal_profile**

In [None]:
from pyspark.sql import Row

profiles = [
    Row(material="ore_normal", mean_temp_c=24.0, std_temp_c=0.4),
    Row(material="wood",       mean_temp_c=23.5, std_temp_c=0.6),
    Row(material="plastic",    mean_temp_c=27.0, std_temp_c=0.8),
    Row(material="tool_metal", mean_temp_c=21.0, std_temp_c=0.7),
]
dim = spark.createDataFrame(profiles)
dim.write.mode("overwrite").format("delta").saveAsTable("dim_material_thermal_profile")
display(dim)

StatementMeta(, a51c70e5-f288-4ded-a934-054f25b60979, -1, Cancelled, , Cancelled)

**Cell 9: Build a 10 Hz telemetry grid per video (from silver_frame_index)**

In [None]:
frames = spark.table("silver_frame_index")

video_span = frames.groupBy("video_id").agg(F.max("ts_ms").alias("max_ts_ms"))

telemetry_grid = (video_span
    .withColumn("ts_ms_arr", F.expr("transform(sequence(0, cast(max_ts_ms as int), 100), x -> double(x))"))
    .select("video_id", F.explode("ts_ms_arr").alias("ts_ms"))
)

print("telemetry rows:", telemetry_grid.count())
display(telemetry_grid.limit(10))

StatementMeta(, a51c70e5-f288-4ded-a934-054f25b60979, -1, Cancelled, , Cancelled)

**Cell 10: Assign a material to each anomaly event → silver_anomaly_event_material**

In [None]:
events = spark.table("silver_event_windows")

material_map = F.expr("""
  CASE pmod(abs(hash(event_id)), 3)
    WHEN 0 THEN 'wood'
    WHEN 1 THEN 'plastic'
    ELSE 'tool_metal'
  END
""")

anom_events = (events
    .filter(F.col("label_prior") == 1)
    .withColumn("material", material_map)
    .select("event_id", "video_id", "t_start_ms", "t_end_ms", "material")
)

anom_events.write.mode("overwrite").format("delta").saveAsTable("silver_anomaly_event_material")
display(anom_events.limit(20))

StatementMeta(, a51c70e5-f288-4ded-a934-054f25b60979, -1, Cancelled, , Cancelled)

**Cell 11: Generate synthetic PLC readings → silver_plc_readings**

In [None]:
# Baselines (tune freely)
BASE_SPEED   = 2.2    # m/s
BASE_CURRENT = 12.0   # A
BASE_VIB     = 0.15   # RMS
BASE_FEED    = 70.0   # %

def noise(seed_expr, scale):
    # deterministic noise in ~[-scale, +scale]
    return (F.pmod(F.abs(F.hash(seed_expr)), F.lit(2000)).cast("double") / 1000.0 - 1.0) * F.lit(scale)

grid = telemetry_grid
anom = spark.table("silver_anomaly_event_material")
profiles = spark.table("dim_material_thermal_profile")

# Range join to mark anomaly timestamps
joined = (grid.join(anom, on="video_id", how="left")
          .where(
              ((F.col("ts_ms") >= F.col("t_start_ms")) & (F.col("ts_ms") <= F.col("t_end_ms")))
              | F.col("event_id").isNull()
          ))

# If overlap, pick stable first event_id
w = Window.partitionBy("video_id", "ts_ms").orderBy(F.col("event_id").asc_nulls_last())
joined = joined.withColumn("rn", F.row_number().over(w)).filter(F.col("rn") == 1).drop("rn")

joined = joined.withColumn("material", F.coalesce(F.col("material"), F.lit("ore_normal")))
joined = joined.withColumn("is_anom", F.when(F.col("event_id").isNotNull(), F.lit(1)).otherwise(F.lit(0)))

# Attach material profile stats
joined = joined.join(profiles, on="material", how="left")

seed = F.concat_ws(":", F.col("video_id"), F.col("ts_ms"))

temp_c = (F.when(F.col("is_anom") == 1, F.col("mean_temp_c")).otherwise(F.lit(24.0))
          + noise(seed, 0.3))

belt_speed = (F.lit(BASE_SPEED)
              + noise(F.concat_ws("s", seed), 0.08)
              - F.when(F.col("is_anom") == 1, F.lit(0.25)).otherwise(F.lit(0.0)))

motor_current = (F.lit(BASE_CURRENT)
                 + noise(F.concat_ws("c", seed), 0.6)
                 + F.when(F.col("is_anom") == 1, F.lit(3.0)).otherwise(F.lit(0.0)))

vibration = (F.lit(BASE_VIB)
             + F.abs(noise(F.concat_ws("v", seed), 0.06))
             + F.when(F.col("is_anom") == 1, F.lit(0.25)).otherwise(F.lit(0.0)))

feed_rate = (F.lit(BASE_FEED)
             + noise(F.concat_ws("f", seed), 1.0)
             - F.when(F.col("is_anom") == 1, F.lit(4.0)).otherwise(F.lit(0.0)))

plc = (joined.select(
            "video_id",
            "ts_ms",
            "event_id",
            "material",
            temp_c.alias("temp_c"),
            belt_speed.alias("belt_speed_mps"),
            motor_current.alias("motor_current_a"),
            vibration.alias("vibration_rms"),
            feed_rate.alias("feed_rate_pct"),
            "is_anom"
        ))

plc.write.mode("overwrite").format("delta").saveAsTable("silver_plc_readings")
display(plc.limit(20))
print("Wrote: silver_plc_readings | rows:", plc.count())

StatementMeta(, a51c70e5-f288-4ded-a934-054f25b60979, -1, Cancelled, , Cancelled)

**Cell 12: Aggregate PLC features per event → silver_plc_event_summary**

In [None]:
plc = spark.table("silver_plc_readings")
events = spark.table("silver_event_windows")

plc_in_events = (plc.join(events, on="video_id", how="inner")
    .where((F.col("ts_ms") >= F.col("t_start_ms")) & (F.col("ts_ms") <= F.col("t_end_ms")))
    .select(events.event_id, events.video_id,
            "temp_c","belt_speed_mps","motor_current_a","vibration_rms","feed_rate_pct")
)

summary = (plc_in_events.groupBy("event_id", "video_id")
    .agg(
        F.avg("temp_c").alias("temp_mean"),
        F.min("temp_c").alias("temp_min"),
        F.max("temp_c").alias("temp_max"),
        F.stddev("temp_c").alias("temp_std"),

        F.avg("motor_current_a").alias("motor_current_mean"),
        F.max("motor_current_a").alias("motor_current_max"),

        F.avg("vibration_rms").alias("vibration_mean"),
        F.max("vibration_rms").alias("vibration_max"),

        F.avg("belt_speed_mps").alias("belt_speed_mean"),
        F.min("belt_speed_mps").alias("belt_speed_min"),

        F.avg("feed_rate_pct").alias("feed_rate_mean"),
        F.stddev("feed_rate_pct").alias("feed_rate_std"),
    )
)

summary = summary.withColumn(
    "plc_features_json",
    F.to_json(F.struct(
        F.col("temp_mean"), F.col("temp_min"), F.col("temp_max"), F.col("temp_std"),
        F.col("motor_current_mean"), F.col("motor_current_max"),
        F.col("vibration_mean"), F.col("vibration_max"),
        F.col("belt_speed_mean"), F.col("belt_speed_min"),
        F.col("feed_rate_mean"), F.col("feed_rate_std"),
    ))
)

summary.write.mode("overwrite").format("delta").saveAsTable("silver_plc_event_summary")
display(summary.limit(20))
print("Wrote: silver_plc_event_summary | events:", summary.count())

StatementMeta(, a51c70e5-f288-4ded-a934-054f25b60979, -1, Cancelled, , Cancelled)