In [2]:
import ezdxf
import subprocess
import ezdxf
import os
import glob

1. Read and convert DWG file to DXF file 

In [3]:
input_folder = r"D:\2_Analytics\6_plan_vs_actual\raw_data_dwg_file\dwg_file"
output_folder = r"D:\2_Analytics\6_plan_vs_actual\raw_data_dwg_file\dxf_file"
oda_exe = r"C:\Program Files\ODA\ODAFileConverter 26.7.0\ODAFileConverter.exe"

os.makedirs(output_folder, exist_ok=True)

cmd = [
    oda_exe,
    input_folder,
    output_folder,
    "ACAD2013",  # safer than 2010
    "DXF",
    "0",
    "*.dwg"
]

result = subprocess.run(cmd, capture_output=True, text=True)

print("Return code:", result.returncode)
print("STDOUT:\n", result.stdout)
print("STDERR:\n", result.stderr)

dxf_files = glob.glob(os.path.join(output_folder, "*.dxf"))
print("Converted DXF files:", dxf_files)


Return code: 0
STDOUT:
 
STDERR:
 
Converted DXF files: ['D:\\2_Analytics\\6_plan_vs_actual\\raw_data_dwg_file\\dxf_file\\ACW_NLM_Proposed_Pit_and_Dumping_area_for_FY_2024-25.dxf']


2. read dxf file and split all possible features

In [4]:
# Input DXF file
input_dxf = r"D:\2_Analytics\6_plan_vs_actual\raw_data_dwg_file\dxf_file\ACW_NLM_Proposed_Pit_and_Dumping_area_for_FY_2024-25.dxf"

# Output folder
output_folder = r"D:\2_Analytics\6_plan_vs_actual\raw_data_dwg_file\split_features_new"
os.makedirs(output_folder, exist_ok=True)

# Read DXF
doc = ezdxf.readfile(input_dxf)
msp = doc.modelspace()

# Buckets for separated entities
lines = []
polygons = []
points = []
circles = []
arcs = []
texts = []
hatches = []

# Iterate over entities
for entity in msp:
    etype = entity.dxftype()

    if etype == "LINE":
        lines.append(entity)

    elif etype == "POINT":
        points.append(entity)

    elif etype in ["LWPOLYLINE", "POLYLINE"]:
        if entity.closed:   # closed polyline = polygon
            polygons.append(entity)
        else:
            lines.append(entity)

    elif etype == "CIRCLE":
        polygons.append(entity)  # treat circle as polygon (closed shape)

    elif etype == "ARC":
        arcs.append(entity)

    elif etype in ["TEXT", "MTEXT"]:
        texts.append(entity)

    elif etype == "HATCH":
        polygons.append(entity)  # hatch = polygon-like fill

# Function to save entities into new DXF
def save_entities(entities, filename):
    if not entities:
        return
    new_doc = ezdxf.new(setup=True)
    new_msp = new_doc.modelspace()
    for e in entities:
        try:
            new_msp.add_foreign_entity(e)
        except Exception as ex:
            print(f"Skipped {e.dxftype()}: {ex}")
    new_doc.saveas(filename)

# Save to separate DXF files
save_entities(lines, os.path.join(output_folder, "LINES.dxf"))
save_entities(polygons, os.path.join(output_folder, "POLYGONS.dxf"))
save_entities(points, os.path.join(output_folder, "POINTS.dxf"))
save_entities(circles, os.path.join(output_folder, "CIRCLES.dxf"))
save_entities(arcs, os.path.join(output_folder, "ARCS.dxf"))
save_entities(texts, os.path.join(output_folder, "TEXTS.dxf"))
save_entities(hatches, os.path.join(output_folder, "HATCHES.dxf"))

print(" DXF files saved in:", output_folder)


 DXF files saved in: D:\2_Analytics\6_plan_vs_actual\raw_data_dwg_file\split_features_new


remove duplicates fron line dxf file 

In [5]:
# Paths
lines_dxf = r"D:\2_Analytics\6_plan_vs_actual\raw_data_dwg_file\split_features_new\LINES.dxf"
output_folder = r"D:\2_Analytics\6_plan_vs_actual\raw_data_dwg_file\split_features_new"
os.makedirs(output_folder, exist_ok=True)

# ---------- Deduplicate Lines ----------
doc_lines = ezdxf.readfile(lines_dxf)
msp_lines = doc_lines.modelspace()

line_set = set()

# LINE entities
for line in msp_lines.query("LINE"):
    start = (round(line.dxf.start.x, 6), round(line.dxf.start.y, 6), round(line.dxf.start.z, 6))
    end = (round(line.dxf.end.x, 6), round(line.dxf.end.y, 6), round(line.dxf.end.z, 6))
    # normalize order
    key = tuple(sorted([start, end]))
    line_set.add(key)

