In [None]:
%pip install pyarrow
%pip install tqdm

In [None]:

import pandas as pd
import os
from tqdm import tqdm
from collections import defaultdict
import concurrent.futures

# DEKN

https://data.open-power-system-data.org/household_data/

In [None]:
df = pd.read_csv("./Energy_graph/data/temp/household_data_15min_singleindex_filtered.csv")
df =df.drop(columns=["utc_timestamp", "interpolated"])
df

In [None]:
df["cet_cest_timestamp"] = df["cet_cest_timestamp"].apply(lambda x: x.split("+")[0])
df["cet_cest_timestamp"] = pd.to_datetime(df["cet_cest_timestamp"], format="%Y-%m-%dT%H:%M:%S")
df = df.set_index("cet_cest_timestamp")
df = df[~df.index.duplicated(keep='first')]

df

In [None]:

# Extract household identifiers
households = set(column.split('_')[2] for column in df.columns)

# Create a dictionary of dataframes, one for each household
dfs = {}

for household in households:
    # Filter columns relevant to this household
    relevant_columns = [col for col in df.columns if household in col]
    temp_df = df[relevant_columns].copy()

    # Rename columns to remove the prefix and retain the device name
    rename_dict = {col: col.replace(f"DE_KN_{household}_", "") for col in relevant_columns}
    temp_df.rename(columns=rename_dict, inplace=True)
    temp_df.rename(columns={'cet_cest_timestamp': 'timestamp', "grid_import": "aggregate"}, inplace=True)
    if "grid_export" in temp_df.columns:
        temp_df.drop(columns=['grid_export'], inplace=True)
    if "pv" in temp_df.columns:
        temp_df.drop(columns=['pv'], inplace=True)
    # temp_df.drop(columns=['grid_export', 'pv'], inplace=True)
    data = {}
    name ="DEKN_" +str(household[-1])
    for c in temp_df.columns:
        data[c] = pd.DataFrame(temp_df[c].dropna())
        
    dfs[name] = data

In [None]:
df2 = pd.read_excel("./Energy_graph/data/temp/household_data.xlsx")
df2 

# GREEND
https://sourceforge.net/projects/greend/



GREEND download form
Great to get to know you! 

Here are our dataset snapshots and the associated password:

v0.1: 
http://sourceforge.net/projects/greend/files/GREEND_0-1_311014.zip/download

PWD:"Vienna"


https://www.academia.edu/7794767/GREEND_An_Energy_Consumption_Dataset_of_Households_in_Italy_and_Austria

http://www.andreatonello.com/wp-content/uploads/PAPERS/CONFERENCES/SGC2014_2.pdf



In [None]:
df = pd.read_csv("./Energy_graph/data/temp/GREEND/building0/dataset_2013-12-07.csv", on_bad_lines="skip")
df


# TODO either fix NILMTK if possible or try to get id to device mapping from somewhere else

# ENERTALK

In [None]:
data_path = "./Energy_graph/data/temp/ENERTALK/enertalk"
def convert2KRtime(df):
    """
    convert dateframe's unix timestamp into Asia/Seoul Timezone
    
    input
    ----
        df: dataframe (columns: timestamp, active_power, reactive_power, appliance)
    
    output
    ----
        df_kr: dataframe (columns: timestamp, active_power, reactive_power, appliance, KR timezone)
    """ 

    df_kr = df
    df_kr['timestamp'] = df_kr['timestamp'].dt.tz_localize('UTC').dt.tz_convert('Asia/Seoul')
    df_kr = df_kr.set_index(pd.DatetimeIndex(df_kr['timestamp']))
    return df_kr


def preprocess_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    """
    Parse the name of the file to get the device name"
    """
    df.drop(columns=["reactive_power"], inplace=True)
    # convert unix timestamp to datetime and set as index
    df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms").dt.tz_localize('UTC').dt.tz_convert('Asia/Seoul')
    df.set_index("timestamp", inplace=True)
    # convert to kWh
    df  = df/1000 * (1/15)/3600
    # resample to 1 second
    df = df.resample("1S").sum()

    return df

def parse_name(file_name: str):
    """
    Parse the name of the file to get the device name"
    """
    # remove the extension
    file_name = file_name.split(".")[0]
    # get the device name
    file_name = file_name.split("_")[1]
 

    return file_name



