In [6]:
import os
import glob

import numpy as np
import pandas as pd

import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns

import ray

# 개별 파일 병합

In [7]:
def load_and_concat_csv(file_pattern: str) -> pd.DataFrame:
    file_list = glob.glob(file_pattern)
    df_list = []

    for file in file_list:
        try:
            df = pd.read_csv(file, low_memory=False)
            df_list.append(df)
        except Exception as e:
            print(f"Error of reading {file}: {e}")

    return pd.concat(df_list, ignore_index=True) if df_list else pd.DataFrame()


def vaisala_road(s_status):
    vaisala_road_state = {
        '1': "Dry",
        '2': "Moist",
        '3': "Wet",
        '5': "Frost",
        '6': "Snow",
        '7': "Ice",
        '9': "Slush", 
    }

    # Get the last character of the status
    last_char = str(s_status)[-1]

    # Check if the last character is in the mapping dictionary
    if last_char not in vaisala_road_state:
        return "error"
    return vaisala_road_state[last_char]

# read file from directory
luft_total_df = load_and_concat_csv("../DATA/MWIS_2023_NDJF_OBS/*MARWIS*csv")
vi_total_df = load_and_concat_csv("../DATA/MWIS_2023_NDJF_OBS/*move4*csv")

# Apply the function to the DataFrame column
vi_total_df["s_status_txt"] = vi_total_df["s_status"].apply(vaisala_road)

# drop column no need
drop_column_list = ['gdirection1','gdirection2','digitalcomp_x','digitalcomp_y','digitalcomp_z',
                    'ta2','rh2','loggerta', 'batteryvolt', 'rev1', 'rev2']
vi_total_df.drop(drop_column_list, axis = 1, inplace=True)

# Extract components of the date and time
year = vi_total_df['gdate'].astype(str).str[-6:-4].apply(lambda x: '20' + x if len(x) == 2 else x)  # Handling two-digit year
month = vi_total_df['gdate'].astype(str).str[-4:-2]
day = vi_total_df['gdate'].astype(str).str[-2:]

hour = vi_total_df['gtime'].astype(str).str[-6:-4].str.zfill(2)
minute = vi_total_df['gtime'].astype(str).str[-4:-2]
second = vi_total_df['gtime'].astype(str).str[-2:]

# Combine extracted components into a datetime string and convert to datetime
vi_total_df['TIMESTAMP'] = pd.to_datetime(
    year + '-' + month + '-' + day + ' ' + hour + ':' + minute + ':' + second,
    format='%Y-%m-%d %H:%M:%S'
)

vi_total_df['TIMESTAMP'] = pd.to_datetime(vi_total_df['TIMESTAMP'])
luft_total_df['TIMESTAMP'] = pd.to_datetime(luft_total_df['TIMESTAMP'])

total_df = pd.merge(vi_total_df, luft_total_df, on='TIMESTAMP', how='outer', suffixes=('_vi', '_luft'))

total_df.replace({
    '/': '',
    '//////': np.nan,
    '////': np.nan,
    '///////': np.nan
}, inplace=True)

total_df.to_csv("../DATA/CONCAT/luft_vi_2023_2024.csv", index=None)


# 센서 자료 AWS자료 병합

In [9]:
from scipy.spatial import KDTree

total_df['glatitude']=total_df['glatitude'].ffill() 
total_df['glongitude']=total_df['glongitude'].ffill() 

# AWS 정보 데이터프레임 읽기 및 k-d 트리 생성
aws_info_df = pd.read_csv("../DATA/AWS/META_관측지점정보.csv", encoding='cp949')
aws_coord = aws_info_df.drop_duplicates(subset=["지점"])[['지점', '위도', '경도']]
aws_coord.columns = ['site', 'latitude', 'longitude']

# 위도와 경도 점들을 배열로 생성
points = np.array(list(zip(aws_coord['latitude'], aws_coord['longitude'])))
tree = KDTree(points)

# 질의 점들 생성
query_points = total_df[['glatitude', 'glongitude']].to_numpy()

# 가장 가까운 점들 찾기
distances, indices = tree.query(query_points)

# 가장 가까운 점의 사이트 정보 가져오기
nearest_sites = aws_coord.iloc[indices]['site'].values

# total_df에 AWS 사이트 정보 추가
total_df['aws_site'] = nearest_sites

In [10]:
asw_stn_list = total_df['aws_site'].unique()

aws_file_dir = "C:/Users/user/Desktop/RoadAnalysis/DATA/AWS/202311~202403/"
aws_file_list = glob.glob(os.path.join(aws_file_dir,"**/AWS*"), recursive=True)
# Ray 초기화
ray.init()

@ray.remote
def process_file(file,asw_stn_list):
    df = pd.read_csv(file, sep="#", header=None,encoding='cp949',engine='python')
    df.columns = ['STN_ID', 'TM', 'LAT', 'LON', 'HT', 'WD', 'WS', 'TA', 'HM', 'PA', 'PS', 'RN_YN',
                  'RN_1HR', 'RN_DAY', 'RN_15M', 'RN_60M', 'WD_INS', 'WS_INS', "END"]
    df = df[df['STN_ID'].isin(asw_stn_list)]
    return df

# 병렬로 파일 처리
futures = [process_file.remote(file,asw_stn_list) for file in aws_file_list]
dfs_list = ray.get(futures)

# 데이터프레임 병합
dfs = pd.concat(dfs_list)

# Ray 종료
ray.shutdown()

# 결과 출력 또는 저장
# print(dfs)
dfs.to_csv("../DATA/AWS/aws_202311_202403.csv", index=False)


2024-07-30 10:39:10,552	INFO worker.py:1779 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


# AWS 자료와 센서 자료 결합

In [11]:
aws_df = pd.read_csv("../DATA/AWS/aws_202311_202403.csv")
# Rename the column 'STN_ID' to 'aws_site' in aws_df
aws_df.rename(columns={"STN_ID": "aws_site"}, inplace=True)

# Convert 'aws_site' to integer type in aws_df
aws_df["aws_site"] = aws_df["aws_site"].astype(int)

# Convert 'TM' to datetime and create 'min_time' column in aws_df
aws_df['min_time'] = pd.to_datetime(aws_df['TM'], format='%Y%m%d%H%M')
aws_df['min_time'] = aws_df['min_time'].dt.strftime('%Y-%m-%d %H:%M')

# Convert 'TIMESTAMP' to datetime and create 'min_time' column in total_df
total_df['min_time'] = pd.to_datetime(total_df['TIMESTAMP'], format='%Y%m%d%H%M')
total_df['min_time'] = total_df['min_time'].dt.strftime('%Y-%m-%d %H:%M')

# Convert 'aws_site' to integer type in total_df
total_df["aws_site"] = total_df["aws_site"].astype(int)

df_last = pd.merge(left=total_df,right=aws_df, on = ["aws_site",'min_time'], how = 'left')
df_last.to_csv("../DATA/CONCAT/aws_sensor_merge.csv",index=None)