In [1]:
from kafka import KafkaConsumer, TopicPartition

consumer = KafkaConsumer(
    bootstrap_servers='172.23.0.104:30131',
    auto_offset_reset='earliest',
)

partition = TopicPartition('core.events', 0)

consumer.assign([partition])

# get current offset
start = consumer.position(partition)

# get end offset
end = consumer.end_offsets([partition])[partition]

total = end - start

print(f"start: {start}, end: {end}, total: {total}")

events = []

for i in range(0, total, 100):
    new_events = consumer.poll(timeout_ms=5000, max_records=100, update_offsets=True)[partition]

    if not new_events:
        break
    
    events.extend(new_events)

print(f"events: {len(events)}")

start: 0, end: 1494, total: 1494
events: 1472


In [51]:
import json
import pandas as pd

df = []

for event in events:
    data = json.loads(event.value)
    
    df.append({
        'offset': event.offset,
        'type': data['context']['type'],
        'timestamp': data['context']['timestamp'],
        'package': data['data']
    })

df = pd.DataFrame(df)

df['timestamp'] = pd.to_datetime(df['timestamp'])
df['hour'] = df['timestamp'].dt.floor('H')

# remove the last hour of rows
df = df[df['hour'] < df['hour'].max()]

max_offset = df['offset'].max()

df

  df['hour'] = df['timestamp'].dt.floor('H')


Unnamed: 0,offset,type,timestamp,package,hour
0,0,core.motion.detected,2024-12-15 01:42:41.134038+00:00,"{'detected': 11, 'total': 80}",2024-12-15 01:00:00+00:00
1,1,core.motion.detected,2024-12-15 01:42:41.307791+00:00,"{'detected': 5, 'total': 80}",2024-12-15 01:00:00+00:00
2,2,core.motion.detected,2024-12-15 01:42:41.441394+00:00,"{'detected': 4, 'total': 80}",2024-12-15 01:00:00+00:00
3,3,core.motion.detected,2024-12-15 01:42:41.629027+00:00,"{'detected': 4, 'total': 80}",2024-12-15 01:00:00+00:00
4,4,core.motion.detected,2024-12-15 01:42:41.688246+00:00,"{'detected': 4, 'total': 80}",2024-12-15 01:00:00+00:00
...,...,...,...,...,...
1304,1304,core.object.tracked,2024-12-25 21:59:51.659385+00:00,"{'track_index': 4, 'box': [638, 204, 1277, 407]}",2024-12-25 21:00:00+00:00
1305,1305,core.object.detected,2024-12-25 21:59:54.658023+00:00,"{'box': [639, 202, 1278, 404], 'confidence': 0...",2024-12-25 21:00:00+00:00
1306,1306,core.object.tracked,2024-12-25 21:59:54.737685+00:00,"{'track_index': 1, 'box': [626, 200, 1291, 410]}",2024-12-25 21:00:00+00:00
1307,1307,core.object.tracked,2024-12-25 21:59:54.818867+00:00,"{'track_index': 2, 'box': [626, 202, 1291, 412]}",2024-12-25 21:00:00+00:00


In [9]:
# determine how many hours from the earliest to latest event
hours = (df['timestamp'].max() - df['timestamp'].min()).total_seconds() / 3600

print(f"hours: {hours}")

hours: 260.32350728694445


In [16]:
# aggregate per 1 minute and count the number of events
counts_df = df.resample('1T', on='timestamp').count()

counts_df = counts_df.drop(columns=['type'])

# remove any that have no events
counts_df = counts_df[counts_df['package'] > 0]

counts_df

  counts_df = df.resample('1T', on='timestamp').count()


Unnamed: 0_level_0,package
timestamp,Unnamed: 1_level_1
2024-12-15 01:42:00+00:00,20
2024-12-15 01:46:00+00:00,24
2024-12-15 01:47:00+00:00,11
2024-12-15 01:53:00+00:00,5
2024-12-15 01:54:00+00:00,9
...,...
2024-12-25 21:58:00+00:00,12
2024-12-25 21:59:00+00:00,52
2024-12-25 22:00:00+00:00,49
2024-12-25 22:01:00+00:00,88


In [48]:
# total up the number of events if the timestamps are back to back (i.e. no gaps)
record_events = []
record_event = None
previous_row = None

counts_df = counts_df.reset_index()
counts_df = counts_df.sort_values('timestamp')

for _, row in counts_df.iterrows():
    if previous_row is not None:
        if (row.timestamp - previous_row.timestamp).total_seconds() == 60:
            if record_event is None:
                record_event = {
                    'start': previous_row.timestamp,
                    'end': row.timestamp,
                    'count': previous_row.package + row.package
                }
            else:
                record_event['end'] = row.timestamp
                record_event['count'] += row.package
            
        elif record_event is not None:
            record_events.append(record_event)
            record_event = None
    
    previous_row = row


record_df = pd.DataFrame(record_events)

record_df

Unnamed: 0,start,end,count
0,2024-12-15 01:46:00+00:00,2024-12-15 01:47:00+00:00,35
1,2024-12-15 01:53:00+00:00,2024-12-15 01:54:00+00:00,14
2,2024-12-15 21:47:00+00:00,2024-12-15 21:48:00+00:00,30
3,2024-12-15 22:00:00+00:00,2024-12-15 22:01:00+00:00,93
4,2024-12-15 22:33:00+00:00,2024-12-15 22:34:00+00:00,52
5,2024-12-15 22:47:00+00:00,2024-12-15 22:48:00+00:00,9
6,2024-12-20 00:56:00+00:00,2024-12-20 00:57:00+00:00,4
7,2024-12-20 01:11:00+00:00,2024-12-20 01:14:00+00:00,15
8,2024-12-20 02:01:00+00:00,2024-12-20 02:02:00+00:00,24
9,2024-12-20 02:31:00+00:00,2024-12-20 02:32:00+00:00,13


ffmpeg \
    -f v4l2 \
    -input_format mjpeg \
    -i /dev/video0 \
    -r 30 \
    -s 1280x720 \
    -vcodec mjpeg \
    -f segment \
    -segment_time 30 \
    -segment_atclocktime 1 \
    -reset_timestamps 1 \
    -strftime 1 \
    stream_%s.mp4

ffprobe -select_streams 0 -show_entries packet=pts_time:stream=codec_type "stream_1735168800.mp4" -print_format json

In [71]:
import os
import subprocess

def fetch_timestamps(video_file):
    cmd = f'/usr/bin/ffprobe -select_streams 0 -show_entries packet=pts_time:stream=codec_type "{video_file}" -print_format json'
    output = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
    output = json.loads(output.stdout)
    output = [float(p['pts_time']) for p in output['packets']]
    return output

video_files = [f for f in os.listdir('.') if f.endswith('.mp4')]
video_files = [os.path.abspath(f) for f in video_files]

frame_times = {}

for video_file in video_files:
    timestamps = fetch_timestamps(video_file)
    video_start_time = float(os.path.basename(video_file).split('_')[1].split('.')[0])
    times = []

    for timestamp in timestamps:
        times.append(video_start_time + timestamp)

    frame_times[video_file] = sorted(times)

1735171290.0 - 1735171319.966667
1735171350.0 - 1735171353.4
1735171246.0 - 1735171260.066667
1735171320.0 - 1735171350.0
1735171260.0 - 1735171289.933333


```
1735171246.0 - 1735171260.066667
               1735171260.0 - 1735171289.933333
                              1735171290.0 - 1735171319.966667
                                             1735171319.966667 - 1735171349.933333
                                                                 1735171350.0 - 1735171353.4

```