In [1]:
import pandas as pd
import numpy as np
import os
import csv
import copy
from datetime import timedelta
import matplotlib.pyplot as plt
import pathlib
from operator import itemgetter
from itertools import *

In [2]:
def save_df(df : pd.DataFrame, folder : str, file_name : str, filetype : str) -> None:
    
    if isinstance(df, pd.DataFrame):
        
        if filetype == 'csv':
            
            df.to_csv(os.path.join(folder, 'preprocessed_' + file_name), index = False) 
    
    elif isinstance(df, dict):
        
        for key in df.keys():
            
            if isinstance(df[key], pd.DataFrame):
                
                df[key].to_csv(os.path.join(folder, 'preprocessed_' + str(key)), index = False) 
                
            else:
                
                print('datatype is not dataframe')
                return False    
    else:
                
        print('df wrong type argument')
        return False 

In [12]:
class Preprocessor:

    @staticmethod
    def to_datetime(data: pd.DataFrame) -> pd.DataFrame:
        if not isinstance(data.index, pd.DatetimeIndex):
            data.index = pd.to_datetime(data['periodtime_1m'].astype('str'), format='%Y%m%d%H%M')
        return data
    
    @staticmethod
    def sorting_df(data : pd.DataFrame, col_name  = ['periodtime_1m']) -> pd.DataFrame :
        
        return data.sort_values(by= col_name)
    
    @staticmethod
    def fill_missing_index(data : pd.DataFrame, start_tm: str, end_tm: str, freq = '1T') -> pd.DataFrame :
        
        try:
            
            start_index = str(min(data.periodtime_1m.values))
            end_index =  str(max(data.periodtime_1m.values))
            
        except AttributeError:
            
            print(' no {} in data_column'.format('periodtime_1m'))

        if not isinstance(data.index, pd.DatetimeIndex):
           
            data.index = pd.to_datetime(data['periodtime_1m'].astype('str'), format='%Y%m%d%H%M')
            
        if len(pd.date_range(start_index, end_index , freq= freq)) == len(data.index):
            print('processing_fill',data.index)     
            return data.loc[start_tm:end_tm]
           

        else:
                            
            full_time_data = pd.DataFrame(index=pd.date_range(start_index , end_index , freq='1T'), columns=['tmp'])
            data = data.combine_first(full_time_data)
            return data.loc[start_tm:end_tm]

    @staticmethod
    def linear_interpolation( data, freq = '1T'):
        
        data.interpolate(method = 'values',inplace = True, limit_area=None)
        data.bfill(inplace =True)
        data.ffill(inplace =True)
        if Preprocessor.checkNa(data):
            raise ValueError('Na exist')
        else:
            return data

    @staticmethod
    def resampling(data : pd.DataFrame, start_tm, end_tm, sampling_time = '5T') -> pd.DataFrame:
        
        try:
            
            start_tm = str(min(data.periodtime_1m.values))
            end_tm =  str(max(data.periodtime_1m.values))
            
        except AttributeError:
            
            print(' no {} in data_column'.format('periodtime_1m'))
                
        if not isinstance(data.index, pd.DatetimeIndex):
            
            data = Preprocessor.to_datetime(data)
            
        return data.loc[pd.date_range(start_tm, end_tm , freq = sampling_time)]
    @staticmethod
    def moving_average(data : pd.DataFrame, minutes = 3):
         
        col_name= 'prod_MA' + '_' + str(minutes)
        
        data.loc[:, col_name] = data.prod_tt.iloc[:].rolling(window = minutes, min_periods=1).mean()
        
        return data
    @staticmethod
    def convert_speed_to_time(data:pd.DataFrame) -> pd.DataFrame:
        
        data.loc[:,'real_length'] = data['real_tt']*data['real_ts']/3.6
        col_tt = data.loc[: , ["prod_tt_0",'prod_tt_1',"prod_tt_2"]]
        col_ts = data.loc[: ,[ "prod_ts_0",'prod_ts_1',"prod_ts_2"]]
        data['past_prod_ts_mean'] =  col_ts.mean(axis=1)
        data['past_prod_tt_mean_1'] =  col_tt.mean(axis=1)
        data['past_prod_tt_mean_2'] = (data['real_length'] / data['past_prod_ts_mean'])*3.6
        return data
    @staticmethod
    def checkNa(data:pd.DataFrame) -> bool:
        if (data['real_tt'].isnull().values.any()) or (data['prod_tt'].isnull().values.any()):
            print(np.where(np.asanyarray(np.isnan(data))))
            
            return True
        
        else:
            
            return False

