# imports

In [1]:
import pandas as pd
import numpy as np

import os
from tqdm import tqdm
import warnings

import plotly.express as px
import plotly.io as po
import plotly.graph_objects as go
import matplotlib.pyplot as plt

from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PolynomialFeatures

import sympy #미분
import math # arctangent; math.atan 사용 목적
import statistics

# 차로 공식 추출

## Load Datasets

### 궤적 데이터 with 차선변경
* LOS별로 나누지 않은 데이터로부터 차선 위치를 뽑아야 함

In [2]:
folder_dir = 'D:/OneDrive - 연세대학교 (Yonsei University)/Projects/Yonsei_TELab/003_도로상충_210517-/2차년도_2022/21_드론궤적분석자료_영업소_서울TG_부산방향/TCS'

In [3]:
folder_name = '00_dataset_filtered_full'

file_dir = os.path.join(folder_dir, folder_name)

In [4]:
file_list = os.listdir(file_dir)
file_list[0:3]

['01_1.csv', '01_2.csv', '01_3.csv']

In [5]:
df_list = []
num_list = []

for file in file_list:
    num = file[0:4]
    
    file_path = os.path.join(file_dir, file)
    
    globals()[f'target{num}'] = pd.read_csv(file_path, encoding = 'cp949')
    
    df_list.append(globals()[f'target{num}'])
    num_list.append(num)

## 각 대상지별 차선의 벡터 정의하기

In [6]:
for df, num in zip(df_list, num_list):

    # 차선별로 N개의 데이터프레임 생성하기
    lane_list = df['Lane Identification'].unique()
    
    for lane in lane_list:
        globals()[f'target{num}_{lane}'] = df[df['Lane Identification'] == lane] # 각 대상지, 촬영지점별, 차로별 데이터프레임 생성

In [7]:
#target01_1_U1.head(3)

### 2차함수, 3차함수 그려주는 함수를 정의하기
* 참고 사이트 : https://plotly.com/python/ml-regression/

In [8]:
def format_coefs(coefs):
    
    equation_list = [f"{coef}x^{i}" for i, coef in enumerate(coefs)]
    equation = "$" +  " + ".join(equation_list) + "$"

    replace_map = {"x^0": "", "x^1": "x", '+ -': '- '}
    
    for old, new in replace_map.items():
        equation = equation.replace(old, new)

    return equation

### 각 차선별 대표궤적 뽑아 2차함수로 표현
* 차선이동을 하지 않은(Only U2) 경우의 차량ID만 뽑아내기
* 차선 벡터 구하기 : 위 점들에 대한 평균 벡터(vector) 또는 근사값인 선형 방정식을 구하기
* 각 x좌표에 대응되는 지점의 벡터와, 차량의 벡터 사잇각을 구하기
* 이를 html 파일로 plotly 이미지를 저장하기
* 식의 경우 pd.dataframe을 이용하여 .csv로 만들어 저장하기

In [9]:
folder_name = '03-0_lane_identification'

lane_equation_dir = os.path.join(folder_dir, folder_name)

os.makedirs(lane_equation_dir, exist_ok = True) # 해당 경로가 없을 시 폴더 생성, 존재할 경우 건너뛰기

In [10]:
equations = pd.DataFrame({'num' : [], 'term0' : [], 'term1' : [], 'term2' : []}) # 각 대상지 촬영지점별, 차로별 방정식을 만들기 위한 것

