In [24]:
# Since this notebook is supposed to run on CPU-based environments, we list the CPU configurations

!lscpu

Architecture:        x86_64
CPU op-mode(s):      32-bit, 64-bit
Byte Order:          Little Endian
CPU(s):              36
On-line CPU(s) list: 0-35
Thread(s) per core:  1
Core(s) per socket:  18
Socket(s):           2
NUMA node(s):        2
Vendor ID:           GenuineIntel
CPU family:          6
Model:               79
Model name:          Intel(R) Xeon(R) CPU E5-2695 v4 @ 2.10GHz
Stepping:            1
CPU MHz:             3300.000
CPU max MHz:         3300.0000
CPU min MHz:         1200.0000
BogoMIPS:            4190.29
L1d cache:           32K
L1i cache:           32K
L2 cache:            256K
L3 cache:            46080K
NUMA node0 CPU(s):   0-17
NUMA node1 CPU(s):   18-35
Flags:               fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl smx est tm2 ssse3 s

In [25]:
# Verify number of CPU cores available to spawn different jobs

import multiprocessing

cpu_cores = multiprocessing.cpu_count()
print(f'Number of cpu cores available is {cpu_cores}.')

Number of cpu cores available is 36.


In [26]:
# Set up the basic prerequisites

import os
import numpy as np
import pandas as pd

base_dir = '/lcrc/project/NEXTGENOPT/NREL_COMSTOCK_DATA'
versionID = 32

metadata = pd.read_parquet(base_dir+f'/upgrade{versionID}.parquet') 

# split into different meradatas depending upon which state it is from

metadata_grouped = metadata.groupby('in.state')
m_states = [group for _,group in metadata_grouped]

# split the metadata into list-of-lists which can be parallelized 

def chunk_list(lst, m):
    # function to chunk list almost equally into list-of-lists
    n = len(lst)
    avg = n / m
    out = []
    last = 0.0

    while last < n:
        out.append(lst[int(last):int(last + avg)])
        last += avg

    return out

m_states_parallel = chunk_list(m_states,cpu_cores)

  metadata_grouped = metadata.groupby('in.state')


In [27]:
# In this block, define the basic function which will allow parallelization

power_consumption_feature_name = 'out.electricity.total.energy_consumption'

def df_col_idx(idx,colname,df):
    
    # index a dataframe column as if it were a list
    # note: use SPARINGLY, can be expensive
    
    dfcol = df[colname].tolist()
    return dfcol[idx]

def parallel_function(m_states_superlist):
    
    for mdata in m_states_superlist:
        
        state_name = df_col_idx(0,'in.state',mdata)
        print(f'Executing state {state_name} on process PID {os.getpid()}.')
        
        # load all data
        bldg_ID = mdata.index.tolist()
        county_ID = mdata['in.nhgis_county_gisjoin'].tolist()
        
        corrs, retained_idx, avg_power = [], [], []
        ftr_name, ftr_name_recorded = [], False
        
        for bid,co_id in zip(bldg_ID,county_ID):
            
            if not (
                    os.path.exists(base_dir+f'/{state_name}/{bid}-{versionID}.parquet') 
                    and 
                    os.path.exists(base_dir+f'/weather/{co_id}_2018.csv')):
                print(f'Could not find building ID {bid} and/or weather data {co_id} for for state {state_name}. Continuing.')
                continue
            
            
            b_df = pd.read_parquet(base_dir+f'/{state_name}/{bid}-{versionID}.parquet')
            w_df = pd.read_csv(base_dir+f'/weather/{co_id}_2018.csv')
            retained_idx.append(bid)
            float_columns = b_df.select_dtypes(include=['float', 'float16', 'float32', 'float64']).columns.tolist()
            other_cols = [colname for colname in float_columns if colname != power_consumption_feature_name]
            weather_cols = w_df.select_dtypes(include=['float', 'float16', 'float32', 'float64']).columns.tolist()
            
            power_consumption = b_df[power_consumption_feature_name].to_numpy()
            avg_power.append(power_consumption.mean())
            
            other_features_corr = []
            
            # building specific features
            for ftr in other_cols:
                
                other_feature = b_df[ftr].to_numpy()
                with np.errstate(invalid='ignore'):
                    other_features_corr.append(np.corrcoef(power_consumption,other_feature)[0,1])
                if not ftr_name_recorded:
                    ftr_name.append(ftr)
                    
            # weather features
            for ftr in weather_cols:
                
                other_feature = np.repeat(w_df[ftr].to_numpy(),4)
                with np.errstate(invalid='ignore'):
                    other_features_corr.append(np.corrcoef(power_consumption,other_feature)[0,1])
                if not ftr_name_recorded:
                    ftr_name.append(ftr)
                    
            ftr_name_recorded = True
            
            corrs.append(other_features_corr)
            
        # take average of all correlations across buildings
        
        corrs_all_bldg = np.array(corrs).mean(axis=0)
        
        # descending order sort
        idx_desc = np.argsort(corrs_all_bldg)[::-1]
        feature_name_desc = [ftr_name[i] for i in idx_desc]
        corrs_desc = corrs_all_bldg[idx_desc]
        
        # before we print the results, we are also going to do a static features analysis
        # columns in the metadata which are numeric in value
        float_columns = mdata.select_dtypes(include=['float', 'float16', 'float32', 'float64']).columns.tolist()
        avg_power = np.array(avg_power)
        static_corr = []
        
        for ftr in float_columns:
            ftr_vec = mdata.loc[retained_idx, ftr].to_numpy()
            with np.errstate(invalid='ignore'):
                static_corr.append(np.corrcoef(avg_power,ftr_vec)[0,1])
        static_corr = np.array(static_corr)
        
        # organize static features in descending order
        # descending order sort
        idx_static_desc = np.argsort(static_corr)[::-1]
        feature_name_static_desc = [float_columns[i] for i in idx_static_desc]
        corrs_static_desc = static_corr[idx_static_desc]
        
        # print correlations of dynamic features
        print(f'For state {state_name}, the dynamic feature correlation to the building power consumption, average across all buildings, are as follows.:')
        for fname,fval in zip(feature_name_desc,corrs_desc):
            if np.isnan(fval):
                continue
            print(f"{fname}:\t {fval}")
            
        # print correlations of static features
        print(f'For state {state_name}, the static feature correlation to the building power consumption, average across all buildings, are as follows.:')
        for fname,fval in zip(feature_name_static_desc,corrs_static_desc):
            if np.isnan(fval):
                continue
            print(f"{fname}:\t {fval}")
            
    return 0