# LWPOLYLINE entities
for pline in msp_lines.query("LWPOLYLINE"):
    points = list(pline.get_points("xy"))
    # break into segments
    for i in range(len(points) - 1):
        start = (round(points[i][0], 6), round(points[i][1], 6))
        end = (round(points[i+1][0], 6), round(points[i+1][1], 6))
        key = tuple(sorted([start, end]))
        line_set.add(key)
    if pline.closed and len(points) > 2:
        start = (round(points[-1][0], 6), round(points[-1][1], 6))
        end = (round(points[0][0], 6), round(points[0][1], 6))
        key = tuple(sorted([start, end]))
        line_set.add(key)

print(f" Unique lines found: {len(line_set)}")

# Save deduplicated lines
doc_new_lines = ezdxf.new(setup=True)
msp_new_lines = doc_new_lines.modelspace()
for start, end in line_set:
    msp_new_lines.add_line(start, end)
doc_new_lines.saveas(os.path.join(output_folder, "LINES_clean.dxf"))


print(" Cleaned line DXF saved in:", output_folder)

 Unique lines found: 18
 Cleaned line DXF saved in: D:\2_Analytics\6_plan_vs_actual\raw_data_dwg_file\split_features_new


Remove duplicates from text dxf file

get the text near to end points 

In [8]:
import ezdxf
import geopandas as gpd
from shapely.geometry import LineString, Point
from scipy.spatial import cKDTree
import pandas as pd

# ---------- Input DXF paths ----------
lines_dxf = r"D:\2_Analytics\6_plan_vs_actual\raw_data_dwg_file\split_features_new\LINES_clean.dxf"
texts_dxf = r"D:\2_Analytics\6_plan_vs_actual\raw_data_dwg_file\split_features_new\TEXTS_cleaned.dxf"

# ---------- Step 1: Extract Lines ----------
doc_lines = ezdxf.readfile(lines_dxf)
msp_lines = doc_lines.modelspace()

lines = []
for line in msp_lines.query("LINE"):
    start = (line.dxf.start.x, line.dxf.start.y)
    end = (line.dxf.end.x, line.dxf.end.y)
    lines.append({"geometry": LineString([start, end]), "start": start, "end": end})

# Also include polyline segments
for pline in msp_lines.query("LWPOLYLINE"):
    points = list(pline.get_points("xy"))
    for i in range(len(points) - 1):
        start, end = points[i], points[i + 1]
        lines.append({"geometry": LineString([start, end]), "start": start, "end": end})
    if pline.closed and len(points) > 2:
        start, end = points[-1], points[0]
        lines.append({"geometry": LineString([start, end]), "start": start, "end": end})

gdf_lines = gpd.GeoDataFrame(lines, crs="EPSG:32644")  # CRS

# ---------- Step 2: Extract Texts ----------
doc_texts = ezdxf.readfile(texts_dxf)
msp_texts = doc_texts.modelspace()

texts = []
for t in msp_texts.query("TEXT MTEXT"):
    content = t.dxf.text.strip()
    insert = (t.dxf.insert.x, t.dxf.insert.y)
    texts.append({"geometry": Point(insert), "text": content})

gdf_texts = gpd.GeoDataFrame(texts, crs=gdf_lines.crs)

# ---------- Step 3: Build KDTree for fast nearest search ----------
tree = cKDTree(gdf_texts.geometry.apply(lambda p: (p.x, p.y)).to_list())

def nearest_text(point):
    dist, idx = tree.query([point.x, point.y], k=1)
    return gdf_texts.iloc[idx]["text"]

# ---------- Step 4: Assign nearest texts to each line endpoint ----------
start_texts = []
end_texts = []

for idx, row in gdf_lines.iterrows():
    start_point = Point(row["start"])
    end_point = Point(row["end"])
    start_texts.append(nearest_text(start_point))
    end_texts.append(nearest_text(end_point))

gdf_lines["start_text"] = start_texts
gdf_lines["end_text"] = end_texts

# ---------- Step 5: Save final shapefile ----------
output_shp = r"D:\2_Analytics\6_plan_vs_actual\raw_data_dwg_file\split_features_new\LINES_with_attributes.shp"
gdf_lines.to_file(output_shp)

print(f"Line shapefile with labels saved at: {output_shp}")


Line shapefile with labels saved at: D:\2_Analytics\6_plan_vs_actual\raw_data_dwg_file\split_features_new\LINES_with_attributes.shp


In [None]:
import ezdxf
import os
import geopandas as gpd
from shapely.geometry import LineString, Point
from scipy.spatial import cKDTree