In [11]:
for df, num in tqdm(zip(df_list, num_list)):

    X = df['Local X (m)'].values.reshape(-1, 1) 
        
    if len(X) != 0:
        x_range = np.linspace(X.min(), X.max(), 100).reshape(-1, 1)
            
        fig = px.scatter(df, x = 'Local X (m)', y = 'Local Y(m)', opacity = 0.65)
            
        poly = PolynomialFeatures(2) # 2차식 추세선 긋기
        poly.fit(X) # ft 하기
            
        X_poly = poly.transform(X)
        x_range_poly = poly.transform(x_range)

        model = LinearRegression(fit_intercept = False)
        model.fit(X_poly, df['Local Y(m)'])
        y_poly = model.predict(x_range_poly)

        equation = format_coefs(model.coef_) # 식의 형태로 model 추세 결과를 나타내기
            
        term0 = model.coef_[0] # 상수항
        term1 = model.coef_[1] # 1차항
        term2 = model.coef_[2] # 2차항
        
        fig.add_traces(go.Scatter(x = x_range.squeeze(), y = y_poly, name = equation))
        
        save_html_name = f'{num}.html'
        save_html_path = os.path.join(lane_equation_dir, save_html_name)
        po.write_html(fig, file = save_html_path)
            
        equations = equations.append({'num' : num, 'term0' : term0, 'term1' : term1, 'term2' : term2}, ignore_index = True)
        
        # 각 대상지별로 그래프를 그려 저장하기
        
        
    else:
        pass

save_eqation_name = f'Equation.csv'
save_eqation_path = os.path.join(lane_equation_dir, save_eqation_name)
equations.to_csv(save_eqation_path, encoding = 'utf-8')

6it [00:03,  1.57it/s]


# 급앞지르기, 급진로변경
* **급앞지르기**
    * 속도 30KM/H 이상에서
    * 진행방향이 좌/우측 6도/sec 이상으로 차로변경
    * 5초 동안 누적각도가 +-2도/sec 이하
    * 가속이 초당 3km/h 이상인 경우
* **급진로변경**
    * 속도가 30km/h 이상에서
    * 진행방향이 좌측 또는 우측 6도/sec 이상으로 차로변경하고
    * 5초간 누적각도가 +-2도/sec 이하
    * 가감속이 초당 +=2km/h이하인 경우
* **tan(x)**
    * tan6 : `math.tan(math.pi/60)`
    * tan(-6) : - 0.5773502691896257
    * tan2 : `math.tan(math.pi/180)`
    * tan(-2) : -tan2
* **과정**
    1. 각 대상지, LOS별로 차로변경을 한 차량(Vehicle ID)만 걸러낸다.
    2. 각 차량이 어디로(1->2, 2->1, 2->3, 3->2)갔는지 구분하는 컬럼 `to_from`을 만든다.
    3. 각 차량의 각도차를 구한다.

## Load Dataset

### Lane Info
* 위에서 추출한 차로 공식을 불러오기

In [12]:
folder_name = '03-0_lane_identification'

lane_info_dir = os.path.join(folder_dir, folder_name)

lane_info_file = 'Equation.csv'
lane_info_path = os.path.join(lane_info_dir, lane_info_file)

In [13]:
lane_info = pd.read_csv(lane_info_path, encoding = 'cp949')

lane_info.head()

Unnamed: 0.1,Unnamed: 0,num,term0,term1,term2
0,0,01_1,32.261879,0.032916,0.000116
1,1,01_2,57.492307,0.106342,-0.000619
2,2,01_3,40.645995,-0.170005,0.000578
3,3,01_4,30.833973,-0.105406,0.000257
4,4,01_5,14.593834,-0.028344,0.000205


### Drone Data

In [14]:
folder_name = '01_density_by_target_LOS_concated_lanechange'

file_dir = os.path.join(folder_dir, folder_name)

In [15]:
file_list = os.listdir(file_dir)

file_list[0:3]

['01_1_A.csv', '01_2_A.csv', '01_3_A.csv']

In [16]:
df_list = []
num_LOS_list = []

for file in file_list:
    num_LOS = file[0:6]
    
    file_path = os.path.join(file_dir, file)
    
    globals()[f'target{num_LOS}'] = pd.read_csv(file_path, encoding = 'cp949')
    
    df_list.append(globals()[f'target{num_LOS}'])
    num_LOS_list.append(num_LOS)

