In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import duckdb
import geopandas as gpd
import warnings
warnings.filterwarnings('ignore')

# pandas setting
pd.set_option('display.float_format', '{:.2f}'.format)
pd.set_option('display.max_columns', 100)
# matplotlib setting
plt.rcParams['font.family'] = 'NanumGothic'
plt.rcParams['axes.unicode_minus'] = False

In [None]:
import matplotlib.font_manager as fm
# font_list = fm.findSystemFonts(fontpaths=None, fontext='ttf')
# font_list

In [None]:
# Connect to Duckdb
con = duckdb.connect(database = 'myanalysis.db', read_only=False)# duckdb db 불러오기
# 메모리 110GB 제한
# 메모리 스필 관련 설정
con.execute("PRAGMA temp_directory='/tmp';")
con.execute("PRAGMA memory_limit='100GB';")
con.execute('show tables').df()

In [None]:
# 1. 목적 통행으로 집계

In [None]:
# 7, 2024 all days and weekdays
dates = pd.date_range(start = '2024-07-01', end = '2024-07-31', freq='D')
weekday = dates[dates.weekday < 5]
weekday_strs = weekday.strftime('%Y%m%d').to_list()
allday_strs = dates.strftime('%Y%m%d').to_list()
len(allday_strs)

In [None]:
# 목적통행용 테이블 생성
query = '''
CREATE OR REPLACE TABLE tb_linked_transport_202407
    (운행일자 VARCHAR(8),
    가상카드번호 VARCHAR,
    트랜잭션ID VARCHAR,
    승차정류장ID VARCHAR,
    하차정류장ID VARCHAR,
    승차일시 VARCHAR,
    하차일시 VARCHAR,
    승차지역코드 VARCHAR,
    하차지역코드 VARCHAR,
    승차교통수단구분 VARCHAR,
    하차교통수단구분 VARCHAR,
    총이동거리 FLOAT,
    총탑승시간 FLOAT
    )
'''
# con.execute(query)
# print(con.execute('show tables').df())
con.execute("select * from tb_linked_transport_202407 limit 10").df()

In [None]:
#목적 통행 테이블 INSERT

for day in allday_strs:
    query = f'''
    INSERT INTO tb_linked_transport_202407
    WITH summary AS(
    SELECT 운행일자,
        가상카드번호,
        트랜잭션ID,
        MAX(환승건수) AS 최대환승수,
        SUM(이용거리) AS 총이동거리,
        SUM(탑승시간) AS 총탑승시간
    FROM tb_transport_202407
    WHERE 운행일자 = {day}
    GROUP BY 운행일자, 가상카드번호, 트랜잭션ID
    )
    SELECT s.운행일자,
        s.가상카드번호,
        s.트랜잭션ID,
        t1.정산사승차정류장ID AS 승차정류장ID,
        t2.정산사하차정류장ID AS 하차정류장ID,
        t1.승차일시 AS 승차일시,
        t2.하차일시 AS 하차일시,
        t1.정산지역코드 AS 승차지역코드,
        t2.정산지역코드 AS 하차지역코드,
        t1.교통수단구분 AS 승차교통수단구분,
        t2.교통수단구분 AS 하차교통수단구분,
        s.총이동거리,
        s.총탑승시간

    FROM summary s
    -- t1: 첫 통행
    LEFT JOIN tb_transport_202407 t1
        ON s.운행일자 = t1.운행일자
        AND s.가상카드번호 = t1.가상카드번호
        AND s.트랜잭션ID = t1.트랜잭션ID
        AND t1.환승건수 = 0
    -- t2: 마지막 통행
    LEFT JOIN tb_transport_202407 t2
        ON s.운행일자 = t2.운행일자
        AND s.가상카드번호 = t2.가상카드번호
        AND s.트랜잭션ID = t2.트랜잭션ID
        AND s.최대환승수 = t2.환승건수
    '''
    # con.query(query)
    print(f"{day} db화 완료")

In [None]:
# 2. 데이터 무결성 전처리

In [None]:
# 데이터 정확성 관련 체크
# 16283개 드랍