In [13]:
class Tr_dataset_manger():
    
    """
        class 설명 : 데이터 셋 만드는 코드
        
        argument 설명 :
        
            header_flags : 읽어드리는 csv file에 header 여부 
        
            multi_feature_flags {
                
                True > featrue 를 travel_speed + congestion (prod)
                False > feature를 travel_speed 단일 피쳐
                
                }

            load_cate : 'express' > 고속도로 데이터
    """
    
    
    def __init__(self, data_path, file_name, header_flags = False, multi_feature_flags = True, load_cate = 'express'):
        
            
        
        self.data_path = data_path
        self.traffic_data_path = os.path.join(data_path, file_name)        
        self.header_flags = header_flags
        self.column_dict = None
        self.data = None
        self.multi_feature_flags = multi_feature_flags

        ## data 기간 정해주는 부분 
        self.info = {

            'start_tm': '201902090000',
            'end_tm' :  '202005312359'
        }
        # csv파일에 헤더가 없을 경우 만들어주는 부분 (header_flags argument와 연동)
        self.column_dict = {

            'col_1' : 'periodtime_1m',
            'col_2' : 'periodtime_5m',
            'col_3' : 'tsdlinkid',
            'col_4' : 'nexttsdlinkid',
            'col_5' : 'representlength',
            'col_6' : 'roadclass',
            'col_7' : 'linkclass',
            'col_8' : 'congestionclass',
            'col_9' : 'real_ts',
            'col_10' : 'real_tt',
            'col_11' : 'real_pc',
            'col_12' : 'real_con',
            'col_13' : 'real_na_code',
            'col_14' : 'prod_ts',
            'col_15' : 'prod_tt',
            'col_16' : 'prod_pc',
            'col_17' : 'prod_con',
            'col_18' : 'prod_ret',
            'col_19' : 'prod_na_code',
            'col_20' : 'pat_ts',
            'col_21' : 'pat_tt',
            'col_22' : 'pat_accum_pc',
            'col_23' : 'pat_con',
            'col_24' : 'patuseflag',
            'col_25' : 'pat_na_code',
            'col_26' : 'periodtype',
            'col_27' : 'tm',
            'col_28' : 'dt'
        }
            
    def read_file(self):
        
        if self.header_flags ==False:
            
            data = pd.read_csv(self.traffic_data_path, names = list(self.column_dict.values()))
            
        else:
            
            data = pd.read_csv(self.traffic_data_path)
        
        return data
        
    def preprocessing_data(self):

        ## 수정 중  return 결과물이 npz 파일로 return 되게끔 수정 필요 
        
        result_prod = np.array([])
        result_real  = np.array([]) 
        result_prod_con = np.array([])
        result_real_con = np.array([])
        result_jam_flags = np.array([])
        
        
        data = self.read_file()
        self.preprocessed_data_dict = {}
        
        idx_dict = pd.read_csv(os.path.join(self.data_path,'tsd_mapping.csv'))[['idx','tsdlink_id']].set_index('idx')\
        ['tsdlink_id'].to_dict()
        
        for i, key in enumerate(idx_dict.keys()):

        
            tsd = idx_dict[key]
            file_name = 'preprocessed_' + str(tsd) +'_' + '.csv'

            if data.dtypes['tsdlinkid'] != 'int':

                data['tsdlinkid'] = data['tsdlinkid'].astype('int')

            tmp_data = copy.deepcopy(data.loc[(data.tsdlinkid == tsd)])
            """ 

            추후 PREPROCESSOR 클래스 매소드의 일부는 데이더 추출 과정에서 big-query나 spark / hive로 작업해오는게 더 나음

            """
            tmp_data = Preprocessor.sorting_df(tmp_data)
            tmp_data = Preprocessor.fill_missing_index(tmp_data,self.info['start_tm'], self.info['end_tm'])
            tmp_data = Preprocessor.linear_interpolation(tmp_data)
            tmp_data = Preprocessor.resampling(tmp_data, self.info['start_tm'], self.info['end_tm'])
            tmp_data = Preprocessor.moving_average(tmp_data)

            tmp_data['row_idx'] = np.arange(len(tmp_data))
            tmp_data['row_idx'] = tmp_data['row_idx'].astype('int')
            
            tmp_data['lagged_real_con'] = tmp_data.real_con.shift(-1)
            tmp_data['lagged2_real_con'] = tmp_data.real_con.shift(-2)
            tmp_data['jam_flags'] = np.where((tmp_data.real_con != 1.0) | (tmp_data.lagged_real_con != 1.0)\
                                              | (tmp_data.lagged2_real_con != 1.0), 1, 0)
            
            
            
            if Preprocessor.checkNa(tmp_data):

                print('{} : NA exists'.format(key))
                print()
                return tmp_data
            else:
                pass

