In [83]:
import os
import time
import polars as pl
from polars import Config
from datetime import datetime, timedelta

fpath1 = "C:\\data_tests\\data\\kyrgyz_aws.csv"
files = {
    "Site_A": fpath1,
    
}

# データ型とカラムの対応dict
DATATYPES = {
    'RECORD': pl.Int32,
    'WS_10min_Avg': pl.Float32,
    'WD_10min_Avg': pl.Float32,
    'WS_10min_Max': pl.Float32,
    'AirTemp_10min_Avg': pl.Float32,
    'AirRH_10min_Avg': pl.Float32,
    'Atm_10min_Avg': pl.Float64,
    'Rain_10min_Tot': pl.Float32,
    'SR_dn_10min_Avg': pl.Float64,
    'SR_up_10min_Avg': pl.Float64,
    'IR_dn_10min_Avg': pl.Float64,
    'IR_up_10min_Avg': pl.Float64,
    'IR_up_Temp_10min_Avg': pl.Float32,
    'IR_dn_Temp_10min_Avg': pl.Float32,
    'NetRs_10min_Avg': pl.Float64,
    'NetRl_10min_Avg': pl.Float64,
    'Albedo_10_min_Avg': pl.Float64,
    'NetTot_10min_Avg': pl.Float64,
    'IR_dn_Corr_10min_Avg': pl.Float64,
    'IR_up_Corr_10min_Avg': pl.Float64,
    'BattVolt_10min_Min': pl.Float32
}


# **処理時間の計測**
START_TIME = time.time()

# dfを格納するdict
dfs = {}

for site_name, file in files.items():
    # awsデータの読み込み
    df = (
        pl.scan_csv(file, encoding="utf8-lossy", skip_rows=1, dtypes=DATATYPES, ignore_errors=True)
        .slice(4)
        .with_columns(pl.col("TIMESTAMP").str.strptime(pl.Datetime, "%Y/%m/%d %H:%M"))
        .filter((pl.col("TIMESTAMP") >= pl.col("TIMESTAMP").min()) & (pl.col("TIMESTAMP") <= pl.col("TIMESTAMP").max()))
        .collect()
    )


    # データの日付範囲を取得
    time_range = df.select(
        pl.col("TIMESTAMP").min().alias("start_time"),
        pl.col("TIMESTAMP").max().alias("end_time")
    )
    # 日付範囲としてログ出力
    print(time_range)

    # バッテリーが12V未満のデータはログ出力して分離
    df_below_12v = df.filter(pl.col("BattVolt_10min_Min") < 12)
    if df_below_12v.height > 0:
        print(f"-- 12V未満の不良データ --")
        print(df_below_12v)
    # 12V以上の物は正規データとして扱う
    df = df.filter(pl.col("BattVolt_10min_Min") >= 12)


    ## 仮想的なTIMESTAMPを作成して，実際のデータと照合する
    ts_min = df.select(pl.col("TIMESTAMP").min()).to_series().item()    # 最小日時
    ts_max = df.select(pl.col("TIMESTAMP").max()).to_series().item()    # 最大日時
    # timestampを作る
    timestamps = []
    current = ts_min
    while current <= ts_max:
        timestamps.append(current)
        current += timedelta(minutes=10)
    # polars dataframeに変換
    ex_timestamp = pl.DataFrame({"TIMESTAMP": timestamps})

    # TIMESTAMPの欠損を取得
    missing_timestamps = ex_timestamp.join(df.select("TIMESTAMP"), on="TIMESTAMP", how="anti")
    if missing_timestamps.height > 0:
        print(missing_timestamps)
    # 欠損値をログ出力
    if missing_timestamps.height > 0:
        print(f"-- TIMESTAMPの欠損ログ --")
        print(f"{missing_timestamps}")

    # TIMESTAMPをUNIXTIMEに変換
    df = df.with_columns(
        pl.col("TIMESTAMP").dt.epoch()
    )
    # 変換出来なかった行をログ出力
    df_null_unixtime = df.filter(pl.col("TIMESTAMP").is_null())
    if df_null_unixtime.height > 0:
        print(f"--UNIXTIMEの欠損ログ--")
        print(df_null_unixtime)

    # RECORD列を地点情報に書き換える
    df = df.with_columns(pl.lit(site_name).alias("RECORD")).rename({"RECORD": "location", "TIMESTAMP": "timestamp"})


    # dictに保存
    dfs[site_name] = df

