0  status bit sensor flow  
1  status bit coincidence  
2  status bit suction pumps  
3  status bit weather station  
4  status bit IADS  
5  status bit estimated raw channel deviation  
6  status bit LED temperature  
7  status bit operating modus  
  
20  velocity [m/s]  
21  coincidence [%]  
22  modus  
23  suction pump output [%]  
24  IADS temperature (Fidas) evaporation unit (UF-CPC), sensor #1 (Promo) [°C]  
25  estimated raw channel deviation [channels]  
26  LED temperature [°C]  
27  flow rate [l/min]  
28  Cn for UF-CPC [P/cm³] (count and nephelometer modus)  
29  x50 droplet diameter (UF-CPC) [μm]  
30  temperature of condensation unit (UF-CPC), sensor #2 (Promo) [°C]
40  temperature [°C]  
41  relative humidity [%]  
42  wind speed [km/h]  
43  wind direction [°]  
44  precipitation intensity [l/m²/h]  
45  precipitation type 
46  temperature dew point [°C]  
47  air pressure [hPa]  
48  wind signal quality [%]  
  
Fidas/Promo only: 
52 PM2.5 [mg/m³] – 1 s average 
53 PM10 [mg/m³] – 1 s average 
54 PM1 [mg/m³] – 10 s average 
55 PM2.5 [mg/m³] – 10 s average 
56 PM10 [mg/m³] – 10 s average 
57 PMtot [mg/m³] – 10 s average 
58 PM2.5 [mg/m³] – 60 s average 
59 PM10 [mg/m³] – 60 s average 
60  Cn [P/cm³] (PM averaging interval, default: 900s) 
61  PM1 [mg/m³]  
62  PM2.5 [mg/m³]  
63  PM4 [mg/m³]  
64  PM10 [mg/m³]  
65  PMtotal [mg/m³]  
66-109  further PM values [mg/m³] (different algorithms)  
110ff  ΔCn [P/cm³] size distribution with size intervals as shown by the device under 
Expert User Mode / Particle Size Distribution / Table (10 s average)

In [6]:
import socket
import polars as pl
import datetime
from pathlib import Path
import schedule
import time
from typing import Any

# --- CONFIGURABLE PARAMETERS ---
BASE_DIR = "data/fidas"
INTERVAL_SECONDS = 60  # how often to compute a median
BUFFER_SIZE = 1024
LOCAL_IP = "192.168.2.129"
LOCAL_PORT = 56790

# --- GLOBALS ---
sock = None
buffer = ""
summary_df = pl.DataFrame()
current_hour = datetime.datetime.now(datetime.timezone.utc).replace(minute=0, second=0, microsecond=0)

# --- CORE FUNCTIONS ---

def parse_udp_record(record: str) -> "dict[str, Any]":
    try:
        id_part, rest = record.split('<', 1)
        data_part, checksum = rest.split('>', 1)
        parsed = {"id": int(id_part.strip()), "checksum": checksum.strip()}
        if data_part.startswith("sendVal"):
            data_part = data_part[len("sendVal"):].strip()
        for pair in data_part.split(';'):
            if '=' in pair:
                k, v = pair.split('=', 1)
                key = f"val_{int(k.strip())}"
                try:
                    val = float(v.strip())
                except ValueError:
                    val = float('nan')
                parsed[key] = val
        return parsed
    except Exception as e:
        print(f"Failed to parse record: {e}")
        return {}

def ensure_output_path(dt: datetime.datetime) -> Path:
    folder = Path(BASE_DIR) / f"{dt.year:04d}" / f"{dt.month:02d}" / f"{dt.day:02d}"
    folder.mkdir(parents=True, exist_ok=True)
    filename = f"fidas-{dt.year:04d}{dt.month:02d}{dt.day:02d}{dt.hour:02d}.parquet"
    return folder / filename

def read_udp_for_duration(seconds: int) -> "list[dict[str, Any]]":
    global buffer
    records = []
    start = time.time()
    sock.settimeout(seconds)
    try:
        while time.time() - start < seconds:
            remaining = seconds - (time.time() - start)
            if remaining <= 0:
                break
            try:
                data, _ = sock.recvfrom(BUFFER_SIZE)
                buffer += data.decode('ascii', errors='ignore')
                while '>' in buffer:
                    raw, buffer = buffer.split('>', 1)
                    record = parse_udp_record(raw + '>')
                    if record:
                        records.append(record)
            except socket.timeout:
                break
    except Exception as e:
        print(f"Error during UDP read: {e}")
    return records