## 대상지별 x좌표에 따른 미분계수 도출
* `lane_info` 인스턴스를 이용, 차로 방정식 로드
* `sympy` 패키지를 사용하여 차량궤적을 미분, 미분방정식(각 점에서의 기울기)을 추출
    * A(차량) : 각 대상지별, 서비스수준별 차량 데이터에 대하여, 현재 (x,y) 좌표와 다음(x', y') 좌표에 대하여 기울기를 구한다.
    * B(차로) : 해당 x좌표에서의 미분값을 구한다.
* 각도 A와 B 사잇각을 탄젠트 등을 이용하여 구하고, .csv 파일로 저장한다.

### 대상지별 미분방정식 도출

In [17]:
folder_name = '03-1_find_angle'

angle_dir = os.path.join(folder_dir, folder_name)

os.makedirs(angle_dir, exist_ok = True) # 해당 경로가 없을 시 폴더 생성, 존재할 경우 건너뛰기

In [18]:
x = sympy.symbols('x') # x를 기준으로 미분한다고 지정

In [19]:
warnings.filterwarnings(action = 'ignore') # pandas 경고메시지 숨기기

lane_info['deriv_x0'] = None
lane_info['deriv_x1'] = None

for i in range(len(lane_info)):
    
    term0 = lane_info['term0'].iloc[i]
    term1 = lane_info['term1'].iloc[i]
    term2 = lane_info['term2'].iloc[i]
    
    fx = term0 + term1 * x + term2 * x**2 #각 대상지별, 차선별 평균 궤적의 식
    
    fprime = sympy.Derivative(fx, x).doit() # fx 함수를 x에 대하여 미분
    fprime_x = sympy.poly(fprime, [x])
    
    lane_info['deriv_x0'].iloc[i] = fprime_x.coeffs()[1] # 미분방정식의 상수항
    lane_info['deriv_x1'].iloc[i] = fprime_x.coeffs()[0] # 미분방정식의 일차항

save_deriv_name = f'derivation.csv'
save_deriv_path = os.path.join(angle_dir, save_deriv_name)

lane_info.to_csv(save_deriv_path, encoding = 'utf-8')

### 차량별 좌표별 이동 벡터 구하기
* A : 각 대상지별, 서비스수준별 차량 데이터에 대하여, 현재 (x,y) 좌표와 다음(x', y') 좌표에 대하여 기울기를 구한다.

In [20]:
warnings.filterwarnings(action = 'ignore') # pandas 경고메시지 숨기기

slope_df_list = []

for df, num_LOS in tqdm(zip(df_list, num_LOS_list)):
    # 차량별 이동 동선을 써야 함
    vehicle_list = df['Vehicle ID'].unique()
    
    slope_df = pd.DataFrame()
    
    for veh in vehicle_list: # 각 차량별로
        veh_df = df[df['Vehicle ID'] == veh]
        veh_df['X_next'] = veh_df['Local X (m)'].shift(-1) # 각 차량의 다음 점의 x좌표. 없으면 None이겠지
        veh_df['Y_next'] = veh_df['Local Y(m)'].shift(-1) # 각 차량의 다음 점의 Y좌표. 없으면 None이겠지?
        
        veh_df['slope_veh'] = (veh_df['Y_next'] - veh_df['Local Y(m)'])/(veh_df['X_next'] - veh_df['Local X (m)']) # 각 차량의 벡터
        
        slope_df = pd.concat([slope_df, veh_df])
    
    slope_df_list.append(slope_df)
    
    save_slope_file = f'slope{num_LOS}.csv'
    save_slope_path = os.path.join(angle_dir, save_slope_file)
    
    slope_df.to_csv(save_slope_path, encoding = 'cp949')

6it [00:25,  4.20s/it]


## 차선변경을 한 차량만 필터링하기
* 각 대상지, LOS별로 차로변경을 한 차량(Vehicle ID)만 필터링
* 즉, `Lane_change` 컬럼이 `Change`인 차량만 걸러내면 됨

