In [None]:
#
# General Power Theory - Powerstar
#
# File:            gpt_powerstar.ipynb
# Author:          Thomas Gwasira (tomgwasira@gmail.com), MLT Power
# Date:            February 2020
#
# Functionality:
# This is development code for implementation of the General Power Theory algorithm
# on the Powerstar using the Raspberry Pi on the inverter.
#

In [1]:
import matplotlib.pyplot as plt
import numpy as np
import serial

In [None]:
def quadratic_interpolate(a, b, c, b_pos):
    '''
    Performs quadratic interpolation for a set of three adjacent values to obtain location (x-coordinate) of peak
    
    @param a: Sample 1 of 3 adjacent samples to which parabola is to be fitted
    @param b: Sample 2 of 3 adjacent samples to which parabola is to be fitted
    @param c: Sample 3 of 3 adjacent samples to which parabola is to be fitted
    @param b_pos: Index of middle sample
    
    @rtype: double
    @returns: Non-discrete 'index' of peak
    '''
    
    return b_pos+(0.5*(a-c)/(a-(2*b)+c))

In [None]:
def compute_f0_acf(x, fs):
    '''
    Determines fundamental frequency of a signal using autocorrelation
    
    Parameters:
    @param x: signal whose fundamental frequency is to be determined
    @param fs: sampling frequency of signal x
    
    The ACF of the signal is not actually fully computed or stored to save on space and time.
    Instead, a state machine is used to detect the second peak in the ACF (first peak is the maximum and is always at 0). 
    
    The states are:
    STATE 0 : set threshold under which value we'll ignore the data. NEW STATE = 1
    STATE 1 : check if signal is above threshold AND slope of the signal is positive. If so, move to STATE 2.
    STATE 2 : check if slope of signal has become negative or zero. If so, peak has been found. Move to STATE 3.
    STATE 3 : exit state machine.
    
    @rtype: double
    @returns: Fundamental frequency (f0)
    '''
    
    thresh = 0
    f0 = 0
    STATE = 0
    sum = 0
    prev_sum = 0
    L = 0 # fundamental frequency period expressed in samples
    
    L_det = 0
    count = 0
    
    R = np.zeros(len(x))
    
    # Autocorrelation with peak detection
    for m in range(len(x)):
        prev_prev_sum = prev_sum
        prev_sum = sum
        sum = 0
        
        for n in range(len(x)-m):
            sum += x[n] * x[n+m] # not storing ACF in order to store memory
        
        # State machine to obtain three adjacent points around peak for quadratic interpolation
        if (STATE == 2 and (sum - prev_sum) <= 0): # if gradient change, obtain current and previous two values
            L = quadratic_interpolate(prev_prev_sum, prev_sum, sum, m-1)
            STATE = 3
            break
            
        if (STATE == 1 and (sum > thresh) and (sum - prev_sum) > 0):
            STATE = 2
            
        if (m == 0):
            thresh = sum * 0.5 # peaks to be detected are above half the first peak (maximum value of ACF)
            STATE = 1
            
    
    # Computing frequency in Hz
    f0 = fs/L;
    
    if (f0 < 40 and f0 > 60):
        print('Fundamental frequency too far from 50 Hz!')
    
    return f0

In [None]:
def normalized_spectral_value(x, f, fs):
    '''
    Uses Discrete Time Fourier Transform formula to obtain the normalized complex value of the signal's spectrum at a particular
    frequency.
    
    The normalization routine performed here for the Fourier Transform to obey Parseval's relation is based on the article:
    http://www.dspguide.com/ch10/7.htm    
    
    Parameters:
    @param x: Time domain signal whose frequency component is to be computed
    @param f: Frequency (Hz) of interest
    @param fs: Sample rate
    
    @rtype: complex128
    @returns: Complex value of frequency component
    '''
    R = 0 # real part
    I = 0 # complex part
    frel = f/fs # relative frequency. Equation of DTFT actually uses cycles/sample not cycles/sec (i.e. Hz).
    
    for n in range(len(x)):
        R += x[n] * np.cos(2*np.pi*frel*n)
        I += x[n] * np.sin(2*np.pi*frel*n)
    
    X_f = np.complex(R, I)
    
    # Normalising
    if (f == 0): # for DFT if f corresponds to sample at 0 or sample at +/-N/2. Because DTFT is used here, there is no way of knowing what frequency is at N/2
        X_f = X_f/2
    X_f = X_f/(len(x)/2) # dividing all points in DTFT by N/2
    
    return  X_f

In [3]:
#---------------------------------------------------------------
# Closing relays inorder to use SRC Voltage
#---------------------------------------------------------------
with serial.Serial("/dev/serial0", 115200, timeout=1) as ser:
    ser.reset_input_buffer() # clear input serial buffer
    ser.reset_output_buffer() # clear ouput serial buffer

    ser.write(b"-p p 5555\r") # get access to testing mode
    ser.write(b"-p a 3\r") # get access to factory testing mode
    ser.write(b"-m1\r") # reset faults
    ser.write(b"-sd8\r") # check mode. Must be in standby mode.
    
    # ser.write(b"-f f ?\r") # help on factory mode
    ser.write(b"-f f test_io\r") # put device in IO test mode
    # ser.write(b"-f i ?\r") # help on IO factory mode
    ser.write(b"-f i src_1=1\r") # close src_1 relay
    ser.write(b"-f i src_2=1\r") # close src_2 relay

    ser.close()

In [16]:
#---------------------------------------------------------------
# Reading data from Powerstar 10 card
#---------------------------------------------------------------
# Setup primary UART (miniUART) and read from Powerstar card
with serial.Serial("/dev/serial0", 115200, timeout=1) as ser:
    ser.reset_input_buffer() # clear input serial buffer
    ser.reset_output_buffer() # clear output serial buffer

    ser.write(b"-twh\r") # request headings for signals from Powerstar 10 card
    headings = [((x.strip()).strip(',')).split(',') for x in (((ser.read(1024)).strip()).strip('\r')).split('\r')] # read serial data in response to headings request. strip() necessary for any leading and trailing spaces in data
    print(headings)

    ser.write(b"-twrl\r"); ser.read(10000); # set to high resolution logging fadc = 10kHz
    # ser.write(b"-twrl\r"); ser.read(1024); # set to low resolution logging fadc = 1kHz

    ser.write(b"-tw\r") # request signals from Powerstar 10 card
    data = [((x.strip()).strip(',')).split(',') for x in (((ser.read(10000)).strip()).strip('\r')).split('\r')] # read serial data
    data = np.array(data[:-1]) # discard last list in data. Usually no complete.
    data = data.astype(np.float) # convert data array of strings to array of floats 
    data = np.transpose(data) # transpose data so that a list at each row is a buffer for a partricular measured quantity
    # print(data)

    src_v1 = data[0]/12 # source voltage
    src_i1 = data[2]/12 # source current

    # Signal parameters
    fs = float(1000)                                 # sampling frequency of ADC. 1kHz for low resolution; 10kHz for high resolution.
    nsamples = len(src_v1)                               # number of samples
    tlen = nsamples/fadc

    ser.close()

[['Src Volts SIG', 'Inv Volts SIG', 'Src Amps SIG', 'Inv Amps SIG', 'Src V d', 'Src V q'], [''], ['Cmd Ok']]
