In [4]:
import pickle
import time
import os
import datetime
import numpy as np
import pandas as pd

cur_dir = os.getcwd()
main_dir = os.path.dirname(os.path.dirname(cur_dir))

In [5]:
# test_dates = ["08_01_2022","08_02_2022"]
# test_folders = ["Day1_Training1", "Day2_Training1"]
# file_names = {"eds":["flagged_DateTimed_WTRUN2_day1_training1_EDS", "DateTimed_WTRUN2_day2_training1_EDS"], 
#               "uav":["WTRUN2_day1_training1_UAV", "WTRUN2_day2_training1_UAV"],
#               "rtdstr":["compensated_normalized_WTRUN2_training_sweep1_2022-08-01_17-24-43-50_rtd-str_Jan2023_tworegion",
#                         "compensated_normalized_WTRUN2_day2_training1_2022-08-02_12-38-30-01_rtd-str_Jan2023_tworegion"]}

test_dates = ["08_02_2022"]
test_folders = ["Day2_Dynamic1"]
file_names = {"eds":["nopeaks_DateTimed_WTRUN2_day2_dynamic1_EDS"], 
              "uav":["WTRUN2_day2_dynamic1_UAV"],
              "rtdstr":["compensated_normalized_WTRUN2_day2_dynamic1_2022-08-02_14-32-54-11_rtd-str_Jan2023_tworegion"]}

In [6]:
#Bring in the EDS, UAV, and RTD-STR (IMGenie) data as Pandas Dataframes
test_dfs = {"eds": list(), "uav": list(), "rtdstr": list()}
eds_rtdstr_offsets = list() #each item: ["vantage stream name", "number of datapoints until reaching to vantage from other stream"]
vantage_uav_offsets = list() #each item: "number of datapoints until reaching to vantage from UAV"

for i in range(len(test_dates)):
  data_dir = os.path.join(main_dir, test_dates[i]+"_Tests", "testdata", test_folders[i])
  for sensor_type in ("eds", "uav", "rtdstr"):
    test_csv = os.path.join(data_dir, file_names[sensor_type][i]+".csv")
    test_df = pd.read_csv(test_csv, header=0)
    test_dfs[sensor_type].append(test_df)

In [7]:
#Insert DateTime objects to the dataframes.