In [21]:
folder_name = '03-2_lanechange_only_vehicle'

lanechange_only_dir = os.path.join(folder_dir, folder_name)

os.makedirs(lanechange_only_dir, exist_ok = True) # 해당 경로가 없을 시 폴더 생성, 존재할 경우 건너뛰기

In [22]:
lane_change_veh_list = []

for slope_df, num_LOS in tqdm(zip(slope_df_list, num_LOS_list)):
    
    vehicle_list = slope_df['Vehicle ID'].unique()
    
    lane_change_df = pd.DataFrame() # 빈 데이터프레임: 차선변경을 한 차량 데이터만 저장할 것
    
    for veh in vehicle_list: # 각 차량(Vehicle ID)별로
        
        veh_df = slope_df[slope_df['Vehicle ID'] == veh]
        Lane_record = veh_df['Lane_record'].iloc[0] # 해당 차량의 차로 이동 경로
        
        lane_list = veh_df['Lane Identification'].unique() # 해당 차량이 이용한 차로 종류
        
        if Lane_record not in lane_list: # 차로 이동경로가 2개 차로 이상 포함 시
            lane_change_df = pd.concat([lane_change_df, veh_df])
        else:
            pass
        
    lanechange_only_file = f'lanechange{num_LOS}.csv'
    lanechange_only_path = os.path.join(lanechange_only_dir, lanechange_only_file)
    
    lane_change_df.to_csv(lanechange_only_path, encoding = 'cp949')
        
    lane_change_veh_list.append(lane_change_df)

6it [00:05,  1.02it/s]


In [23]:
lane_change_veh_list[0].head()

Unnamed: 0.2,Unnamed: 0,Unnamed: 0.1,Unnamed: 0.1.1,Unnamed: 0.1.1.1,Unnamed: 0.1.1.1.1,Vehicle ID,Frame ID,Total Frames,Global Time (Epoch Time),Local X (m),...,Special Car,Lane Class,Vehicle Movement,Lane_record,Lane_00,Lane_99,Lane_change,X_next,Y_next,slope_veh
0,0,0,23,23,23,3,414,108,46813800,84.379913,...,0,상행본선,2.12053,U2_U1,U2,U1,,86.505432,31.683453,0.025945
2,2,2,25,25,25,3,417,111,46813900,86.505432,...,0,상행본선,2.126234,U2_U1,U2,U1,,88.635429,31.746338,0.029524
4,4,4,27,27,27,3,420,114,46814000,88.635429,...,0,상행본선,2.130925,U2_U1,U2,U1,,90.752785,31.801544,0.026073
6,6,6,29,29,29,3,423,117,46814100,90.752785,...,0,상행본선,2.118075,U2_U1,U2,U1,,92.861961,31.871742,0.033282
8,8,8,31,31,31,3,426,120,46814200,92.861961,...,0,상행본선,2.110344,U2_U1,U2,U1,,94.976715,31.93536,0.030083


## 판정을 위한 변수 생성
* 각 대상지별, 서비스 수준별 차량 x좌표에 따른 차선 미분계수 구하여 사잇각도 구하고 판정
* B : 해당 x좌표에서의 미분값을 구한다.
* 각도 A와 B 사잇각을 탄젠트 등을 이용하여 구하고, .csv 파일로 저장

* **급앞지르기**
    * 속도 30KM/H 이상에서
    * 진행방향이 좌/우측 6도/sec 이상으로 차로변경하고, 
    * 5초 동안 누적각도가 +-2도/sec 이하,
    * 가속이 초당 3km/h(**0.833333m/s**) 이상인 경우
* **급진로변경**
    * 속도가 30km/h 이상에서, 
    * 진행방향이 좌측 또는 우측 6도/sec 이상으로 차로변경하고, 
    * 5초간 누적각도가 +-2도/sec 이하, 
    * 가감속이 초당 +=2km/h(**0.555556m/s**)이하인 경우