query = '''
DELETE from tb_linked_transport_202407 where 승차일시 > 하차일시;
select count(*) from tb_linked_transport_202407;
'''

con.execute(query).df()

In [None]:
# NULL값 체크

# 결론: NULL값을 두고, 체류시간 프로세스에 이전 하차정류장 혹은 현재 승차정류장이 NULL일 경우
# -> 체류시간 미적용으로 처리하는게 정밀할 것으로 판단

query = '''
SELECT
    COUNT(*) AS total_rows,
    COUNT(*) - COUNT(가상카드번호) AS 가상카드번호_NULLs,
    COUNT(*) - COUNT(트랜잭션ID) AS 트랜잭션ID_NULLs,
    COUNT(*) - COUNT(승차정류장ID) AS 승차정류장ID_NULLs,
    COUNT(*) - COUNT(하차정류장ID) AS 하차정류장ID_NULLs,
    COUNT(*) - COUNT(승차일시) AS 승차일시_NULLs,
    COUNT(*) - COUNT(하차일시) AS 하차일시_NULLs,
    COUNT(*) - COUNT(승차지역코드) AS 승차지역코드_NULLs,
    COUNT(*) - COUNT(하차지역코드) AS 하차지역코드_NULLs,
    COUNT(*) - COUNT(승차교통수단구분) AS 승차교통수단구분_NULLs,
    COUNT(*) - COUNT(하차교통수단구분) AS 하차교통수단구분_NULLs,
    COUNT(*) - COUNT(총이동거리) AS 총이동거리_NULLs,
    COUNT(*) - COUNT(총탑승시간) AS 총탑승시간_NULLs,
    (COUNT(*) - COUNT(가상카드번호))/COUNT(*)*100 AS 가상카드번호_PERCENT,
    (COUNT(*) - COUNT(트랜잭션ID))/COUNT(*)*100 AS 트랜잭션ID_PERCENT,
    (COUNT(*) - COUNT(승차정류장ID))/COUNT(*)*100 AS 승차정류장ID_PERCENT,
    (COUNT(*) - COUNT(하차정류장ID))/COUNT(*)*100 AS 하차정류장ID_PERCENT,
    (COUNT(*) - COUNT(승차일시))/COUNT(*)*100 AS 승차일시_PERCENT,
    (COUNT(*) - COUNT(하차일시))/COUNT(*)*100 AS 하차일시_PERCENT,
    (COUNT(*) - COUNT(승차지역코드))/COUNT(*)*100 AS 승차지역코드_PERCENT,
    (COUNT(*) - COUNT(하차지역코드))/COUNT(*)*100 AS 하차지역코드_PERCENT,
    (COUNT(*) - COUNT(승차교통수단구분))/COUNT(*)*100 AS 승차교통수단구분_PERCENT,
    (COUNT(*) - COUNT(하차교통수단구분))/COUNT(*)*100 AS 하차교통수단구분_PERCENT,
    (COUNT(*) - COUNT(총이동거리))/COUNT(*)*100 AS 총이동거리_PERCENT,
    (COUNT(*) - COUNT(총탑승시간))/COUNT(*)*100 AS 총탑승시간_PERCENT,


FROM tb_linked_transport_202407;
'''

con.execute(query).df()

In [None]:
# 3. 통근패턴 인원 불러와서 주거, 업무, 여가 패턴 붙이기

In [None]:
con.execute('desc tb_linked_transport_202407').df()

In [None]:
con.execute('SELECT COUNT(*) FROM tb_commuting_cardid_202407').df()

In [None]:
# 가상카드번호가 663만개로 -> 20개의 청크로 분할 해시 매핑
query = '''
    CREATE OR REPLACE VIEW card_buckets AS
    SELECT *, abs(hash(가상카드번호)) % 20 AS bucket_id
    FROM tb_commuting_cardid_202407;

    SELECT * FROM card_buckets;
'''
# con.execute(query).df()

In [None]:
# 체류시간 테이블 생성

# 운행일자, 가상카드번호, 정류장ID, 지역코드, 교통수단구분, duration_type, 정류장별체류시간, 체류인원

