### **Load geojson file, flatten the structure and store geometry**

In [None]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
%pip install Shapely
from pyspark.sql.functions import explode, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

import json

# Load GeoJSON from OneLake path
with open("/lakehouse/default/Files/GIS_Data/City Wards Data - 4326.geojson") as f: 
    geojson = json.load(f)

# Extract features
features = geojson["features"]

print(features)

# Flatten properties
flat_rows = [dict(f["properties"], geometry=f["geometry"]) for f in features]
print(flat_rows[0])

# Define schema
schema = StructType([
    StructField("_id", IntegerType(), True),
    StructField("AREA_ID", DoubleType(), True),
    StructField("DATE_EFFECTIVE", StringType(), True),
    StructField("DATE_EXPIRY", StringType(), True),
    StructField("AREA_ATTR_ID", DoubleType(), True),
    StructField("AREA_TYPE_ID", DoubleType(), True),
    StructField("PARENT_AREA_ID", DoubleType(), True),
    StructField("AREA_TYPE", StringType(), True),
    StructField("AREA_CLASS_ID", DoubleType(), True),
    StructField("AREA_CLASS", StringType(), True),
    StructField("AREA_SHORT_CODE", StringType(), True),
    StructField("AREA_LONG_CODE", StringType(), True),
    StructField("AREA_NAME", StringType(), True),
    StructField("AREA_DESC", StringType(), True),
    StructField("FEATURE_CODE", StringType(), True),
    StructField("FEATURE_CODE_DESC", StringType(), True),
    StructField("TRANS_ID_CREATE", DoubleType(), True),
    StructField("TRANS_ID_EXPIRE", DoubleType(), True),
    StructField("OBJECTID", IntegerType(), True),
    StructField("GEOMETRY", StringType(), True)
    
])

# Build clean flat_rows
flat_rows = [
    {
        "_id": f["properties"].get("_id"),
        "AREA_ID": f["properties"].get("AREA_ID"),
        "DATE_EFFECTIVE": f["properties"].get("DATE_EFFECTIVE"),
        "DATE_EXPIRY": f["properties"].get("DATE_EXPIRY"),
        "AREA_ATTR_ID": f["properties"].get("AREA_ATTR_ID"),
        "AREA_TYPE_ID": f["properties"].get("AREA_TYPE_ID"),
        "PARENT_AREA_ID": f["properties"].get("PARENT_AREA_ID"),
        "AREA_TYPE": f["properties"].get("AREA_TYPE"),
        "AREA_CLASS_ID": f["properties"].get("AREA_CLASS_ID"),
        "AREA_CLASS": f["properties"].get("AREA_CLASS"),
        "AREA_SHORT_CODE": f["properties"].get("AREA_SHORT_CODE"),
        "AREA_LONG_CODE": f["properties"].get("AREA_LONG_CODE"),
        "AREA_NAME": f["properties"].get("AREA_NAME"),
        "AREA_DESC": f["properties"].get("AREA_DESC"),
        "FEATURE_CODE": f["properties"].get("FEATURE_CODE"),
        "FEATURE_CODE_DESC": f["properties"].get("FEATURE_CODE_DESC"),
        "TRANS_ID_CREATE": f["properties"].get("TRANS_ID_CREATE"),
        "TRANS_ID_EXPIRE": f["properties"].get("TRANS_ID_EXPIRE"),
        "OBJECTID": f["properties"].get("OBJECTID"),
        "GEOMETRY": json.dumps(f["geometry"])

    }
    for f in features
]

# print(flat_rows[0])

# Convert to Spark DataFrame
df = spark.createDataFrame(flat_rows, schema=schema).orderBy("_id", ascending=True)

#print(df.select("GEOMETRY").take(1)[0]["GEOMETRY"])

## df.show()
#Write to Lakehouse table
df.write.mode("overwrite").saveAsTable("silver_01_dim_ward_geojson_toronto")

spark.sql("select * from silver_01_dim_ward_geojson_toronto limit 1")

# Final Output as CSV
silver_df = spark.read.table("silver_01_dim_ward_geojson_toronto")
silver_pdf = silver_df.toPandas()
silver_pdf.to_csv("/lakehouse/default/Files/silver_01_dim_ward_geojson_toronto.csv", index=False)