* `tan6` : `math.tan(math.pi/30)`
* `tan(-6)` = `-tan6`
* **tan6 < S | S < tan(-6) : `overangle`**
* 각 차량의 각도차(`tangap`)를 구한다.
* 지난 5초간의 누적각도(`cum_angle`)를 구할 시
    * **절대값** 사용
    * `math.atan(x)` 사용하여 구하기

### 필요 함수 정의

In [24]:
def make_lane_deriv(deriv_x0, deriv_x1, x):
    """해당 x좌표에서 도로선형의 미분계수(접선의 기울기)를 구하는 함수"""
    
    lane_deriv = deriv_x0 + deriv_x1 * x
    
    return lane_deriv

In [25]:
def make_tangap(lane_deriv, slope_veh):
    """도로의 기울기 미분계수와 차량의 움직임 벡터(slope_veh)를 사용하여,
    매 점에서의 두 벡터간 차이에 의해 발생하는 사잇각의 tangent 값을 구하는 함수
    --- tan(A-B) = (tanA - tanB)/(1 + tanA * tanB) 임을 유의 ---
    lane_deriv(A) :: 도로의 해당 x좌표에서 접선 기울기(미분계수)
    slope_veh(B) :: 해당 차량의 이동 벡터"""
    
    tangap = (lane_deriv - slope_veh)/(1 + lane_deriv * slope_veh)
    
    return tangap

In [26]:
def make_tangap_rad(tangap):
    """사잇각의 탄젠트값(tangap)에 arctangent를 적용,
    해당 사잇각의 radian값을 얻어내는 함수"""
    
    rad_tangap = math.atan(tangap)
    
    return rad_tangap

In [27]:
def overangle(rad_tangap):
    """
    좌, 우 6도 이상으로 각도변경했는지(overangle) 판정하는 함수
    ---tan6도 = tan(math.pi/30) 임에 유의 ---
    ---abs()는 절대값을 반환하는 함수임 ---
    """
    tan_plus_6 = math.tan(math.pi/30) # tan(6도)
    tan_minus_6 = (-1) * tan_plus_6 # tan(-6도)
    
    if rad_tangap >= tan_plus_6 or rad_tangap <= tan_minus_6:
        return 'overangle'
    else:
        pass

In [28]:
def vehicle_direction(rad_tangap):
    """
    차량이 도로선형에 비해 상대적으로 왼쪽으로 갔는지/오른쪽으로 갔는지 판정해주는 함수
    --- tangap > 0 : 차로를 중심으로 오른쪽
    --- tangap < 0 : 차로를 중심으로 왼쪽"""
    
    if rad_tangap > 0:
        return 'right'
    elif rad_tangap < 0:
        return 'left'
    elif rad_tangap == 0:
        return 'straight'
    else:
        pass

### 필요 데이터 로드 : 미분계수값

In [29]:
folder_name = '03-1_find_angle'

deriv_info_dir = os.path.join(folder_dir, folder_name)

deriv_info_file = 'derivation.csv'

deriv_info_path = os.path.join(deriv_info_dir, deriv_info_file)

In [30]:
deriv_info = pd.read_csv(deriv_info_path, encoding = 'cp949')

deriv_info

Unnamed: 0.2,Unnamed: 0,Unnamed: 0.1,num,term0,term1,term2,deriv_x0,deriv_x1
0,0,0,01_1,32.261879,0.032916,0.000116,0.032916,0.000232
1,1,1,01_2,57.492307,0.106342,-0.000619,0.106342,-0.001238
2,2,2,01_3,40.645995,-0.170005,0.000578,-0.170005,0.001155
3,3,3,01_4,30.833973,-0.105406,0.000257,-0.105406,0.000514
4,4,4,01_5,14.593834,-0.028344,0.000205,-0.028344,0.000409
5,5,5,01_6,18.55842,-0.065065,0.000134,-0.065065,0.000267


