In [1]:
import os
os.environ['JAVA_HOME'] ="/usr/lib/jvm/java-21-openjdk-amd64"
!echo $JAVA_HOME

/usr/lib/jvm/java-21-openjdk-amd64


In [2]:
import urbanity
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, lit, expr

def inspect_geojson_structure(spark, file_path):
    """
    Inspect the structure of a GeoJSON file to understand its schema.
    
    Args:
        spark: Spark session
        file_path: Path to GeoJSON file
        
    Returns:
        Sample row and schema
    """
    # Load the raw data
    raw_df = spark.read.format("json").load(file_path)
    
    # Print the schema
    print("Raw GeoJSON Schema:")
    raw_df.printSchema()
    
    # Show a sample row
    print("Sample row:")
    raw_df.show(1, truncate=False, vertical=True)
    
    return raw_df

def load_geojson_files_flexible(spark, nodes_path, edges_path, city_name=None):
    """
    Load GeoJSON files with flexible structure handling.
    
    Args:
        spark: Spark session
        nodes_path: Path to the nodes GeoJSON file
        edges_path: Path to the edges GeoJSON file
        city_name: Optional name of the city for identification
        
    Returns:
        Tuple of (nodes_df, edges_df)
    """
    # Read raw JSON
    nodes_raw = spark.read.format("json").load(nodes_path)
    edges_raw = spark.read.format("json").load(edges_path)
    
    # Print schemas to understand structure
    print("Nodes Raw Schema:")
    nodes_raw.printSchema()
    
    # Check if the data follows GeoJSON feature collection pattern
    if "features" in nodes_raw.columns:
        # Standard GeoJSON feature collection
        print("Processing as GeoJSON feature collection...")
        
        # Explode the features array to get individual features
        nodes_df = nodes_raw.select(explode(col("features")).alias("feature"))
        edges_df = edges_raw.select(explode(col("features")).alias("feature"))
        
        # Extract properties and geometry from each feature
        nodes_df = nodes_df.select(
            col("feature.properties.*"),
            col("feature.geometry.coordinates").alias("coordinates"),
            col("feature.geometry.type").alias("geometry_type")
        )
        
        edges_df = edges_df.select(
            col("feature.properties.*"),
            col("feature.geometry.coordinates").alias("coordinates"),
            col("feature.geometry.type").alias("geometry_type")
        )
    elif "properties" in nodes_raw.columns:
        # Each row is a GeoJSON feature
        print("Processing as individual GeoJSON features...")
        
        # Extract properties and geometry directly
        nodes_df = nodes_raw.select(
            col("properties.*"),
            col("geometry.coordinates").alias("coordinates"),
            col("geometry.type").alias("geometry_type")
        )
        
        edges_df = edges_raw.select(
            col("properties.*"),
            col("geometry.coordinates").alias("coordinates"),
            col("geometry.type").alias("geometry_type")
        )
    else:
        # Not following standard GeoJSON structure
        print("Non-standard GeoJSON structure, keeping as-is...")
        nodes_df = nodes_raw
        edges_df = edges_raw
    
    # Add city identifier if provided
    if city_name:
        nodes_df = nodes_df.withColumn("city", lit(city_name))
        edges_df = edges_df.withColumn("city", lit(city_name))
    
    # Show final schema
    print("Final nodes schema:")
    # nodes_df.printSchema()
    
    return nodes_df, edges_df

def load_multiple_cities_flexible(spark, city_paths):
    """
    Load data for multiple cities with flexible structure handling.
    
    Args:
        spark: Spark session
        city_paths: Dictionary mapping city names to tuples of (nodes_path, edges_path)
        
    Returns:
        Tuple of (all_nodes_df, all_edges_df)
    """
    all_nodes_dfs = []
    all_edges_dfs = []
    
    for city, (nodes_path, edges_path) in city_paths.items():
        print(f"Loading data for city: {city}")
        nodes_df, edges_df = load_geojson_files_flexible(spark, nodes_path, edges_path, city)
        all_nodes_dfs.append(nodes_df)
        all_edges_dfs.append(edges_df)
    
    # Union all city dataframes if we have more than one
    if len(all_nodes_dfs) > 1:
        # Try unionByName with allowMissingColumns=True for Spark 3.1+
        try:
            all_nodes_df = all_nodes_dfs[0]
            all_edges_df = all_edges_dfs[0]
            
            for i in range(1, len(all_nodes_dfs)):
                all_nodes_df = all_nodes_df.unionByName(all_nodes_dfs[i], allowMissingColumns=True)
                all_edges_df = all_edges_df.unionByName(all_edges_dfs[i], allowMissingColumns=True)
        except:
            # Fallback for older Spark versions - find common columns
            print("Using fallback union method for older Spark versions")
            node_columns = set(all_nodes_dfs[0].columns)
            edge_columns = set(all_edges_dfs[0].columns)
            
            for df in all_nodes_dfs[1:]:
                node_columns = node_columns.intersection(set(df.columns))
            
            for df in all_edges_dfs[1:]:
                edge_columns = edge_columns.intersection(set(df.columns))
            
            # Select only common columns and union
            all_nodes_df = all_nodes_dfs[0].select(*node_columns)
            all_edges_df = all_edges_dfs[0].select(*edge_columns)
            
            for i in range(1, len(all_nodes_dfs)):
                all_nodes_df = all_nodes_df.union(all_nodes_dfs[i].select(*node_columns))
                all_edges_df = all_edges_df.union(all_edges_dfs[i].select(*edge_columns))
    else:
        all_nodes_df = all_nodes_dfs[0]
        all_edges_df = all_edges_dfs[0]
    
    return all_nodes_df, all_edges_df

