# Edge Trees Analysis

## Import settings

In [None]:
import json
from pathlib import Path
import os

#settings_file = r"Q:\Projekt\Data_2024\styrfiler\settings_SEKNNO.json"
#settings_file = r"Q:\Projekt\Data_2024\styrfiler\settings_SEVPLI.json"
#settings_file = r"C:\SVK_utveckling\settings_SEVPLI.json"
settings_file = r"C:\Users\SE1K4H\Desktop\SVK-Analys-Filer\settings_SEKNNO.json" # Working on Elsas's computer

try:
    if not os.path.exists(settings_file):
        raise FileNotFoundError(f"Could not find {settings_file}")

    with open(settings_file, 'r', encoding='utf-8') as file:
        settings = json.load(file)

    # TODO: check which ones are needed and not
    run_ID = settings["run_ID"]
    powerline_list = settings["powerline_list"]
    local_dir = settings["local_folder"]
    wires_gdb_template = settings["wires_gdb_template"]
    domains_folder = settings["domains_folder"]
    scandate_file = settings["scandate_file"]
    cvd_LEDNINGSGATA_path = os.path.join(domains_folder, "cvd_LEDNINGSGATA.txt")
    powerlines_folder = settings["powerlines_folder"]
    module_path = settings["modules"] 
    ogr2ogr_path = settings["ogr2ogr_path"]   
    proj_lib_path = settings["proj_lib_path"]
    gdal_data_path = settings["gdal_data_path"]
    DEFAULT_DB_NAME = settings["default_db_name"]
    DB_NAME = settings["db_name"]
    USER = settings["db_user"]
    PASSWORD = settings["db_password"]
    HOST = settings["host"]
    PORT = settings["port"] 

    if None in (run_ID, powerline_list, local_dir, wires_gdb_template, domains_folder, scandate_file, powerlines_folder, module_path, ogr2ogr_path, proj_lib_path, gdal_data_path, DEFAULT_DB_NAME, DB_NAME, USER, PASSWORD, HOST, PORT):
        raise KeyError(f"One or more keys are missing in {settings_file}")
    
    print(f"Settings loaded successfully.")
except json.JSONDecodeError as e:
    print(f"Error: Invalid JSON format in {settings_file}")
except KeyError as e:
    print(f"Error: {e}")
except FileNotFoundError as e:
    print(f"Error: {e}")
except Exception as e:
    print(f"Error: Unexpected error: {e}")

## Load PostGIS methods

In [None]:
import psycopg2

# TODO can move these kind of methods to utils or similiar?
def connect_to_database(db_name):
    try:
        conn = psycopg2.connect(dbname=db_name, user=USER, password=PASSWORD, host=HOST, port=PORT)
        conn.autocommit = True
        return conn
    except psycopg2.Error as e:
        print(f"Error connecting to {db_name}: {e}")
        return None
    
def execute_query(conn, query, data=None):
    try:
        with conn.cursor() as cur:
            cur.execute(query, data or ())
    except psycopg2.Error as e:
        print(f"Error executing query: {e}")

## Merge SKB text files for all blocks of a powerline

In [None]:
from pathlib import Path
import glob
import fileinput
import pandas as pd
import os

# TODO add exception handling.

def combine_blocks(row):
    LG = row["LG"]
    line = row["line"]
    line_dir = Path(powerlines_folder) / LG / f"line_{line}"
    
    block_dir = os.path.join(line_dir, "kantträd", "block")
    combined_blocks_path = os.path.join(line_dir, "kantträd", "SKB_raw.txt")
    
    # Merge files for all blocks into one
    merge_blocks(block_dir, "*.txt", combined_blocks_path)
    print(f"Successfully merged all edge trees blocks for {LG}/line_{line} into text file {combined_blocks_path}.")

def merge_blocks(src_dir, search_pattern, dst_file):
    blocks = glob.glob(os.path.join(src_dir, search_pattern))
    with open(dst_file, "w") as fh:
        input_lines = fileinput.input(blocks)
        fh.writelines(input_lines)

powerlines_df = pd.read_csv(powerline_list, sep="\t", header=0)
powerlines_df.apply(combine_blocks, axis=1)
print(f"Done with all power lines")

## Add Edge Trees to PostGIS Tables 

