# translating matlab code parser

## setup

In [1]:
from pathlib import Path
import numpy as np

from tqdm import tqdm_notebook

## params

In [2]:
# paths
lvm_path = Path(r'data/qim_20kbps_10db_l2_v2.lvm')

# signal specific params
spb = 20
header = [0 if i % 2 == 0 else 1 for i in range(10)]
print(f'header: {header}')

header: [0, 1, 0, 1, 0, 1, 0, 1, 0, 1]


## class building

In [7]:
class DataParserLVM:
    def __init__(self, lvm_path, spb, header=None):
        print('initializing..')
        
        # set input path
        if isinstance(lvm_path, Path):
            try: 
                assert lvm_path.exists()
                self.lvm_path = lvm_path
                print(f'\tset input lvm path: {self.lvm_path}')
            except AssertionError:
                print(f'no file found at {lvm_path}')
                raise                
        else:
            print(f'invalid path {lvm_path}, must be pathlib.Path object')
            raise TypeError
        
        # samples per bit
        try:
            assert isinstance(spb, int)
            self.spb = spb
            print(f'\tset samples per bit: {self.spb}')
        except AssertionError:
            try:
                self.spb = int(spb)
                print(f'\tset samples per bit: {self.spb}')
            except Exception:
                print(f'unable to convert provided spb ({type(spb)}) to integer, please provide in integer format')
                raise
                
        # header
        if header is None:
            self.header = np.array([0 if i % 2 == 0 else 1 for i in range(10)])
            print(f'\tset header: {self.header}')
        else:
            if isinstance(header, list):
                self.header = np.array(header)
                print(f'\tset header: {self.header}')
            elif isinstance(header, np.array):
                self.header = header
                print(f'\tset header: {self.header}')
            else:
                print(f'invalid header format specified ({type(header)}), must be np.array or list')
                raise TypeError
                
        # startup tasks
        self.parse_lvm_file(output_path='auto')
        self.read_csv_data()
        self.preprocess_data()
        
            
    def parse_lvm_file(self, output_path='auto'):
        """
        reads in .lvm file, writes out .csv with column names
        containing only data points
        
        [arguments]
        input_path: str or pathlib.Path object (preferred)
            input file path
        output_path: 'auto' (preferred) or str or pathlib.Path object
            if auto:
                appends _parsed.csv to end of input file name
            else:
                use provided path, must end in .csv
                
        [returns]
        None
            outputs parsed lvm file
            sets self.parsed_lvm_path, pathlib.Path object pointing to parsed file
        """
        
        print('parsing .lvm file..')
        with open(self.lvm_path, 'r') as infile:
            # set output path
            file_name = infile.name
            
            if output_path == 'auto':
                self.parsed_lvm_path = Path(f'{file_name}_parsed.csv')
            else:
                self.parsed_lvm_path = output_path
                
                if isinstance(parsed_lvm_path, Path):
                    assert str(parsed_lvm_path).endswith('.csv'), 'output file must end with .csv'
                elif isinstance(parsed_lvm_path, str):
                    assert parsed_lvm_path.endswith('.csv'), 'output file must end with .csv'
                else:
                    print('error, output_path must be str or pathlib.Path object')
                    raise TypeError
            
            if not self.parsed_lvm_path.exists():
                self.parsed_lvm_path.touch()
            
            # find header, store column names
            while True:
                line = infile.readline()
                
                if line.strip().endswith('Comment'):
                    cols = line.split(',')[:-1]
                    print(f'\tcolumns: {cols}')
                    break        
                    
            # trim file and write out
            with open(self.parsed_lvm_path, 'w') as outfile:
                outfile.write(', '.join(cols) + '\n')
                while True:
                    try:
                        outfile.write(next(infile))
                    except StopIteration:
                        break
        
        print(f'\tcomplete, parsed file saved to: {self.parsed_lvm_path}')
        
    def read_csv_data(self):
        print('importing data from parsed csv..')
        self.raw_data = np.genfromtxt(
            self.parsed_lvm_path,
            delimiter=',',
            skip_header=1,
            usecols=(1)
        )
        
        print(f'\timported data, {self.raw_data.size} rows')
        
        
    def preprocess_data(self):
        th = max(self.raw_data) / 2
        shape_check = self.raw_data[np.where(self.raw_data >= th)].size
        
        self.raw_data[np.where(self.raw_data >= th)] = 1
        self.raw_data[np.where(self.raw_data < th)] = 0
        
        # set dtype and store
        self.data = self.raw_data.astype('int')
        
        # validate
        try:
            assert self.data.sum() == shape_check            
        except AssertionError:
            print(f'error: sum of 1\'s ({self.data.sum()}) does not match number of entries >= th ({shape_check})')
        
        
    def get_state_length_list(self, data):
        '''
        takes data list ([1,1,0,1,0,0,...]) and returns numpy array 
        of duration of consecutive bits ([13,245,2588,19,1056,...])
        '''
        
        return np.diff(
            np.where(
                np.concatenate(
                    ([data[0]],
                     data[:-1] != data[1:],
                     [0]
                    )
                )
            )[0]
        )[::2]
    

    def discretize_signal(self):
        # set params
        discretized_data = []
        start_bit = self.data[0]
        alt_bit = 0 if start_bit == 1 else 1
        print(f'starting bit: {start_bit}, alt bit: {alt_bit}')
        
        # get state lengths
        state_lengths = self.get_state_length_list(self.data)
        
        for i, state in enumerate(state_lengths):
            discretized_state = int(np.round(state / self.spb))
            #print(f'iter: {i}, discretized state: {discretized_state}')
            
            if i % 2 == 0:
                discretized_data.append([start_bit for j in range(discretized_state)])
            elif i % 2 == 1:
                discretized_data.append([alt_bit for j in range(discretized_state)])
            else:
                print('ya done messed up.')
        
        self.discretized_array = np.array([item for sublist in discretized_data for item in sublist])
        print(f'complete, discretized signal: {self.discretized_array.size} entries')
        

    
        

