Run this cell to define support functions

In [None]:
import sys
import os
import math
import numpy as np
from comtrade import Comtrade

kVLNbase = 230.0 / math.sqrt(3.0)
MVAbase = 100.0
kAbase = MVAbase / kVLNbase / 3.0

# file names used in the test suites
flats = ['fsicrv', 'fsicrq0', 'fsicrqp', 'fsicrqn', 'fsicrpf0', 'fsicrpfp', 'fsicrpfn',
         'fsminv', 'fsminq0', 'fsminqp', 'fsminqn', 'fsminpf0', 'fsminpfp', 'fsminpfn']

uvrts = ['uvq03sag', 'uvq03pg', 'uvq01pg', 'uvq02pg', 'uvq02p', 
         'uvqp3sag', 'uvqp3pg', 'uvqp1pg', 'uvqp2pg', 'uvqp2p', 
         'uvqn3sag', 'uvqn3pg', 'uvqn1pg', 'uvqn2pg', 'uvqn2p']

ovrts = ['ovq0', 'ovqp', 'ovqn']

freqs = ['oficr', 'uficr', 'ofmin', 'ufmin']

angles = ['anicr', 'apicr', 'anmin', 'apmin']

steps = ['stvref', 'stqref', 'stpfref', 'stpref']

ramps = ['rampscr']

# test suite definitions: title, files, tmax for each
test_suites = {'fs': {'title': 'Weak-grid model initialization tests', 'files': flats, 
                      'tmax_pscad': 20.0, 'tmax_emtp':20.0, 
                      'tevents_pscad':[0.0], 'tevents_emtp':[0.0]},
               'uv': {'title': 'Weak-grid undervoltage ride-through tests', 'files': uvrts, 
                      'tmax_pscad': 20.0, 'tmax_emtp':20.0, 
                      'tevents_pscad':[10.0], 'tevents_emtp':[5.0]},
               'ov': {'title': 'Weak-grid overvoltage ride-through tests', 'files': ovrts, 
                      'tmax_pscad': 20.0, 'tmax_emtp':20.0, 
                      'tevents_pscad':[10.0], 'tevents_emtp':[5.0]},
               'fr': {'title': 'Weak-grid frequency ride-through tests', 'files': freqs, 
                      'tmax_pscad': 40.0, 'tmax_emtp':40.0, 
                      'tevents_pscad':[10.0], 'tevents_emtp':[10.0]},
               'an': {'title': 'Weak-grid angle ride-through tests', 'files': angles, 
                      'tmax_pscad': 40.0, 'tmax_emtp':30.0, 
                      'tevents_pscad':[10.0], 'tevents_emtp':[10.0]},
               'sc': {'title': 'Short-circuit ratio ramp-down test', 'files': ramps, 
                      'tmax_pscad': 45.0, 'tmax_emtp':45.0, 
                      'tevents_pscad':[10.0], 'tevents_emtp':[10.0]},
               'st': {'title': 'Control reference step tests', 'files': steps, 
                      'tmax_pscad': 50.0, 'tmax_emtp':50.0, 
                      'tevents_pscad':[10.0], 'tevents_emtp':[10.0]}}

def scale_factor(lbl, bPSCAD):
  if 'P' in lbl:
    return 1.0 / MVAbase
  elif 'Q' in lbl:
    return 1.0 / MVAbase
  elif 'I' in lbl:
    return 1.0 / kAbase / math.sqrt(2.0)
  elif 'Vrms' in lbl:
    if not bPSCAD:
      return 1.0 / kVLNbase
  elif 'V' in lbl:
    return 1.0 / kVLNbase / math.sqrt(2.0)
  return 1.0

# load all the analog channels from each case into dictionaries of numpy arrays. Expecting:
#   1..3 = Va..Vc
#   4..6 = Ia..Ic
#   7 = Vrms
#   8 = P
#   9 = Q
#   10 = F
def load_channels(comtrade_path, bDebug=False, bAll=True, choices=None):
  if bDebug:
    print (comtrade_path)
  rec = Comtrade ()
  rec.load (comtrade_path + '.cfg', comtrade_path + '.dat')
  t = np.array(rec.time)

  channels = {}
  units = {}
  channels['t'] = t
  if bDebug:
    print ('{:d} channels ({:d} points) found in {:s}.cfg'.format (rec.analog_count, len(t), comtrade_path))
  for i in range(rec.analog_count):
    lbl = rec.analog_channel_ids[i].strip()
    # for PSCAD naming convention, truncate the channel at first colon, if one exists
    idx = lbl.find(':')
    if idx >= 0:
      lbl = lbl[0:idx]
    if bAll or lbl in choices:
      ch_config = rec.cfg.analog_channels[i]
      scale = 1.0
      if ch_config.pors.upper() == 'P':
        scale = ch_config.secondary / ch_config.primary
      elif ch_config.pors.upper() == 'S':
        scale = ch_config.primary / ch_config.secondary
      if bDebug:
        print ('  "{:s}" [{:s}] scale={:.6e}'.format(lbl, ch_config.uu, scale))
      channels[lbl] = scale * np.array (rec.analog[i])
      units[lbl] = ch_config.uu
    elif bDebug:
      print ('  skipping "{:s}"'.format(lbl))

  return channels, units