# **計測終了**
END_TIME = time.time()
print(f"-- 実行時間: {END_TIME - START_TIME:.4f} 秒 --")


df

shape: (1, 2)
┌─────────────────────┬─────────────────────┐
│ start_time          ┆ end_time            │
│ ---                 ┆ ---                 │
│ datetime[μs]        ┆ datetime[μs]        │
╞═════════════════════╪═════════════════════╡
│ 2022-09-26 13:10:00 ┆ 2024-07-13 10:00:00 │
└─────────────────────┴─────────────────────┘
-- 12V未満の不良データ --
shape: (2, 22)
┌────────────┬────────┬────────────┬───────────┬───┬───────────┬───────────┬───────────┬───────────┐
│ TIMESTAMP  ┆ RECORD ┆ WS_10min_A ┆ WD_10min_ ┆ … ┆ NetTot_10 ┆ IR_dn_Cor ┆ IR_up_Cor ┆ BattVolt_ │
│ ---        ┆ ---    ┆ vg         ┆ Avg       ┆   ┆ min_Avg   ┆ r_10min_A ┆ r_10min_A ┆ 10min_Min │
│ datetime[μ ┆ i32    ┆ ---        ┆ ---       ┆   ┆ ---       ┆ vg        ┆ vg        ┆ ---       │
│ s]         ┆        ┆ f32        ┆ f32       ┆   ┆ f64       ┆ ---       ┆ ---       ┆ f32       │
│            ┆        ┆            ┆           ┆   ┆           ┆ f64       ┆ f64       ┆           │
╞════════════╪════════╪══

  pl.scan_csv(file, encoding="utf8-lossy", skip_rows=1, dtypes=DATATYPES, ignore_errors=True)


timestamp,location,WS_10min_Avg,WD_10min_Avg,WS_10min_Max,AirTemp_10min_Avg,AirRH_10min_Avg,Atm_10min_Avg,Rain_10min_Tot,SR_dn_10min_Avg,SR_up_10min_Avg,IR_dn_10min_Avg,IR_up_10min_Avg,IR_up_Temp_10min_Avg,IR_dn_Temp_10min_Avg,NetRs_10min_Avg,NetRl_10min_Avg,Albedo_10_min_Avg,NetTot_10min_Avg,IR_dn_Corr_10min_Avg,IR_up_Corr_10min_Avg,BattVolt_10min_Min
i64,str,f32,f32,f32,f32,f32,f64,f32,f64,f64,f64,f64,f32,f32,f64,f64,f64,f64,f64,f64,f32
1664197800000000,"""Site_A""",1.808,258.399994,2.7,5.836,31.73,646.416,0.0,928.9788,126.6331,-126.2469,32.10106,12.11,11.59,802.3458,-158.348,0.1363156,643.9978,249.1938,404.7926,13.56
1664198400000000,"""Site_A""",2.535,265.200012,3.4,5.668,30.549999,646.3317,0.0,924.3484,125.8074,-126.3353,33.52101,11.73,11.24,798.541,-159.8563,0.1361013,638.6846,247.1382,404.3836,13.56
1664199000000000,"""Site_A""",1.972,269.5,2.9,5.533,30.790001,646.2999,0.0,342.7564,123.825,-41.3195,36.95861,10.64,10.4,218.9314,-78.27811,9.071039,140.6533,326.4338,403.498,13.57
1664199600000000,"""Site_A""",1.473,268.700012,3.3,6.115,27.35,646.2983,0.0,9.188166,134.3892,-0.021746,41.87482,9.85,9.89,-125.201,-41.89656,22.7164,-167.0976,363.6657,405.7614,13.59
1664200200000000,"""Site_A""",2.797,267.799988,6.3,6.163,33.040001,646.2183,0.0,647.0114,130.6579,-84.65701,44.71037,9.79,9.53,516.3536,-129.3674,4.652904,386.9862,278.6996,406.7659,13.59
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
1720862400000000,"""Site_A""",5.7,99.0,5.7,3.7,74.400002,640.2999,0.0,759.8738,85.99303,-61.88894,6.373354,7.585,7.03,673.8807,-68.26229,0.1131257,605.6184,290.2963,355.7825,13.56
1720863000000000,"""Site_A""",5.7,99.0,5.7,3.7,74.400002,640.2999,0.0,710.0383,81.92848,-66.81017,8.499489,8.17,7.669,628.1098,-75.30966,0.1149435,552.8002,288.3352,361.1105,13.56
1720863600000000,"""Site_A""",5.7,99.0,5.7,3.7,74.400002,640.2999,0.0,676.0035,75.55333,-63.78199,8.74075,9.3,8.78,600.4501,-72.52273,0.1101413,527.9274,297.0933,366.9775,13.56
1720864200000000,"""Site_A""",5.7,99.0,5.7,3.7,74.400002,640.2999,0.0,396.2463,41.9869,-42.61458,8.134896,8.94,8.63,354.2594,-50.74947,0.1037554,303.5099,316.4397,365.6068,13.56