# ---------- Input & Output ----------
input_dxf = r"D:\2_Analytics\6_plan_vs_actual\raw_data_dwg_file\split_by_layer\SECTION_LINE_NAUKARI_LST.dxf"
work_dir = r"D:\2_Analytics\6_plan_vs_actual\raw_data_dwg_file\split_by_layer\processed"
os.makedirs(work_dir, exist_ok=True)

lines_dxf = os.path.join(work_dir, "LINES.dxf")
texts_dxf = os.path.join(work_dir, "TEXTS.dxf")
output_shp = os.path.join(work_dir, "LINES_with_start_end_text.shp")

# ---------- Step 1: Split DXF ----------
doc = ezdxf.readfile(input_dxf)
msp = doc.modelspace()

line_entities, text_entities = [], []
for e in msp:
    etype = e.dxftype()
    if etype in ["LINE", "LWPOLYLINE"]:
        line_entities.append(e)
    elif etype in ["TEXT", "MTEXT"]:
        text_entities.append(e)

# Save lines
if line_entities:
    doc_lines = ezdxf.new(setup=True)
    msp_lines = doc_lines.modelspace()
    for e in line_entities:
        try:
            msp_lines.add_foreign_entity(e)
        except Exception as ex:
            print(f" Skipped line entity: {ex}")
    doc_lines.saveas(lines_dxf)

# Save texts
if text_entities:
    doc_texts = ezdxf.new(setup=True)
    msp_texts = doc_texts.modelspace()
    for e in text_entities:
        try:
            msp_texts.add_foreign_entity(e)
        except Exception as ex:
            print(f" Skipped text entity: {ex}")
    doc_texts.saveas(texts_dxf)

print(f"Split complete: {len(line_entities)} lines, {len(text_entities)} texts")

# ---------- Step 2: Convert Lines to GeoDataFrame ----------
doc_lines = ezdxf.readfile(lines_dxf)
msp_lines = doc_lines.modelspace()

lines = []
for line in msp_lines.query("LINE"):
    start = (line.dxf.start.x, line.dxf.start.y)
    end = (line.dxf.end.x, line.dxf.end.y)
    lines.append({"geometry": LineString([start, end]), "start": start, "end": end})

for pline in msp_lines.query("LWPOLYLINE"):
    pts = list(pline.get_points("xy"))
    if len(pts) > 1:
        for i in range(len(pts) - 1):
            start, end = pts[i], pts[i+1]
            lines.append({"geometry": LineString([start, end]), "start": start, "end": end})
        if pline.closed:
            start, end = pts[-1], pts[0]
            lines.append({"geometry": LineString([start, end]), "start": start, "end": end})

gdf_lines = gpd.GeoDataFrame(lines, crs="EPSG:32644")  
gdf_lines = gdf_lines.drop_duplicates(subset=["geometry"])
print(f"Deduplicated lines: {len(gdf_lines)}")

# ---------- Step 3: Convert Texts to GeoDataFrame ----------
doc_texts = ezdxf.readfile(texts_dxf)
msp_texts = doc_texts.modelspace()

texts = []
for t in msp_texts.query("TEXT MTEXT"):
    content = t.dxf.text.strip()
    insert = (t.dxf.insert.x, t.dxf.insert.y)
    texts.append({"geometry": Point(insert), "text": content})

gdf_texts = gpd.GeoDataFrame(texts, crs=gdf_lines.crs)
gdf_texts = gdf_texts.drop_duplicates(subset=["geometry", "text"])
print(f" Deduplicated texts: {len(gdf_texts)}")

# ---------- Step 4: Nearest text for start & end ----------
tree = cKDTree(gdf_texts.geometry.apply(lambda p: (p.x, p.y)).to_list())

def nearest_text(point):
    dist, idx = tree.query([point.x, point.y], k=1)
    return gdf_texts.iloc[idx]["text"]

start_labels, end_labels = [], []
for _, row in gdf_lines.iterrows():
    start_point = Point(row["start"])
    end_point = Point(row["end"])
    start_labels.append(nearest_text(start_point))
    end_labels.append(nearest_text(end_point))

gdf_lines["start_text"] = start_labels
gdf_lines["end_text"] = end_labels

# ---------- Step 5: Save Shapefile ----------
gdf_lines.to_file(output_shp)
print(f" Final line shapefile with start/end texts saved: {output_shp}")


Split complete: 19 lines, 36 texts
Deduplicated lines: 18
 Deduplicated texts: 36
 Final line shapefile with start/end texts saved: D:\2_Analytics\6_plan_vs_actual\raw_data_dwg_file\split_by_layer\processed\LINES_with_start_end_text.shp
