In [None]:
import csv
import datetime
import os
from io import StringIO

from azure.storage.filedatalake import DataLakeServiceClient
from dotenv import load_dotenv
import pandas
import psycopg2

In [None]:
class DirectoryRestructure:
    def __init__(
        self
        , device_id: str
        , adp_datalake_input_container_name: str
        , target_date: datetime.date
        , master_df: pandas.DataFrame
        , env_file_path: str = "/Workspace/Users/tomoya-m@aicello.co.jp/20250529_ディレクトリー構成変更/.env"
    ) -> None:

        # .env ファイルから環境変数を設定する。
        load_dotenv(dotenv_path=env_file_path)

        # インスタンス変数を設定する。
        self.device_id = device_id
        self.adp_datalake_connection_string = os.environ["ADP_DATALAKE_CONNECTION_STRING"]
        self.adp_datalake_account_name = "dlsaicellodatalake001"
        self.adp_datalake_input_containers_name = adp_datalake_input_container_name
        self.adp_datalake_output_container_name = "scada-archive"
        self.target_year = str(target_date.year).zfill(4)
        self.target_month = str(target_date.month).zfill(2)
        self.target_day = str(target_date.day).zfill(2)
        self.master_df = master_df

        datalake_service_client = DataLakeServiceClient.from_connection_string(self.adp_datalake_connection_string)
        self.input_file_system_client = datalake_service_client.get_file_system_client(self.adp_datalake_input_containers_name)
        self.output_file_system_client = datalake_service_client.get_file_system_client(self.adp_datalake_output_container_name)

        return


    def main(self):
        data = self.read_csv()
        prepped_data = self.rename_header(data=data)
        self.write_csv(data=prepped_data)
        return


    def read_csv(self) -> list:
        # 入力ファイルパスを設定する。
        input_path = f"year={self.target_year}/month={self.target_month}/{self.target_year}{self.target_month}{self.target_day}.csv"

        # ファイルクライアントを取得する。
        input_file_client = self.input_file_system_client.get_file_client(input_path)

        # 入力ファイルを読み込む。
        download = input_file_client.download_file()
        content = download.readall().decode("utf-8")

        reader = csv.reader(StringIO(content))

        return list(reader)


    def rename_header(self, data: list) -> list:
        original_header = data[0]
        new_header = []

        for col in original_header:
            # 該当するUUIDを探す。
            matched_rows = self.master_df[
                (self.master_df["physical_name"]==col) | (self.master_df["logical_name"]==col)
            ]

            if not matched_rows.empty:
                # UUIDを取得する。
                matched_uuid = matched_rows.iloc[0]["uuid"]

                # UUIDに対応する`is_latest == True`の行を探す。
                latest_row = self.master_df[
                    (self.master_df["uuid"]==matched_uuid) & (self.master_df["is_latest"]==True)
                ]

                #
                if not latest_row.empty:
                    new_name = latest_row.iloc[0]["logical_name"]
                    new_header.append(new_name)
                else:
                    raise ValueError(f"{col=}で名称の変換が正しくできません。")

            else:
                raise ValueError(f"{col=}に対応するUUIDが見つかりませんでした。")

        data[0] = new_header

        return data


    def write_csv(self, data: list):
        # 出力ファイルパスを設定する。
        output_path = f"device_id={self.device_id}/year={self.target_year}/month={self.target_month}/{self.target_year}{self.target_month}{self.target_day}.csv"

        # ファイルクライアントを取得する。
        output_file_client = self.output_file_system_client.get_file_client(output_path)

        # メモリに書き込む。
        buffer = StringIO()
        writer = csv.writer(buffer)
        writer.writerows(data)
        buffer.seek(0)

        # BOM付きUTF-8でエンコードする。
        encoded_data = buffer.getvalue().encode("utf-8-sig")

        # ファイルを書き込む。
        output_file_client.upload_data(
            data=encoded_data
            , overwrite=True
        )

        buffer.close()

        return