def compute_and_store_median():
    global summary_df, current_hour
    now = datetime.datetime.now(datetime.timezone.utc).replace(minute=0, second=0, microsecond=0)
    if now.hour != current_hour.hour:
        # Write accumulated summary
        path = ensure_output_path(current_hour)
        if not summary_df.is_empty():
            if path.exists():
                existing = pl.read_parquet(path)
                summary_df = pl.concat([existing, summary_df], how="diagonal").unique()
            summary_df.write_parquet(path)
            print(f"[{now}] Saved hourly summary: {path}")
        summary_df = pl.DataFrame()
        current_hour = now.replace(minute=0, second=0, microsecond=0)

    # Collect and compute median
    records = read_udp_for_duration(INTERVAL_SECONDS)
    if not records:
        return

    df = pl.DataFrame(records)
    value_cols = [col for col in df.columns if col not in {"id", "checksum"} and df.schema[col] in {pl.Float64, pl.Float32}]
    median_row = df.select([pl.median(col).alias(col) for col in value_cols])

    median_row = median_row.with_columns([
        pl.lit("median").alias("id"),
        pl.lit("").alias("checksum"),
        pl.lit(now).cast(pl.Datetime("us", "UTC")).alias("dtm")
    ])
    for col in df.columns:
        if col not in median_row.columns:
            median_row = median_row.with_columns(pl.lit(None).alias(col))

    median_row = median_row.select(sorted(median_row.columns))
    summary_df = pl.concat([summary_df, median_row], how="diagonal")
    print(f"[{now}] Collected median, running summary count: {len(summary_df)}")

# --- MAIN ---

def run_scheduler():
    global sock
    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock_instance:
        sock_instance.bind((LOCAL_IP, LOCAL_PORT))
        sock = sock_instance  # reference global for reuse
        print(f"Listening on {LOCAL_IP}:{LOCAL_PORT} every {INTERVAL_SECONDS} seconds...")

        schedule.every(INTERVAL_SECONDS).seconds.do(compute_and_store_median)

        try:
            while True:
                schedule.run_pending()
                time.sleep(1)
        except KeyboardInterrupt:
            print("Stopped by user.")


if __name__ == "__main__":
    run_scheduler()


Listening on 192.168.2.129:56790 every 60 seconds...
Error during UDP read: [WinError 10040] A message sent on a datagram socket was larger than the internal message buffer or some other network limit, or the buffer used to receive a datagram into was smaller than the datagram itself
[2025-04-25 17:13:22.809333] Collected median, running summary count: 1
Failed to parse record: invalid literal for int() with base 10: '2F6082'
Error during UDP read: [WinError 10040] A message sent on a datagram socket was larger than the internal message buffer or some other network limit, or the buffer used to receive a datagram into was smaller than the datagram itself
Failed to parse record: invalid literal for int() with base 10: '2F6082'
Error during UDP read: [WinError 10040] A message sent on a datagram socket was larger than the internal message buffer or some other network limit, or the buffer used to receive a datagram into was smaller than the datagram itself
Stopped by user.


In [None]:
import socket
import polars as pl
import datetime
from pathlib import Path
from typing import Any

def parse_udp_record(record: str) -> "dict[str, Any]":
    try:
        id_part, rest = record.split('<', 1)
        data_part, checksum = rest.split('>', 1)

        parsed = {"id": int(id_part.strip()), "checksum": checksum.strip()}

        if data_part.startswith("sendVal"):
            data_part = data_part[len("sendVal"):].strip()

        for pair in data_part.split(';'):
            if '=' in pair:
                k, v = pair.split('=', 1)
                key = f"val_{int(k.strip())}"
                try:
                    val = float(v.strip())
                except ValueError:
                    val = float('nan')
                parsed[key] = val

        return parsed
    except Exception as e:
        print(f"Failed to parse record: {e}")
        return {}

def ensure_output_path(base_dir: str, dt: datetime.datetime) -> Path:
    y, m, d = dt.year, dt.month, dt.day
    folder = Path(base_dir) / f"{y:04d}" / f"{m:02d}" / f"{d:02d}"
    folder.mkdir(parents=True, exist_ok=True)
    filename = f"fidas-{y:04d}{m:02d}{d:02d}{dt.hour:02d}.parquet"
    return folder / filename