In [8]:
DP = DataParserLVM(lvm_path, 20)

initializing..
	set input lvm path: data\qim_20kbps_10db_l2_v2.lvm
	set samples per bit: 20
	set header: [0 1 0 1 0 1 0 1 0 1]
parsing .lvm file..
columns: ['X_Value', 'Voltage']
	complete, parsed file saved to: data\qim_20kbps_10db_l2_v2.lvm_parsed.csv
	imported data, 4000000 rows


In [9]:
DP.discretize_signal()

starting bit: 0, alt bit: 1
complete, discretized signal: 68851 entries


In [10]:
DP.discretized_array

array([0, 0, 1, ..., 1, 1, 1])

## data acquisition

### new lvm parser dev

In [None]:
def parse_lvm_file(input_path, output_path='auto'):
    """
    reads in .lvm file, writes out .csv with column names
    containing only data points
    
    [arguments]
    input_path: str or pathlib.Path object (preferred)
        input file path
    output_path: 'auto' (preferred) or str or pathlib.Path object
        if auto:
            appends _parsed.csv to end of input file name
        else:
            use provided path, must end in .csv
            
    [returns]
    output_path: pathlib.Path object
        outputs Path object pointing to outputted file   
    """

    with open(lvm_path, 'r') as infile:
        # set output path
        file_name = infile.name
        
        if output_path == 'auto':
            parsed_lvm_path = Path(f'{file_name}_parsed.csv')
        else:
            parsed_lvm_path = output_path
            
            if isinstance(parsed_lvm_path, pathlib.Path):
                assert str(parsed_lvm_path).endswith('.csv'), 'output file must end with .csv'
            elif isinstance(parsed_lvm_path, str):
                assert parsed_lvm_path.endswith('.csv'), 'output file must end with .csv'
            else:
                print('error, output_path must be str or pathlib.Path object')
        
        if not parsed_lvm_path.exists():
            parsed_lvm_path.touch()
        
        # find header, store column names
        while True:
            line = infile.readline()
            
            if line.strip().endswith('Comment'):
                cols = line.split(',')[:-1]
                print(f'columns: {cols}')
                break        
                
        # trim file and write out
        with open(parsed_lvm_path, 'w') as outfile:
            outfile.write(', '.join(cols) + '\n')
            while True:
                try:
                    outfile.write(next(infile))
                except StopIteration:
                    break
    
    print(f'complete, parsed file saved to: {parsed_lvm_path}')
    
    return parsed_lvm_path

In [None]:
parsed_lvm_path = parse_lvm_file(lvm_path, output_path='auto')

### read csv

In [None]:
raw_data = np.genfromtxt(
    parsed_lvm_path,
    delimiter=',',
    skip_header=1,
    usecols=(1)
)

print(f'imported data, {raw_data.size} rows')

## data processing

### set `1`'s and `0`'s