In [84]:
# **処理時間の計測**
START_TIME = time.time()


# 🛠️ **日付フォーマット指定**
date_format = "%Y/%m/%d %H:%M"

# ❶ TIMESTAMPの範囲を先に取得
time_range = (
    pl.scan_csv(file, encoding="utf8-lossy", skip_rows=1, dtypes=DATATYPES, ignore_errors=True)
    .slice(4)
    .with_columns(pl.col("TIMESTAMP").str.strptime(pl.Datetime, date_format).dt.cast_time_unit("ms"))
    .select([pl.col("TIMESTAMP").min().alias("start_time"), pl.col("TIMESTAMP").max().alias("end_time")])
    .collect()
)

print(time_range)



# OK: DataFrameのrow(0)で取得するのが正解
start_time = time_range.row(0)[0]  # 0列目＝start_time
end_time = time_range.row(0)[1]    # 1列目＝end_time

# これでdatetime_rangeもエラーなしでいける！
timestamps = pl.datetime_range(
    start=start_time,
    end=end_time,
    interval="10m",
    time_unit="ms",
    eager=True
)


print(timestamps)

# 🗓️ 確認
df_range = pl.DataFrame({"TIMESTAMP": timestamps})
print(df_range)


df = (
    pl.scan_csv(file, encoding="utf8-lossy", skip_rows=1, dtypes=DATATYPES, ignore_errors=True)
    .slice(4)
    .with_columns(pl.col("TIMESTAMP").str.strptime(pl.Datetime, date_format).dt.cast_time_unit("ms"))
    .filter((pl.col("TIMESTAMP") >= pl.col("TIMESTAMP").min()) & (pl.col("TIMESTAMP") <= pl.col("TIMESTAMP").max()))
    .collect()
)

print(f"df_range TIMESTAMP dtype: {df_range.schema['TIMESTAMP']}")
print(f"df TIMESTAMP dtype: {df.schema['TIMESTAMP']}")


# ❺ TIMESTAMPの欠損を取得 (Series同士じゃなくDataFrame同士でjoin！)
missing_timestamps = df_range.join(df.select(pl.col("TIMESTAMP")), on="TIMESTAMP", how="anti")

if missing_timestamps.height > 0:
    print(f"-- ⚠️ TIMESTAMPの欠損ログ --\n{missing_timestamps}")
else:
    print("✅ TIMESTAMPに欠損なし！")

# ❻ UNIXTIMEに変換＆location追加
df = df.with_columns([
    pl.col("TIMESTAMP").dt.epoch(),
    pl.lit(site_name).alias("location")
]).rename({"TIMESTAMP": "timestamp"})


# **計測終了**
END_TIME = time.time()
print(f"-- 🚀 実行時間: {END_TIME - START_TIME:.4f} 秒 --")

df


