In [1]:
import glob
from pathlib import Path
import os
from datetime import datetime
from zoneinfo import ZoneInfo
from operator import itemgetter, attrgetter
import json
import base64

In [64]:
LOG_DIR = "/home/kawaguti-nis/mqtt-recorder-rs/log/"

MGR_REGIST = LOG_DIR+"mgr/register/"

EXP_DATES = ["2025-08-25","2025-08-26","2025-08-27","2025-08-28","2025-08-29","2025-08-30","2025-08-31",
             "2025-09-01","2025-09-02","2025-09-03","2025-09-04","2025-09-05","2025-09-06","2025-09-07"
             ]

EXP_DEV_IP = "新城VR"

In [47]:
#各ディレクトリの日付. 時間を知りたい

def list_files_date(ststr,edstr="", baseDir = LOG_DIR+"control/*"):
  files = glob.glob(baseDir)
  if len(ststr)==10:
    ststr += ' 00:00:00'
  
  if edstr == "":
    edstr = ststr[:11]+'23:59:59'
  
  sttime = datetime.strptime(ststr, '%Y-%m-%d %H:%M:%S').timestamp()
  edtime = datetime.strptime(edstr, '%Y-%m-%d %H:%M:%S').timestamp()
  
  dirs = []
  
  for d in files:
    p = Path(d)
  #dがディレクトリかどうか？
    if p.is_dir():
      st = p.stat()
      birth_ts = st.st_ctime
#      print(birth_ts)
      if birth_ts >= sttime and birth_ts <= edtime:
        dirs.append((birth_ts,str(datetime.fromtimestamp(birth_ts, ZoneInfo("Asia/Tokyo"))), p.name))
    #print( datetime.fromtimestamp(birth_ts, ZoneInfo("Asia/Tokyo")), p.name)
  st = sorted(dirs, key=itemgetter(0))
  return st


In [4]:
# ある日付からのログを取得
def list_mqtt_logs(ststr):
  dirs = list_files_date(ststr)
  dt = ststr[:10] # 日付
  files = []
  for d in dirs:
    mqfiles = glob.glob(LOG_DIR+"control/"+d[2]+"/"+dt+"/*")
#    print(dt, mqfiles[0])
    files.extend(mqfiles)

  return files


In [48]:
# 各ファイルで、スタート・エンドの時刻と行数を調べる
def check_mqtt_file_start_end(fname):
#  data_list = []
  count = 0
  startTime= None
  with open(fname, "r", encoding="utf-8") as f:
    for line in f:
      if line.strip():  # 空行をスキップ
        obj = json.loads(line)
        count += 1
        if "msg_b64" in obj:
          b64_string = obj.pop("msg_b64")
          decoded_bytes = base64.b64decode(b64_string)
          decoded_text = decoded_bytes.decode("utf-8")
          obj["msg"] = decoded_text
        endTime = obj.get("time")
        if startTime == None:
          startTime= endTime
  #      print(obj)
        obj.pop("topic",None)
        obj.pop("qos",None)
        obj.pop("retain",None)
#        data_list.append(obj)
  fp = Path(fname)
  
  ret =  {
    "id": fp.parents[1].name,    # デバイスUUID
    "date": fp.parents[0].name,  # 日付
    "fname": fp.name,  #  ファイル名の中に情報がある。。。
    "startTime": startTime,
    "startStr":str(datetime.fromtimestamp(startTime))[:19], 
    "endStr": str(datetime.fromtimestamp(endTime))[11:19],
    "endTime": endTime,
    "count": count
  }
#  print(ret)
#  print(fp.name,
#        str(datetime.fromtimestamp(startTime))[:19], 
#        str(datetime.fromtimestamp(endTime))[11:19], int((endTime-startTime)*10)/10,"sec", 
#        count ,"lines",int(10* count / (endTime-startTime) )/10, "lines/sec")
  return ret

In [49]:
# 登録時の情報を調べる
def list_mgr_register(date,  baseDir =  MGR_REGIST):
  files = glob.glob(baseDir+date+"/*")
  files.sort()
  data_list = []
  for fname in files:
    count = 0
#    print("Start: ",fname)
    with open(fname, "r", encoding="utf-8") as f:
      for line in f:        
        if line.strip():  # 空行をスキップ
          obj = json.loads(line)
          count += 1
          if "msg_b64" in obj:
            b64_string = obj.pop("msg_b64")
            decoded_bytes = base64.b64decode(b64_string)
            decoded_text = decoded_bytes.decode("utf-8")
            obj["msg"] = decoded_text
          obj.pop("qos",None)
          obj.pop("retain",None)
          jsonMsg = obj.get("msg")
          jsonObj = json.loads(jsonMsg)
          opt = jsonObj.get("optStr")
          if opt:
#            print(count, str(datetime.fromtimestamp(obj.get("time")))[:19], jsonObj.get("type"), jsonObj.get("devId"),opt)
            opts = opt.split("@")
            jsonObj['wid'] = opts[0]
            jsonObj['app'] = opts[1]
            jsonObj['time']=obj.get('time')
            jsonObj['timeStr']= str(datetime.fromtimestamp(obj.get("time")))
            data_list.append(jsonObj)
  return sorted(data_list,key= lambda x: x.get('time') )

  

In [50]:
# 登録解除時の情報を調べる (どうも解除情報はすべて残っていない。。。なぜ？)
def list_mgr_unregister(date,  baseDir =   LOG_DIR+"mgr/unregister/"):
  files = glob.glob(baseDir+date+"/*")
  files.sort()
  data_list = []
  for fname in files:
    count = 0
#    print("Start: ",fname)
    with open(fname, "r", encoding="utf-8") as f:
      for line in f:        
        if line.strip():  # 空行をスキップ
          obj = json.loads(line)
          count += 1
          if "msg_b64" in obj:
            b64_string = obj.pop("msg_b64")
            decoded_bytes = base64.b64decode(b64_string)
            decoded_text = decoded_bytes.decode("utf-8")
            obj["msg"] = decoded_text
          obj.pop("qos",None)
          obj.pop("retain",None)
          jsonMsg = obj.get("msg")
          jsonObj = json.loads(jsonMsg)
          jsonObj['time']=obj.get('time')
          jsonObj['timeStr']= str(datetime.fromtimestamp(obj.get("time")))
          data_list.append(jsonObj)
#          print(count,  jsonObj)
#          if opt:
#            print(count, str(datetime.fromtimestamp(obj.get("time")))[:19], jsonObj.get("type"), jsonObj.get("devId"),opt)
#            opts = opt.split("@")
#            jsonObj['wid'] = opts[0]
#            jsonObj['app'] = opts[1]
#            jsonObj['time']=obj.get('time')
#            jsonObj['timeStr']= str(datetime.fromtimestamp(obj.get("time")))
#            data_list.append(jsonObj)
  return sorted(data_list,key= lambda x: x.get('time') )

In [51]:
# ある日付からのログを取得
def list_st_end_all(ststr):
  st_end_all = []
  files = list_mqtt_logs(ststr)
  for f in files:
    st_end_line = check_mqtt_file_start_end(f)
    st_end_all.append(st_end_line)
  
  return st_end_all
  

In [None]:
def check_all_exp(dates = EXP_DATES, dev_id = EXP_DEV_IP):
  all_data = []
  for d in dates:
    date_logs = list_st_end_all(d)
    if len(date_logs)> 0:
      exp_logs = [l for l in date_logs if dev_id in l['id']]
      all_data.extend(exp_logs)
      print("Done:",d,"log count:",len(exp_logs))

  return all_data

In [None]:
all_exp_logs =  check_all_exp()