In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
# Packages Loading
import os
import sys
sys.path.insert(0, '/home/ec2-user/workspace/01.Incheon/02_cctv_optmz')
from cctv_optmz_query import QueryCollection

# ignore warning
import warnings
warnings.filterwarnings(action='ignore')

# preprocessing function
import numpy as np
import pandas as pd

import logging

# datetime
import time
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

# pickle
import joblib

from sklearn.model_selection import train_test_split # data split
from sklearn.preprocessing import MinMaxScaler # scaler
from sklearn.ensemble import RandomForestRegressor # RF model
from sklearn.metrics import r2_score, mean_squared_error # model evaluation

# annotation
from typing import Tuple, Type

# pyspark processing function
import pyspark.sql.functions as F

# import, export Folder
MODEL_FOLDER = '/home/ec2-user/workspace/01.Incheon/02_cctv_optmz/model_learning/model'
SCALER_FOLDER = '/home/ec2-user/workspace/01.Incheon/02_cctv_optmz/model_learning/scaler'

MONTH_PERIOD = 2
MONTH_PERIOD_HALF = MONTH_PERIOD // 2

23/06/08 15:42:05 WARN Utils: Your hostname, safe-service resolves to a loopback address: 127.0.0.1; using 172.31.35.243 instead (on interface eth0)
23/06/08 15:42:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/06/08 15:42:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/06/08 15:42:06 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
# line break
line_break = '\n'

# logging setting
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
logging.info("Main-Process : start")
start = time.time()

logging.info(f"-- spark load --{line_break}")

# .py 실행 파라미터로 첫번째 인자에 실행현재(YYYYMMDD) 날짜가 들어옴
today = str(sys.argv[1])
today = '20230601'

assert len(today) == 8, "입력일자는 YYYYMMDD 형식 입니다"

# today를 datetime 타입으로 변환하여 저장하는 변수
today_dt : Type[datetime] = datetime.strptime(today, '%Y%m%d')

# 인천시 구 딕셔너리 (구 이름 : 구 코드)
gu_dict = {
    'Bupyeong-gu': 28237,
    'Dong-gu': 28140,
    'Jung-gu': 28110,
    'Gyeyang-gu': 28245,
    'Namdong-gu': 28200,
    'Ongjin-gun': 28720,
    'Yeonsu-gu': 28185,
    'Michuhol-gu': 28177,
    'Seo-gu': 28260,
    'Ganghwa-gun': 28710
}

gu_nm = 'Bupyeong-gu'
gu_cd = 28237

# cctv_optmz_query 인스턴스 생성
query_obj = QueryCollection(today, gu_cd)

15:42:09: Main-Process : start
15:42:09: -- spark load --



In [4]:
def create_train_df() -> 'pandas.DataFrame':
    """01. 여성 유동인구 데이터를 로딩하는 함수

    Returns
    -------
    train_df : pandas.DataFrame
       학습 데이터셋
    """

    for year in range(0, 5):

        # 일자 설정
        start_date = (today_dt - relativedelta(years=year, months=MONTH_PERIOD_HALF)).strftime('%Y%m%d')
        end_date = (today_dt - relativedelta(years=year, days=1)).strftime('%Y%m%d')

        # (1개월 전 ~ 어제) 통계 데이터를 불러오는 코드
        train_df = query_obj.get_train_women_pop_sql(start_date, end_date)

        if train_df.take(1):
            print(start_date, end_date)
            break

    # 요일 변수 생성
    train_df = train_df.withColumn('weekday', F.to_date(F.unix_timestamp('stdr_de', 'yyyyMMdd').cast("timestamp")))
    train_df = train_df.withColumn('weekday', F.date_format('weekday', 'E'))

    # 학습용 여성 유동인구 수 데이터 로딩 함수 실행
    train_df = train_df.toPandas()
    
    return train_df

# 01. 훈련 데이터 가져오기
train_df = create_train_df()
logging.info('-- create_train_df() 종료 --')

                                                                                

20220501 20220531


15:44:26: -- create_train_df() 종료 --                                            


# merge_statistic_df