In [None]:
import psycopg2
from pathlib import Path
import pandas as pd
import os
from psycopg2 import sql #TODO update to and install psycopg, the latest version.

cur = None
conn = None

def create_SKB_table(row, conn_db):
    LG = row["LG"]
    line = row["line"]
    table_name = f"{LG.lower()}_{line}_skb_xymz"
    line_dir = Path(powerlines_folder) / LG / f"line_{line}"
    SKB_dir = line_dir / "kantträd"
    SKB_file_in = os.path.join(SKB_dir, f"SKB_raw.txt")
    drop_table_if_exists_query = sql.SQL("DROP TABLE IF EXISTS {table_name}").format(table_name=sql.Identifier(table_name))
    create_query = sql.SQL("CREATE TABLE {table_name}(objectid SERIAL PRIMARY KEY, x DOUBLE PRECISION, y DOUBLE PRECISION, z FLOAT, dz FLOAT, mz FLOAT, shape geometry(POINTZ, 3006))").format(table_name=sql.Identifier(table_name))

    try:
        # Step 1: Create table for powerline
        execute_query(conn_db, drop_table_if_exists_query)
        execute_query(conn_db, create_query)
        
        # Step 2: Insert edge trees into powerline table 
        with open(SKB_file_in, 'r') as src_file:
            for file_line in src_file:
                l_split = file_line.split(' ')
                x = float(l_split[0])
                y = float(l_split[1])
                z = float(l_split[2])
                dz = float(l_split[3])
                mz = z - dz
                
                # TODO Börja här, kika på formatet för x, y, z osv
                insert_query = sql.SQL("INSERT INTO {table_name} (x, y, z, dz, mz, shape) VALUES ({x}, {y}, {z}, {dz}, {mz}, ST_GeomFromText('POINTZ({x} {y} {mz})', 3006));").format(
                    table_name=sql.Identifier(table_name),
                    x=sql.Placeholder("x"),
                    y=sql.Placeholder("y"),
                    z=sql.Placeholder("z"),
                    dz=sql.Placeholder("dz"),
                    mz=sql.Placeholder("mz"))
                tree_data = {"x": x, "y": y, "z": z, "dz": dz, "mz": mz}
                execute_query(conn_db, insert_query, tree_data)

    except psycopg2.Error as e:
        print("Error while working with PostgreSQL:", e)   

try: 
    conn_db = connect_to_database(DB_NAME)
    powerlines_df = pd.read_csv(powerline_list, sep="\t", header=0)

    if conn_db:
        powerlines_df.apply(lambda row: create_SKB_table(row, conn_db), axis=1)
    
    print("Successfully inserted edge trees to PostGIS database.") 
except Exception as e:
    print("Error: ", e)
finally: 
    if conn_db is not None:
        conn_db.close()
        print(f"Connection to database {DB_NAME} closed.")

## Conduct distance calculations

In [None]:
import psycopg2
from pathlib import Path
import pandas as pd

