# Agriculture Spark – Ingest (CSV → Parquet)

Dieses Notebook:
- erstellt eine **SparkSession**
- definiert **Schemas** für Hub/Spoke
- liest die Original-CSV Dateien ein
- führt **Parsing/Cleansing** durch (z.B. `"-"` → `null`, Timestamp-Parsing, Feature-Vector-Parsing)
- speichert die bereinigten Daten als **Parquet**

> **Hinweis:** Pfade sind standardmäßig auf die hochgeladenen Dateien in dieser Umgebung gesetzt. Wenn ihr das Notebook in eurem Repo/Cluster nutzt, passt die Pfade im nächsten Cell an.


In [26]:
# =========================
# 0) Imports & Pfade
# =========================
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
import os

# --- INPUT: Originaldateien ---
HUB_CSV   = "data/raw/dm-hub.csv"
SPOKE_CSV = "data/raw/dm-spoke.csv"

# --- OUTPUT: Parquet-Zielordner ---
OUT_ROOT = "data/processed"
HUB_OUT_PARQUET   = os.path.join(OUT_ROOT, "hub")
SPOKE_OUT_PARQUET = os.path.join(OUT_ROOT, "spoke")

os.makedirs(OUT_ROOT, exist_ok=True)

print("HUB_CSV:", HUB_CSV)
print("SPOKE_CSV:", SPOKE_CSV)
print("OUT_ROOT:", OUT_ROOT)


HUB_CSV: data/raw/dm-hub.csv
SPOKE_CSV: data/raw/dm-spoke.csv
OUT_ROOT: data/processed


In [27]:
# =========================
# 1) Spark Session erstellen
# =========================
spark = (
    SparkSession.builder
    .appName("AgricultureSpark-Ingest")
    # sinnvolle Defaults (könnt ihr je nach Cluster anpassen)
    .config("spark.sql.session.timeZone", "UTC")
    .config("spark.sql.shuffle.partitions", "200")
    .getOrCreate()
)

spark.version


'3.5.7'

## 2) Schemas definieren

Wir erzwingen ein Schema statt alles als String einzulesen.  
CSV-Felder, die manchmal `"-"` enthalten, werden zunächst als `string` eingelesen und danach sauber gecastet (damit kein Parsing crasht).


In [28]:
# =========================
# 2) Schemas definieren
# =========================
# Hub: 7 Spalten (kein Header in CSV)
# Format: <hubtracker> <timestamp> hub-coords <latitude> <longitude> <voltage/V> <temperature/°C>
hub_schema_raw = T.StructType([
    T.StructField("hubtracker", T.StringType(), True),
    T.StructField("timestamp", T.StringType(), True),
    T.StructField("hub_coords", T.StringType(), True),
    T.StructField("latitude", T.StringType(), True),
    T.StructField("longitude", T.StringType(), True),
    T.StructField("voltage", T.StringType(), True),
    T.StructField("temperature", T.StringType(), True),
])

# Spoke: 14 Spalten (kein Header in CSV)
# Format: <hubtracker> <timestamp> spoke-visibility <spoketracker> <rssi/dB> <device-state>
#         <voltage/V> <temperature/°C> <animal-state> <state-resting/min> <state-walking/min>
#         <state-grazing/min> <state-running/min> <features-vector/15 values>
spoke_schema_raw = T.StructType([
    T.StructField("hubtracker", T.StringType(), True),
    T.StructField("timestamp", T.StringType(), True),
    T.StructField("spoke_visibility", T.StringType(), True),
    T.StructField("spoketracker", T.StringType(), True),
    T.StructField("rssi", T.StringType(), True),
    T.StructField("device_state", T.StringType(), True),
    T.StructField("voltage", T.StringType(), True),
    T.StructField("temperature", T.StringType(), True),
    T.StructField("animal_state", T.StringType(), True),
    T.StructField("state_resting", T.StringType(), True),
    T.StructField("state_walking", T.StringType(), True),
    T.StructField("state_grazing", T.StringType(), True),
    T.StructField("state_running", T.StringType(), True),
    T.StructField("features", T.StringType(), True),     # features[15] als eine Spalte
])