In [5]:
def merge_statistic_df(train_df: 'pandas.DataFrame') -> 'pandas.DataFrame':
    """02. 통계용 여성 유동인구 데이터 셋을 결합하는 함수

    Returns
    -------
    train_df : pandas.DataFrame
       학습 데이터
    """

    for year in range(0, 5):
        # 일자 설정
        start_date = (today_dt - relativedelta(years=year, months=MONTH_PERIOD)).strftime('%Y%m%d')
        end_date = (today_dt - relativedelta(years=year, months=MONTH_PERIOD_HALF, days=1)).strftime('%Y%m%d')

        # (2개월 전 ~ 1개월 1일 전) 통계 데이터를 불러오는 코드
        statistic_df = query_obj.get_statistic_women_pop_sql(start_date, end_date)

        if statistic_df.take(1):
            print(start_date, end_date)
            break

    # 통계용 데이터 요일 컬럼 생성
    statistic_df = statistic_df.withColumn('weekday', F.to_date(F.unix_timestamp('stdr_de', 'yyyyMMdd').cast("timestamp")))
    statistic_df = statistic_df.withColumn('weekday', F.date_format('weekday', 'E'))

    # 그리드별-시간별-요일별 여성유동인구 평균, 그리드별-시간별-요일별 여성유동인구 최댓값
    statistic_df = statistic_df.groupby(['grid_id', 'stdr_tm', 'weekday']).agg( F.mean('women_pop').alias('women_pop_mean'), F.max('women_pop').alias('women_pop_max'))
    statistic_df = statistic_df.toPandas()

    # 통계용 데이터와 학습용 데이터 inner join
    train_df = train_df.merge(statistic_df, how='inner', on=['grid_id', 'stdr_tm', 'weekday'])

    return train_df

In [6]:
# 02. 통계 데이터 결합하기
train_df = merge_statistic_df(train_df)
logging.info('-- merge_statistic_df() 종료 --')

                                                                                

20220401 20220430


15:45:56: -- merge_statistic_df() 종료 --                                         


# merge_bus_stop_df

In [7]:
def merge_bus_stop_df(train_df: 'pandas.DataFrame') -> 'pandas.DataFrame':
    """03. 버스정류소 개수 데이터 셋을 결합하는 함수

    Returns
    -------
    train_df : pandas.DataFrame
       학습 데이터
    """

    # 버스정류소 개수 데이터 로딩
    bus_stop_cnt_df = query_obj.get_bus_stop_cnt_sql()
    
    if not bus_stop_cnt_df.take(1):
        logging.error('bus_stop_cnt_df no data')
    
    bus_stop_cnt_df = bus_stop_cnt_df.toPandas()

    train_df = train_df.merge(bus_stop_cnt_df, how='left', on='grid_id')
    train_df['bus_stop_cnt'].fillna(0, inplace=True)
    
    return train_df

In [8]:
# 03. 버스정류소 개수 데이터 결합하기
train_df = merge_bus_stop_df(train_df)
logging.info('-- merge_bus_stop_df() 종료 --')

15:48:34: -- merge_bus_stop_df() 종료 --                                          


# apply_get_dummies

In [None]:
def apply_get_dummies(train_df: 'pandas.DataFrame'):
    
    # 범주형 변수(요일) One-Hot Encoding
    train_df = pd.get_dummies(train_df, columns=['weekday'])

    # 없는 요일 범주형 변수 만들기
    days = [ 'weekday_Mon', 'weekday_Tue', 'weekday_Wed', 'weekday_Thu', 'weekday_Fri', 'weekday_Sat', 'weekday_Sun' ]

    for day in filter(lambda x: x not in train_df.columns, days):
        train_df[day] = np.uint8(0)
        
    cols = [
        'stdr_de',
        'stdr_tm',
        'grid_id',
        'women_pop',
        'women_pop_mean',
        'women_pop_max',
        'bus_stop_cnt'
    ]
    
    cols.extend(days)
    
    train_df = train_df.reindex(columns=cols)
    
    return train_df

train_df = apply_get_dummies(train_df)

# split_train_test

In [21]:
train_df.columns[7:]

Index(['weekday_Mon', 'weekday_Tue', 'weekday_Wed', 'weekday_Thu',
       'weekday_Fri', 'weekday_Sat', 'weekday_Sun'],
      dtype='object')