#Insert for EDS data:
for i in range(len(test_dfs["eds"])):
  if "eds_DateTime Obj" not in test_dfs["eds"][i].columns:
    eds_times = test_dfs['eds'][i]["DateTime Str"].apply(lambda x: datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f"))
    test_dfs['eds'][i].insert(1, "eds_DateTime Obj", eds_times)
  else:
    test_dfs['eds'][i]["eds_DateTime Obj"] = test_dfs['eds'][i]["eds_DateTime Obj"].apply(lambda x: datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f"))

#Insert for RTD-STR data:
for i in range(len(test_dfs["rtdstr"])):
  if "rtdstr_DateTime Obj" not in test_dfs["rtdstr"][i].columns:
    month = int (test_dates[i][0:2])
    day = int (test_dates[i][3:5])
    year = int (test_dates[i][6:])
    rtdstr_times = test_dfs['rtdstr'][i]["Date/Time"].apply(lambda x: datetime.datetime.strptime(x, "%Y-%m-%d_%H-%M-%S-%f"))
    rtdstr_times = rtdstr_times.apply(lambda x: datetime.datetime(year, month, day, x.hour, x.minute, x.second, x.microsecond)) #We do this because we haven't recorded date in the original data.
    test_dfs['rtdstr'][i].insert(1, "rtdstr_DateTime Obj", rtdstr_times)
  else:
    test_dfs['rtdstr'][i]["rtdstr_DateTime Obj"] = test_dfs['rtdstr'][i]["rtdstr_DateTime Obj"].apply(lambda x: datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f"))

#Insert for UAV data:
#First define a function to separate full seconds into hours, minutes, seconds, and milliseconds
def seperate_seconds(seconds):
  #This function assumes inserted seconds is shorter than 24 hours = 86,400 seconds
  onlyseconds = int(seconds.split(".")[0])
  hours = int (onlyseconds/(60*60))
  minutes = int (onlyseconds/(60)%60)
  remseconds = int (onlyseconds - hours*60*60 - minutes*60)
  milliseconds = seconds.split(".")[1][0:6]
  return hours, minutes, remseconds, milliseconds

for i in range(len(test_dfs["uav"])):
  if "uav_DateTime Obj" not in test_dfs["uav"][i].columns:
    seconds = test_dfs['uav'][i]["STATIC_PRESSURE_TIME"].astype(str)
    time_sr = seconds.apply(seperate_seconds)
    uav_times = time_sr.astype(str).apply(lambda x: datetime.datetime.strptime(x, "(%H, %M, %S, '%f')"))
    test_dfs['uav'][i].insert(1, "uav_DateTime Obj", uav_times)
  else:
    test_dfs['uav'][i]["uav_DateTime Obj"] = test_dfs['uav'][i]["uav_DateTime Obj"].apply(lambda x: datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f"))

In [8]:
#Find the offset between the start points of EDS and IMGenie data
for i in range(len(test_dfs["eds"])):
  first_time_eds = test_dfs["eds"][i]["eds_DateTime Obj"][0] 
  first_time_rtdstr = test_dfs["rtdstr"][i]["rtdstr_DateTime Obj"][0]
  vantage_stream = "eds" if first_time_eds > first_time_rtdstr else "rtdstr" #Vantage stream is the stream that start later.
  early_stream = "eds" if first_time_eds < first_time_rtdstr else "rtdstr" #Early stream is the other one
  eds_rtdstr_offsets.append([vantage_stream])

  vantage_time = test_dfs[vantage_stream][i][f"{vantage_stream}_DateTime Obj"][0] 
  offset_ix = 1
  time_delta = datetime.timedelta(seconds=999) #(assume this is large enough)
  while True:
    incrementing_time = test_dfs[early_stream][i][f"{early_stream}_DateTime Obj"][offset_ix] 
    new_time_delta = abs(vantage_time - incrementing_time)
    if new_time_delta > time_delta:
      break
    time_delta = new_time_delta
    offset_ix += 1
  eds_rtdstr_offsets[-1].append(offset_ix)

offsets_savedir = os.path.join(cur_dir, 'offset_pickles')
with open(os.path.join(offsets_savedir,'eds_rtdstr_offsets_dynamic1.pkl'), 'wb') as f:
  pickle.dump(eds_rtdstr_offsets, f)

In [9]:
print(eds_rtdstr_offsets)

[['eds', 281]]


In [10]:
#Find the offset between the start points of UAV and the vantage stream we found out above.
#We assume UAV always starts before vantage because of how we conducted the experiments.
observe_column_name = {"eds":"Inclination (deg)", "uav":"Pitch (deg)"}
for i in range(len(test_dfs["uav"])):
  aoajump_ixs = list() #[[uav_ix, eds_ix]]
  for data_stream in ("uav", "eds"):
    #Find the indices of eds and uav streams where Inclination (deg) and Pitch (deg) increases by >4 degrees in 5 seconds.
    #Calculate the average time delta between each row.
    time_delta = (test_dfs[data_stream][i][f"{data_stream}_DateTime Obj"][200] - test_dfs[data_stream][i][f"{data_stream}_DateTime Obj"][100])/100

    #Calculate diff for 5 seconds
    num_rows = int(5/time_delta.total_seconds())
    angle_change = test_dfs[data_stream][i][observe_column_name[data_stream]].diff(periods=num_rows)

    #Extract the row ID where we see the first >4 degree change in 5 seconds (+10 degree calibration procedure).
    aoajump_ix = angle_change [angle_change>4].index[0]

    #Process the special case when UAV had a weird AoA jump in the Aug 2, 2022 training test.
    if i==1 and data_stream == "uav":
      j = 0
      reftime = datetime.datetime(1900, 1, 1, 0, 0, 0)
      timestamp = test_dfs[data_stream][i][f"{data_stream}_DateTime Obj"][aoajump_ix]
      print ("data_type =", data_stream)
      print ("i =",i)
      _ = input("Warning! Entering into a special case designed for WTRUN2_day2_training1_UAV file. Make sure you're processing this file here.")
      while timestamp-reftime < datetime.timedelta(seconds=1200):
        j += 1
        aoajump_ix = angle_change [angle_change>4].index[j]
        timestamp = test_dfs[data_stream][i][f"{data_stream}_DateTime Obj"][aoajump_ix]
      
    aoajump_ixs.append(aoajump_ix)

  eds_aoajump_timestamp = test_dfs["eds"][i]["eds_DateTime Obj"][aoajump_ixs[1]]  
  if eds_rtdstr_offsets[i][0] == "eds": #If vantage is eds
    aoajump_delta_w_vantage = eds_aoajump_timestamp - test_dfs["eds"][i]["eds_DateTime Obj"][0]
  else: #If vantage is rtdstr
    aoajump_delta_w_vantage = eds_aoajump_timestamp - test_dfs["rtdstr"][i]["rtdstr_DateTime Obj"][0]

  uav_aoajump_timestamp = test_dfs["uav"][i]["uav_DateTime Obj"][aoajump_ixs[0]]
  time_to_find = uav_aoajump_timestamp - aoajump_delta_w_vantage
  
  time_delta = datetime.timedelta(seconds=999) #(assume this is large enough)
  search_ix = 1
  timeshift_reps = 0
  while True:
    new_timedelta = abs (test_dfs["uav"][i]["uav_DateTime Obj"][search_ix] - time_to_find)
    if new_timedelta > time_delta:
      timeshift_reps += 1
      if timeshift_reps > 3:
        search_ix -= 3
        break
    else:
      timeshift_reps = 0
    search_ix += 1
    time_delta = new_timedelta
  uav_vantage_ix = search_ix
  vantage_uav_offsets.append(uav_vantage_ix)


In [11]:
vantage_uav_offsets

[2766]

In [12]:
# Construct new df objects that will contain all RTDSTR, EDS, and UAV data:
#   -One df object will contain one experiment run (such as Day1_Training1)
#   -First timepoint will match with that of the vantage stream.
#   -The timepoints will come from the RTDSTR df. EDS and UAV will interpolate according to:
#       -EDS: Iterating row-wise in RTDSTR data, find the closest timematch in EDS data and bring that block to this row.
#       -UAV: First, change the DateTime Obj of UAV that matches the vantage with the RTDSTR at the vantage.
#             Then, apply that offset to the subsequent rows.
#             Then apply above idea to find closest match in UAV data.
#   -Last datapoint will be the earlier of the duration of 3 streams. Can be thought of another ending vantage point.
#   -Then we'll concatenate the dfs by propagating the time to second and so on.

combined_dfs = list()
end_loc_offsets = list()
for i in range(len(test_folders)):
  #Get the first timepoint from vantage and all subsequent timepoints from RTDSTR
  start_ixs = dict()
  vantage = eds_rtdstr_offsets[i][0]
  start_ixs["rtdstr"] = 0 if vantage=="rtdstr" else eds_rtdstr_offsets[i][1]-1
  start_ixs["eds"] = 0 if vantage=="eds" else eds_rtdstr_offsets[i][1]-1
  start_ixs["uav"] = vantage_uav_offsets[i]

  #Adjust UAVs DateTime Objs:
  uav_time_offset = test_dfs['uav'][i]["uav_DateTime Obj"][start_ixs["uav"]]
  rtdstr_time_offset = test_dfs['rtdstr'][i]["rtdstr_DateTime Obj"][start_ixs["rtdstr"]]
  test_dfs['uav'][i]["uav_DateTime Obj"].loc[start_ixs["uav"]:] = test_dfs['uav'][i]["uav_DateTime Obj"].loc[start_ixs["uav"]:] - uav_time_offset + rtdstr_time_offset

  #Construct the new DataFrame by first bringing the RTDSTR and then matching datapoints from UAV and EDS:
  main_df = test_dfs['rtdstr'][i].iloc[start_ixs["rtdstr"]:].reset_index()
  main_df = main_df.rename(columns={'index':"rtdstr_orig_index"})
  
  #Merge in the UAV and EDS data to the newly created main_df by carefully according to the algorithm above.
  for stream in ('uav', 'eds'):
    print (f"Processing {stream}")
    streamrow = start_ixs[stream]
    streamdf_dat = np.full([main_df.shape[0], test_dfs[stream][0].columns.size], None)
    streamdf = pd.DataFrame(streamdf_dat, columns=test_dfs[stream][0].columns)
    
    #We iterate over the rows of main_df, which is based on the rtdstr data (because it is the densest),
    #and finding closest match in streaming data to fuse in.
    for maindf_index, row in main_df.iterrows():
      while True:
        try:
          curr_time_stream = test_dfs[stream][i][f"{stream}_DateTime Obj"][streamrow]
          next_time_stream = test_dfs[stream][i][f"{stream}_DateTime Obj"][streamrow+1]
          while next_time_stream < curr_time_stream: #If there's problem in time acquisition (accurring sometimes in UAV data)
            streamrow += 1
            curr_time_stream = test_dfs[stream][i][f"{stream}_DateTime Obj"][streamrow]
            next_time_stream = test_dfs[stream][i][f"{stream}_DateTime Obj"][streamrow+1]
          gap_curr = abs(main_df["rtdstr_DateTime Obj"][maindf_index] - curr_time_stream)
          gap_next = abs(main_df["rtdstr_DateTime Obj"][maindf_index] - next_time_stream)
        except: #To avoid KeyError when we reach the end of test_dfs[stream].
          print ("Exception occurred")
          streamrow += 1
          break 
        if gap_curr < gap_next:
          break
        streamrow += 1
      try:  
        streamdf.iloc[maindf_index] = test_dfs[stream][i].iloc[streamrow:streamrow+1]
      except:
        pass
      if maindf_index % 100000 == 0:
        print (f"We are at line = {maindf_index}")
    main_df = main_df.merge(streamdf, how='left', left_index=True, right_index=True)

  #Find the last datapoint to cut off the data there.
  #First, find out which of the streams end the earliest. That'll determine our cutoff point.
  timeref_start = main_df["rtdstr_DateTime Obj"]
  shortest_duration = datetime.timedelta(days=1) #1 day should be long enough
  for stream in ('rtdstr', 'uav', 'eds'):
    stream_duration = test_dfs[stream][i][f"{stream}_DateTime Obj"].iloc[-1] - timeref_start[0]
    if stream_duration < shortest_duration:
      shortest_duration = stream_duration
      shortest_stream = stream
  timeref_end = test_dfs[shortest_stream][i][f"{shortest_stream}_DateTime Obj"].iloc[-1]
  
  #Then find what timepoint this cutoff will correspond to in our main_df.
  #If we find rtdstr above, the "end_loc" value we'll get here will be -1 and we'll retain all data.
  timedelta = datetime.timedelta(days=1)
  backiter_j = -1
  while True:
    gap = abs(main_df["rtdstr_DateTime Obj"].iloc[backiter_j] - timeref_end)
    if gap > timedelta:
      break
    backiter_j -= 1 #We will come here at least once. That's why we have backiter_j+1 below.
    timedelta = gap
  end_loc_offset = backiter_j + 1
  main_df = main_df.iloc[0:end_loc_offset]
  
  combined_dfs.append(main_df)
  end_loc_offsets.append(end_loc_offset)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  test_dfs['uav'][i]["uav_DateTime Obj"].loc[start_ixs["uav"]:] = test_dfs['uav'][i]["uav_DateTime Obj"].loc[start_ixs["uav"]:] - uav_time_offset + rtdstr_time_offset


Processing uav
We are at line = 0
We are at line = 100000
We are at line = 200000
We are at line = 300000
We are at line = 400000
We are at line = 500000
We are at line = 600000
We are at line = 700000
We are at line = 800000
We are at line = 900000
Processing eds
We are at line = 0
We are at line = 100000
We are at line = 200000
We are at line = 300000
We are at line = 400000
We are at line = 500000
We are at line = 600000
We are at line = 700000
We are at line = 800000
We are at line = 900000


In [13]:
#Save the consolidated data files for later reuse
df_save_dir = os.path.join(main_dir, "ConsolidatedData", "Dynamic1_Jan2023")
for i in range(len(test_folders)):
  combined_dfs[i].to_pickle(os.path.join(df_save_dir,f'nopeaks_consolidated_{i}.pkl'))

#Save the number of lines trailed in rtdstr after shortest stream is ended. 
#If this ends up being -1, it means rtdstr stream ended earliest.
with open(os.path.join(offsets_savedir,'rtdstr_offsets_trailing_dynamic1.pkl'), 'wb') as f:
  pickle.dump(end_loc_offsets, f)

In [14]:
#Concat the n files from the data into a single consolidated pd.DataFrame
consolidated_df = pd.concat((test_df for test_df in combined_dfs), ignore_index=True)
consolidated_df.to_pickle(os.path.join(df_save_dir,'nopeaks_consolidated_all.pkl'))