In [None]:
th = max(raw_data) / 2

shape_check = raw_data[np.where(raw_data >= th)].size
raw_data[np.where(raw_data >= th)] = 1
raw_data[np.where(raw_data < th)] = 0

# set dtype
raw_data = raw_data.astype('int')

# validate
# sum of all the ones should equal the number of entries >= th
assert raw_data.sum() == shape_check, 'error: sum of 1\'s does not match number of entries >= th'

### discretize signal

In [16]:
def get_state_length_list(data):
    '''
    takes data list ([1,1,0,1,0,0,...]) and returns numpy array 
    of duration of consecutive bits ([13,245,2588,19,1056,...])
    '''
    
    return np.diff(
        np.where(
            np.concatenate(
                ([data[0]],
                 data[:-1] != data[1:],
                 [0]
                )
            )
        )[0]
    )[::2]

In [18]:
state_lengths = get_state_length_list(DP.raw_data)
state_lengths[:10]

array([49, 19,  9, 10, 30, 17,  2, 49, 49, 19], dtype=int64)

In [14]:
def discretize_signal(raw_data, spb):
    discretized_data = []
    start_bit = raw_data[0]
    alt_bit = 0 if start_bit == 1 else 1
    #print(f'starting bit: {start_bit}, alt bit: {alt_bit}')
    
    
    for i, state in enumerate(state_lengths):
        discretized_state = int(np.round(state / spb))
        #print(f'iter: {i}, discretized state: {discretized_state}')
        
        if i % 2 == 0:
            # start_bit
            discretized_data.append([start_bit for j in range(discretized_state)])
        elif i % 2 == 1:
            # alt_bit
            discretized_data.append([alt_bit for j in range(discretized_state)])
        else:
            print('ya done messed up.')
    
    discretized_array = np.array([item for sublist in discretized_data for item in sublist])
    #print(f'complete, discretized signal: {discretized_array.size} entries')
    
    return discretized_array

In [None]:
discretized_array = discretize_signal(raw_data, 92)

In [None]:
discretized_array[95:150]

### signal finder

In [20]:
def search_sequence_numpy(arr,seq):
    """ Find sequence in an array using NumPy only.

    Parameters
    ----------    
    arr    : input 1D array
    seq    : input 1D array

    Output
    ------    
    Output : 1D Array of indices in the input array that satisfy the 
    matching of input sequence in the input array.
    In case of no match, an empty list is returned.
    """

    # Store sizes of input array and sequence
    Na, Nseq = arr.size, seq.size

    # Range of sequence
    r_seq = np.arange(Nseq)

    # Create a 2D array of sliding indices across the entire length of input array.
    # Match up with the input sequence & get the matching starting indices.
    M = (arr[np.arange(Na-Nseq+1)[:,None] + r_seq] == seq).all(1)

    # Get the range of those indices as final output
    if M.any() >0:
        return np.where(np.convolve(M,np.ones((Nseq),dtype=int))>0)[0]
    else:
        return []  

In [None]:
search_sequence_numpy(discretized_array, np.array(header))

In [None]:
discretized_array[30078:30120]

### spb brute force attempt

In [11]:
from collections import Counter

In [12]:
spb_list = [i for i in range(90, 100, 1)]
print(f'spb_list: {len(spb_list)} elements')

spb_list: 10 elements


In [21]:
result_dict = {}

for spb in tqdm_notebook(spb_list):
    discretized_array = discretize_signal(DP.raw_data, spb)    
    
    result_dict[spb] = search_sequence_numpy(discretized_array, np.array(header))

HBox(children=(IntProgress(value=0, max=10), HTML(value='')))




In [22]:
frequencies = {
    spb: Counter(np.diff(matches)).most_common(3)
    for spb, matches in result_dict.items()
}

frequencies[90]

[(1, 807), (191, 38), (192, 21)]

In [23]:
frequencies

{90: [(1, 807), (191, 38), (192, 21)],
 91: [(1, 807), (191, 38), (192, 21)],
 92: [(1, 807), (190, 69), (191, 2)],
 93: [(1, 807), (190, 69), (191, 2)],
 94: [(1, 798), (189, 37), (188, 18)],
 95: [(1, 798), (189, 37), (188, 18)],
 96: [(1, 783), (188, 56), (187, 8)],
 97: [(1, 783), (188, 56), (187, 8)],
 98: [(1, 397), (41, 2), (98, 1)],
 99: [(1, 397), (41, 2), (98, 1)]}