shape: (1, 2)
┌─────────────────────┬─────────────────────┐
│ start_time          ┆ end_time            │
│ ---                 ┆ ---                 │
│ datetime[ms]        ┆ datetime[ms]        │
╞═════════════════════╪═════════════════════╡
│ 2022-09-26 13:10:00 ┆ 2024-07-13 10:00:00 │
└─────────────────────┴─────────────────────┘
shape: (94_446,)
Series: 'literal' [datetime[ms]]
[
	2022-09-26 13:10:00
	2022-09-26 13:20:00
	2022-09-26 13:30:00
	2022-09-26 13:40:00
	2022-09-26 13:50:00
	…
	2024-07-13 09:20:00
	2024-07-13 09:30:00
	2024-07-13 09:40:00
	2024-07-13 09:50:00
	2024-07-13 10:00:00
]
shape: (94_446, 1)
┌─────────────────────┐
│ TIMESTAMP           │
│ ---                 │
│ datetime[ms]        │
╞═════════════════════╡
│ 2022-09-26 13:10:00 │
│ 2022-09-26 13:20:00 │
│ 2022-09-26 13:30:00 │
│ 2022-09-26 13:40:00 │
│ 2022-09-26 13:50:00 │
│ …                   │
│ 2024-07-13 09:20:00 │
│ 2024-07-13 09:30:00 │
│ 2024-07-13 09:40:00 │
│ 2024-07-13 09:50:00 │
│ 2024-07-13 10:00

  pl.scan_csv(file, encoding="utf8-lossy", skip_rows=1, dtypes=DATATYPES, ignore_errors=True)
  pl.scan_csv(file, encoding="utf8-lossy", skip_rows=1, dtypes=DATATYPES, ignore_errors=True)


timestamp,RECORD,WS_10min_Avg,WD_10min_Avg,WS_10min_Max,AirTemp_10min_Avg,AirRH_10min_Avg,Atm_10min_Avg,Rain_10min_Tot,SR_dn_10min_Avg,SR_up_10min_Avg,IR_dn_10min_Avg,IR_up_10min_Avg,IR_up_Temp_10min_Avg,IR_dn_Temp_10min_Avg,NetRs_10min_Avg,NetRl_10min_Avg,Albedo_10_min_Avg,NetTot_10min_Avg,IR_dn_Corr_10min_Avg,IR_up_Corr_10min_Avg,BattVolt_10min_Min,location
i64,i32,f32,f32,f32,f32,f32,f64,f32,f64,f64,f64,f64,f32,f32,f64,f64,f64,f64,f64,f64,f32,str
1664207400000000,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,874.4415,108.1555,-119.7761,31.93631,11.82,11.28,766.286,-151.7124,0.1350726,614.5736,254.1391,403.0202,0.0,"""Site_A"""
1664208000000000,1,,,,,,,0.0,931.5999,126.8229,-126.5776,29.28855,12.03,11.48,804.7771,-155.8661,0.1361504,648.9109,248.4513,401.4445,0.0,"""Site_A"""
1664197800000000,2,1.808,258.399994,2.7,5.836,31.73,646.416,0.0,928.9788,126.6331,-126.2469,32.10106,12.11,11.59,802.3458,-158.348,0.1363156,643.9978,249.1938,404.7926,13.56,"""Site_A"""
1664198400000000,3,2.535,265.200012,3.4,5.668,30.549999,646.3317,0.0,924.3484,125.8074,-126.3353,33.52101,11.73,11.24,798.541,-159.8563,0.1361013,638.6846,247.1382,404.3836,13.56,"""Site_A"""
1664199000000000,4,1.972,269.5,2.9,5.533,30.790001,646.2999,0.0,342.7564,123.825,-41.3195,36.95861,10.64,10.4,218.9314,-78.27811,9.071039,140.6533,326.4338,403.498,13.57,"""Site_A"""
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
1720862400000000,94443,5.7,99.0,5.7,3.7,74.400002,640.2999,0.0,759.8738,85.99303,-61.88894,6.373354,7.585,7.03,673.8807,-68.26229,0.1131257,605.6184,290.2963,355.7825,13.56,"""Site_A"""
1720863000000000,94444,5.7,99.0,5.7,3.7,74.400002,640.2999,0.0,710.0383,81.92848,-66.81017,8.499489,8.17,7.669,628.1098,-75.30966,0.1149435,552.8002,288.3352,361.1105,13.56,"""Site_A"""
1720863600000000,94445,5.7,99.0,5.7,3.7,74.400002,640.2999,0.0,676.0035,75.55333,-63.78199,8.74075,9.3,8.78,600.4501,-72.52273,0.1101413,527.9274,297.0933,366.9775,13.56,"""Site_A"""
1720864200000000,94446,5.7,99.0,5.7,3.7,74.400002,640.2999,0.0,396.2463,41.9869,-42.61458,8.134896,8.94,8.63,354.2594,-50.74947,0.1037554,303.5099,316.4397,365.6068,13.56,"""Site_A"""
