# Candidate Generation Module

Take Shapefile, create candidate segments.

---

### `Expected Data`:

- **Annotations Shapefile:**
    Holds annotation to be split into segments for candidate generation.

- **Ground Truth Shapefile:**
    Holds Ground Truth annotation for candidate set quality evaluation.


### `Output Data`:
- **Candidate Frames:**
    One Shapefile for every segment, holding a set of offset replacement candidates.
    
---

In [1]:
# -- Module Information --

import time, os, sys
from lib.ModuleTools import *
from lib.GeoTools import *
from lib.ShapeTools import *
from lib.EMTools import *
import lib.fixclip as fixclip
import shapely.geometry as shp
from pyproj import Transformer

import matplotlib.pyplot as plt

module_data = {
    'name': 'Candidates', # Module Name
    'desc': 'Create Candidate Set from Shapefiles', # Module Description
    'test_name': 'segment_sets', # Default test name
    'csv': 'segments.csv', # Csv path
    'verbose': True # Default verbosity
}

test = ModuleTest(module_data)


# -- Parameters --



# Candidate Gen Config
Config = {}
# Number of candidate pairs to be generated for each line segment 
candidatePairs = wrap(30, Config)
# length of line segments when parsing source. Measured in meters
segmentLength = wrap(10, Config)
# Distance from source line to generate candidate tuples
baseOffset = wrap(1.5, Config)  



### Multiplier for offset
# Increase candidate offset by multiplication. Set to zero to incrememnt linearly
exp_mod = wrap(0, Config) 
 
test.Pgroup('Candidate Generation Config', Config)



# -- Input Data --
# - Raster and shape data -
InputData = {}
# Shapefile annotations for label
annotation = wrap_fp('/data/GeometricErrors/Scene01/imperfectLines.shp', InputData)
# Ground Truth Shapefile for evaluation.
groundTruth = wrap_fp('/data/GeometricErrors/Scene01/GroundTruth.shp', InputData)
# Raster to consider (if cropping)
raster = wrap_fp('/data/GeometricErrors/Scene01/template_map.tif', InputData)

test.Fgroup('Shapes', InputData)

# --------------------
if test.verbose:
    print("Parameters Confirmed.")
    from datetime import datetime as dt 
    print(dt.now().strftime('\n%a at %I:%M:%S%p'))

  shapely_geos_version, geos_capi_version_string


KeyboardInterrupt: Interrupted by user

In [None]:
from shapely.affinity import translate
import lib.LineGen as lg
import geopandas as gpd

def Get_Candidates(seg_gdf, pair_count, dist, exp_mod=1.2):
    """ Generate Candidate Frames from cut linestring Frame.

    Options:
    pair_count: number of candidate pairs per segment
    dist: offset for candidate pairs. Increases linearly then exponentially.
    exp_mod: offset multiplier for outer candidates.
    """
    # Will store one GeoDataframe for each segment, holding that segment's candidates
    frames = []

    # Iterate over linestring segments in seg_gdf
    for line in seg_gdf.geometry:
        
        # Create Candidate List, starting with the original linestring
        candidates = [line]

        # Clear the offset value
        off_dist = 0
        
        # Iterate i over the number of candidate pairs to be generated.
        for i in range(pair_count):
            
            # Increment
            off_dist += dist
            
            # Check for outer pairs multiplier 
            if (exp_mod > 0.0) and (i*2 > pair_count):
                off_dist *= exp_mod                
                

            # Parse the offset distance into tuple (x_off, y_off)  
            (xVal, yVal) = lg.Weight_Dist(line=line, dist=off_dist)
            
            # Determine line's slope, Translate from tuple
            if lg.Get_Slope(line) > 0:
                candidates.append(translate(line, xoff=-xVal, yoff=yVal))
                candidates.append(translate(line, xoff=xVal, yoff=-yVal)) 
            else:
                candidates.append(translate(line, xoff=xVal, yoff=yVal))
                candidates.append(translate(line, xoff=-xVal, yoff=-yVal)) 
                
        # Add the candidate frame to output.
        frames.append(gpd.GeoDataFrame(geometry=candidates, crs=seg_gdf.crs))

    return frames

def CropCandidates(candidate_gdf, raster_poly, buff):
    """ Crop Candidate Frames. """

    copy_gdf = GDF_Buffer(candidate_gdf, buff)

    # Save Frame
    metadata = {key: [] for key in candidate_gdf.columns}
    for idx, geom in enumerate(copy_gdf.geometry):
        if raster_poly.contains(geom):
            for key, data in candidate_gdf.items():
                metadata[key].append(data[idx])

    if len(metadata['geometry']) < 2:
        print("Warning (CropCandidates): Less than 2 of the candidates exist on the raster. Checked {}. Saving as shapefile and omitting from candidates.".format(len(candidate_gdf.geometry)))
        return 'omit'

    return gpd.GeoDataFrame(metadata, crs=candidate_gdf.crs)

In [None]:
# Code Section 
section_tstamp = time.time()
section_name = 'Create Candidate Set'
# --------------------

# Create Raster Bounding Box Polygon for removing out-of-bounds candidates
# CRS Transformer
trans = Transformer.from_crs(raster.crs, annotation.crs, always_xy=True)
# Bounding Polygon
(mx,my,Mx,My) = raster.bounds
boundingPoly = shp.box(mx,my,Mx,My)
points = [shp.Point(trans.transform(x,y)) for x, y in boundingPoly.exterior.coords]
raster_poly = shp.Polygon(points)


# Remove Multi-Geometries
annotation = Flatten_Frame(annotation)

# Split source_lines into fixed-length segments
cutLines = Segment_GDF(gdf=annotation, segment_length=segmentLength)

# Generate candidates for each segment. 
candidateFrames = Get_Candidates(seg_gdf=cutLines, pair_count=candidatePairs, dist=baseOffset, exp_mod=exp_mod)

# crop and save candidates
allSegments = []
for idx, cFrame in enumerate(candidateFrames):
    cropped = CropCandidates(cFrame, raster_poly, 2)
    if not isinstance(cropped, str):
        fp = os.path.join(test.Folder('segments'), 'set{:02}{}.shp'.format(idx, test.end))
        cropped.to_file(fp)
        for geom in cropped.geometry:
            allSegments.append(geom)
        
# Save all segments as single frame
allSegments = gpd.GeoDataFrame(geometry=allSegments, crs=annotation.crs) 
allSegments.to_file(test.dir + '/all{}.shp'.format(test.end))

print("Total Segments:      ", len(cutLines.geometry))
print("Candidates Per Seg:  ", ((candidatePairs*2) + 1))
print("Total Candidates (~):", (len(candidateFrames)*(candidatePairs*2) + 1))

# --------------------
section_data = {'time': time.time()-section_tstamp, 'stamp': section_tstamp}
test.Section(section_name, section_data)
# --------------------
if test.verbose:
    print("{} ({:02}) Complete.".format(section_name, len(test.sectionNames)))
    print(dt.now().strftime('\n%a at %I:%M:%S%p'))

In [None]:
""" Save Information and Index Test. """

# Index in master csv
test.index()

# Write Info to Markdown
test.markdown()

# ------------------------
if test.verbose:
    print("Data successfully saved to markdown file, indexed on csv.") 
    print(dt.now().strftime('\n%a at %I:%M:%S%p'))

In [None]:
print(test.Folder('segments'))