Преобразование данных PVS в формат для обучения: в виде точек.

In [54]:
import os

input_dir = "data/pvs-converted"
output_dir = "data/pvs-points"

if not os.path.exists(output_dir):
    os.makedirs(output_dir)

In [55]:
def flat_map(input):
  lst = []
  for elem in input:
    lst.extend(elem)
  return lst


input_fname_indexes = flat_map([[f"{i}-right", f"{i}-left"] for i in range(1, 10)])

In [56]:
import pandas as pd

input_dfs = []

def get_raw_inputs(input_fname):
  df = pd.read_csv(input_fname)
  acDf = df[['timestamp', 'acc_x_dashboard', 'acc_y_dashboard', 'acc_z_dashboard']]
  gyDf = df[['timestamp', 'gyro_x_dashboard', 'gyro_y_dashboard', 'gyro_z_dashboard']]
  gpsDf = df[['timestamp_gps', 'latitude', 'longitude']]
  predDf = df[['timestamp', 'good_road', 'regular_road', 'bad_road']]

  out = (acDf, gyDf, gpsDf, predDf)
  print(input_fname, *map(lambda x: x.shape, out))
  return out


# input_normalized_file_paths = list(map(lambda x: os.path.join(input_dir, f"{x}-normalized.csv"), input_fname_indexes))

for input_fname_index in input_fname_indexes:
  input_normalized_fpath = os.path.join(input_dir, f"{input_fname_index}-normalized.csv")
  input_dfs.append((*get_raw_inputs(input_normalized_fpath), input_fname_index))
  

data/pvs-converted/1-right-normalized.csv (144036, 4) (144036, 4) (144036, 3) (144036, 4)
data/pvs-converted/1-left-normalized.csv (144036, 4) (144036, 4) (144036, 3) (144036, 4)
data/pvs-converted/2-right-normalized.csv (124684, 4) (124684, 4) (124684, 3) (124684, 4)
data/pvs-converted/2-left-normalized.csv (124684, 4) (124684, 4) (124684, 3) (124684, 4)
data/pvs-converted/3-right-normalized.csv (105816, 4) (105816, 4) (105816, 3) (105816, 4)
data/pvs-converted/3-left-normalized.csv (105816, 4) (105816, 4) (105816, 3) (105816, 4)
data/pvs-converted/4-right-normalized.csv (132492, 4) (132492, 4) (132492, 3) (132492, 4)
data/pvs-converted/4-left-normalized.csv (132492, 4) (132492, 4) (132492, 3) (132492, 4)
data/pvs-converted/5-right-normalized.csv (133877, 4) (133877, 4) (133877, 3) (133877, 4)
data/pvs-converted/5-left-normalized.csv (133877, 4) (133877, 4) (133877, 3) (133877, 4)
data/pvs-converted/6-right-normalized.csv (96279, 4) (96279, 4) (96279, 3) (96279, 4)
data/pvs-converted/

In [57]:
import pandas
import numpy

tick = 25000
window = 128
slide = 0.5


In [58]:
def get_df_time_range(df: pandas.DataFrame):
    l = len(df.index)
    return (df.iloc[0].time, df.iloc[l - 1].time, l)


def get_diff_time(start: int, end: int):
    return end - start