### tan 및 overangle 판정
* **차선변경을 한 차량만을 대상으로 해야 한다**
* 여러 컬럼을 함수에 적용하기 : `데이터프레임이름[['변수1', '변수2']].apply(lambda x: 함수이름(*x), axis = 1)`

In [31]:
num_LOS[0:4]

'01_6'

In [32]:
folder_name = '03-3_tangent_overangle'

tan_df_dir = os.path.join(folder_dir, folder_name)

os.makedirs(tan_df_dir, exist_ok = True) # 해당 경로가 없을 시 폴더 생성, 존재할 경우 건너뛰기

In [33]:
lane_change_veh_list[0].columns

Index(['Unnamed: 0', 'Unnamed: 0.1', 'Unnamed: 0.1.1', 'Unnamed: 0.1.1.1',
       'Unnamed: 0.1.1.1.1', 'Vehicle ID', 'Frame ID', 'Total Frames',
       'Global Time (Epoch Time)', 'Local X (m)', 'Local Y(m)',
       'Vehicle Length', 'Vehicle Width', 'Vehicle Class',
       'Vehicle Velocity(km/h)', 'Vehicle Acceleration', 'Lane Identification',
       'Preceding Vehicle', 'Following Vehicle', 'Spacing Headway',
       'Time Headway', 'Bad Object', 'Special Car', 'Lane Class',
       'Vehicle Movement', 'Lane_record', 'Lane_00', 'Lane_99', 'Lane_change',
       'X_next', 'Y_next', 'slope_veh'],
      dtype='object')

In [34]:
tan_df_list = []

for lane_change_df, num_LOS in tqdm(zip(lane_change_veh_list, num_LOS_list)):
    
    if len(lane_change_df) > 0:
        # 먼저, 대상지번호 num을 지정해줘야 한다.
        num = num_LOS[0:4]
        lane_change_df['num'] = num # 병합의 기준이 될 num 컬럼을 데이터프레임 내부에 생성해줌
    
        tan_df = pd.merge(
            left = lane_change_df,
            right = deriv_info, # 미분계수와의 병합을 위함
            left_on = 'num',
            right_on = 'num'
        )
    
        tan_df['lane_deriv'] = tan_df[['deriv_x0', 'deriv_x1', 'Local X (m)']].apply(lambda x : make_lane_deriv(*x), axis = 1) # 도로선형의 미분계수 생성
        tan_df['tangap'] = tan_df[['lane_deriv', 'slope_veh']].apply(lambda x : make_tangap(*x), axis = 1) # 도로선형의 미분계수와 차량벡터 사이 탄젠트값 생성
        tan_df['rad_tangap'] = tan_df['tangap'].apply(make_tangap_rad) # tangap 의 라디안 값 생성
        tan_df['overangle'] = tan_df['rad_tangap'].apply(overangle) # tangap의 라디안 값을 바탕으로 overangle 판정
        tan_df['vehicle_direction'] = tan_df['rad_tangap'].apply(vehicle_direction) # 도로선형을 중심으로 상대적으로 차량이 어느쪽으로 회전중인지 판정
        
        if len(tan_df[tan_df['overangle'] == 'overangle']) > 0:
            print(f'{num_LOS} has {len(tan_df[tan_df["overangle"] == "overangle"])} overangle(s).')
            
            # overangle인 차량만 필터링하기
            
            veh_overangle = tan_df[tan_df['overangle'] == 'overangle']['Vehicle ID'].unique()

            veh_overangle_filter = tan_df['Vehicle ID'].isin(veh_overangle) # overangle 차량만 뽑아내는 필터링
            tan_df = tan_df[veh_overangle_filter]
            
            tan_df_name = f'{num_LOS}.csv'
            tan_df_path = os.path.join(tan_df_dir, tan_df_name)
    
            tan_df.to_csv(tan_df_path, encoding = 'cp949')
        
        tan_df_list.append(tan_df)
    
    else:
        pass