query = '''
CREATE OR REPLACE TABLE tb_duration_total_stations_202407
    (운행일자 VARCHAR,
    가상카드번호 VARCHAR,
    정류장ID VARCHAR,
    지역코드 VARCHAR,
    교통수단구분 VARCHAR(1),
    duration_type VARCHAR(4),
    정류장별체류시간 FLOAT,
    체류인원 FLOAT);

desc FROM tb_duration_total_stations_202407;
'''
con.execute(query).df()

In [None]:
# BUCKET = 0 TEST
for i in range(20):
    query = f'''
    INSERT INTO tb_duration_total_stations_202407
    -- 1. 통근 패턴에 해당하는 가상카드번호만 필터하되, 메모리 이슈로 20개로 나누어 반복문 수행
    WITH subset AS(
    SELECT t.운행일자,
        t.가상카드번호,
        t.트랜잭션ID,
        t.승차정류장ID,
        t.하차정류장ID,
        t.승차지역코드,
        t.하차지역코드,
        t.승차교통수단구분,
        t.하차교통수단구분,
        s1.cluster_id AS 승차클러스터ID,
        s2.cluster_id AS 하차클러스터ID,
        cb.주거지클러스터 AS 주거지클러스터ID,
        cb.업무지클러스터 AS 업무지클러스터ID,
        strptime(t.승차일시, '%Y%m%d%H%M%S') AS 승차일시,
        strptime(t.하차일시, '%Y%m%d%H%M%S') AS 하차일시
    FROM tb_linked_transport_202407 AS t
    -- 1-1. 카드 버킷 매핑
    JOIN card_buckets AS cb
        ON t.가상카드번호 = cb.가상카드번호
    -- 1-2. 정류장에 클러스터ID 매핑
    LEFT JOIN tb_station_cluster as s1
        ON t.승차정류장ID = s1.정류장ID
        AND t.승차지역코드 = s1.지역코드
        AND t.승차교통수단구분 = s1.교통수단구분
    LEFT JOIN tb_station_cluster as s2
        ON t.하차정류장ID = s2.정류장ID
        AND t.하차지역코드 = s2.지역코드
        AND t.하차교통수단구분 = s2.교통수단구분
    -- 버킷 분할 처리
    WHERE cb.bucket_id = {i}
    ),

    -- 2. 체류시간 테이블 구성을 위한 LEAD 윈도우 함수
    step_02 AS (
    SELECT *,
        LEAD(승차정류장ID) OVER w AS 다음승차정류장ID,
        LEAD(승차클러스터ID) OVER w AS 다음승차클러스터ID,
        LEAD(승차일시) OVER w AS 다음승차일시,
        LEAD(승차지역코드) OVER w AS 다음승차지역코드,
        LEAD(승차교통수단구분) OVER w AS 다음승차교통수단구분,
    FROM subset
    WINDOW w AS (
    PARTITION BY 가상카드번호
    ORDER BY 승차일시)
    ),

    -- 3. 최종 체류 목적을 포함한 인버스 테이블 추출
    step_03 AS(
    SELECT *,
        -- 체류시간(분단위), 인원 컬럼 추가
        EXTRACT(EPOCH FROM(다음승차일시 - 하차일시))/60 AS 체류시간,
        1 AS 체류인원,
        CASE
            WHEN 하차클러스터ID == 주거지클러스터ID AND 다음승차클러스터ID == 주거지클러스터ID THEN 'live'
            WHEN 하차클러스터ID == 업무지클러스터ID AND 다음승차클러스터ID == 업무지클러스터ID THEN 'work'
            WHEN 하차클러스터ID != 주거지클러스터ID AND 하차클러스터ID != 업무지클러스터ID
                AND 다음승차클러스터ID != 주거지클러스터ID AND 다음승차클러스터ID != 업무지클러스터ID THEN 'play'
            ELSE 'other'
            END AS 'duration_type'
    FROM step_02
        -- 필터로직 1) 하차정류장과 다음승차정류장이 NULL 경우 제외
    WHERE 다음승차정류장ID IS NOT NULL AND 하차정류장ID IS NOT NULL
        -- 필터로직 2) 다음승차일시가 하차일시와 동일한 날짜거나, 다음 날인경우만
        AND 다음승차일시 <= 하차일시+INTERVAL'1 day'
        -- 필터로직 3) 체류시간이 5분 이상 되는 경우
        AND 다음승차일시 - 하차일시 >= INTERVAL'5 minutes'
    )
    -- 4. 하차정류장과 다음승차정류장 세로로 분리

    -- 4-1. 하차정류장, 체류유형
    SELECT
        운행일자,
        가상카드번호,
        하차정류장ID AS 정류장ID,
        하차지역코드 AS 지역코드,
        하차교통수단구분 AS 교통수단구분,
        duration_type,
        체류시간/2 AS 정류장별체류시간,
        체류인원
    FROM step_03

    UNION ALL
    -- 4.2. 다음승차정류장, 체류유형 세로로 결합
    SELECT
        운행일자,
        가상카드번호,
        다음승차정류장ID AS 정류장ID,
        다음승차지역코드 AS 지역코드,
        다음승차교통수단구분 AS 교통수단구분,
        duration_type,
        체류시간/2 AS 정류장별체류시간,
        체류인원
    FROM step_03
    ;
    '''
    con.execute(query).df()
    print(f"card_id {i}번째 chunk completed")