def calculate_distances(row, conn_db):
    #TODO se över SQL query om de kan förbättras, optimeras
    #TODO Borde ta bort columner först om de existerar 
    LG = row["LG"]
    line = row["line"]
    table_name = f"{LG.lower()}_{line}_skb_xymz"
    wire_table_name = f"{LG.lower()}_{line}_fas"
    
    add_columns_query = sql.SQL("""ALTER TABLE {table_name}
        ADD COLUMN avst_mz_fas DOUBLE PRECISION,
        ADD COLUMN avst_hori DOUBLE PRECISION, 
        ADD COLUMN avst_fas DOUBLE PRECISION;""").format(table_name=sql.Identifier(table_name))
    
    avst_mz_fas_query = sql.SQL("""UPDATE {table_name} p
        SET avst_mz_fas = subquery.distance
        FROM (
            SELECT DISTINCT ON (p.objectid)  
                p.objectid,
                ST_3DDistance(p.shape, l.shape) AS distance
            FROM 
                {table_name} p, 
                {wire_table_name} l
            WHERE 
                ST_3DDWithin(p.shape, l.shape, 100)
            ORDER BY 
                p.objectid, distance ASC
        ) AS subquery
        WHERE p.objectid = subquery.objectid;""").format(table_name=sql.Identifier(table_name), wire_table_name=sql.Identifier(wire_table_name))

    avst_fas_query = sql.SQL("""UPDATE {table_name} p
        SET avst_fas = subquery.distance
        FROM (
            SELECT objectid, avst_mz_fas - dz AS distance
            FROM {table_name}
        ) AS subquery
        WHERE p.objectid = subquery.objectid;""").format(table_name=sql.Identifier(table_name))

    avst_hori_query = sql.SQL("""UPDATE {table_name} p
        SET avst_hori = subquery.distance
        FROM (
            SELECT DISTINCT ON (p.objectid)  
                p.objectid,
                ST_Distance(p.shape, l.shape) AS distance
            FROM 
                {table_name} p, 
                {wire_table_name} l
            WHERE 
                ST_DWithin(p.shape, l.shape, 100)
            ORDER BY 
                p.objectid, distance ASC
        ) AS subquery
        WHERE p.objectid = subquery.objectid;""").format(table_name=sql.Identifier(table_name), wire_table_name=sql.Identifier(wire_table_name))

    # Step 1: Add attributes
    execute_query(conn_db, add_columns_query)
    print(f"Columns where added successfully to table {table_name}")

    # Step 2: Calculate shortest distance between mz and phase.
    execute_query(conn_db, avst_mz_fas_query)
    print(f"avst_mz_fas was calculated succesfully for {LG}_{line}")

    # Step 3: Calculate the shortest distance between tree top (z) and phase with event of potential fall.
    execute_query(conn_db, avst_fas_query)
    print(f"avst_fas was calculated succesfully for {LG}_{line}")

    # Step 4: Calculate the horizontal distance between tree and phase.
    execute_query(conn_db, avst_hori_query)
    print(f"avst_hori was calculated succesfully for {LG}_{line}") 

try: 
    powerlines_df = pd.read_csv(powerline_list, sep="\t", header=0)
    conn_db = connect_to_database(DB_NAME)

    if conn_db:
        powerlines_df.apply(lambda row: calculate_distances(row, conn_db), axis=1)
    
    print("Successfully calculated distances for all wires.") #TODO printed on error, change this
except psycopg2.Error as e:
    print("Error while working with PostgreSQL:", e) 
except Exception as e:
    print("Error: ", e)
finally: 
    if conn_db is not None:
        conn_db.close()
        print(f"Connection to database {DB_NAME} closed.")

# Fyll i Littera, Ursprung, Ursprung_Datum, Registreringsdatum
Borde göras i ett tidigare steg

In [None]:
# Add data from LEDNIGNAR.txt
import os
import pandas as pd
from pathlib import Path
import datetime
from datetime import date
import arcpy

SKB_location = os.path.join(working_gdb, 'SKB_XYZ')
#print(SKB_location)

date_field = 'RGDTM'
#matosakerhet_h_field = 'MATOSAKERHET_HOJD'

urspr = "SWECO"
#ins_met = 20
#matosak_plan = 1000
#matosak_hojd = 1000
lev_dat = str(date.today())

def add_date_and_accuracy(powerline):
    LG = powerline["LG"]
    line = powerline["line"]
    littera = powerline["Littera"]
    #LG_code = cvd_LEDNINGSGATA[LG]
    regdatum = skanningsdatum[f"{LG}_{line}"]
    
    LG_line = f"{LG}_{line}"
    print(f"Doing {LG_line}")

    fc_SKB = f"{LG}_{line}_SKB_XYZ"
    SKB_path = os.path.join(SKB_location, fc_SKB)
    # Activating workspace
    arcpy.env.workspace = SKB_path
    arcpy.env.overwriteOutput = True
    

    
    arcpy.management.CalculateField(fc_SKB, "LITTERA", "".join(("'", littera, "'")))
    arcpy.management.CalculateField(fc_SKB, "Ursprung", "".join(("'", urspr, "'")))
    arcpy.management.CalculateField(fc_SKB, "Ursprung_Datum", "".join(("'", lev_dat, "'")))
    arcpy.management.CalculateField(fc_SKB, "RGDTM", "".join(("'", regdatum, "'")))
    #sr = arcpy.SpatialReference("SWEREF99_TM")
    #arcpy.DefineProjection_management(fc_SKB, sr)
    # Assigning domains to fields
    arcpy.management.AssignDomainToField(
        fc_SKB, "LITTERA", "cvd_LITTERA_LEDNING")
    
    #print("Klart")

