In [21]:
from typing import List
import datetime
from itertools import groupby, chain
import pandas as pd

import pprint
pp = pprint.PrettyPrinter(indent=4)

# Exam

## Background
- Motion2AI는 머신러닝 애플리케이션을 구축하고 사용하여, 산업용 차량이 사용 중인지(= loaded) 유휴 상태인지(= not loaded) 이해합니다.
- 머신러닝 애플리케이션은 100% 정확하지 않으며, 잘못된 머신러닝 예측으로 인해, 머신러닝 측정 값(`loaded` column)은 flickering 되고 있습니다. 즉, `loaded` 데이터에 노이즈가 있습니다.
- 우리의 목표는 데이터에서 노이즈를 제거하기 위해, 파이썬에서 간단한 시간별 필터링 로직(time-wise filtering logic)을 만드는 것입니다.

---

## Data Description
1. vehicle_name
    - 1~4까지 vehicle ID
1. datetime
    - 데이터가 기록된 시각을 `YYYY-MM-DD hh:mm:ss`로 저장
1. loaded
    - 머신러닝 알고리즘이 지게차에 물건이 적재되었는지(`loaded`) 여부를 감지
1. normalized_loaded
    - `loaded` column의 post-processed 데이터

---

## Problem
- 'loaded' 데이터에 대한 머신러닝 알고리즘은 시간이 지남에 따라 노이즈가 끼고, flickering 될 수 있습니다.
- 노이즈를 제거하기 위해 `loaded` 데이터를 처리하기를 원합니다.
- 파이썬 코드의 결과는 `normalized_loaded`(= ground truth)의 값과 동일할 것으로 예상됩니다.

<br>

1. `loaded` status는 1 or 0입니다.
1. `loaded` 값이 5초 이상 변경되지 않으면, `normalized_loaded` 값은 `loaded` 값과 동일합니다.
    - == `loaded` 값이 5초 이상 유지되면, `normalized_loaded` 값은 `loaded` 값과 같음 
1. `loaded` 값이 4초 이내로 변경되면, 카운트되지 않으며, `normalized_loaded` 값은 이전 `normalized_loaded` 값입니다.
    - == `loaded` 값이 4초 이내로 바뀌면, flickering 됐다고 생각하고, 카운트 하지 않음


<br>

**`normalized_loaded`를 생성하는 파이썬 함수를 작성하세요.**

---

## Data issue

1. 데이터가 수집되지 않은 경우가 있음 (== 모든 데이터가 1초 간격인 것이 아님)
1. 'normalized_loaded' column이 ground truth가 아님

In [19]:
def check_issue_1(time_series, sec=4):
    """데이터가 sec초 보다 크게 차이나는 경우의 수를 카운트"""
    issue = []
    prev_datetime = time_series[0]
    
    for now_datetime in time_series[1:]:
        diff = now_datetime - prev_datetime
        if diff > datetime.timedelta(seconds=sec):
            issue.append(diff)
        prev_datetime = now_datetime
        
    print(len(issue))

    
def check_issue_2(norm_series):
    """normalized_loaded에서 5초 이상 유지되지 않았는데 값이 바뀐 경우의 수를 카운트"""
    prev = norm_series[0]
    cnt = 1
    
    counts = []
    for x in norm_series[1:]:
        if x == prev:
            cnt += 1
            continue
        else:
            counts.append(cnt)
            cnt = 1
            prev = x
    counts.append(cnt)
    assert sum(counts) == len(norm_series)

    issue = [cnt for cnt in counts if cnt < 5]
    print(len(issue))


def check_issues():
    data_path = './pose_example.csv'
    vehicle_id = 1
    
    df = pd.read_csv(data_path)
    df['datetime'] = pd.to_datetime(df['datetime'], format='%Y-%m-%d %H:%M:%S')

    temp_df = df[df['vehicle_name'] == vehicle_id]
    temp_time = temp_df['datetime']
    temp_norm_loaded = temp_df['normalized_loaded']
    
    check_issue_1(temp_time)
    check_issue_2(temp_norm_loaded)
    

if __name__ == "__main__":
    check_issues()

20
49


---

## Answer

In [37]:
from typing import List, Dict
import datetime
from itertools import groupby, chain
import pandas as pd

In [42]:
def split_by_vehicle_id(df: pd.DataFrame) -> Dict[int, pd.DataFrame]:
    """vehicle id 별로 데이터 쪼개기"""
    df_dict = {}
    vehicle_id_list = df['vehicle_name'].unique()
    for vid in vehicle_id_list:
        new_df = df[df['vehicle_name'] == vid]
        df_dict[vid] = new_df
    assert df.shape[0] == sum(vdf.shape[0] for vdf in df_dict.values())
    return df_dict


def split_by_time_diff(df_dict: Dict[int, pd.DataFrame], 
                       sec=4) -> Dict[int, List[pd.DataFrame]]:
    """데이터 기록(datetime) 간격이 5초 이상 차이나는 부분 데이터 쪼개기"""
    new_df_dict = {}
    
    # vehicle id를 반복
    for vehicle_id, vdf in df_dict.items():
        splitted_df_list = []
        prev_index = 0
        prev_datetime = vdf.iloc[0]['datetime']
        
        # row를 반복
        for idx, row in enumerate(vdf.iloc[1:].itertuples(), 1):
            now_datetime = row.datetime
            diff = now_datetime - prev_datetime
            if diff > datetime.timedelta(seconds=sec):
                splitted_df_list.append(vdf.iloc[prev_index: idx])
                prev_index = idx
            prev_datetime = row.datetime
        splitted_df_list.append(vdf.iloc[prev_index:])
        new_df_dict[vehicle_id] = splitted_df_list
        assert vdf.shape[0] == sum(sdf.shape[0] for sdf in splitted_df_list)
    return new_df_dict


def remove_flickering(df_dict: Dict[int, List[pd.DataFrame]]) -> List[int]:
    """5초 이상 유지되지 않은 flickering하는 부분 제거"""
    all_status = []
    
    # vehicle id를 반복
    for vehicle_id, df_list in df_dict.items():
        
        # splitted df를 반복
        for datum in df_list:
            loaded = list(datum['loaded'])
            dup_cnt = [sum(1 for _ in group) for _, group in groupby(loaded)]
            dup_val = [loaded[0]]
            
            for i in range(len(dup_cnt)-1):
                if dup_val[-1] == 0:
                    dup_val.append(1)
                else:
                    dup_val.append(0)
            
            true_status = []
            for val, cnt in zip(dup_val, dup_cnt):
                if not true_status or cnt >= 5:
                    true_status.extend([val]*cnt)
                else:
                    true_status.extend([true_status[-1]]*cnt)
            all_status.append(true_status)
    return list(chain.from_iterable(all_status))


def run(data_path: str) -> List[int]:
    """전체 프로세스 실행"""
    raw_df = pd.read_csv(data_path)
    raw_df['datetime'] = pd.to_datetime(raw_df['datetime'], format='%Y-%m-%d %H:%M:%S')
    df = raw_df.sort_values(['vehicle_name', 'datetime'])    

    df_dict = split_by_vehicle_id(df)
    df_dict = split_by_time_diff(df_dict)
    result = remove_flickering(df_dict)
    # df['result'] = result
    return result

In [44]:
if __name__ == "__main__":
    run('./pose_example.csv')