# Example usage
if __name__ == "__main__":
    spark = SparkSession.builder \
        .appName("GeoJSON data loader") \
        .config("spark.driver.memory", "10g") \
        .config("spark.executor.memory", "10g") \
        .config("spark.memory.offHeap.enabled", "true") \
        .config("spark.memory.offHeap.size", "2g") \
        .config("spark.sql.shuffle.partitions", "200") \
        .config("spark.default.parallelism", "100") \
        .config("spark.memory.fraction", "0.8") \
        .config("spark.memory.storageFraction", "0.3") \
        .config("spark.executor.cores", "4") \
        .config("spark.driver.maxResultSize", "4g") \
        .getOrCreate()
    
    # Example: Inspect a single file first to understand structure
    sample_file = "data/Adelaide_nodes_100m.geojson"
    inspect_geojson_structure(spark, sample_file)
    print('-------')
    sample_file = "data/Adelaide_edges_100m.geojson"
    inspect_geojson_structure(spark, sample_file)
    print('-------')
    
    # Dictionary mapping city names to file paths
    city_paths = {
        # "new_york": ("data/new_york_nodes.geojson", "data/new_york_edges.geojson"),
        # "san_francisco": ("data/san_francisco_nodes.geojson", "data/san_francisco_edges.geojson")
    }
    for i in os.listdir("data/"):
        if "nodes" in i.lower():
            city_paths[i.split("_")[0]] = ["data/"+i, None]
    for i in os.listdir("data/"):
        if "edges" in i.lower():
            city_paths[i.split("_")[0]][1] = "data/"+i
    
    # Load all cities with the flexible loader
    nodes_df, edges_df = load_multiple_cities_flexible(spark, city_paths)
    
    # Cache dataframes for faster processing
    nodes_df.cache()
    edges_df.cache()

    nodes_df.write \
        .format("parquet") \
        .partitionBy("city") \
        .mode("overwrite") \
        .save("./street_networks/nodes")
    
    edges_df.write \
        .format("parquet") \
        .partitionBy("city") \
        .mode("overwrite") \
        .save("./street_networks/edges")
    
    # Alternatively, repartition before saving
    nodes_df.repartition(10).write.format("parquet").mode("overwrite").save("./street_networks/nodes")
    edges_df.repartition(10).write.format("parquet").mode("overwrite").save("./street_networks/edges")
    os.makedirs("street_networks/nodes/", exist_ok=True)
    # If DataFrames are very large, save in smaller chunks
    city_list = [row.city for row in nodes_df.select("city").distinct().collect()]
    for city in city_list:
        city_nodes = nodes_df.filter(f"city = '{city}'")
        city_edges = edges_df.filter(f"city = '{city}'")
        
        city_nodes.write.format("parquet").mode("overwrite").save(f"./street_networks/nodes/{city}")
        city_edges.write.format("parquet").mode("overwrite").save(f"./street_networks/edges/{city}")
    # # Save for future use
    # nodes_df.write.format("parquet").mode("overwrite").save("street_networks/nodes")
    # edges_df.write.format("parquet").mode("overwrite").save("street_networks/edges")
    
    # # Show schema and sample data
    # print("Nodes schema:")
    # nodes_df.printSchema()
    
    # print("Edges schema:")
    # edges_df.printSchema()
    
    # print("Sample nodes data:")
    # nodes_df.show(5, truncate=False)
    
    # print("Sample edges data:")
    # edges_df.show(5, truncate=False)

25/03/14 17:00:24 WARN DAGScheduler: Broadcasting large task binary with size 1667.5 KiB
25/03/14 17:01:11 WARN DAGScheduler: Broadcasting large task binary with size 1667.5 KiB
25/03/14 17:01:56 WARN DAGScheduler: Broadcasting large task binary with size 1667.5 KiB
25/03/14 17:02:40 WARN DAGScheduler: Broadcasting large task binary with size 1667.5 KiB
25/03/14 17:03:25 WARN DAGScheduler: Broadcasting large task binary with size 1667.5 KiB
25/03/14 17:04:09 WARN DAGScheduler: Broadcasting large task binary with size 1667.5 KiB
25/03/14 17:04:53 WARN DAGScheduler: Broadcasting large task binary with size 1667.5 KiB
25/03/14 17:05:38 WARN DAGScheduler: Broadcasting large task binary with size 1667.5 KiB
25/03/14 17:06:22 WARN DAGScheduler: Broadcasting large task binary with size 1667.5 KiB
25/03/14 17:07:07 WARN DAGScheduler: Broadcasting large task binary with size 1667.5 KiB
25/03/14 17:07:54 WARN DAGScheduler: Broadcasting large task binary with size 1667.5 KiB
25/03/14 17:08:39 WAR

In [4]:
nodes_df.write.format("parquet").mode("overwrite").save("street_networks_3/nodes")
edges_df.write.format("parquet").mode("overwrite").save("street_networks_3/edges")

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/ubuntu/.pyenv/versions/3.10.13/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/.pyenv/versions/3.10.13/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/ubuntu/.pyenv/versions/3.10.13/lib/python3.10/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/ubuntu/.pyenv/versions/3.10.13/lib/python3.10/site-packages/py4j/clientserver.py", line 

Py4JError: An error occurred while calling o1579.write

In [5]:
# print("Nodes schema:")
# nodes_df.printSchema()

# print("Edges schema:")
# edges_df.printSchema()

print("Sample nodes data:")
nodes_df.show(5, truncate=False)

print("Sample edges data:")
edges_df.show(5, truncate=False)

Sample nodes data:


ConnectionRefusedError: [Errno 111] Connection refused