# get interpolated values:
# - (1_000_000/tick) HZ accelerometer, gyroscope
# - ((tick * window * slide)/1_000_000) seconds period GPS
def interpolate(
    acDf: pandas.DataFrame, gyDf: pandas.DataFrame, gpsDf: pandas.DataFrame
):
    acTse = get_df_time_range(acDf)
    gyTse = get_df_time_range(gyDf)
    gpsTse = get_df_time_range(gpsDf)

    minTime = int(max(acTse[0], gyTse[0], gpsTse[0]))
    maxTime = int(min(acTse[1], gyTse[1], gpsTse[1]))

    sensorInterpTimes = numpy.arange(minTime, maxTime, tick)
    # print(sensorInterpTimes, len(sensorInterpTimes), sep="\n")

    gpsInterpTimes = numpy.arange(
        minTime + int(tick * window * slide / 2),
        maxTime + int(tick * window * slide / 2),
        int(tick * window * slide),
    )
    # print(gpsInterpTimes, len(gpsInterpTimes), sep="\n")

    acTimes = acDf.time.to_numpy()
    gyTimes = gyDf.time.to_numpy()
    gpsTimes = gpsDf.time.to_numpy()

    acX = acDf.x.to_numpy()
    acY = acDf.y.to_numpy()
    acZ = acDf.z.to_numpy()

    gyX = gyDf.x.to_numpy()
    gyY = gyDf.y.to_numpy()
    gyZ = gyDf.z.to_numpy()

    acXi = numpy.interp(sensorInterpTimes, acTimes, acX)
    acYi = numpy.interp(sensorInterpTimes, acTimes, acY)
    acZi = numpy.interp(sensorInterpTimes, acTimes, acZ)
    gyXi = numpy.interp(sensorInterpTimes, gyTimes, gyX)
    gyYi = numpy.interp(sensorInterpTimes, gyTimes, gyY)
    gyZi = numpy.interp(sensorInterpTimes, gyTimes, gyZ)

    gpsLat = gpsDf.latitude.to_numpy()
    gpsLon = gpsDf.longitude.to_numpy()

    gpsLoni = numpy.interp(gpsInterpTimes, gpsTimes, gpsLon)
    gpsLati = numpy.interp(gpsInterpTimes, gpsTimes, gpsLat)

    acDf1 = pandas.DataFrame(
        {"time": sensorInterpTimes, "x": acXi, "y": acYi, "z": acZi}
    )
    gyDf1 = pandas.DataFrame(
        {"time": sensorInterpTimes, "x": gyXi, "y": gyYi, "z": gyZi}
    )
    gpsDf1 = pandas.DataFrame(
        {"time": gpsInterpTimes, "latitude": gpsLati, "longitude": gpsLoni}
    )

    return (acDf1, gyDf1, gpsDf1)


In [59]:
# interpolate sensors data, predictions
from math import radians, sin, cos, sqrt, atan2

input_dfs_harmonised = []

def rename_df_cols(acDf, gyDf, gpsDf, predDf):
  acDfr = acDf.rename(
    columns={"timestamp": "time", "acc_x_dashboard": "x", "acc_y_dashboard": "y", "acc_z_dashboard": "z"}) 
  gyDfr = gyDf.rename(
    columns={"timestamp": "time", "gyro_x_dashboard": "x", "gyro_y_dashboard": "y", "gyro_z_dashboard": "z"})
  gpsDfr = gpsDf.rename(
    columns={"timestamp_gps": "time"})
  predDfr = predDf.rename(
    columns={"timestamp": "time"}
  )
  return (acDfr, gyDfr, gpsDfr, predDfr)


# get prediction based on window avg.
# good -> 1, regular -> 0.5, bad -> 0
# find min on window
def harmonise_prediction(time_markers: numpy.array, predDf):
  data = []
  for marker in time_markers:
    start = marker - int(tick * window * slide / 2)
    end = marker + int(tick * window * slide / 2)
    # NOTE: not optimized
    predDfr = predDf[(predDf.time >= start) & (predDf.time <= end)]
    res = (predDfr.good_road * 1 + predDfr.regular_road * 0.5 + predDfr.bad_road * 0).mean()
    data.append((marker, res))
  return pd.DataFrame(data, columns=["time", "prediction"])


def haversine_m(lat1, lon1, lat2, lon2):
    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])

    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    distance = 6371000 * c  # Radius of the earth in m
    return distance