Run this next cell to enable [Matplotlib](https://matplotlib.org/)

In [None]:
import matplotlib.pyplot as plt

def setup_plot_options():
  plt.rcParams['savefig.directory'] = os.getcwd()
  lsize = 6
  plt.rc('font', family='serif')
  plt.rc('xtick', labelsize=lsize)
  plt.rc('ytick', labelsize=lsize)
  plt.rc('axes', labelsize=lsize)
  plt.rc('legend', fontsize=4)
#  clr = plt.get_cmap('tab20c').colors
#  plt.axes().set_prop_cycle('color', clr)

def show_comparison_plot (chd, unitd, title, bPSCAD, tmax, PNGName=None):
  fig, ax = plt.subplots(4, 1, sharex = 'col', figsize=(7,7), constrained_layout=True)
  fig.suptitle (title)

  channel_labels = ['Vrms', 'P', 'Q', 'F']
  y_labels = ['Vrms [pu]', 'P [pu]', 'Q [pu]', 'F [Hz]']
  x_ticks = np.linspace (0.0, tmax, 11)

  for key in chd:
    ch = chd[key]
    for i in range(4):
      lbl = channel_labels[i]
      ax[i].plot (ch['t'], scale_factor(lbl, bPSCAD) * ch[lbl], label=key)

  bLegend = True
  if len(chd) < 2:
    bLegend = False
  for i in range(4):
    ax[i].set_ylabel (y_labels[i])
    ax[i].set_xticks (x_ticks)
    ax[i].set_xlim (x_ticks[0], x_ticks[-1])
    ax[i].grid()
    if bLegend:
      ax[i].legend(loc='lower right')
  ax[3].set_xlabel ('Time [s]')
#  if not bPSCAD:
#    ax[0].set_ylim (0.85, 1.1)

  if PNGName is not None:
    plt.savefig(PNGName)
  plt.show()

%matplotlib widget

setup_plot_options()

This support function processes and plots one test suite: fs, uv, ov, fr, an, st, sc

In [None]:
def plot_test_suite (suite, session_path, case_tag, bPSCAD, PNGName):
  test_title = suite['title']
  test_files = suite['files']
  if bPSCAD:
    test_tmax = suite['tmax_pscad']
  else:
    test_tmax = suite['tmax_emtp']
  channels = {}
  units = {}
  for tag in test_files:
    tag_path = os.path.join (session_path, '{:s}'.format (tag))
    channels[tag], units[tag] = load_channels (tag_path)
    if bPSCAD: # cosmetic initialization of the frequency plot
      channels[tag]['F'][0] = 60.0
  title = '{:s}: {:s}'.format(test_title, case_tag)
  show_comparison_plot (channels, units, title, bPSCAD, test_tmax, PNGName)


These support functions calculate the metrics

In [None]:
def window_metrics (t, v, scale=1.0, t_fault=1.0, t_start=None, t_end=None):
  if t_start is None:
    t_start = t[0]
  if t_end is None:
    t_end = t[-1]
  n1=min(np.argwhere(t>=t_start))[0]
  n2=max(np.argwhere(t<=t_end))[0]
  y = scale*v[n1:n2]
  x = t[n1:n2]
  idx_start = 0
  if x[0] < t_fault:
    idx_start = max(np.argwhere(t<t_fault))[0]
  y_init = y[idx_start]
  x_init = x[idx_start]
  y_final = y[-1]
  x_final = x[-1]
  y_mean = np.mean(y)
  y_max = np.max(y)
  x_max = x[np.argmax(y)]
  y_min = np.min(y)
  x_min = x[np.argmin(y)]

  # look for the 10% and 90% points
  pu_reaction = 0.1
  pu_rise = 0.9
  sb_pu = 0.05

  y_reaction = y_init + pu_reaction * (y_final - y_init)
  reverse = False
  if y_final < y_init:
    reverse = True
    y_peak = y_min
    x_peak = x_min
    idx_reaction = min(np.argwhere(y<=y_reaction))[0]
    y_rise = y_init + pu_rise * (y_min - y_init)
    idx_rise = min(np.argwhere(y<=y_rise))[0]
  else:
    y_peak = y_max
    x_peak = x_max
    idx_reaction = min(np.argwhere(y>=y_reaction))[0]
    y_rise = y_init + pu_rise * (y_max - y_init)
    idx_rise = min(np.argwhere(y>=y_rise))[0]
  x_reaction = x[idx_reaction]
  x_rise = x[idx_rise]

  # some figures of merit
  x_fault = t_fault # should be known from the EMT simulation
  y_fault = y_init

  # wavefront and damping measurements
  reaction_time = x_reaction - x_fault
  rise_time = x_rise - x_reaction
  denom = abs(y_final - y_init)
  if denom > 0.0:
    overshoot = abs(y_peak - y_final) / denom
    ln_overshoot = math.log(overshoot)
    damping_by_overshoot = -ln_overshoot / math.sqrt (math.pi*math.pi + ln_overshoot*ln_overshoot)
  else:
    overshoot = 0.0
    damping_by_overshoot = 1.0

  # settling band measurements
  sb_pu *= abs(y_final)
  sb1 = y_final + sb_pu
  sb2 = y_final - sb_pu
  arg1 = np.argwhere(y>=sb1)
  arg2 = np.argwhere(y<=sb2)
  if len(arg1) > 0 and len(arg2) > 0:
    nb1 = max(arg1)[0]
    nb2 = max(arg2)[0]
    if nb1 > nb2:
      y_settle = y[nb1]
      x_settle = x[nb1]
    else:
      y_settle = y[nb2]
      x_settle = x[nb2]
    settling_time = x_settle - x_reaction
  else:
    settling_time = 0.0
    
  d = {'Min':{'x':x_min, 'y':y_min},
       'Max':{'x':x_max, 'y':y_max},
       'Init':{'x':x_init, 'y':y_init},
       'Final':{'x':x_final, 'y':y_final},
       'Peak':{'x':x_peak, 'y':y_peak},
       'Start':{'x':x_reaction, 'y':y_reaction},
       'Rise':{'x':x_rise, 'y':y_rise},
       'Mean': y_mean, 'Reverse': reverse,
       'ReactionTime': reaction_time, 'RiseTime': rise_time, 'SettlingTime': settling_time,
       'Overshoot': overshoot, 'OvershootDamping': damping_by_overshoot}
  return d

def process_metrics (suite, session_path, case_tag, bPSCAD):
  if bPSCAD:
    test_tmax = suite['tmax_pscad']
    t_faults = suite['tevents_pscad']
  else:
    test_tmax = suite['tmax_emtp']
    t_faults = suite['tevents_emtp']
  title = '{:s}, {:s}, event time={:.2f}s'.format(suite['title'], case_tag, t_faults[0])
  test_files = suite['files']
  channels = {}
  units = {}
  channel_labels = ['Vrms', 'P', 'Q', 'F']
  metric_labels = ['Vrms', 'P', 'Q']
  print (title)
  print ('Case-Qty      N  Initial    Final      Min      Max     Peak    Treac    Trise  Tsettle OverShot  Damping')
  for tag in test_files:
    tag_path = os.path.join (session_path, '{:s}'.format (tag))
    channels[tag], units[tag] = load_channels (tag_path, bAll=False, choices=channel_labels)
    if bPSCAD and 'F' in channels[tag]: # cosmetic initialization of the frequency plot
      channels[tag]['F'][0] = 60.0
    for ch in metric_labels:
      mnemonic = '{:s}-{:s}'.format (tag, ch)
      d = window_metrics (t=channels[tag]['t'], v=channels[tag][ch], scale=scale_factor (ch, bPSCAD), t_fault=t_faults[0])
      print ('{:13s} {:1s} {:8.4f} {:8.4f} {:8.4f} {:8.4f} {:8.4f} {:8.5f} {:8.5f} {:8.5f} {:8.5f} {:8.5f}'.format (
          mnemonic, str(d['Reverse'])[0], d['Init']['y'], d['Final']['y'], d['Min']['y'], d['Max']['y'], d['Peak']['y'],
          d['ReactionTime'], d['RiseTime'], d['SettlingTime'], d['Overshoot'], d['OvershootDamping']))
  show_comparison_plot (channels, units, title, bPSCAD, test_tmax, PNGName=None)

Run the next cell to configure PSCAD results import

In [None]:
# set the session_path to match location of your unzipped sample cases
bPSCAD = True
case_tag = 'Solar'                                                     
session_path = 'd:/data/pscad'
PNGName = None

Run the next cell to configure EMTP import

In [None]:
# set the session_path to match location of your unzipped sample cases
bPSCAD = False
session_path = 'd:/data/emtp'
case_tag = 'Wind'
PNGName = None

Plot and process the model initialization tests (fs)

In [None]:
process_metrics (test_suites['fs'], session_path, case_tag, bPSCAD)

Plot and process the undervoltage ride-through tests (uv)

In [None]:
process_metrics (test_suites['uv'], session_path, case_tag, bPSCAD)

Plot and process the overvoltage ride-through tests (ov)

In [None]:
process_metrics (test_suites['ov'], session_path, case_tag, bPSCAD)

Plot and process the frequency ride-through tests (fr)

In [None]:
process_metrics (test_suites['fr'], session_path, case_tag, bPSCAD)

Plot and process the angle ride-through tests (an)

In [None]:
process_metrics (test_suites['an'], session_path, case_tag, bPSCAD)

Plot and process the control reference step tests (st)

In [None]:
process_metrics (test_suites['st'], session_path, case_tag, bPSCAD)

Plot and process the short-circuit ratio ramp-down test (sc)

In [None]:
process_metrics (test_suites['sc'], session_path, case_tag, bPSCAD)