1it [00:00,  3.40it/s]

01_1_A has 184 overangle(s).
01_2_A has 5775 overangle(s).


3it [00:00,  4.03it/s]

01_3_A has 434 overangle(s).
01_4_A has 1 overangle(s).
01_5_A has 5 overangle(s).


6it [00:00,  6.07it/s]

01_6_A has 1 overangle(s).





### overangle data Load
* overangle 조건을 만족하는 차량들만 다시 데이터 로드

In [35]:
folder_name = '03-3_tangent_overangle'

overangle_dir = os.path.join(folder_dir, folder_name)

In [36]:
file_list = os.listdir(overangle_dir)

file_list[0:3]

['01_1_A.csv', '01_2_A.csv', '01_3_A.csv']

In [37]:
tan_df_list = []
num_LOS_list = []

for file in file_list:
    
    file_path = os.path.join(overangle_dir, file)
    num_LOS = file[0:6]
    
    globals()[f'tan_df_{num_LOS}'] = pd.read_csv(file_path, encoding = 'cp949')
    
    tan_df_list.append(globals()[f'tan_df_{num_LOS}'])
    num_LOS_list.append(num_LOS)

In [38]:
len(tan_df_list), len(num_LOS_list)

(6, 6)

### 시간 변수, 누적 각도 생성
* 각 차량의 지난 5초(5 * 프레임레이트만큼의 프레임ID) 간의 누적각도를 구함
* 관련 변수들의 계산 : **절대값**이어야 함에 유의
    * `rad_tangap_abs` : rad_tangap의 절대값
    * `cumsum_rad_abs` : 차량별 "총" 누적 각도. rad_tangap 절대값의 합
    * `cumsum_rad_abs_final_5s` : 차량별 5초 누적각도. rad_tangap의 최근 5초간의 합
    * `cumsum_rad_abs_final_5s_tan` :  `cumsum_rad_5s`를 tan으로 환산한 값

In [39]:
folder_name = '03-4_cumsum_angle'

cumsum_df_dir = os.path.join(folder_dir, folder_name)

os.makedirs(cumsum_df_dir, exist_ok = True) # 해당 경로가 없을 시 폴더 생성, 존재할 경우 건너뛰기

In [40]:
frame_rate = 30 # 초당 프레임 계산 : 3프레임 당 0.1초, 1초당 30프레임임

In [41]:
cumsum_df_list = []

for tan_df, num_LOS in tqdm(zip(tan_df_list, num_LOS_list)):
    # 누적각도인 cumsum_rad는 차량별(Vehicle ID)로 구해야 한다.
    
    cumsum_df = pd.DataFrame()
    
    tan_df['rad_tangap_abs'] = tan_df['rad_tangap'].abs() # rad_tangap의 절대값인 rad_tangap_abs 생성
    
    veh_list = tan_df['Vehicle ID'].unique() # 각 테이블의 차량리스트 생성
    
    for veh in veh_list:
        veh_df = tan_df[tan_df['Vehicle ID'] == veh]
        
        frame_max = veh_df['Frame ID'].max()
        frame_min = veh_df['Frame ID'].min()
        
        veh_runtime = (frame_max - frame_min)/frame_rate # 각 차량별 총 주행시간을 구함
        
        # cumsum_rad_abs : 누적각도합(총계) 구하기
        veh_df['cumsum_rad_abs'] = veh_df['rad_tangap_abs'].cumsum(skipna = True)
        veh_df['cumsum_rad_abs_before_5s'] = veh_df['cumsum_rad_abs'].shift(50) # 5초 = 150프레임만큼 back
        
        veh_df['cumsum_rad_abs_final_5s'] = veh_df['cumsum_rad_abs'] - veh_df['cumsum_rad_abs_before_5s'] # 5초 누적 값의 라디안 값임
        
        cumsum_df = pd.concat([cumsum_df, veh_df])
    
    cumsum_df_list.append(cumsum_df)
    
    cumsum_df_file = f'{num_LOS}.csv'
    cumsum_df_path = os.path.join(cumsum_df_dir, cumsum_df_file)
    
    cumsum_df.to_csv(cumsum_df_path, encoding = 'cp949')