display(silver_pdf) # To get a download option as csv

### **Refine ward geojson table to simplified WKT format with a tolerance of 0.005**

In [None]:
%pip install Shapely
import json
from shapely.geometry import shape

# Read the Silver table
df = spark.read.table("silver_01_dim_ward_geojson_toronto")

# Convert to Pandas for row-wise geometry parsing
pdf = df.toPandas()

# Convert GeoJSON string to WKT and simplify
def geojson_to_wkt(geo_str, tolerance=0.005):
    try:
        geom_json = json.loads(geo_str)
        geom_obj = shape(geom_json)
        simplified = geom_obj.simplify(tolerance, preserve_topology=True)
        print(simplified.is_valid) 
        return simplified.wkt
    except Exception as e:
        print(f"Error parsing geometry: {e}")
        return None

# Apply to all rows
pdf["GEOMETRY_WKT_SIMPLIFIED"] = pdf["GEOMETRY"].apply(lambda g: geojson_to_wkt(g))

# Convert back to Spark DataFrame
pdf["FEATURE_CODE"] = pdf["FEATURE_CODE"].astype(str).fillna("") #This column data type becomes void often
spark_df = spark.createDataFrame(pdf)

# Write to Silver table
# spark.sql("DROP TABLE IF EXISTS silver_02_dim_ward_geowkt_toronto")

# Write to new Silver table
spark_df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("silver_02_dim_ward_geowkt_toronto")

# Final Output as CSV
silver_df = spark.read.table("silver_02_dim_ward_geowkt_toronto")
silver_pdf = silver_df.toPandas()
silver_pdf.to_csv("/lakehouse/default/Files/Silver_Exports/silver_02_dim_ward_geowkt_toronto.csv", index=False)

display(pdf) # To get a download option as csv
    


### **Display Silver Dim Wards Toronto as dataframe to download**

In [9]:
df = spark.sql("SELECT * FROM lh_wards_toronto.silver_dim_wards_toronto LIMIT 1000")
display(df)

StatementMeta(, bcbe1773-3c1d-466f-9721-94aba395e489, 71, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 17c7a272-17e8-43c9-b3a4-9745cc8f5670)

### **Display Silver Fact Wards Income 2020 Toronto as dataframe to download**

In [10]:
df = spark.sql("SELECT * FROM lh_wards_toronto.silver_fact_income_2020_toronto LIMIT 1000")
display(df)

StatementMeta(, bcbe1773-3c1d-466f-9721-94aba395e489, 72, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e0f722d1-1182-415d-9963-d905d6a7765d)

### **Display Silver Fact Wards Population 2020 Toronto as dataframe to download**

In [11]:
df = spark.sql("SELECT * FROM lh_wards_toronto.silver_fact_population_toronto LIMIT 1000")
display(df)

StatementMeta(, bcbe1773-3c1d-466f-9721-94aba395e489, 73, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 15d8e8fa-2243-4fb8-b02f-7b3b8f8fab12)

### **Export Silver 01 Dim GeoJSON Toronto as dataframe to download**

In [13]:
df = spark.read.format("csv").option("header","true").load("Files/Silver_Exports/silver_01_dim_ward_geojson_toronto.csv")
# df now is a Spark DataFrame containing CSV data from "Files/Silver_Exports/silver_01_dim_ward_geojson_toronto.csv".
display(df)

StatementMeta(, bcbe1773-3c1d-466f-9721-94aba395e489, 75, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 6adb0b5c-ec18-4b04-afe3-0ad116b0fba0)

### **Export Silver 01 Dim GeoWKT Toronto as dataframe to download**

In [14]:
df = spark.read.format("csv").option("header","true").load("Files/Silver_Exports/silver_02_dim_ward_geowkt_toronto.csv")
# df now is a Spark DataFrame containing CSV data from "Files/Silver_Exports/silver_02_dim_ward_geowkt_toronto.csv".
display(df)

StatementMeta(, bcbe1773-3c1d-466f-9721-94aba395e489, 76, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9337e40a-ac8d-4510-a983-9a9519be8ff5)