In [None]:
def get_master_df(table_id: int):
    host = "psql-scadadbserver-prod-001.postgres.database.azure.com"
    database_name = "scada_master"
    user_name = "sqladminuser"
    password = "aicello-DS"

    connection = psycopg2.connect(f"host={host} dbname={database_name} user={user_name} password={password}")
    cursor = connection.cursor()
    cursor.execute(
        f"""
        SELECT
            column_uuid as uuid,
            logical_name,
            physical_name,
            is_latest
        FROM columns
        WHERE table_id={table_id}
        """
    )

    query_result = cursor.fetchall()
    master_df = pandas.DataFrame(query_result, columns=[desc[0] for desc in cursor.description])

    return master_df

In [None]:
devices = {
    "scada_cleancontainers_unit2": {
        "table_id": 3002
        , "container_name": "cleancontainers-scada-unit2-002"
        , "start_date": datetime.date(2025, 3, 25)
    }

    , "scada_cleancontainers_unit3": {
        "table_id": 3003
        , "container_name": "cleancontainers-scada-unit3-002"
        , "start_date": datetime.date(2024, 7, 9)
    }

    , "scada_cleancontainers_unit5": {
        "table_id": 3005
        , "container_name": "cleancontainers-scada-unit5-002"
        , "start_date": datetime.date(2024, 7, 10)
    }

    , "scada_cleancontainers_unit6": {
        "table_id": 3006
        , "container_name": "cleancontainers-scada-unit6-002"
        , "start_date": datetime.date(2025, 2, 25)
    }

    , "scada_cleancontainers_unit7": {
        "table_id": 3007
        , "container_name": "cleancontainers-scada-unit7-002"
        , "start_date": datetime.date(2024, 6, 3)
    }

    , "scada_cleancontainers_unit10": {
        "table_id": 3010
        , "container_name": "cleancontainers-scada-unit10-002"
        , "start_date": datetime.date(2025, 4, 3)
    }

    , "scada_solublon_unit21": {
        "table_id": 6021
        , "container_name": "solublon-scada-unit21-002"
        , "start_date": datetime.date(2024, 5, 29)
    }

    , "scada_solublon_unit22": {
        "table_id": 6022
        , "container_name": "solublon-scada-unit22-002"
        , "start_date": datetime.date(2024, 5, 29)
    }

    , "scada_solublon_factory2_geneki": {
        "table_id": 6121
        , "container_name": "solublon-scada-factory2-geneki-002"
        , "start_date": datetime.date(2024, 5, 29)
    }

    , "scada_solublon_factory2_kako": {
        "table_id": 6122
        , "container_name": "solublon-scada-factory2-kako-002"
        , "start_date": datetime.date(2024, 5, 29)
    }

    , "scada_solublon_unit31": {
        "table_id": 6031
        , "container_name": "solublon-scada-unit31-002"
        , "start_date": datetime.date(2024, 12, 2)
    }

    , "scada_solublon_unit32": {
        "table_id": 6032
        , "container_name": "solublon-scada-unit32-002"
        , "start_date": datetime.date(2025, 1, 21)
    }

    , "scada_solublon_unit33": {
        "table_id": 6033
        , "container_name": "solublon-scada-unit33-002"
        , "start_date": datetime.date(2025, 1, 21)
    }

    , "scada_solublon_site3_infrastructure": {
        "table_id": 6132
        , "container_name": "solublon-scada-site3-infrastructure-002"
        , "start_date": datetime.date(2024, 10, 23)
    }
}

In [None]:
device_id = "scada_solublon_factory2_kako"

print(f"{device_id}の処理をスタートします。")

table_id = devices[device_id]["table_id"]
container_name = devices[device_id]["container_name"]
start_date = devices[device_id]["start_date"]

master_df = get_master_df(table_id=table_id)


process_date = start_date
while process_date < datetime.datetime.now().date():
    print(f"{process_date}分のアーカイブファイルを処理します。")

    DirectoryRestructure(
        device_id=device_id
        , adp_datalake_input_container_name=container_name
        , target_date=process_date
        , master_df=master_df
    ).main()

    process_date += datetime.timedelta(days=1)