## 3) CSV einlesen (robust)

Wir setzen:
- `header=True`
- `multiLine=True` (falls es Zeilenumbrüche in Quotes gibt)
- `escape`/`quote` robust
- `mode=PERMISSIVE` + `_corrupt_record` zur Diagnose


In [29]:
# =========================
# 3) CSV einlesen
# =========================
csv_options = {
    "header": "false",
    "sep": ",",
    "quote": '"',
    "escape": '"',
    "multiLine": "true",
    "mode": "PERMISSIVE",
    "columnNameOfCorruptRecord": "_corrupt_record",
    "encoding": "UTF-8",
}

# HUB: mit Schema lesen (keine Header-Zeile)
hub_raw = spark.read.schema(hub_schema_raw).options(**csv_options).csv(HUB_CSV)

print("HUB columns:", hub_raw.columns)
hub_raw.show(5, truncate=False)

# SPOKE: mit Schema lesen (keine Header-Zeile)
spoke_raw = spark.read.schema(spoke_schema_raw).options(**csv_options).csv(SPOKE_CSV)

print("SPOKE columns:", spoke_raw.columns)
spoke_raw.show(5, truncate=False)


HUB columns: ['hubtracker', 'timestamp', 'hub_coords', 'latitude', 'longitude', 'voltage', 'temperature']
+----------+----------+----------+----------+---------+-------+-----------+
|hubtracker|timestamp |hub_coords|latitude  |longitude|voltage|temperature|
+----------+----------+----------+----------+---------+-------+-----------+
|937832    |1752294998|hub-coords|46.3694617|7.6737851|-      |-          |
|932400    |1752295123|hub-coords|46.3693767|7.6737784|-      |-          |
|937832    |1752294998|hub-coords|46.3694617|7.6737851|4.79   |11         |
|937832    |1752294998|hub-coords|46.3694617|7.6737851|4.79   |11         |
|937832    |1752294998|hub-coords|46.3694617|7.6737851|4.79   |11         |
+----------+----------+----------+----------+---------+-------+-----------+
only showing top 5 rows