6it [00:01,  3.24it/s]


### 판정 함수 설정
* `decision` : 급앞지르기(`rapid_passing`), 급진로변경(`rapid_change`) 판정
* **급앞지르기**
    * 속도 30KM/H 이상에서
    * 진행방향이 좌/우측 6도/sec 이상으로 차로변경하고, 
    * 5초 동안 누적각도가 +-2도/sec 이하,
    * 가속이 초당 3km/h(**0.833333m/s**) 이상인 경우
* **급진로변경**
    * 속도가 30km/h 이상에서, 
    * 진행방향이 좌측 또는 우측 6도/sec 이상으로 차로변경하고, 
    * 5초간 누적각도가 +-2도/sec 이하, 
    * 가감속이 초당 +-2km/h(**0.555556m/s**)이하인 경우

In [42]:
def decision(velocity, acceleration, cumsum_rad_abs_5s):
    if velocity >= 30:
        
        if cumsum_rad_abs_5s >= 0 and cumsum_rad_abs_5s <= 2:
            
            if acceleration >= 3 * 1000/3600:
                return 'rapid_passing'
            
            elif acceleration <= 2 and acceleration >= -2:
                return 'rapid_change'
            
            else:
                return None
        
        else:
            return None

    else:
        return None

### 판정: 급앞지르기/급진로변경

In [43]:
for cumsum_df, num_LOS in tqdm(zip(cumsum_df_list, num_LOS_list)):
    
    cumsum_df['decision_or'] = cumsum_df[['Vehicle Velocity(km/h)', 'Vehicle Acceleration', 'cumsum_rad_abs']].apply(lambda x :decision(*x), axis = 1)
    cumsum_df['decision_5s'] = cumsum_df[['Vehicle Velocity(km/h)', 'Vehicle Acceleration', 'cumsum_rad_abs_final_5s']].apply(lambda x : decision(*x), axis = 1)
    
    cumsum_df_file = f'{num_LOS}.csv'
    cumsum_df_path = os.path.join(cumsum_df_dir, cumsum_df_file)
    
    cumsum_df.to_csv(cumsum_df_path, encoding = 'cp949')

6it [00:00, 10.18it/s]


### 피봇하여 저장
* 각 대상지별, LOS별로 다음에 대한 차량 대수를 구하도록 한다.
    * `Lane_record`

In [44]:
folder_name = '03-5_decision'

save_pv_dir = os.path.join(folder_dir, folder_name)

os.makedirs(save_pv_dir, exist_ok = True) # 해당 경로가 없을 시 폴더 생성, 존재할 경우 건너뛰기

In [45]:
harmonic = lambda x: statistics.harmonic_mean(list(x))
harmonic .__name__ = 'harmonic'

unq = lambda x: len(x.unique())
unq .__name__ = 'unq'

In [46]:
for cumsum_df, num_LOS in tqdm(zip(cumsum_df_list, num_LOS_list)):
    
    pvdf = pd.pivot_table(cumsum_df,
                          index = ['Lane_record', 'decision_or', 'decision_5s'],
                          values = ['Vehicle ID', 'Vehicle Velocity(km/h)', 'Vehicle Acceleration'],
                          aggfunc = {'Vehicle ID': unq,
                                     'Vehicle Velocity(km/h)': harmonic,
                                     'Vehicle Acceleration': 'mean'})
    
    if len(pvdf) > 0:
        save_pv_name = f'{num_LOS}.csv'
        save_pv_path = os.path.join(save_pv_dir, save_pv_name)
        
        pvdf.to_csv(save_pv_path, encoding = 'cp949')
    
    else:
        pass

6it [00:00, 87.19it/s]