In [23]:
def split_train_test(dataFrame: 'pandas.DataFrame')-> Tuple['pandas.DataFrame', 'pandas.DataFrame', 'pandas.DataFrame', 'pandas.DataFrame']:
    """04. 학습용 데이터, 테스트용 데이터를 분리하는 함수

    Parameters
    ----------
    dataFrame : pandas.DataFrame
       학습 데이터

    Returns
    -------
    train_x : pandas.DataFrame
       모델 학습 독립변수 데이터 

    test_x : pandas.DataFrame
       모델 학습 종속변수 데이터 

    train_y : pandas.DataFrame
       모델 테스트 독립변수 데이터 

    test_y : pandas.DataFrame
       모델 테스트 종속변수 데이터 
    """

    # 학습 데이터, 테스트 데이터 분리하기
    x = dataFrame[['women_pop_mean', 'women_pop_max', 'bus_stop_cnt',
                   'weekday_Mon', 'weekday_Tue', 'weekday_Wed', 'weekday_Thu', 'weekday_Fri', 'weekday_Sat', 'weekday_Sun']]
    
    y = dataFrame[['women_pop']]

    # 데이터 분리하기 
    train_x, test_x, train_y, test_y = train_test_split(x, y, test_size=0.25, random_state=42)

    return train_x, test_x, train_y, test_y

# 04. 모델 분리하기
train_x, test_x, train_y, test_y = split_train_test(train_df)
logging.info('-- split_train_test() 종료 --')

16:05:51: -- split_train_test() 종료 --


# store_min_max_sacler

In [24]:
def store_min_max_sacler(train_x: 'pandas.DataFrame'):
    
    # MinMaxScaler 불러오기
    min_max_scaler = MinMaxScaler()

    # 연속형 변수 MinMaxScaler Fit & Transform
    train_x[['women_pop_mean', 'women_pop_max', 'bus_stop_cnt']] =  min_max_scaler.fit_transform(train_x[['women_pop_mean', 'women_pop_max', 'bus_stop_cnt']])

    # Save MinMaxScaler
    joblib.dump(min_max_scaler, os.path.join(SCALER_FOLDER, f'{gu_nm}_scaler.pkl'))
    
    return train_x

train_x = store_min_max_sacler(train_x)    

# train_women_pop_model

In [30]:
def train_women_pop_model(train_x: 'pandas.DataFrame', train_y: 'pandas.DataFrame'):
    """05. 학습용 데이터를 학습시키는 함수

    Parameters
    ----------
    train_x : pandas.DataFrame
       모델 학습 독립변수 데이터 

    train_y : pandas.DataFrame
       모델 테스트 독립변수 데이터      
    """

    # 모델 생성 및 학습
    logging.info(f'RandomForestRegressor {gu_nm} 학습시작 --')
    rf = RandomForestRegressor(max_depth=6)
    rf.fit(train_x, train_y)

    # Save RandomForest Regreesion Model
    joblib.dump(rf, os.path.join(MODEL_FOLDER, f'{gu_nm}_rf.pkl'))
    
train_women_pop_model(train_x, train_y)

16:08:28: RandomForestRegressor Bupyeong-gu 학습시작 --


In [None]:
def test_women_pop(test_x: 'pandas.DataFrame', test_y: 'pandas.DataFrame'):
    """05. 테스트 데이터를 평가하는 함수

    Parameters
    ----------
    dataFrame : pandas.DataFrame
       학습 데이터

    Returns
    -------
    test_x : pandas.DataFrame
       모델 학습 종속변수 데이터 

    test_y : pandas.DataFrame
       모델 테스트 종속변수 데이터 
    """
    
    # Load MinMaxScaler
    min_max_scaler = joblib.load(os.path.join(SCALER_FOLDER, f'{gu_nm}_scaler.pkl'))

    # 연속형 변수 MinMaxScaler Transform
    test_x[['women_pop_mean', 'women_pop_max', 'bus_stop_cnt']] =  min_max_scaler.transform(test_x[['women_pop_mean', 'women_pop_max', 'bus_stop_cnt']])

    # Load RandomForest Regression Model
    rf = joblib.load(os.path.join(MODEL_FOLDER, f'{gu_nm}_rf.pkl'))

    # 모델 예측
    test_pred = rf.predict(test_x)

    # 평가 지표
    r2 = r2_score(test_y, test_pred)
    logging.info(f'R2-SCORE : {round(r2, 2)}')
    
    mse = mean_squared_error(test_y, test_pred)
    logging.info(f'MSE : {round(mse, 2)}')

    rmse = mean_squared_error(test_y, test_pred, squared=False)
    logging.info(f'RMSE : {round(rmse, 2)}')


In [None]:
1. 상관계수
2. 산포도를 그린다 (scatter plot)

In [None]:
import sea

In [None]:
correlation_matrix = train_df[train_df.columns[3:7]].corr()
print(correlation_matrix)

In [None]:
train_df.columns[3:7]

In [None]:
pd.get_dummies(train_x['weekday'], columns=['weekday'])

In [None]:
pd.get_dummies(train_x['weekday']).dtypes