def process_house(house):
    house_path = os.path.join(data_path, house)
    house_dict = defaultdict(list)
    house_name = "ENERTALK_" + str(int(house))
    
    for day in os.listdir(house_path):
        day_path = os.path.join(house_path, day)
        
        for device in os.listdir(day_path):
            device_path = os.path.join(day_path, device)
            name = parse_name(device)
            
            df = preprocess_dataframe(pd.read_parquet(device_path))
            house_dict[name].append(df)

    for key in house_dict:
        house_dict[key] = pd.concat(house_dict[key], axis=0)
    
    return house_name, house_dict


Serial program

In [None]:
from tqdm import tqdm
from collections import defaultdict
data_path = "./Energy_graph/data/temp/ENERTALK/enertalk"
data_dict = {}
for house in os.listdir(data_path):
    house_dict = defaultdict(list)
    house_name = "ENERTALK_" + str(int(house))
    for day in tqdm(os.listdir(data_path + "/" + house)):
        for device in os.listdir(data_path + "/" + house + "/" + day):
            name = parse_name(device)
            df = preprocess_dataframe(pd.read_parquet(data_path + "/" + house + "/" + day + "/" + device))
            house_dict[name].append(df)

    for key in house_dict:
        house_dict[key] = pd.concat(house_dict[key], axis=0)
    
    data_dict[house_name] = house_dict
    break
    



Multithreaded

In [None]:
import os
import pandas as pd
from collections import defaultdict
import concurrent.futures
from tqdm import tqdm

data_path = "./Energy_graph/data/temp/ENERTALK/enertalk"
data_dict = {}

def process_house(house, progress_bar=None):
    house_path = os.path.join(data_path, house)
    house_dict = defaultdict(list)
    house_name = "ENERTALK_" + str(int(house))
    
    for day in os.listdir(house_path):
        day_path = os.path.join(house_path, day)
        for device in os.listdir(day_path):
            device_path = os.path.join(day_path, device)
            name = parse_name(device)
            df = preprocess_dataframe(pd.read_parquet(device_path))
            house_dict[name].append(df)

    for key in house_dict:
        house_dict[key] = pd.concat(house_dict[key], axis=0)

    if progress_bar:
        progress_bar.update(1)  # Increment the progress bar when a house is processed

    return house_name, house_dict

houses = os.listdir(data_path)
# Create a progress bar with a total equal to the number of houses
with tqdm(total=len(houses), desc="Processing houses", unit="house") as progress_bar:
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Pass the progress_bar to the worker function
        futures = [executor.submit(process_house, house, progress_bar) for house in houses]
        for future in concurrent.futures.as_completed(futures):
            house_name, house_dict = future.result()
            data_dict[house_name] = house_dict


Multiprocessed

In [None]:
import os
import pandas as pd
from collections import defaultdict
import concurrent.futures
from tqdm import tqdm
import multiprocessing


data_dict = {}

def process_house(house_path, queue):
    house = os.path.basename(house_path)  # Extract house name from the path
    house_dict = defaultdict(list)
    house_name = "ENERTALK_" + str(int(house))
    
    for day in os.listdir(house_path):
        day_path = os.path.join(house_path, day)
        for device in os.listdir(day_path):
            device_path = os.path.join(day_path, device)
            name = parse_name(device)
            df = preprocess_dataframe(pd.read_parquet(device_path))
            house_dict[name].append(df)

    for key in house_dict:
        house_dict[key] = pd.concat(house_dict[key], axis=0)

    queue.put(1)  # Indicate that one house has been processed
    return house_name, house_dict

# Construct full paths for each house directory
data_path = "./Energy_graph/data/temp/ENERTALK/"
house_paths = [os.path.join(data_path, house) for house in os.listdir(data_path)]
queue = multiprocessing.Manager().Queue()

with tqdm(total=len(house_paths), desc="Processing houses", unit="house") as progress_bar:
    with concurrent.futures.ProcessPoolExecutor(max_workers=os.cpu_count()/2) as executor:
        futures = [executor.submit(process_house, house_path, queue) for house_path in house_paths]
        
        # Update progress bar based on queue
        for _ in concurrent.futures.as_completed(futures):
            progress_bar.update(queue.get())

        for future in futures:
            house_name, house_dict = future.result()
            data_dict[house_name] = house_dict


In [None]:
data_dict["ENERTALK_0"]
# save with pickle
import pickle
with open("./Energy_graph/data/processed/ENERTALK.pkl", "wb") as f:
    pickle.dump(data_dict, f, protocol=pickle.HIGHEST_PROTOCOL)

In [None]:

df = pd.read_parquet("./Energy_graph/data/temp/ENERTALK/enertalk/00/20161101/02_washing-machine.parquet.gzip").drop(columns=["reactive_power"])
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms").dt.tz_localize('UTC').dt.tz_convert('Asia/Seoul')
df.set_index("timestamp", inplace=True)
df  = df/1000 * (1/15)/3600
df.resample("1S").sum()

In [None]:
test_str = "02_washing-machine.parquet.gzip"
def parse_name(file_name: str):
    """
    Parse the name of the file to get the device name"
    """
    # remove the extension
    file_name = file_name.split(".")[0]
    # get the device name
    file_name = file_name.split("_")[1]
 

    return file_name

print(parse_name(test_str))

# DEDDIAG

In [None]:
# get map of item_id to label for appliance
labels = pd.read_csv("./Energy_graph/data/temp/DEDDIAG/house_08/items.tsv", sep="\t")
labels.set_index("item_id", inplace=True)
id_label_map = labels["category"].to_dict()
id_label_map

In [None]:
def parse_id(file_name : str) -> int:
    return int(file_name.split('_')[1])

# watts to kWh given data frequency as a fraction of an hour (e.g. 0.5 for half-hourly data)
def watts2kwh(df, data_frequency):
    df = df/1000 * data_frequency
    return df


In [None]:
data_path = "./Energy_graph/data/temp/DEDDIAG/house_08/"
from tqdm import tqdm
data = {}

for device in tqdm([d for d in os.listdir(data_path) if "data" in d]):
    label = id_label_map[parse_id(device)]
    if "Phase" not in label:
        if "Total" in label:
            label = "aggregate"
        df = pd.read_csv(data_path + device, sep="\t")
        df["time"] = pd.to_datetime(df["time"])
        df.drop(columns=["item_id"], inplace=True)
        df.set_index("time", inplace=True)
        df = df[~df.index.duplicated(keep='first')]
        df = df.resample("1s").ffill()
        df.dropna(inplace=True)
        df = watts2kwh(df, 1/3600)
        print(label)
        data[label] = df

    
data_dict = {
    "DEDDIAG_8": data,
}


In [None]:
data["aggregate"].resample("D").sum()

# SUSTData


In [None]:
path = "./Energy_graph/data/temp/SUSTData/"
# aggregate consumption data
df_aggregate = pd.DataFrame()
for file in os.listdir(path + "aggregate"):
    if file.endswith(".csv"):
        df_aggregate = pd.concat([df_aggregate,(pd.read_csv(path+"aggregate/" + file))])

df_aggregate["timestamp"] = pd.to_datetime(df_aggregate["timestamp"])
df_aggregate.set_index("timestamp", inplace=True)
df_aggregate.drop(columns=['Unnamed: 0', "Q","V","I"], inplace=True)
df_aggregate.rename(columns={"P":"power"}, inplace=True)
data_dict = {"aggregate":df_aggregate}

In [None]:
def parse_name(file_name: str):
    """
    Parse the file name to get the appliance name
    """
    # appliance name
    appliance_name = file_name.split(".")[0].split("_")[1]
    # date
    return appliance_name


# appliance consumption data
for file in os.listdir(path+"appliances/"):
    if file.endswith(".csv"):
        print(parse_name(file))
        data_dict[parse_name(file)] = pd.read_csv(path + "appliances/" + file).set_index("timestamp")

In [None]:
data_dict["aggregate"]

# MFRED
unused for now because of aggregated apartments might be used later

In [None]:
df = pd.read_csv("./Energy_graph/data/temp/MFRED/MFRED_Aggregates_15min_2019Q1-Q4.csv")

df

# EMBED
TODO
http://embed-dataset.org/

# HEART


In [None]:
# watts to kWh given data frequency as a fraction of an hour (e.g. 0.5 for half-hourly data)
def watts2kwh(df, data_frequency):
    df = df/1000 * data_frequency
    return df
def parse_name(file_name: str):
    """
    Parse the file name to get the house name
    """
    # appliance name
    appliance_name = file_name.split(".")[0]

    # date
    return appliance_name[:5] + "_" + appliance_name[5:]


# df = pd.read_csv("./Energy_graph/data/temp/HEART/HERON33.csv")
# df["Timestamp"] = pd.to_datetime(df["Timestamp"], unit="ms")

# df = df.set_index("Timestamp").drop(columns=["dw", "wm"])
# df = watts2kwh(df, 1/3600)