In [None]:
con.execute("select count(*) from tb_duration_total_stations_202407").df()

In [None]:
con.execute("select * from tb_duration_total_stations_202407 limit 10").df()

In [None]:
# 추가 그룹바이 프로세스
query = '''
-- 1. 체류 테이블 생성 및 초기화
CREATE OR REPLACE TABLE tb_duration_time_202407
    (운행일자 VARCHAR,
    정류장ID VARCHAR,
    교통수단구분 VARCHAR,
    duration_type VARCHAR(4),
    총체류시간 FLOAT,
    총체류인원 FLOAT,
    체류시간_평균 FLOAT,
    체류시간_중위 FLOAT,
    정류장명칭 VARCHAR,
    법정동코드 VARCHAR,
    정류장GPSX좌표 FLOAT,
    정류장GPSY좌표 FLOAT);

INSERT INTO tb_duration_time_202407

-- 5. 정류장 정보 결합
WITH step_05 AS(
SELECT *,
    sc.정류장명칭,
    sc.법정동코드::VARCHAR AS 법정동코드,
    sc.정류장GPSX좌표,
    sc.정류장GPSY좌표
FROM tb_duration_total_stations_202407 t
LEFT JOIN tb_station_cluster sc
    ON t.정류장ID = sc.정류장ID
    AND t.지역코드 = sc.지역코드
    AND t.교통수단구분 = sc.교통수단구분
)
-- 6. 정류장, 체류 유형에 따른 집계
SELECT 운행일자,
    정류장ID,
    교통수단구분,
    duration_type,
    SUM(정류장별체류시간) AS 총체류시간,
    SUM(체류인원) AS 총체류인원,
    AVG(정류장별체류시간) AS 체류시간_평균,
    MEDIAN(정류장별체류시간) AS 체류시간_중위,
    정류장명칭,
    법정동코드,
    정류장GPSX좌표,
    정류장GPSY좌표,
FROM step_05
GROUP BY 운행일자, 정류장ID, 정류장명칭, 법정동코드, 교통수단구분, 정류장GPSX좌표, 정류장GPSY좌표, duration_type;
SELECT COUNT(*) FROM tb_duration_time_202407;
'''
con.execute(query).df()