powerlines_df = pd.read_csv(powerline_list, sep="\t", header=0)

df_skanningsdatum = pd.read_csv(scandate_file, sep="\t", header=0)
skanningsdatum = {f"{row['LG']}_{row['line']}": row['skanningsdatum'] for _, row in df_skanningsdatum.iterrows()}

#df_cvd_LEDNINGSGATA = pd.read_csv(cvd_LEDNINGSGATA_path, sep="\t", header=0)
#df_cvd_DATE = pd.read_csv(cvd_DATE_path, sep="\t", header=0)
#cvd_LEDNINGSGATA = {df_cvd_LEDNINGSGATA.LG[i]: df_cvd_LEDNINGSGATA.Code[i] for i in range(
#    len(df_cvd_LEDNINGSGATA))}
#cvd_DATE = {df_cvd_DATE.LG[i].join(
#    "_00" + str(df_cvd_DATE.nr[i])): df_cvd_DATE.Datum[i] for i in range(len(df_cvd_DATE))}

powerlines_df.apply(add_date_and_accuracy, axis=1)
print("körning av cell klar")

# Slå ihop kantträd för alla ledningar till en featureclass

In [None]:
#working_gdb = "C:\SVK_2024\python_work_dir\working_250218.gdb"
print(working_gdb)
#print(Path(working_gdb,"SKB_XYZ"))
arcpy.env.workspace = working_gdb
#arcpy.env.workspace = os.path.join(working_gdb,"SKB_XYZ")
print(arcpy.env.workspace)
feature_classes = arcpy.ListFeatureClasses(feature_dataset="SKB_XYZ")
print(feature_classes)
output_feature_classes = os.path.join(working_gdb,"kantträd_oklippta") #"c:\workspace\working_25.gdb\merged_feature_class"
print(output_feature_classes)
arcpy.Merge_management(feature_classes, output_feature_classes)
print("körning av cell klar")

# Klipp träffpunkter
Se till att använda aktuella data i LG_polygons och station_polygons

In [None]:
import arcpy
import os
import pandas as pd

#fc_ledningsgator = r"Q:\Projekt\Data_2024\Underlag_SVK\Bestallningsunderlag_2024.gdb\GNG_LEDNINGSGATA" #os.path.join(gdb, "Gator2021")
#fc_stationsomraden = r"Q:\Projekt\Data_2024\Underlag_SVK\Bestallningsunderlag_2024.gdb\GNG_STATIONSOMRADE" #os.path.join(gdb, "STATIONSOMRADE")

arcpy.env.overwriteOutput = True

##fd_ej_akuta = os.path.join(gdb, "ej_akuta")
#fd_all = os.path.join(gdb, "all")
#fd_traffpunkter = os.path.join(gdb, "traffpunkter")

#fc_all = os.path.join(fd_all, f"all_{version}")
#fc_traffpunkter = os.path.join(fd_traffpunkter, f"traffpunkter_{version}")
tmp_layer_traffpunkter = os.path.join(working_gdb, "tmp_layer_traffpunkter")
print(f"tmp_layer_traffpunkter: {tmp_layer_traffpunkter}")

fc_traffpunkter = os.path.join(results_gdb, 'kantträd')
print(f"fc_traffpunkter: {fc_traffpunkter}")

fc_all = os.path.join(working_gdb, "kantträd_oklippta")
print(f"fc_all: {fc_all}")

#arcpy.management.Delete(tmp_layer_traffpunkter) #behövs detta bara i fall man kör om funktionen och tmp_layer_traffpunkter existerar 



# Gör kopia av oklippta
arcpy.management.CopyFeatures(fc_all, fc_traffpunkter)

arcpy.management.MakeFeatureLayer(fc_traffpunkter, tmp_layer_traffpunkter)

# Steg 1
# Inom fastbredd och stationsområden: ta bort alla inom 7 m horisontellt avstånd från fas
#
# 1.1 Välj alla inom fastbredd
arcpy.management.SelectLayerByLocation(in_layer = tmp_layer_traffpunkter, 
                                       overlap_type = "INTERSECT", 
                                       select_features = LG_polygons,
                                       selection_type = "NEW_SELECTION")
