In [30]:
from influxdb_client import InfluxDBClient, Point, WriteOptions
import pandas as pd
import numpy as np
import configparser
from datetime import datetime, timedelta


# ============================================================
# 1. CONFIGURATION
# ============================================================
INFLUX_URL                  = "http://100.107.165.205:8086/"
INFLUX_TOKEN                = "SBto4EBQvq7wY-APYOoDn4QpYZ9GkjWzQZzftrDDk31kjRYWYN-37i7uHNXddkZjYsAU85EdYbI2hoKBLB1woA=="
INFLUX_ORG                  = "e35fd59963c483cd"
INFLUX_BUCKET_CORRIDOR      = "1_1_1"
INFLUX_BUCKET_BATHROOM      = "1_2_6"
INFLUX_BUCKET_KITCHEN       = "1_4_10"
INFLUX_BUCKET_AUTH          = ""

CONFIG_STAY_MAX_INTERVAL    = 10

In [59]:
# ============================================================
# 2. CONNECT TO INFLUXDB
# ============================================================
def get_client():
    return InfluxDBClient(
        url=INFLUX_URL,
        token=INFLUX_TOKEN,
        org=INFLUX_ORG
    )


# ============================================================
# 3. QUERY DATA FROM INFLUXDB
# ============================================================
def query_data(device_id=1, range_in_hours=6):
    query_api = client.query_api()
    MEASUREMENT_PIR = "PIR"

    if device_id == 1:
        bucket = INFLUX_BUCKET_CORRIDOR
    elif device_id == 2:
        bucket = INFLUX_BUCKET_BATHROOM
    elif device_id == 4:
        bucket = INFLUX_BUCKET_KITCHEN
    else:
        print("Unknown device ID.")
        return

    # query = f'''
    #     from(bucket: "{bucket}")
    #       |> range(start: -{range_in_hours}h)
    #       |> filter(fn: (r) => r["_measurement"] == "{MEASUREMENT_PIR}")
    #       |> keep(columns: ["_time", "_value", "_field", "_measurement"])
    # '''

    query = f'''
           from(bucket: "{bucket}")
             |> range(start: 2025-12-01T00:00:00Z, stop: 2025-12-01T23:59:59Z)
             |> filter(fn: (r) => r["_measurement"] == "{MEASUREMENT_PIR}")
             |> keep(columns: ["_time", "_value", "_field", "_measurement"])
       '''

    print("Running query...")
    df = query_api.query_data_frame(query)

    # If Influx returns multiple tables, flatten them
    if isinstance(df, list):
        df = pd.concat(df, ignore_index=True)

    print(f"Retrieved {len(df)} rows.")
    return df

In [63]:
def add_to_df(df, start_time, latest_time, room):
    duration = pd.Timedelta(seconds=0) if latest_time is None else latest_time - start_time
    print(f"Duration: {duration.seconds}")
    df.loc[len(df)] = [room, start_time, duration.seconds] 

def analyze_stays(df: pd.DataFrame):
    print("\nAnalyzing stays...")
    if df.empty:
        print("\nEmpty dataframe.")
        return df
    
    room = df.loc[0, '_value']
    
    df_stays = pd.DataFrame({
        "room": [],
        "start_time": [],
        "duration": []
    })
    
    df.sort_values(by="_time") # ascending by default
    time_array = df["_time"]
    start_time = df.loc[0, '_time'] # type: pd.Timestamp
    print(f"Start time: {start_time}")
    
    print(f"Type of start_time_str: {type(start_time)}")
    latest_time = None
    for idx, event_time in enumerate(time_array[1:]):
        time_passed = event_time - start_time
        # Check if event is part of the same stay
        if time_passed < timedelta(minutes=CONFIG_STAY_MAX_INTERVAL):
            # If part of stay, extend latest_time to current event_time and continue with next event
            latest_time = event_time
            continue
        else:
            # If not, write stay to df and set current event_time as new start_time
            add_to_df(df_stays, start_time, latest_time, room)
            # Reset start_time & latest_time
            latest_time = None
            start_time = event_time
    
    if start_time is not None:
        add_to_df(df_stays, start_time, latest_time, room)
    return df_stays

In [66]:
# ============================================================
# 5. WRITE RESULTS BACK TO INFLUXDB
# ============================================================
def write_results(df_stays: pd.DataFrame):
    print("\nWriting results to InfluxDB...")

    # Convert start_time to datetime (required by InfluxDB)
    df_stays["start_time"] = pd.to_datetime(df_stays["start_time"])
    # Set the timestamp column as index â€” InfluxDB uses the index as time
    df_stays = df_stays.set_index("start_time")

    write_api = client.write_api()
    
    print("\nData is ready to be written into DB:")
    print(df_stays)

    # Write DataFrame to InfluxDB
    write_api.write(
        bucket="stays",
        record=df_stays,
        data_frame_measurement_name="room_stays",  # measurement name
        data_frame_tag_columns=["room"]  # "room" becomes a tag
    )

    print("Data written successfully to bucket 'stays'")

In [67]:
# ============================================================
# 6. MAIN PIPELINE
# ============================================================
if __name__ == "__main__":
    print("Connecting to InfluxDB...")
    client = get_client()

    device_id = 1
    range_in_hours = 6

    df = query_data(device_id=device_id, range_in_hours=range_in_hours)
    print(f"\nFound {df.shape[0]} rows of data")
    df_stays = analyze_stays(df)
    write_results(df_stays)

    print("\nPipeline finished.")

Connecting to InfluxDB...
Running query...
Retrieved 148 rows.

Found 148 rows of data

Analyzing stays...
Start time: 2025-12-01 00:00:49+00:00
Type of start_time_str: <class 'pandas._libs.tslibs.timestamps.Timestamp'>
Duration: 524
Duration: 11
Duration: 589
Duration: 72
Duration: 129
Duration: 0
Duration: 558
Duration: 0
Duration: 0
Duration: 587
Duration: 15
Duration: 393
Duration: 0
Duration: 71
Duration: 17
Duration: 45
Duration: 15
Duration: 573
Duration: 508
Duration: 585
Duration: 314
Duration: 181
Duration: 8
Duration: 526
Duration: 19
Duration: 64
Duration: 0
Duration: 0

Writing results to InfluxDB...

Data is ready to be written into DB:
                               room  duration
start_time                                   
2025-12-01 00:00:49+00:00  corridor       524
2025-12-01 04:26:49+00:00  corridor        11
2025-12-01 04:45:23+00:00  corridor       589
2025-12-01 04:55:44+00:00  corridor        72
2025-12-01 08:45:48+00:00  corridor       129
2025-12-01 10:54:52


The result will not be shaped to optimal processing by pandas.DataFrame. Use the pivot() function by:

    
           from(bucket: "1_1_1")
             |> range(start: 2025-12-01T00:00:00Z, stop: 2025-12-01T23:59:59Z)
             |> filter(fn: (r) => r["_measurement"] == "PIR")
             |> keep(columns: ["_time", "_value", "_field", "_measurement"])
        |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")



For more info see:
    - https://docs.influxdata.com/resources/videos/pivots-in-flux/
    - https://docs.influxdata.com/flux/latest/stdlib/universe/pivot/
    - https://docs.influxdata.com/flux/latest/stdlib/influxdata/influxdb/schema/fieldsascols/