In [28]:
# In this block, we achieve parallelization

from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor(max_workers=cpu_cores) as executor:
    results = list(executor.map(parallel_function, m_states_parallel))

Executing state AK on process PID 414154.
Executing state AL on process PID 414153.
Executing state AZ on process PID 414155.
Executing state DE on process PID 414158.Executing state DC on process PID 414159.

Executing state CO on process PID 414157.
Executing state CA on process PID 414156.
Executing state HI on process PID 414161.
Executing state ID on process PID 414162.Executing state FL on process PID 414160.

Executing state IN on process PID 414163.
Executing state IA on process PID 414164.
Executing state KS on process PID 414165.
Executing state LA on process PID 414166.
Executing state ME on process PID 414167.
Executing state MA on process PID 414168.
Executing state MI on process PID 414169.
Executing state MS on process PID 414170.
Executing state MT on process PID 414172.
Executing state NV on process PID 414173.Executing state MO on process PID 414171.

Executing state NH on process PID 414174.
Executing state NM on process PID 414175.
Executing state ND on process PID 

Process ForkProcess-213:
Process ForkProcess-189:
Process ForkProcess-210:
Process ForkProcess-187:
Process ForkProcess-205:
Process ForkProcess-182:
Process ForkProcess-204:
Traceback (most recent call last):


out.params.vrf_heating_total_supplemental_load_electric..j:	 nan

Traceback (most recent call last):
  File "/home/sbose/.conda/envs/DATA_MANAGE/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/home/sbose/.conda/envs/DATA_MANAGE/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/sbose/.conda/envs/DATA_MANAGE/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/sbose/.conda/envs/DATA_MANAGE/lib/python3.10/concurrent/futures/process.py", line 240, in _process_worker
    call_item = call_queue.get(block=True)
  File "/home/sbose/.conda/envs/DATA_MANAGE/lib/python3.10/multiprocessing/queues.py", line 102, in get
    with self._rlock:
  File "/home/sbose/.conda/envs/DATA_MANAGE/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/sbose/.conda/envs/DATA_MANAGE/lib/python3.10/multiprocessing/synchroniz

Process ForkProcess-202:
Process ForkProcess-188:
Process ForkProcess-207:
Process ForkProcess-192:
Process ForkProcess-197:
Process ForkProcess-201:
Process ForkProcess-206:
Process ForkProcess-215:
Process ForkProcess-203:
Process ForkProcess-209:
Process ForkProcess-193:
Process ForkProcess-216:
Process ForkProcess-195:
Process ForkProcess-200:
Process ForkProcess-191:
Process ForkProcess-208:
Process ForkProcess-190:
Process ForkProcess-199:
Process ForkProcess-194:
Process ForkProcess-185:
Process ForkProcess-214:
Process ForkProcess-212:
Process ForkProcess-184:
Process ForkProcess-211:
Process ForkProcess-198:
Process ForkProcess-196:
Process ForkProcess-186:
Process ForkProcess-183:
Process ForkProcess-181:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call la

KeyboardInterrupt: 