In [None]:
# Installing and importing libraries
%pip install shapely
%pip install pyshp

import os
import csv
import shapely
import shapefile
from shapely.geometry import shape, Point, LineString, MultiLineString, Polygon, MultiPolygon

In [None]:
# Updating pip
pip install --upgrade pip

In [None]:


class ShapefileProcessor:
    def __init__(self, shapefile_path):
        self.shapefile_path = shapefile_path
        self.reader = shapefile.Reader(shapefile_path)
        self.fields = self.reader.fields[1:]
        self.shapes = self.reader.shapes()
        self.records = self.reader.records()
        self.valid_shapes = []

    def validate_geometry(self):
        for i, s in enumerate(self.shapes):
            try:
                geom = shape(s)
                if not isinstance(geom, (Point, LineString, MultiLineString, Polygon, MultiPolygon)):
                    print(f"Feature {i} has an invalid geometry type: {geom.geom_type}")
                else:
                    self.valid_shapes.append((self.records[i], s))
            except Exception as e:
                print(f"Feature {i} is not valid geometry: {str(e)}")

    def check_intersection(self):
        non_intersecting_shapes = []
        for i, (r1, s1) in enumerate(self.valid_shapes):
            intersects = False
            for j, (r2, s2) in enumerate(self.valid_shapes):
                if i != j and s1.intersects(s2):
                    print(f"Feature {i} intersects feature {j}")
                    intersects = True
                    break
            if not intersects:
                non_intersecting_shapes.append((r1, s1))
        self.valid_shapes = non_intersecting_shapes

    def remove_invalid_geometry(self):
        output_shapefile = shapefile.Writer(self.reader.shapeType)
        for name, type, size in self.fields:
            output_shapefile.field(name, type, size)
        for r, s in self.valid_shapes:
            output_shapefile.record(*r)
            output_shapefile.shape(s)
        output_shapefile.save(os.path.splitext(self.shapefile_path)[0] + '_valid.shp')
        print(f"Saved {len(self.shapes) - len(self.valid_shapes)} invalid features")

    def remove_intersecting_geometry(self):
        output_shapefile = shapefile.Writer(self.reader.shapeType)
        for name, type, size in self.fields:
            output_shapefile.field(name, type, size)
        for r, s in self.valid_shapes:
            output_shapefile.record(*r)
            output_shapefile.shape(s)
        output_shapefile.save(os.path.splitext(self.shapefile_path)[0] + '_non_intersecting.shp')
        print(f"Saved {len(self.shapes) - len(self.valid_shapes)} intersecting features")

    def convert_to_csv(self):
        output_file = os.path.splitext(self.shapefile_path)[0] + '.csv'
        with open(output_file, 'w', newline='') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow([name for name, _, _ in self.fields] + ['Longitude', 'Latitude'])
            for r, s in self.valid_shapes:
                if isinstance(shape(s), Point):
                    writer.writerow(list(r) + list(s.points[0]))
                else:
                    print(f"Feature with ID {r[0]} is not a point and cannot be converted to CSV")

    def process_shapefile(self):
        self.validate_geometry()
        if not self.valid_shapes:
            print("No valid features to process")
            return

        print(f"Found {len(self.shapes) - len(self.valid_shapes)} invalid features")
        option = input("Select an option:\n1. Remove invalid geometry and export to shapefile\n2. Check intersection and export non-intersecting features\n3. Convert to


In [11]:
import os
import csv
import shapefile
from shapely.geometry import shape, Point
from collections import defaultdict


class ShapefileValidator:
    def __init__(self):
        self.options = {
            "1": "Validate each feature",
            "2": "Check intersection among features",
            "3": "Remove invalid geometry and export to shapefile",
            "4": "Remove intersecting geometry and export to shapefile",
            "5": "Export to CSV",
        }
    
    def run(self):
        # Prompt user for shapefile path
        shapefile_path = input("Enter shapefile path: ")
        while not os.path.isfile(shapefile_path):
            shapefile_path = input("Invalid path. Enter shapefile path: ")
        
        # Open the shapefile
        sf = shapefile.Reader(shapefile_path)
        shape_type = sf.shapeType
        if shape_type not in [1, 3, 5]:  # Point, PolyLine, Polygon
            print("Warning: This shapefile geometry type is not applicable for option 2 or 5")
        
        # Display options
        for option, description in self.options.items():
            print(f"{option}. {description}")
        
        # Prompt user for option
        option = input("Select an option: ")
        while option not in self.options:
            option = input("Invalid option. Select an option: ")
        
        if option == "1":
            # Validate each feature
            errors = []
            for i, shape_record in enumerate(sf.iterShapeRecords()):
                try:
                    shape_record.shape.__geo_interface__
                except Exception as e:
                    errors.append(i + 1)
                    print(f"Feature {i + 1} is not a valid geometry: {e}")
            if not errors:
                print("All features are valid geometries")
        
        elif option == "2":
            # Check intersection among features
            if shape_type in [1, 5]:
                shapes = [shape(shape_record.shape) for shape_record in sf.iterShapeRecords()]
                for i in range(len(shapes)):
                    for j in range(i + 1, len(shapes)):
                        if shapes[i].intersects(shapes[j]):
                            print(f"Features {i + 1} and {j + 1} intersect")
            else:
                print("Warning: This shapefile geometry type is not applicable for option 2")
        
        elif option == "3":
            # Remove invalid geometry and export to shapefile
            errors = []
            valid_records = []
            for shape_record in sf.iterShapeRecords():
                try:
                    shape_record.shape.__geo_interface__
                    valid_records.append(shape_record)
                except Exception as e:
                    errors.append(shape_record.record[0])
                    print(f"Feature {shape_record.record[0]} is not a valid geometry: {e}")
            if valid_records:
                w = shapefile.Writer(sf.shapeType)
                for field in sf.fields:
                    w.field(*field)
                for shape_record in valid_records:
                    w.record(*shape_record.record)
                    w.shape(shape_record.shape)
                w.save(f"{shapefile_path[:-4]}_valid.shp")
            print(f"{len(errors)} invalid features found")
        
        # elif option == "4":
        #     # Remove intersecting geometry and export to shapefile
        #     if shape_type in [1, 5]:
        #         shapes = [shape(shape_record.shape) for shape_record in sf.iterShapeRecords()]
        #         intersections = defaultdict(set)
        #         for i in range(len(shapes)):
        #             for j in range(i + 1, len(shapes)):
        #                 if shapes[i

In [21]:
import shapefile


class ShapefileValidator:
    def __init__(self, filepath):
        self.filepath = print(input("What's the filepath?"))
        self.shp = shapefile.Reader(self.filepath)
        self.records = self.shp.records()
        self.shapes = self.shp.shapes(input("What's the filepath?"))

    def validate(self ):
        invalid_features = []
        for i, shape in enumerate(self.shapes):
            try:
                shape.__geo_interface__
            except AttributeError:
                invalid_features.append(i)
        if invalid_features:
            print(f"Invalid features found: {invalid_features}")
        else:
            print("All features are valid geometry.")

    def check_intersection(self):
        if self.shp.shapeTypeName in ["LineString", "MultiLineString", "Polygon", "MultiPolygon"]:
            print("This feature is not applicable to geometry type POINT.")
            return
        intersecting_features = []
        for i, shape1 in enumerate(self.shapes):
            for j, shape2 in enumerate(self.shapes):
                if i != j and shape1.intersects(shape2):
                    intersecting_features.append(i)
                    break
        if intersecting_features:
            print(f"Intersecting features found: {intersecting_features}")
        else:
            print("No intersecting features found.")

    def remove_invalid_geometry(self, output_path):
        valid_records = [r for i, r in enumerate(self.records) if i not in self.invalid_features]
        valid_shapes = [s for i, s in enumerate(self.shapes) if i not in self.invalid_features]
        w = shapefile.Writer(output_path)
        for i, shape in enumerate(valid_shapes):
            w.shape(shape)
            w.record(*valid_records[i])
        w.close()
        print(f"{len(self.invalid_features)} invalid feature(s) removed. Exported to {output_path}.")

    def remove_intersecting_geometry(self, output_path):
        non_intersecting_records = [r for i, r in enumerate(self.records) if i not in self.intersecting_features]
        non_intersecting_shapes = [s for i, s in enumerate(self.shapes) if i not in self.intersecting_features]
        w = shapefile.Writer(output_path)
        for i, shape in enumerate(non_intersecting_shapes):
            w.shape(shape)
            w.record(*non_intersecting_records[i])
        w.close()
        print(f"{len(self.intersecting_features)} intersecting feature(s) removed. Exported to {output_path}.")

    def to_csv(self, output_path):
        if self.shp.shapeTypeName != "Point":
            print("This option is only valid for Point geometry types.")
            return
        with open(output_path, "w") as f:
            f.write("X,Y\n")
            for shape in self.shapes:
                f.write(f"{shape.points[0][0]},{shape.points[0][1]}\n")
        print(f"Exported to {output_path}.")
        
    def run(self):
        print("""
        Options: 
        1. Validate your shapefile
        2. Check intersection
        3. Remove invalid geometry and export to shapefile
        4. Remove intersecting geometry and export to shapefile
        5. Export to CSV.""")

        option = input("Select an option (1-5):")
        if option == "1":
            self.validate()
        elif option == "2":
            self.check_intersection()
        elif option == "3":
            self.invalid_features = []
            for i, shape in enumerate(self.shapes):
                try:
                    shape.__geo_interface__
                except AttributeError:
                    self.invalid_features.append(i)
            output_path = input("Enter the output file path: ")
        elif option == "4":
            valid_sf, intersect_count = self.remove_intersecting_geometry()
            self.export_shapefile(valid_sf, intersect_count)
        elif option == 5:
            self.export_csv()
        else:
            print("Invalid option selected. Please try again.")
        

In [19]:
validator = ShapefileValidator("C:/Users/polly/Documents/Modules/GIS 708 - Advanced GIS/Part 1 - Katumba/Practical 1/Practical 1_Dataset/Municipalities.shp")
validator.run()

Options: 1. Validate, 2. Check intersection, 3. Remove invalid geometry and export to shapefile, 4. Remove intersecting geometry and export to shapefile, or 5. Export to CSV.


In [2]:
import os.path
import shapefile
import csv
from os import path

class ShapefileValidator:
    def __init__(self):
        self.sf = None
        
    def run(self):
        shapefile_path = input("Enter shapefile path: ")
        if not os.path.exists(shapefile_path):
            print("File does not exist.")
            return
        

        self.sf = shapefile.Reader(shapefile_path, "rb")
        print("Options:\n1. Validate\n2. Check intersection\n3. Remove invalid geometry and export to shapefile\n4. Remove intersecting geometry and export to shapefile\n5. Convert to CSV")
        option = int(input("Enter option number: "))
        if option == 1:
            self.validate()
        elif option == 2:
            self.check_intersection()
        elif option == 3:
            filename, path = self.get_output_path()
            self.remove_invalid_geometry(path, filename)
        elif option == 4:
            filename, path = self.get_output_path()
            self.remove_intersecting_geometry(path, filename)
        elif option == 5:
            filename, path = self.get_output_path()
            self.convert_to_csv(path, filename)
        else:
            print("Invalid option number.")


    def get_output_path(self):
        filename = input("Enter output filename: ")
        path = input("Enter output directory path: ")
        return filename, path
    
    def validate(self):
        if not self.sf:
            print("No shapefile loaded.")
            return
        
        errors = []
        for shape in self.sf.shapes():
            try:
                shape.__geo_interface__
            except:
                errors.append(shape)
        
        if errors:
            print(f"{len(errors)} invalid features found:")
            for error in errors:
                print(f"Feature {error.shapeIndex + 1}")
        else:
            print("All features are valid.")
    
    def check_intersection(self):
        if not self.sf:
            print("No shapefile loaded.")
            return
        
        shape_type = self.sf.shapeType
        if shape_type in [3, 5, 15, 25]:
            print("Intersection check is not applicable to this shapefile type.")
            return
        
        errors = []
        for i, shape1 in enumerate(self.sf.shapes()):
            for shape2 in self.sf.shapes()[i+1:]:
                if shape1.shapeType == shape2.shapeType and shape1.intersects(shape2):
                    errors.append((shape1.shapeIndex, shape2.shapeIndex))
        
        if errors:
            print(f"{len(errors)} intersecting features found:")
            for error in errors:
                print(f"Features {error[0]+1} and {error[1]+1}")
        else:
            print("No intersecting features found.")
    
    def remove_invalid_geometry(self, output_path, output_filename):
        if not self.sf:
            print("No shapefile loaded.")
            return
        
        invalid_indices = []
        for i, shape in enumerate(self.sf.shapes()):
            try:
                shape.__geo_interface__
            except:
                invalid_indices.append(i)
        
        if invalid_indices:
            print(f"{len(invalid_indices)} invalid features found. Removing them...")
            writer = shapefile.Writer(output_path + output_filename)
            for i, shape in enumerate(self.sf.shapes()):
                if i not in invalid_indices:
                    writer.shape(shape)
                    writer.record(*self.sf.record(i))
            writer.close()
            print("Shapefile exported.")
        else:
            print("No invalid features found.")
    
    def remove_intersecting_geometry(self, output_path, output_filename):
        if not self.sf:
            print("No shapefile loaded.")
            return
        
        shape_type = self.sf.shapeType
        if shape_type in [3, 5, 15, 25]:
            print("Intersection check is not possible")
            return
        
        errors = []
        for i, shape1 in enumerate(self.sf.shapes()):
            for shape2 in self.sf.shapes()[i+1:]:
                if shape1.shapeType == shape2.shapeType and shape1.intersects(shape2):
                    errors.append((shape1.shapeIndex, shape2.shapeIndex))

        if errors:
            print(f"{len(errors)} intersecting features found. Removing them...")
            
            invalid_indices = set([i for e in errors for i in e])
            writer = shapefile.Writer(output_path + output_filename)
            
            for i, shape in enumerate(self.sf.shapes()):
                if i not in invalid_indices:
                    writer.shape(shape)
                    writer.record(*self.sf.record(i))
            writer.close()
            print("Shapefile exported.")
        else:
            print("No intersecting features found.")

    
    def convert_to_csv(self, output_path, output_filename):
        if not self.sf:
            print("No shapefile loaded.")
            return

        with open(output_path + output_filename + '.csv', 'w', newline='') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow([field[0] for field in self.sf.fields[1:]])
            for record in self.sf.records():
                writer.writerow(record)

        print("CSV file exported.")

        
           

In [7]:
validator = ShapefileValidator()
validator.run()

Options:
1. Validate
2. Check intersection
3. Remove invalid geometry and export to shapefile
4. Remove intersecting geometry and export to shapefile
5. Convert to CSV
CSV file exported.
