In [None]:
# default_exp core

# Powerline Detection

> Use Hough Transform to extract powerlines from LiDAR data.

In [None]:
#hide
from nbdev.showdoc import *
import pathlib
import matplotlib.pyplot as plt
import laspy as lp

In [None]:
#export
import numpy as np
from tqdm import tqdm
from scipy.linalg import norm
from scipy.optimize import leastsq
from skimage.transform import hough_line, hough_line_peaks

In [None]:
#export
class Hline(object):
    "a class is used to record the details of lines extracted by Hough Transform"
    '''
    coord: parameters of a line
    line: detailed coordinates
    '''
    def __init__(self, *args):
        self.coord = args[0]
        self.line = args[1]
        self.angle = args[2]
        self.dist = args[3]
    def getLengthOfLine(self):
        line = self.line
        return len(line)

In [None]:
#export
class ImageBuffer():
    "it is use to record the buffer image details of powerline corridors"
    def __init__(self, **kwargs):
        self.img = kwargs['img']
        self.coord = kwargs['coord']

In [None]:
#export
def grid_subsampling(points, voxel_size):
    "Define a function that takes as input an array of points, and a voxel size expressed in meters"
    "points: it is from a LiDAR data; voxel_size: the length of edge"
    nb_vox=np.ceil((np.max(points, axis=0) - np.min(points, axis=0))/voxel_size)

    a = ((points - np.min(points, axis=0)) // voxel_size).astype(int)
    non_empty_voxel_keys, inverse, nb_pts_per_voxel= np.unique(a, axis=0, return_inverse=True, return_counts=True)
    idx_pts_vox_sorted=np.argsort(inverse)
    voxel_grid={}
    grid_barycenter,grid_candidate_center=[],[]
    last_seen=0
    for idx, vox in enumerate(tqdm(non_empty_voxel_keys)):
        per_voxel = nb_pts_per_voxel[idx]
        num_pvs   = last_seen + per_voxel
        idxs      = idx_pts_vox_sorted[last_seen: num_pvs]
        voxel_grid[tuple(vox)] = points[idxs]
        gbc = np.mean(points[idxs], axis=0)
        grid_barycenter.append(gbc)
        #
        temp = voxel_grid[tuple(vox)]-np.mean(voxel_grid[tuple(vox)],axis=0)
        temp = np.linalg.norm(temp, axis=1).argmin()                                             
        grid_candidate_center.append(voxel_grid[tuple(vox)][temp])
        last_seen += per_voxel
    #
    grid_barycenter       = np.array(grid_barycenter)
    grid_candidate_center = np.array(grid_candidate_center)
    return {'barycenter': grid_barycenter, 'candidate': grid_candidate_center}

In [None]:
#export 
def getDirctionVectorsByPCA(pca):
    "calculate the direction vectors of PCA in a point cloud"
    abc = []
    for length, vector in zip(pca.explained_variance_, pca.components_):
        v = vector *  np.sqrt(length)
        abc.append(v.tolist())
    return abc
#export 
def pointsProjectAxis(abc,startPoint, pts):
    "calculate the points that are mapped onto the pca lines from a point cloud"
    '''
    abc: slope vector
    startPoint: starting point of a line
    pts: coordinates of all points
    '''
    m,n,p = abc
    xo,yo,zo = startPoint
    pts_ty=[]
    for i in pts:
        t = (m*(i[0] - xo) + n*(i[1] - yo) + p*(i[2] - zo))/(m**2 + n**2 + p**2)
        xc = m*t + xo
        yc = n*t + yo
        zc = p*t + zo
        pts_ty.append([xc,yc,zc])
    return pts_ty
#export 
def radiusOfCylinderByLeastSq(**kwargs):
    "use least square algorithm to calculate the radius of a cylinder"
    pts = kwargs['points'] # the coordinates of points
    projs = kwargs['projections'] # the points that are mapped onto the pca lines from a point cloud
    a = pts - projs
    dist=np.array([norm(i) for i in a])
    errfunc = lambda cr:np.sum((dist - cr)**2)
    est_p, success = leastsq(errfunc,0)
    return np.ceil(est_p)
#export 
def get_start_end_line(pts):
    "get the starting point and end point of a line"
    px,py = pts[:,0], pts[:,1]
    idx = np.lexsort((py,px))
    start = pts[idx[0]]
    end = pts[idx[-1]]
    return np.array([start,end])
#export 
def cylinderSurface(**kwargs):
    "generate the surface of a cylinder"
    pts = kwargs['points'] # points in the axis of a cylinder
    R = kwargs['radius']
    pca = kwargs['pca']
    p0, p1 = get_start_end_line(pts)
    mag = norm(p0 - p1)
    #vector in direction of axis
    v,n1,n2 = pca.components_
    # judge whether the direction of u and v is same
    u = p1 - p0
    j = np.dot(v,u)/(norm(u)*norm(v))
    if j < 0:
        p0, p1 = (p1,p0)
    #surface ranges over t from 0 to length of axis and 0 to 2*pi
    t = np.linspace(0, mag, 100)
    theta = np.linspace(0, 2 * np.pi, 100)
    #use meshgrid to make 2d arrays
    t, theta = np.meshgrid(t, theta)
    #generate coordinates for surface
    cX, cY, cZ = [p0[i] + v[i] * t + R * np.sin(theta) * n1[i] + R * np.cos(theta) * n2[i] for i in [0, 1, 2]]
    return (cX, cY, cZ)
#export   
def extractFeathersByPointCloud(**kwargs):
    "calculate the axis length sphericity, linarity and planarity of a point cloud"
    pca = kwargs['pca']
    pts = kwargs['points']
    abc = getDirctionVectorsByPCA(pca)
    pts_lamda_0 = pointsProjectAxis(abc[0],pca.mean_,pts)
    a = {'points': pts, 'projections': pts_lamda_0}
    r = radiusOfCylinderByLeastSq(**a)
    pts_lamda_0 = np.array(pts_lamda_0)
    p0, p1 = get_start_end_line(pts_lamda_0)
    lamda1, lamda2, lamda3 = np.sqrt(pca.explained_variance_)
    #
    axis_length = norm(p0 - p1)
    sp = lamda3/lamda1
    ln = (lamda1 - lamda2)/lamda1
    pl = (lamda2 - lamda3)/lamda1
    rtn = {'radius': r[0], 'axis': axis_length, 'sphericity': sp, 'linearity': ln, 'planarity': pl}
    return rtn
 

# Hough Transform

In [None]:
#export
def pointsToRaster(**kwargs):
    "Transform a point cloud to a raster image"
    xyz_min = kwargs['min_xyz']
    xyz_max = kwargs['max_xyz']
    pts = kwargs['pts']
    cellsize = kwargs['cellsize']
    #
    xyz_min = np.floor(xyz_min)
    xyz_max = np.floor(xyz_max)
    cell_ids = (pts - xyz_min)/cellsize
    cell_ids = np.floor(cell_ids)
    cell_ids = np.unique(cell_ids,axis=0) # eliminate the replicated values
    cell_ids = cell_ids.astype(int)
    #
    a = np.floor((xyz_max - xyz_min)/cellsize)
    a = a.astype(int)
    xnum = a[0] + 1
    ynum = a[1] + 1
    img_xy = np.zeros((ynum, xnum), dtype='uint8')
    for m in cell_ids:
        i,j,k = m
        img_xy[j,i] = 1
    return img_xy
#export
def getCellIDByPolarCoordinates(**kwargs):
    "Get the indices of cells according to polar coordinates"
    '''
    Modifyed by Haitao Lyu on 08/08/2020
    Input Argument:
        houghPic: a 2D matrix that represents a raster picture, which generated by the function 'pointsToRaster'.
        pts: a polar coordinates list that comes from the return value of the function'houghToRasterByNumOfCrossLines'
        threshold: the maximal distance between a cell and a line
    Return Value:
        rtn: a list that includes the index of row and num and the corresponding slope angle
    '''
    houghPic = kwargs['img']
    peaks = kwargs['peaks']
    threshold = kwargs['maxdist']
    #
    rtn=[]
    oy,ox = np.nonzero(houghPic)
    for nc, angle, dist in peaks:
        a = np.math.cos(angle)
        b = np.math.sin(angle)
        c = -dist
        d_line = np.abs(a*ox + b*oy + c)/np.sqrt(a*a + b*b)
        #
        d_l = d_line[d_line<threshold]
        if len(d_l) < 1:
            
            print(f'this line length:{len(d_l)}, neglect it')
            continue
        #
        nc = np.int(nc)
        #
        if nc >= len(d_l):
            nc = len(d_l) -1
        #
        e = np.partition(d_l, nc)
        e = e[:nc]
        line = []
        for i in e:
            idx = np.argwhere(d_line == i)
            idx = idx.ravel()
            row = oy[idx]
            col = ox[idx]
            rc = np.vstack((row,col)).transpose()
            t = rc.tolist()
            #t = [row, col, angle, dist]
            line = line + t
        #
        line = np.unique(line, axis=0)
        line = line.tolist()
        rtn.append(line)
    return rtn
    pass

#export
def houghToRasterByCellID(**kwargs):
    "Transform the result of Hough transform to a raster image"
    rc = getCellIDByPolarCoordinates(**kwargs)
    houghPic = kwargs['img']
    img_temp = np.zeros_like(houghPic)
    for ij in rc:
        ij = np.transpose(ij)
        i = ij[0,:]
        j = ij[1,:]
        img_temp[i,j] = 1
    return img_temp
    pass
#export
def recordHoughLines(**kwargs):
    "encapsulate each hough line into a class HLine and save all hough lines in an array"
    '''
    Modifyed by Haitao Lyu on 08/08/2020
    Description: encapsulate each hough line into a class HLine and save all hough lines in an array
    Parameters:
        houghpic: a 2D matrix that represents a raster picture
        houghpeaks: generated by hough_line_peaks, which is a function of library skimage
        threshold: the maximal distance between a cell and a line
    Return:
        an array including hough line encapsulated by class HLine
    '''
    houghPic = kwargs['img']
    peaks = kwargs['peaks']
    threshold = kwargs['maxdist']
    rtn = []
    oy,ox = np.nonzero(houghPic)
    for nc, angle, dist in peaks:
        a = np.math.cos(angle)
        b = np.math.sin(angle)
        c = -dist
        d_line = np.abs(a*ox + b*oy + c)/np.sqrt(a*a + b*b)
        #
        d_l = d_line[d_line<threshold]
        if len(d_l) < 1: 
            continue
        #
        nc = len(d_l) -1
        e = np.partition(d_l, nc)
        e = e[:nc]
        line =[]
        coord = [nc, angle, dist]
        for i in e:
            idx = np.argwhere(d_line == i)
            idx = idx.ravel()
            row = oy[idx]
            col = ox[idx]
            rc = np.vstack((row,col)).transpose()
            t = rc.tolist()
            line = line + t
        #
        line = np.unique(line, axis=0)
        HL = Hline(coord, line, angle,dist)
        rtn.append(HL)
    return rtn
#
#export 
def countNumOfConsecutiveObj(*args):
    "count the number of consecutive 0"
    ary = args[0]
    obj = args[1]
    if len(ary) < 0:
        return -1
    a = 0
    cun = []
    for i in ary:
        if i == obj:
            a = a + 1
        else:
            cun.append(a)
            a = 0
    return cun
#export
def calRowByColViaLineEquation(**kwargs):
    "compute row:y according to col:x based on a line equation"
    '''
    Parameters:
        hl: it is self-defined class HLine, which saves the details of a line
    '''
    hl = kwargs['HLine']
    threshold = kwargs['maxdist']
    angle = hl.angle
    dist = hl.dist
    row_col_min = np.min(hl.line, axis=0)
    row_col_max = np.max(hl.line, axis=0)
    r = np.arange(row_col_min[0], row_col_max[0] + 1)
    c = np.arange(row_col_min[1], row_col_max[1] + 1)
    x,y = np.meshgrid(c,r)
    ox = x.ravel()
    oy = y.ravel()
    a = np.math.cos(angle)
    b = np.math.sin(angle)
    c = -dist
    d_line = np.abs(a*ox + b*oy + c)/np.sqrt(a*a + b*b)
    idx = d_line < threshold
    row = oy[idx]
    col = ox[idx]
    rc = np.vstack((row,col)).transpose()
    rc = np.unique(rc, axis=0)
    #t = rc.tolist()
    return rc
#
#export 
def generateCorridorByLine(**kwargs):
    "generate a powerline corridor in a raster image "
    houghPic = kwargs['img']
    hl = kwargs['HLine']
    img_temp = np.zeros_like(houghPic)
    if len(hl) < 0:
        return -1
    elif len(hl) == 1:
        hl = hl[0]
        for rc in hl.line:
            i = rc[0]
            j = rc[1]
            img_temp[i,j] = 1
    else:
        for h in hl:
            for rc in h.line:
                i = rc[0]
                j = rc[1]
                img_temp[i,j] = 1
    return img_temp
#export
def generateBuffer(**kwargs):
    "generate buffer zone based on the hough transform"
    '''
    Description: generate buffer zone based on the hough transform
    '''
    houghPic = kwargs['img']
    peaks = kwargs['peaks']
    threshold = kwargs['maxdist']
    #
    img = np.ones_like(houghPic)
    oy,ox = np.nonzero(img)
    img_temp = np.zeros_like(houghPic)
    for nc, angle, dist in peaks:
        a = np.math.cos(angle)
        b = np.math.sin(angle)
        c = -dist
        d_line = np.abs(a*ox + b*oy + c)/np.sqrt(a*a + b*b)
        #
        d_l = d_line[d_line<threshold]
        if len(d_l) < 1: 
            continue
        #
        nc = len(d_l) -1
        e = np.partition(d_l, nc)
        e = e[:nc]
        for i in e:
            idx = np.argwhere(d_line == i)
            row = oy[idx].flatten()
            col = ox[idx].flatten()
            img_temp[row, col] = 1
    return img_temp
#export
def locatePointsFromBuffer(**kwargs):
    "obtain points from original point cloud base on the image buffer"
    '''
    Description: obtain points from original point cloud base on the image buffer 
    '''
    img_buffer = kwargs['img_buffer']
    min_xyz = kwargs['min_xyz']
    pts = kwargs['pts']
    cellsize = kwargs['cellsize']
    #
    ly, lx = np.nonzero(img_buffer)
    x_min, y_min = min_xyz[0:2]
    px = cellsize * lx + x_min
    py = cellsize * ly + y_min
    nid_line=[]
    zipped = zip(px,py)
    for i,j in zipped:
        fcx = np.logical_and(pts[:,0] >= i, pts[:,0] < (i + cellsize))
        fcy = np.logical_and(pts[:,1] >= j, pts[:,1] < (j + cellsize))
        idx = np.logical_and(fcx, fcy)
        a = np.argwhere(idx==True)
        a = a.ravel()
        nid_line = nid_line + a.tolist()
    return nid_line
#export
def locatePointsFromBufferSlideWindow(**kwargs):
    "obtain points from original point cloud base on the image buffer"
    '''
    Description: obtain points from original point cloud base on the image buffer 
    '''
    img_buffer = kwargs['img_buffer']
    min_xyz = kwargs['min_xyz']
    pts = kwargs['pts']
    cellsize = kwargs['cellsize']
    zl,zh = kwargs['z_range']
    #
    ly, lx = np.nonzero(img_buffer)
    x_min, y_min = min_xyz[0:2]
    px = cellsize * lx + x_min
    py = cellsize * ly + y_min
    nid_line=[]
    zipped = zip(px,py)
    for i,j in zipped:
        fcx = np.logical_and(pts[:,0] >= i, pts[:,0] < (i + cellsize))
        fcy = np.logical_and(pts[:,1] >= j, pts[:,1] < (j + cellsize))
        fcz = np.logical_and(pts[:,2] >= zl, pts[:,2] < zh)
        idx = np.logical_and(fcx, fcy)
        idx = np.logical_and(fcz,idx)
        a = np.argwhere(idx==True)
        a = a.ravel()
        nid_line = nid_line + a.tolist()
    return nid_line
#
#export 
def getFeaturesFromPointCloud(**kwargs):
    "obtain the features of each point"
    pca = kwargs['pca']
    n = kwargs['n'] # the number of nearest points
    xyz = kwargs['points']
    #
    tree = KDTree(xyz)
    features = []
    with progressbar.ProgressBar(max_value = xyz.shape[0]) as bar:
        for i in range(xyz.shape[0]):
            di = tree.query(xyz[i], k=n)
            pts = xyz[di[1]]
            pca.fit(pts)
            a = {'points': pts, 'pca': pca}
            fts = extractFeathersByPointCloud(**a)
            r = fts['radius']
            al = fts['axis']
            sp = fts['sphericity']
            ln = fts['linearity']
            pl = fts['planarity']
            features.append([r, al, sp, ln, pl])
            bar.update(i)
    return features

In [None]:
#export 
def evaluateLinesDiscontinuity(**kwargs):
    "evaluate the extent of lines' discontinuity, which is to calculate the maximal length of discontinuous lines"
    hl = kwargs['hough-lines']
    maxdist = kwargs['maxdist']
    img_base = kwargs['img_base']
    lid = []
    for k in range(len(hl)):
        a = {'HLine':hl[k], 'maxdist':maxdist}
        rc = calRowByColViaLineEquation(**a)
        val = []
        for ij in rc:
            i = ij[0]
            j = ij[1]
            val.append(img_base[i,j])
        #
        aa = countNumOfConsecutiveObj(*[val,0])
        lid.append(np.max(aa))
    return lid


In [None]:
# export
def constructCorridorsByHT(**kwargs):
    "Construct corridors based on the lines extracted by Hough transform"
    xyz_min = kwargs['xyz_min']
    xyz_max = kwargs['xyz_max']
    pts_1 = kwargs['points']
    cellsize = kwargs['cellsize']
    threshold_line = kwargs['HF_peaks_threshold']
    maxdist = kwargs['maxdist']
    threshold_discontinuity = kwargs['discontinuity_threshold']
    buffer_size = kwargs['buffer_size']
    #
    a = {'min_xyz':xyz_min, 'max_xyz':xyz_max, 'pts':pts_1, 'cellsize':cellsize}
    img_xy = pointsToRaster(**a) # rtn
    # Hough transform
    tested_angles = np.linspace(0, 2*np.pi, 360)
    h, theta, d = hough_line(img_xy, theta = tested_angles)
    hough_peaks = hough_line_peaks(h, theta, d, threshold=threshold_line)
    # filter the short lines based on hough peaks and raster image img_xy
    a = {'img': img_xy, 'peaks':zip(*hough_peaks), 'maxdist':maxdist}
    img_base = houghToRasterByCellID(**a) # rtn
    # calcualte the length of lines. the number of cells on lines
    a = {'img': img_base, 'peaks':zip(*hough_peaks), 'maxdist':maxdist}
    hl = recordHoughLines(**a) # rtn hough-lines
    hl = np.array(hl)
    # judge the extent of a line discontinuity. 
    a = {'hough-lines': hl, 'maxdist': maxdist, 'img_base': img_base}
    lid = evaluateLinesDiscontinuity(**a) # rtn discontinuity of lines   
    lid = np.array(lid)
    # remove the lines with long discontinuity
    idx = np.argwhere(lid < threshold_discontinuity)
    hltemp = hl[idx.ravel()]
    # generate new raster image with continuous lines
    a = {'img':img_xy, 'HLine':hltemp}
    img_temp = generateCorridorByLine(**a) # rtn image with continuous lines
    # construct corridors based on img_temp and buffer operation
    line_params = []
    for h in hltemp:
        line_params.append(h.coord)
    a = {'img': img_base, 'peaks':line_params, 'maxdist':buffer_size}
    img_corridors = generateBuffer(**a) # rtn corridors
    # generate image buffer array
    lineBufferArray = [] # rtn the collection of corridors
    for h in hltemp:
        a = {'img': img_base, 'peaks':[h.coord], 'maxdist':buffer_size}
        img = generateBuffer(**a)
        lineBufferArray.append(img)
    #
    rtn = {'img_xy': img_xy,
           'img_base': img_base,
           'hough_lines': hl,
           'discontinuity': lid,
           'img_lines': img_temp,
           'img_corridors': img_corridors,
           'corridors': lineBufferArray
          }
    return rtn

# Self-defined Data Structure

In [None]:
#export 
def getPointsFromSlideCorridors(**kwargs):
    "get the points in slide corridors"
    corridor = kwargs['corridor']
    xyz_min = kwargs['xyz_min']
    cellsize = kwargs['cellsize']
    xyz = kwargs['xyz']
    step = kwargs['step']
    rtn = []
    # get all points in a corridor
    a = {'img_buffer': corridor,
         'min_xyz': xyz_min,
         'pts': xyz,
         'cellsize': cellsize
        }
    pids = locatePointsFromBuffer(**a) 
    lp_line = LPoints(pids)
    pts_line = xyz[pids]
    # get a collection of the points in each slide corridor
    zl = np.min(pts_line,axis=0)[2]
    zh = np.max(pts_line,axis=0)[2]
    zz = np.arange(zl+4, zh+1-step)
    for z in zz:
        ids_sw = []
        a = {'img_buffer': corridor,
             'min_xyz': xyz_min,
             'pts': pts_line,
             'cellsize': cellsize,
             'z_range': [z, z+step]
            }
        pids = locatePointsFromBufferSlideWindow(**a) 
        for p in pids:
            ids_sw.append(lp_line.dic[p])
        lp_sw = LPoints(ids_sw)
        rtn.append(lp_sw)
    return rtn
#export 
def constructSlideCuboidsByPointCloud(**kwargs):
    "construct slide cuboids based on a point cloud"
    pids = kwargs['pids'] # the row numbers of points in original data xyz
    xyz = kwargs['xyz']
    corridor = kwargs['corridor']
    xyz_min = kwargs['xyz_min']
    cellsize = kwargs['cellsize']
    step = kwargs['step']
    rtn = []
    lp_line = LPoints(pids)
    pts_line = xyz[pids]
    # get a collection of the points in each slide corridor
    zl = np.min(pts_line,axis=0)[2]
    zh = np.max(pts_line,axis=0)[2]
    zl = np.floor(zl)
    zh = np.ceil(zh)
    zz = np.arange(zl, zh+1-step)
    for z in zz:
        ids_sw = []
        a = {'img_buffer': corridor,
             'min_xyz': xyz_min,
             'pts': pts_line,
             'cellsize': cellsize,
             'z_range': [z, z+step]
            }
        pids = locatePointsFromBufferSlideWindow(**a) 
        for p in pids:
            ids_sw.append(lp_line.dic[p])
        lp_sw = LPoints(ids_sw)
        rtn.append(lp_sw)
    return rtn
#export
def extractLinesFromCorridor(**kwargs):
    "extract power lines from a corridor by a slide cuboid"
    corridor = kwargs['corridor']
    xyz_min = kwargs['xyz_min']
    xyz_max = kwargs['xyz_max']
    cellsize = kwargs['cellsize']
    xyz = kwargs['xyz']
    step = kwargs['step']
    maxdist = kwargs['maxdist']
    buffer_size = kwargs['buffer_size']
    HF_peaks_threshold = kwargs['HF_peaks_threshold']
    discontinuity_threshold = kwargs['discontinuity_threshold']
    # calculate the id of points in slide corridors, return value is a list of dictionary
    a = {'corridor': corridor,
         'xyz_min': xyz_min,
         'cellsize': cellsize,
         'xyz': xyz,
         'step':step
        }
    ids_sw = getPointsFromSlideCorridors(**a)
    # extract corridors in each slide corridors
    cs = [] # return value is a list of dictionary
    for s in ids_sw:
        pids = list(s.dic.values())
        if len(pids) == 0:
            continue
        pts_sw = xyz[pids]
        a = {'xyz_min': xyz_min,
             'xyz_max': xyz_max,
             'points' : pts_sw,
             'cellsize': cellsize,
             'HF_peaks_threshold': HF_peaks_threshold,
             'maxdist': maxdist,
             'discontinuity_threshold': discontinuity_threshold,
             'buffer_size': buffer_size
            }
        rtn = constructCorridorsByHT(**a)
        if len(rtn['corridors']) > 0:
            b = {'dic_HT':rtn, 'dic_pids':s}
            cs.append(b)
    # extract power lines from sub-corridors
    lids = []
    for s in cs:
        dic_HT = s['dic_HT']
        dic_pids = s['dic_pids']
        pids = list(dic_pids.dic.values())
        pts_sw = xyz[pids]
        crs = dic_HT['corridors']

        for i in crs:
            a = {'img_buffer': i,
                 'min_xyz': xyz_min,
                 'pts': pts_sw,
                 'cellsize': cellsize
                }
            bs = locatePointsFromBuffer(**a)
            for j in bs:
                lids.append(dic_pids.dic[j])
    #
    lids = np.unique(lids)
    return lids
#export
def generateSlideWindowByX(**kwargs):
    xyz = kwargs['xyz']
    pids = kwargs['pids']
    step = kwargs['step']
    pids = np.array(pids)
    pts = xyz[pids]
    zl = np.floor(np.min(pts,axis=0)[0])
    zh = np.ceil(np.max(pts,axis=0)[0])
    zz = np.arange(zl, zh+1-step)
    rtn = []
    for z in zz:
        con = np.logical_and(pts[:,0]>=z, pts[:,0] < z+step)
        ids_sw = pids[con]
        lp_sw = LPoints(ids_sw)
        rtn.append(lp_sw)
    return rtn
#export
def extractLinesFromPointCloud(**kwargs):
    "extract lines from a point cloud. it combines other functions"
    corridor = kwargs['corridor']
    pids = kwargs['pids'] # the ids of a point cloud
    xyz_min = kwargs['xyz_min']
    xyz_max = kwargs['xyz_max']
    cellsize = kwargs['cellsize']
    xyz = kwargs['xyz']
    step = kwargs['step']
    maxdist = kwargs['maxdist']
    buffer_size = kwargs['buffer_size']
    HF_peaks_threshold = kwargs['HF_peaks_threshold']
    discontinuity_threshold = kwargs['discontinuity_threshold']
    #
    #
    a = {'pids':pids, 'xyz': xyz, 'corridor': corridor, 'xyz_min': xyz_min, 'cellsize':cellsize, 'step':step}
    ids_sw = constructSlideCuboidsByPointCloud(**a)
    # extract corridors in each slide corridors
    cs = [] # return value is a list of dictionary
    for s in ids_sw:
        s_pids = list(s.dic.values())
        if len(s_pids) == 0:
            continue
        pts_sw = xyz[s_pids]
        a = {'xyz_min': xyz_min,
             'xyz_max': xyz_max,
             'points' : pts_sw,
             'cellsize': cellsize,
             'HF_peaks_threshold': HF_peaks_threshold,
             'maxdist': maxdist,
             'discontinuity_threshold': discontinuity_threshold,
             'buffer_size': buffer_size
            }
        rtn = constructCorridorsByHT(**a)
        if len(rtn['corridors']) > 0:
            b = {'dic_HT':rtn, 'dic_pids':s}
            cs.append(b)
    # extract power lines from sub-corridors
    lids = []
    for s in cs:
        dic_HT = s['dic_HT']
        dic_pids = s['dic_pids']
        s_pids = list(dic_pids.dic.values())
        pts_sw = xyz[s_pids]
        crs = dic_HT['corridors']

        for i in crs:
            a = {'img_buffer': i,
                 'min_xyz': xyz_min,
                 'pts': pts_sw,
                 'cellsize': cellsize
                }
            bs = locatePointsFromBuffer(**a)
            for j in bs:
                lids.append(dic_pids.dic[j])
    #
    lids = np.unique(lids)
    return lids
#export 
def secondHTandSW(**kwargs):
    "second slide window and Hough transform"
    pids = kwargs['pids']
    corridor = kwargs['corridor']
    stepxy = kwargs['stepxy']
    stepz = kwargs['stepz']
    xyz = kwargs['xyz']
    xyz_min = kwargs['xyz_min']
    xyz_max = kwargs['xyz_max']
    cellsize = kwargs['cellsize']
    maxdist = kwargs['maxdist']
    buffer_size = kwargs['buffer_size']
    HF_peaks_threshold = kwargs['HF_peaks_threshold']
    discontinuity_threshold = kwargs['discontinuity_threshold']
    #
    a = {'xyz': xyz, 'pids': pids, 'step': stepxy }
    gswx = generateSlideWindowByX(**a)
    lp_array = []
    for g in gswx:
        g_pids = list(g.dic.values())
        a = {'pids': g_pids,
             'corridor': corridor,
             'xyz_min': xyz_min,
             'xyz_max': xyz_max,
             'cellsize': cellsize,
             'xyz': xyz,
             'step': stepz,
             'maxdist': maxdist,
             'buffer_size': buffer_size,
             'HF_peaks_threshold': HF_peaks_threshold,
             'discontinuity_threshold': discontinuity_threshold
            }
        lids = extractLinesFromPointCloud(**a)
        lp = LPoints(lids)
        lp_array.append(lp)
    return lp_array

In [None]:
#hide
from nbdev.export import notebook2script
notebook2script()

Converted 00_core.ipynb.
Converted index.ipynb.