In [None]:
'''-- 5. 정류장, 체류 유형에 따른 집계
WITH step_05 AS (
-- 5.1. 월 평균
SELECT 운행일자,
    정류장ID,
    지역코드,
    교통수단구분,
    duration_type,
    SUM(정류장별체류시간) AS 총체류시간,
    SUM(체류인원) AS 총체류인원,
    AVG(정류장별체류시간) AS 체류시간_평균,
    MEDIAN(정류장별체류시간) AS 체류시간_중위
FROM tb_duration_total_stations_202407
GROUP BY 운행일자, 정류장ID, 지역코드, 교통수단구분, duration_type
)
-- 6. 정류장 정보 결합
SELECT s5.*,
    sc.정류장명칭,
    sc.법정동코드::VARCHAR AS 법정동코드,
    sc.정류장GPSX좌표,
    sc.정류장GPSY좌표
FROM step_05 s5
LEFT JOIN tb_station_cluster sc
    ON s5.정류장ID = sc.정류장ID
    AND s5.지역코드 = sc.지역코드
    AND s5.교통수단구분 = sc.교통수단구분;
SELECT COUNT(*) FROM tb_duration_time_202407;
'''
# con.execute(query).df()

In [None]:
con.execute("SELECT * FROM tb_duration_time_202407 LIMIT 100").df()

In [None]:
# df.isnull().sum()

df.describe()

In [None]:
df.운행일자.nunique()

In [None]:
# 체류시간 5분 미만인 경우 제거
print(df.총체류시간.describe())
print('결측치 개수', df.isnull().sum())
print(df.duration_type.value_counts())
df.총체류시간.plot.box()

In [None]:
# 1일과 6일 비교
duration_240701 = con.execute("SELECT * FROM tb_duration_time_202407 WHERE 운행일자 == '20240701'").df()
duration_240706 = con.execute("SELECT * FROM tb_duration_time_202407 WHERE 운행일자 == '20240706'").df()

# 5179 좌표로 변환
duration_240701 = gpd.GeoDataFrame(duration_240701,
                                   geometry = gpd.points_from_xy(duration_240701.정류장GPSX좌표, duration_240701.정류장GPSY좌표),
                                   crs = 'EPSG:4326')
duration_240701 = duration_240701.to_crs('EPSG:5179')
duration_240701['x_5179'] = duration_240701.geometry.x
duration_240701['y_5179'] = duration_240701.geometry.y

duration_240706 = gpd.GeoDataFrame(duration_240706,
                                   geometry = gpd.points_from_xy(duration_240706.정류장GPSX좌표, duration_240706.정류장GPSY좌표),
                                   crs = 'EPSG:4326')
duration_240706 = duration_240706.to_crs('EPSG:5179')
duration_240706['x_5179'] = duration_240706.geometry.x
duration_240706['y_5179'] = duration_240706.geometry.y

# duration_240701.to_csv('output/체류시간_240701.csv')
# duration_240706.to_csv('output/체류시간_240706.csv')
duration_240706.head()

In [None]:
duration_2407[duration_2407.duplicated(subset=['정류장ID', '운행일자', 'duration_type', 'geometry']
                                       , keep=False)].sort_values(['운행일자', 'duration_type', 'geometry'])

In [None]:
# 컬럼 정리
duration_2407_no_geometry = duration_2407[['운행일자', '정류장ID', 'cluster_id', '법정동코드', '정류장명칭',
                                                       '교통수단구분', 'duration_type',
                                                      '총체류시간', '총체류인원', '체류시간_평균', '체류시간_중위']]
duration_2407_no_geometry.columns = ['operation_date', 'station_id', 'cluster_id', 'admin_code', 'station_name',
                                     'station_type', 'duration_type', 'total_duration_time', 'duration_count',
                                     'duration_average_time', 'duration_median_time']
duration_2407_no_geometry.head()

In [None]:
# 고유값 확인
test = duration_2407_no_geometry[duration_2407_no_geometry.duplicated(subset=['operation_date', 'station_id', 'cluster_id', 'station_type', 'duration_type'], keep=False)].sort_values(['operation_date','station_id'])
size = test.groupby(['operation_date', 'station_name', 'station_id', 'cluster_id','duration_type']).size()
size

In [None]:
duration_2407_no_geometry.shape

In [None]:
duration_2407_no_geometry.to_parquet('output/체류시간_2407.parquet')

In [None]:
# duration_2407_no_geometry = pd.read_parquet('output/체류시간_2407.parquet')
print(len(duration_2407_no_geometry))
duration_2407_no_geometry.isnull().sum()