# 1.2 Lägg till alla inom stationsområden
arcpy.management.SelectLayerByLocation(in_layer = tmp_layer_traffpunkter, 
                                       overlap_type = "INTERSECT", 
                                       select_features = station_polygons,
                                       selection_type = "ADD_TO_SELECTION")
# 1.3 Av de valda, behåll punkter med avst_hori < 7 m
arcpy.management.SelectLayerByAttribute(in_layer_or_view = tmp_layer_traffpunkter,
                                        selection_type = "SUBSET_SELECTION", 
                                        where_clause = '"AVSTAND_HORISONTELLT" < 7')
# 1.4 Radera valda
arcpy.management.DeleteFeatures(tmp_layer_traffpunkter)

# Steg 2
# Inom fastbredd och stationsområden: ta bort alla med trädhöjd <= 4.5 m och avst_fas >= 5 m
#
# 2.1 Välj alla inom fastbredd
arcpy.management.SelectLayerByLocation(in_layer = tmp_layer_traffpunkter, 
                                       overlap_type = "INTERSECT", 
                                       select_features = LG_polygons,
                                       selection_type = "NEW_SELECTION")
# 2.2 Lägg till alla inom stationsområden
arcpy.management.SelectLayerByLocation(in_layer = tmp_layer_traffpunkter, 
                                       overlap_type = "INTERSECT", 
                                       select_features = station_polygons,
                                       selection_type = "ADD_TO_SELECTION")
# 2.3 Av de valda, gör gör subset selection med punkter med trädhöjd <= 4.5 OCH avst_fas >= 5
arcpy.management.SelectLayerByAttribute(in_layer_or_view = tmp_layer_traffpunkter,
                                       selection_type = "SUBSET_SELECTION",
                                       where_clause = '"DELTA_HOJD" <= 4.5 AND "AVSTAND_FAS" >= 5')
# 2.4 Radera valda
arcpy.management.DeleteFeatures(tmp_layer_traffpunkter)

## Kopiera kvarvarande punkter till traffpunkter
#arcpy.management.CopyFeatures(tmp_layer_traffpunkter, fc_traffpunkter) # Överflödigt?

# Ta bort det temporära lagret
arcpy.management.Delete(tmp_layer_traffpunkter)

print("körning av cell klar")

In [None]:
import os
import pandas as pd
from pathlib import Path
import datetime
from datetime import date
import arcpy

#r"C:\SVK_2024\python_work_dir\results_250218.gdb"
#results_gdb = os.path.join(local_dir, f"results_{run_ID}.gdb")
#print(Path(results_gdb))

arcpy.env.workspace = working_gdb
print(arcpy.env.workspace)
kanttrad = os.path.join(results_gdb, 'kantträd')
print(kanttrad)

arcpy.management.AssignDomainToField(kanttrad, "LEDNINGSGATA", "cvd_LEDNINGSGATA")
arcpy.management.AssignDomainToField(kanttrad, "LITTERA", "cvd_LITTERA_LEDNING")
arcpy.management.AssignDomainToField(kanttrad, "Matosakerhet_Hojd", "cvd_MATOSAKERHET")
arcpy.management.AssignDomainToField(kanttrad, "Matosakerhet_Plan", "cvd_MATOSAKERHET")
arcpy.management.AssignDomainToField(kanttrad, "Insamlingsmetod", "cvd_INMATNINGSMETOD")



#print(kanttrad)
#arcpy.env.workspace = kanttrad
#arcpy.env.workspace(kanttrad)
#print(arcpy.env.workspace)
#feature_classes = arcpy.ListFeatureClasses(feature_dataset="SKB_XYZ")
print("körning av cell klar")


# Kontroll
### Här kan man lägga in egna skript för kontroll av resultatet

In [None]:
# FLytta till RBX-notebook.
# Listar alla LG - littera som finns i RBX-resultatet
# Jämför detta med logg-filen för att se att rätt ledningar
# är med i resultatet (inte alla ledningar har RBX-punkter)

import arcpy
import os
import geopandas as gpd

gdb_path = r"C:\SVK_2024\pythonkörningar\results_2501xx.gdb"
feature_class = "RBX_closest_points"

gdf = gpd.read_file(gdb_path, layer=feature_class)

gdf = gdf[["LEDNINGSGATA", "LITTERA"]]
unique_df = gdf.drop_duplicates().sort_values(["LEDNINGSGATA", "LITTERA"])

print(unique_df)