In [14]:
raw_data_path = '../data/raw_data/'
file_name = 'traffic_express.csv'
data_manger = Tr_dataset_manger(data_path = raw_data_path , file_name = file_name, multi_feature_flags = True,\
                                load_cate = 'express')
data = data_manger.preprocessing_data()


processing_fill DatetimeIndex(['2019-01-01 00:00:00', '2019-01-01 00:01:00',
               '2019-01-01 00:02:00', '2019-01-01 00:03:00',
               '2019-01-01 00:04:00', '2019-01-01 00:05:00',
               '2019-01-01 00:06:00', '2019-01-01 00:07:00',
               '2019-01-01 00:08:00', '2019-01-01 00:09:00',
               ...
               '2020-05-31 23:50:00', '2020-05-31 23:51:00',
               '2020-05-31 23:52:00', '2020-05-31 23:53:00',
               '2020-05-31 23:54:00', '2020-05-31 23:55:00',
               '2020-05-31 23:56:00', '2020-05-31 23:57:00',
               '2020-05-31 23:58:00', '2020-05-31 23:59:00'],
              dtype='datetime64[ns]', name='periodtime_1m', length=744480, freq=None)
processing_fill DatetimeIndex(['2019-01-01 00:00:00', '2019-01-01 00:01:00',
               '2019-01-01 00:02:00', '2019-01-01 00:03:00',
               '2019-01-01 00:04:00', '2019-01-01 00:05:00',
               '2019-01-01 00:06:00', '2019-01-01 00:07:00',
        

In [None]:
if (data.loc['2019-02-09 00:15:00':]['real_tt'].isnull().values.any()) or (data.loc['2019-02-09 00:15:00':]['prod_tt'].isnull().values.any()):
    print(np.where(np.asanyarray(np.isnan(data))))

In [6]:
data.head(50)

AttributeError: 'NoneType' object has no attribute 'head'

In [35]:
data.interpolate(method = 'values')

Unnamed: 0,periodtime_1m,periodtime_5m,tsdlinkid,nexttsdlinkid,representlength,roadclass,linkclass,congestionclass,real_ts,real_tt,...,patuseflag,pat_na_code,periodtype,tm,dt,prod_MA_3,row_idx,lagged_real_con,lagged2_real_con,jam_flags
2019-02-09 00:00:00,201902090000,201902090000,5295152,,340,5,1,4,,,...,,1.0,5,0,20190209,32.470394,0,,,1
2019-02-09 00:05:00,201902090005,201902090005,5295152,,340,5,1,4,,,...,,1.0,5,0,20190209,34.804497,1,,1.0,1
2019-02-09 00:10:00,201902090010,201902090010,5295152,,340,5,1,4,,,...,,1.0,5,0,20190209,35.582534,2,1.0,1.0,1
2019-02-09 00:15:00,201902090015,201902090015,5295152,,340,5,1,4,44.432163,27.547611,...,,1.0,5,0,20190209,36.995091,3,1.0,1.0,0
2019-02-09 00:20:00,201902090020,201902090020,5295152,,340,5,1,4,34.317158,35.667290,...,,1.0,5,0,20190209,33.182753,4,1.0,1.0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2020-05-31 23:35:00,202005312335,202005312335,5295152,,344,5,1,4,43.770800,28.292900,...,1.0,1.0,5,23,20200531,30.264567,137659,1.0,1.0,0
2020-05-31 23:40:00,202005312340,202005312340,5295152,,344,5,1,4,40.282900,30.742600,...,1.0,1.0,5,23,20200531,29.316667,137660,1.0,1.0,0
2020-05-31 23:45:00,202005312345,202005312345,5295152,,344,5,1,4,40.804700,30.349500,...,1.0,1.0,5,23,20200531,28.641767,137661,1.0,1.0,0
2020-05-31 23:50:00,202005312350,202005312350,5295152,,344,5,1,4,42.111400,29.436100,...,1.0,1.0,5,23,20200531,29.775400,137662,1.0,1.0,1