In [None]:
data_path = "./Energy_graph/data/temp/HEART/"
data_dict = {}
for file in os.listdir(data_path):
    if file.endswith(".csv"):
        # 
        df = pd.read_csv(data_path + file)
        # convert unix timestamp to datetime
        df["Timestamp"] = pd.to_datetime(df["Timestamp"], unit="ms")
        # set datetime as index and drop unnecessary columns
        df = df.set_index("Timestamp").drop(columns=["dw", "wm"])
        
        df.rename(columns={"Value": "aggregate"}, inplace=True)
        # convert watts to kilowatt hours
        df = watts2kwh(df, 1/3600)
        df.dropna(inplace=True)
        # create a dictionary of dataframes for each device
        devices_dict = {}
        for device in df.columns:
                devices_dict[device] = pd.DataFrame(df[device])
        # add the device dictionary to the data dictionary
        data_dict[parse_name(file)] = devices_dict



# IDEAL

In [None]:
# watts to kWh given data frequency as a fraction of an hour (e.g. 0.5 for half-hourly data)
def watts2kwh(df, data_frequency):
    df = df/1000 * data_frequency
    return df
def read_and_preprocess_df(path):
    df = pd.read_csv(path, header=None, names=["timestamp", "value"])
    df["timestamp"] = pd.to_datetime(df["timestamp"])
    # set timestamp as index
    df = df.set_index("timestamp")
    df.sort_index(inplace=True)
    # resample to 7s and forward fill up to 35s
    df = df.resample("7s").ffill(limit=7).dropna()

    # convert to kWh
    df = watts2kwh(df, 7/3600)
    return df
# get house name and appliance name from file name
def parse_name(file_name : str):
    file_name = file_name.split("_")
    house_name = file_name[0].replace("home", "IDEAL_")
    appliance_name = file_name[3]
    if appliance_name == "electric-mains":
        appliance_name = "aggregate"

    if appliance_name == "electric-appliance":
        appliance_name = file_name[4].split(".")[0]
    return house_name, appliance_name

def process_house(house, file_list, data_path):
    house_data = {}
    for file in file_list:
        _, label, df = process_file(file, data_path)
        house_data[label] = df
    return house, house_data


def process_file(file,data_path):
    house, label = parse_name(file)
    return house, label, read_and_preprocess_df(data_path + "data_merged/" + file)

def process_files_for_home(house, file_list, data_path):
    house_data = {}
    for file in file_list:
        _, label, df = process_file(file, data_path)
        house_data[label] = df
    return house, house_data



In [None]:
test_str = "home168_kitchen1534_sensor12520_electric-appliance_washingmachinetumbledrier.csv.gz"


parse_name(test_str)

Serial program

In [None]:
data = {}
data_path = "./Energy_graph/data/temp/IDEAL/"
files = [file for file in os.listdir(data_path + "data_merged/") if ("electric-appliance" in file or "electric-mains" in file) and "home223" not in file]

for file in tqdm(files):
    house, label = parse_name(file)
    data.setdefault(house, {})[label] = read_and_preprocess_df(data_path+"data_merged/" + file)
        



Multiprocessed(takes around 1m:30s with 64 cores) 

In [None]:
from concurrent.futures import ProcessPoolExecutor
from tqdm.notebook import tqdm
from collections import defaultdict
def unpack_and_process(p):
    return process_house(*p)
# Main script body
data_path = "./Energy_graph/data/temp/IDEAL/"
data_dict = {}
files_grouped_by_home = defaultdict(list)
files = [file for file in os.listdir(data_path + "data_merged/") if ("electric-appliance" in file or "electric-mains" in file) and "home223" not in file]
for file in files:
    house, _ = parse_name(file)
    files_grouped_by_home[house].append(file)

total_houses = len(files_grouped_by_home)

print("Processing houses...")
with ProcessPoolExecutor(max_workers=int(os.cpu_count()/2)) as executor, tqdm(total=total_houses, desc="Processing houses", unit="house") as t:
    args = ((house, files_grouped_by_home[house], data_path) for house in files_grouped_by_home)
    
    for house_name, house_data in executor.map(unpack_and_process, args):
        data_dict[house_name] = house_data
        t.update(1)

In [None]:
data_dict

In [None]:
# save with pickle to: energy-knowledge-graph\data\processed\IDEAL.pkl

import pickle

with open('./Energy_graph/data/processed/IDEAL.pkl', 'wb') as f:
    pickle.dump(data_dict, f, protocol=pickle.HIGHEST_PROTOCOL)

# RAE
TODO
needs to be cited
https://dataverse.harvard.edu/dataset.xhtml?persistentId=doi:10.7910/DVN/ZJW4LC