def calculate_speed(gpsDf):
  speeds = numpy.zeros(len(gpsDf))
  prevGpsE = None
  for i, gpsE in gpsDf.iterrows():
    if prevGpsE is not None:
      lat1, lon1, lat2, lon2 = prevGpsE['latitude'], prevGpsE['longitude'], gpsE['latitude'], gpsE['longitude']
      distance = haversine_m(lat1, lon1, lat2, lon2)
      
      diff_time_hours = (gpsE['time'] - prevGpsE['time']) / 1_000_000
      speed = distance / diff_time_hours
      speeds[i] = speed
    prevGpsE = gpsE
  
  if len(gpsDf) > 1:
     speeds[0] = speeds[1]

  return speeds


for (acDf, gyDf, gpsDf, predDf, input_fpath) in input_dfs:
  acDfr, gyDfr, gpsDfr, predDfr = rename_df_cols(acDf, gyDf, gpsDf, predDf)
  acDfi, gyDfi, gpsDfi = interpolate(acDfr, gyDfr, gpsDfr)

  predDfi = harmonise_prediction(gpsDfi["time"].to_numpy(), predDfr)
  speedArr = calculate_speed(gpsDfi)

  entry = (acDfi, gyDfi, gpsDfi, predDfi, speedArr, input_fpath)
  print(entry[5])
  input_dfs_harmonised.append(entry)



1-right
1-left
2-right
2-left
3-right
3-left
4-right
4-left
5-right
5-left
6-right
6-left
7-right
7-left
8-right
8-left
9-right
9-left


In [60]:
# split into raw algorithm inputs
point_raw_entries = []

def get_point_raw_inputs_wpred(acDf, gyDf, gpsDf, predDf, speedArr):
  entries = []
  total = len(acDf)
  for i in range(0, len(predDf)):
    start_i = i * int(window * slide)
    end_i = start_i + window
    if end_i >= total:
      break
    
    acE = acDf[start_i: end_i]
    gyE = gyDf[start_i: end_i]
    gpsE = gpsDf.iloc[[i]].to_dict('records')[0] # not using Series because of type casts
    gpsE['speed'] = speedArr[i]
    predE = predDf.iloc[i]
    entries.append((acE, gyE, gpsE, predE,))
  return entries

def get_point_raw_inputs(acDf, gyDf, gpsDf, speedArr):
  entries = []
  total = len(acDf)
  for i in range(0, len(gpsDf)):
    start_i = i * int(window * slide)
    end_i = start_i + window
    if end_i >= total:
      break
    
    acE = acDf[start_i: end_i]
    gyE = gyDf[start_i: end_i]
    gpsE = gpsDf.iloc[[i]].to_dict('records')[0] # not using Series because of type casts
    gpsE['speed'] = speedArr[i]
    entries.append((acE, gyE, gpsE, ))
  return entries


for (acDf, gyDf, gpsDf, predDf, speedArr, input_fpath) in input_dfs_harmonised:
  point_raw_entries.extend(get_point_raw_inputs_wpred(acDf, gyDf, gpsDf, predDf, speedArr))
  print(input_fpath)

print(len(point_raw_entries))


1-right
1-left
2-right
2-left
3-right
3-left
4-right
4-left
5-right
5-left
6-right
6-left
7-right
7-left
8-right
8-left
9-right
9-left
13322


In [61]:
import json

raw_dir = os.path.join(output_dir, "raw")
if not os.path.exists(raw_dir):
    os.makedirs(raw_dir)

# write each entry to separate file (can be changed)
for i, (acDf, gyDf, gpsD, predS) in enumerate(point_raw_entries):
  out = {}
  out["accelerometer"] = acDf[["x", "y", "z"]].to_dict(orient='records')
  out["gyroscope"] = gyDf[["x", "y", "z"]].to_dict(orient='records')
  out["gps"] = {"latitude": gpsD["latitude"], "longitude": gpsD["longitude"], "speed": gpsD["speed"]}
  out["prediction"] = predS["prediction"]
  with open(os.path.join(raw_dir, f"{i}.json"), "w") as file:
    json.dump(out, file)