def collect_and_log_udp_summary(
    base_dir: str,
    max_records: int = 100,
    local_ip: str = "0.0.0.0",
    local_port: int = 12345,
    buffer_size: int = 1024,
    run_minutes: int = 60
) -> None:
    """
    Collects UDP records, accumulates median summaries over time,
    and writes them to hourly .parquet files in a yyyy/mm/dd folder hierarchy.
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind((local_ip, local_port))
    print(f"Listening on {local_ip}:{local_port} ...")

    summary_df = pl.DataFrame()
    end_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=run_minutes)
    buffer = ""

    try:
        while datetime.datetime.utcnow() < end_time:
            parsed_records = []
            while len(parsed_records) < max_records:
                data, _ = sock.recvfrom(buffer_size)
                buffer += data.decode('ascii', errors='ignore')

                while '>' in buffer and len(parsed_records) < max_records:
                    raw_record, buffer = buffer.split('>', 1)
                    full_record = raw_record + '>'
                    parsed = parse_udp_record(full_record)
                    if parsed:
                        parsed_records.append(parsed)

            if not parsed_records:
                continue

            df = pl.DataFrame(parsed_records)

            # Compute median for value columns
            value_cols = [col for col in df.columns if col not in {"id", "checksum"} and df.schema[col] in {pl.Float64, pl.Float32}]
            median_row = df.select([pl.median(col).alias(col) for col in value_cols])

            now = datetime.datetime.utcnow()
            median_row = median_row.with_columns([
                pl.lit("median").alias("id"),
                pl.lit("").alias("checksum"),
                pl.lit(now).cast(pl.Datetime("us", "UTC")).alias("dtm")
            ])

            for col in df.columns:
                if col not in median_row.columns:
                    median_row = median_row.with_columns(pl.lit(None).alias(col))

            median_row = median_row.select(sorted(median_row.columns))
            summary_df = pl.concat([summary_df, median_row], how="diagonal")

            # Save to hourly parquet file
            out_path = ensure_output_path(base_dir, now)
            if out_path.exists():
                existing = pl.read_parquet(out_path)
                summary_df = pl.concat([existing, summary_df], how="vertical").unique()
            summary_df.write_parquet(out_path)
            print(f"Wrote summary to {out_path}")
            summary_df = pl.DataFrame()  # reset for next batch

    except KeyboardInterrupt:
        print("Stopped by user.")
    finally:
        sock.close()

# Run for 60 minutes, saving 100-record summaries to parquet
collect_and_log_udp_summary(base_dir="data/fidas", max_records=100, run_minutes=60)


In [None]:
import socket
import polars as pl
from typing import Generator, Any
import math

UDP_IP = "192.168.2.129"  # Listen on all interfaces
UDP_PORT = 56790

def receive_udp_ascii(
    local_ip: str = "0.0.0.0",
    local_port: int = 12345,
    buffer_size: int = 1024
) -> str:
    """
    Receive ASCII data via UDP, accumulating until '>' character marks end of record.

    Args:
        local_ip: IP address to bind to (use "0.0.0.0" for all interfaces).
        local_port: UDP port to bind to.
        buffer_size: Maximum number of bytes to receive at once.
    """
    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
        sock.bind((local_ip, local_port))
        sock.settimeout(2)
        print(f"Listening on {local_ip}:{local_port}")

        rcvd = ""
        try:
            while True:
                data, addr = sock.recvfrom(buffer_size)
                if '>' in data.decode():
                    rcvd = f"{rcvd}{data.decode()}"
                    break

            record =

            return rcvd
        except Exception as err:
            print(err)
        finally:
            sock.close()
    # try:
    #     while True:
    #         data, addr = sock.recvfrom(buffer_size)
    #         text = data.decode('ascii', errors='ignore')
    #         buffer += text

    #         while '>' in buffer:
    #             record, buffer = buffer.split('>', 1)
    #             record += '>'
    #             print(f"Received from {addr}: {record}")
    #             # You can process the record here

    # except KeyboardInterrupt:
    #     print("\nStopped by user.")
    # finally:
    #     sock.close()

if __name__ == "__main__":
    rcvd = receive_udp_ascii(local_ip=UDP_IP, local_port=UDP_PORT))
    df = amend

# sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# sock.bind((UDP_IP, UDP_PORT))
# sock.settimeout(2.0)  # Set timeout to 2 seconds

# print(f"Listening for UDP packets on port {UDP_PORT}...")

# try:
#     while True:
#         try:
#             data, addr = sock.recvfrom(4096)  # buffer size in bytes
#             # print(f"Received from {addr}: {data}")
#             print(data.decode())
#         except socket.timeout:
#             print("Waiting for UDP data...")
# except KeyboardInterrupt:
#     print("Exiting.")
# finally:
#     sock.close()


In [None]:
import serial
import time

# Open COM5 (on Windows) or /dev/ttyS5 (on Linux), adjust accordingly
ser = serial.Serial(
    port='/dev/ttyUSB0',        # Use '/dev/ttyS5' or '/dev/ttyUSB0' on Linux
    baudrate=57600,
    bytesize=serial.EIGHTBITS,
    parity=serial.PARITY_NONE,
    stopbits=serial.STOPBITS_ONE,
    timeout=1           # Optional: read timeout in seconds
)

if ser.is_open:
    print("Serial port opened successfully.")

# Example write and read (optional)
ser.write(b'<getVal 60; 61; 64>')

# Optional: wait for device to initialize
time.sleep(1)

# Loop until data is received
print("Waiting for data...")
try:
    while True:
        if ser.in_waiting > 0:
            data = ser.read(ser.in_waiting)  # Read all available bytes
            print("Received:", data.decode(errors='ignore'))
        time.sleep(0.1)  # Avoid busy-waiting
except KeyboardInterrupt:
    print("\nStopping.")

# Cleanup
ser.close()