SPOKE columns: ['hubtracker', 'timestamp', 'spoke_visibility', 'spoketracker', 'rssi', 'device_state', 'voltage', 'temperature', 'animal_state', 'state_resting', 'state_walking', 'state

## 4) Parsing / Cleansing

### Hub
- `timestamp` → Timestamp (`event_ts`)
- `latitude/longitude` → Double
- `voltage/temperature/signal` (`"-"` → `null`) → Double
- optional `delay` falls vorhanden

### Spoke
- `timestamp` → Timestamp
- numerische Felder casten
- Feature-Vektor (`features`) robust splitten und auf **15 Features** bringen


In [30]:
# =========================
# 4A) HUB Parsing
# =========================

def dash_to_double(colname: str):
    return F.when(F.col(colname).isin("-", ""), F.lit(None)).otherwise(F.col(colname)).cast("double")

def dash_to_int(colname: str):
    return F.when(F.col(colname).isin("-", ""), F.lit(None)).otherwise(F.col(colname)).cast("int")

# Timestamp-Parsing: mehrere Formate (Spark versucht ISO automatisch)
hub = hub_raw.withColumn(
    "event_ts",
    F.coalesce(
        F.to_timestamp("timestamp"),
        F.to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss"),
        F.to_timestamp("timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSX"),
        F.to_timestamp("timestamp", "yyyy-MM-dd'T'HH:mm:ssX"),
    )
).withColumn("event_date", F.to_date("event_ts"))

# Casting
hub = (
    hub
    .withColumn("latitude_d",  dash_to_double("latitude"))
    .withColumn("longitude_d", dash_to_double("longitude"))
    .withColumn("voltage_d",   dash_to_double("voltage"))
    .withColumn("temperature_d", dash_to_double("temperature"))
)

hub_parsed = hub.select(
    "hubtracker",
    "event_ts",
    "event_date",
    "hub_coords",
    F.col("latitude_d").alias("latitude"),
    F.col("longitude_d").alias("longitude"),
    F.col("voltage_d").alias("voltage"),
    F.col("temperature_d").alias("temperature"),
    "_corrupt_record" if "_corrupt_record" in hub.columns else F.lit(None).alias("_corrupt_record"),
)

hub_parsed.show(5, truncate=False)
hub_parsed.printSchema()


+----------+--------+----------+----------+----------+---------+-------+-----------+---------------+
|hubtracker|event_ts|event_date|hub_coords|latitude  |longitude|voltage|temperature|_corrupt_record|
+----------+--------+----------+----------+----------+---------+-------+-----------+---------------+
|937832    |NULL    |NULL      |hub-coords|46.3694617|7.6737851|NULL   |NULL       |NULL           |
|932400    |NULL    |NULL      |hub-coords|46.3693767|7.6737784|NULL   |NULL       |NULL           |
|937832    |NULL    |NULL      |hub-coords|46.3694617|7.6737851|4.79   |11.0       |NULL           |
|937832    |NULL    |NULL      |hub-coords|46.3694617|7.6737851|4.79   |11.0       |NULL           |
|937832    |NULL    |NULL      |hub-coords|46.3694617|7.6737851|4.79   |11.0       |NULL           |
+----------+--------+----------+----------+----------+---------+-------+-----------+---------------+
only showing top 5 rows

root
 |-- hubtracker: string (nullable = true)
 |-- event_ts: time

In [31]:
# =========================
# 4B) SPOKE Parsing inkl. Feature-Vektor
# =========================

spoke = (
    spoke_raw
    .withColumn(
        "event_ts",
        F.coalesce(
            F.to_timestamp("timestamp"),
            F.to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss"),
            F.to_timestamp("timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSX"),
            F.to_timestamp("timestamp", "yyyy-MM-dd'T'HH:mm:ssX"),
        )
    )
    .withColumn("event_date", F.to_date("event_ts"))
)

# Numerik casten
spoke = (
    spoke
    .withColumn("spoke_visibility_i", dash_to_int("spoke_visibility"))
    .withColumn("rssi_i", dash_to_int("rssi"))
    .withColumn("device_state_i", dash_to_int("device_state"))
    .withColumn("voltage_d", dash_to_double("voltage"))
    .withColumn("temperature_d", dash_to_double("temperature"))
    .withColumn("animal_state_i", dash_to_int("animal_state"))
    .withColumn("state_resting_i", dash_to_int("state_resting"))
    .withColumn("state_walking_i", dash_to_int("state_walking"))
    .withColumn("state_grazing_i", dash_to_int("state_grazing"))
    .withColumn("state_running_i", dash_to_int("state_running"))
)

# Feature parsing:
# - entferne [ ]
# - normalisiere ; -> ,
# - split
features_clean = F.regexp_replace(F.col("features"), ";", ",")
features_clean = F.regexp_replace(features_clean, r"\[|\]", "")
features_arr = F.split(features_clean, ",")

spoke = spoke.withColumn("features_arr_raw", features_arr)
spoke = spoke.withColumn("feature_count", F.size("features_arr_raw"))

# Cast elements safely
spoke = spoke.withColumn(
    "features_arr",
    F.expr("""
        transform(features_arr_raw, x ->
            CASE
                WHEN x IS NULL THEN NULL
                WHEN trim(x) = '' THEN NULL
                WHEN trim(x) = '-' THEN NULL
                ELSE cast(trim(x) as double)
            END
        )
    """)
)

# Pad/trim to exactly 15
spoke = spoke.withColumn(
    "features_15",
    F.when(
        F.col("features_arr").isNull(),
        F.array_repeat(F.lit(None).cast("double"), 15)
    ).otherwise(
        F.when(
            F.size("features_arr") >= 15,
            F.slice("features_arr", 1, 15)
        ).otherwise(
            F.concat(
                F.col("features_arr"),
                F.array_repeat(F.lit(None).cast("double"), 15 - F.size("features_arr"))
            )
        )
    )
)

spoke = spoke.withColumn("feature_ok", F.col("feature_count") >= F.lit(15))

# explode into f01..f15
for i in range(15):
    spoke = spoke.withColumn(f"f{i+1:02d}", F.col("features_15")[i])

spoke_parsed = spoke.select(
    "hubtracker",
    "spoketracker",
    "event_ts",
    "event_date",
    F.col("spoke_visibility_i").alias("spoke_visibility"),
    F.col("rssi_i").alias("rssi"),
    F.col("device_state_i").alias("device_state"),
    F.col("voltage_d").alias("voltage"),
    F.col("temperature_d").alias("temperature"),
    F.col("animal_state_i").alias("animal_state"),
    F.col("state_resting_i").alias("state_resting"),
    F.col("state_walking_i").alias("state_walking"),
    F.col("state_grazing_i").alias("state_grazing"),
    F.col("state_running_i").alias("state_running"),
    "feature_count",
    "feature_ok",
    *[f"f{i+1:02d}" for i in range(15)],
)

spoke_parsed.show(5, truncate=False)
spoke_parsed.printSchema()


+----------+-----------------+--------+----------+----------------+----+------------+-------+-----------+------------+-------------+-------------+-------------+-------------+-------------+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|hubtracker|spoketracker     |event_ts|event_date|spoke_visibility|rssi|device_state|voltage|temperature|animal_state|state_resting|state_walking|state_grazing|state_running|feature_count|feature_ok|f01 |f02 |f03 |f04 |f05 |f06 |f07 |f08 |f09 |f10 |f11 |f12 |f13 |f14 |f15 |
+----------+-----------------+--------+----------+----------------+----+------------+-------+-----------+------------+-------------+-------------+-------------+-------------+-------------+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|937832    |newspoke-EFD1CF8E|NULL    |NULL      |NULL            |-98 |NULL        |4.55   |13.0       |NULL        |1            |53           |198          |6            |1

## 5) Speicherung als Parquet

Wir speichern partitioniert nach `event_date`.


In [32]:
# =========================
# 5) Write Parquet
# =========================
(
    hub_parsed
    .write
    .mode("overwrite")
    .partitionBy("event_date")
    .parquet(HUB_OUT_PARQUET)
)

(
    spoke_parsed
    .write
    .mode("overwrite")
    .partitionBy("event_date")
    .parquet(SPOKE_OUT_PARQUET)
)

print("Wrote hub parquet to:", HUB_OUT_PARQUET)
print("Wrote spoke parquet to:", SPOKE_OUT_PARQUET)


AnalysisException: [UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE] The Parquet datasource doesn't support the column `_corrupt_record` of the type "VOID".

## 6) Read-back Checks

In [None]:
hub_pq = spark.read.parquet(HUB_OUT_PARQUET)
spoke_pq = spark.read.parquet(SPOKE_OUT_PARQUET)

print("hub_pq rows:", hub_pq.count(), "columns:", len(hub_pq.columns))
print("spoke_pq rows:", spoke_pq.count(), "columns:", len(spoke_pq.columns))

hub_pq.groupBy("event_date").count().orderBy("event_date").show(10, truncate=False)
spoke_pq.groupBy("event_date").count().orderBy("event_date").show(10, truncate=False)

spoke_pq.select(
    F.count("*").alias("n"),
    F.sum(F.col("feature_ok").cast("int")).alias("n_feature_ok"),
    F.avg(F.col("feature_ok").cast("int")).alias("share_feature_ok")
).show()


## 7) Stop Spark (optional)

In [None]:
# spark.stop()