In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, udf, when
from shapely.wkt import loads as load_wkt
import osmnx as ox
import pandas as pd
import geopandas as gpd
import joblib
from sedona.register import SedonaRegistrator
from sedona.core.SpatialRDD import PolygonRDD, PointRDD
from sedona.utils.adapter import Adapter
from sedona.core.SedonaContext import SedonaContext
from sedona.core.enums import GridType
import networkx as nx

In [None]:
spark = SparkSession.builder \
    .appName("Charging Stations in Italy") \
    .config("spark.jars.packages", "org.apache.sedona:sedona-python-adapter-3.4_2.12:1.3.1-incubating,org.datasyslab:geotools-wrapper:1.1.0-25.2") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryo.registrator", "org.apache.sedona.viz.core.SedonaVizKryoRegistrator") \
    .getOrCreate()

In [None]:
sedona = SedonaContext.create(spark)

In [None]:
spark.sparkContext.getConf().getAll()

In [ ]:
# Read the CSV file using Spark
csv_path = r'C:\Users\devea\Desktop\dataset\itdata\ita_general_2020\ita_general_2020.csv'
df = spark.read.csv(csv_path, header=True, inferSchema=True)

In [ ]:
# Fetch charging stations across Italy and save to GeoJSON
charging_stations = ox.features_from_place('Italy', tags={'amenity': 'charging_station'})
charging_stations.to_file(" italy_charging_stations.geojson", driver='GeoJSON')

In [ ]:
from sedona.core.SpatialRDD import PolygonRDD, PointRDD
from sedona.utils.adapter import Adapter

In [ ]:
file_path = 'italy_charging_stations.geojson'
charging_stations_gdf = gpd.read_file(file_path)

duplicate_columns = charging_stations_gdf.columns[charging_stations_gdf.columns.duplicated()].tolist()
print("Duplicate Columns:", duplicate_columns)

In [ ]:
charging_stations_gdf = charging_stations_gdf.loc[:, ~charging_stations_gdf.columns.duplicated()]

In [ ]:
columns_to_drop = ['survey_date', 'socket:type3a', 'socket_type3a_output']
    
columns_to_drop = [col for col in columns_to_drop if col in charging_stations_gdf.columns]
charging_stations_gdf = charging_stations_gdf.drop(columns=columns_to_drop)

In [ ]:
charging_stations_gdf.columns = [col.replace(':', '_') for col in charging_stations_gdf.columns]

cleaned_file_path = 'italy_charging_stations_cleaned.geojson'
charging_stations_gdf.to_file(cleaned_file_path, driver='GeoJSON')

In [None]:
file_path = 'italy_charging_stations_cleaned.geojson'
gdf = gpd.read_file(file_path)

# Print the CRS of the GeoDataFrame
print("CRS of the GeoDataFrame:", gdf.crs)

In [ ]:
cs_df = spark.read.format("geojson").load("italy_charging_stations_cleaned.geojson")

In [ ]:
source_crs = "EPSG:4326"  
target_crs = "EPSG:4326"
css_df = Adapter.toSpatialRdd(cs_df, "geometry")
css_df.CRSTransform(source_crs, target_crs)

In [None]:
from sedona.register import SedonaRegistrator
from sedona.utils.adapter import Adapter
from sedona.core.SpatialRDD import PolygonRDD
from shapely.wkt import loads as load_wkt

In [None]:
# Load the grid CSV file into a Spark DataFrame
grid_file_path = "polygon_data.csv"
grid_df = spark.read.option("header", "true").csv(grid_file_path)

In [None]:
polygon_wkt_rdd = grid_df.filter(grid_df.polygon.isNotNull()).select("polygon").rdd.map(lambda row: row['polygon'])

In [None]:
try:
    # Create PolygonRDD from the RDD of WKT strings
    polygon_rdd = PolygonRDD(
        polygon_wkt_rdd,
        "epsg:4326",  # Coordinate Reference System
        True          # Carry spatial metadata
    )
except Exception as e:
    print(f"An error occurred while creating PolygonRDD: {e}")

In [ ]:
polygon_rdd.CRSTransform(source_crs, target_crs)

In [ ]:
# Perform spatial partitioning (to improve join performance)
polygon_rdd.spatialPartitioning(polygon_rdd.getPartitioner())
css_df.spatialPartitioning(polygon_rdd.getPartitioner())

In [ ]:
# Perform the spatial join between the grid polygons and the charging stations
joined_rdd = polygon_rdd.spatialPartitionedJoin(css_df)

In [ ]:
joined_df = Adapter.toDf(joined_rdd, spark)
joined_df.show()

### Population

In [ ]:
population_file_path = 'ita_general_2020.csv'
population_df = spark.read.option("header", "true").csv(population_file_path)

In [ ]:
population_df = population_df.withColumn("longitude", population_df["longitude"].cast("float"))
population_df = population_df.withColumn("latitude", population_df["latitude"].cast("float"))

In [ ]:
# using udf to check shapely
@udf("boolean")
def point_in_polygon(longitude, latitude, polygon_wkt):
    if longitude is None or latitude is None or polygon_wkt is None:
        return False
    try:
        point = Point(longitude, latitude)
        polygon = load_wkt(polygon_wkt)
        return polygon.contains(point)
    except:
        return False

In [ ]:
joined_df = population_df.crossJoin(grid_df) \
    .filter(point_in_polygon(col("longitude"), col("latitude"), col("polygon")))

In [ ]:
population_density_df = joined_df.groupBy("grid_id").agg({"ita_general_2020": "mean"}).withColumnRenamed("avg(ita_general_2020)", "population_density")
population_density_df = population_density_df.fillna({"population_density": 0})

In [ ]:
result_df = grid_df.join(population_density_df, on="grid_id", how="left").fillna({"population_density": 0})
result_df.show()

In [ ]:
features = {
    'amenity': ['school', 'university', 'restaurant', 'place_of_worship', 
                'community_centre', 'townhall', 'parking', 'library'],
    'leisure': ['park', 'cinema'],
    'building': ['commercial', 'government', 'civic', 'retail']
}

In [ ]:
# Extract POIs
all_pois = []

for feature_type, values in features.items():
    for value in values:
        tags = {feature_type: value}
        try:
            pois = ox.geometries_from_place('Italy', tags=tags)
            pois = pois[['geometry']]
            pois['poi_type'] = value
            all_pois.append(pois)
        except Exception as e:
            print(f"Error extracting {value}: {e}")

# Combine POIs into a single GeoDataFrame
pois_gdf = pd.concat(all_pois)

### Landuse

In [ ]:
categories = {
    'residential': ['residential'],
    'commercial': ['commercial', 'retail', 'office', 'industrial'],
    'retail': ['retail'],
    'industrial': ['industrial'],
    'other': ['agricultural', 'forest', 'conservation', 'recreation']
}

In [ ]:
def categorize_landuse(polygon_wkt):
    if polygon_wkt is None:
        return 'Other'
    try:
        polygon = load_wkt(polygon_wkt)
        landuse_gdf = ox.geometries_from_polygon(polygon, tags={'landuse': True})
        if landuse_gdf.empty:
            return 'Other'
        counts = landuse_gdf['landuse'].value_counts()
        for cat, types in categories.items():
            if any(landuse in counts for landuse in types):
                return cat
    except Exception as e:
        print(f"Error categorizing land use: {str(e)}")
    return 'Other'

categorize_landuse_udf = udf(categorize_landuse, "string")
grid_df = grid_df.withColumn("landuse", categorize_landuse_udf(col("polygon")))

In [ ]:
def aggregate_road_data(polygon_wkt):
    if polygon_wkt is None:
        return None, None, None, None
    try:
        polygon = load_wkt(polygon_wkt)
        tags = {'highway': True}
        roads = ox.geometries_from_polygon(polygon, tags)
        if roads.empty:
            return 'No roads', None, None, None
        oneway = roads['oneway'].mode().iloc[0] if 'oneway' in roads else None
        lanes = roads['lanes'].mode().iloc[0] if 'lanes' in roads else None
        highway = roads['highway'].mode().iloc[0] if 'highway' in roads else None
        if 'maxspeed' in roads:
            roads['maxspeed'] = pd.to_numeric(roads['maxspeed'].str.replace(' km/h', '', regex=False), errors='coerce')
            maxspeed = roads['maxspeed'].mean()
        else:
            maxspeed = None
        return oneway, lanes, highway, maxspeed
    except Exception as e:
        print(f"Error aggregating road data: {str(e)}")
        return None, None, None, None

In [ ]:
aggregate_road_data_udf = udf(aggregate_road_data, "struct<oneway:string, lanes:string, highway:string, maxspeed:double>")
grid_df = grid_df.withColumn("road_data", aggregate_road_data_udf(col("polygon")))
grid_df = grid_df.withColumn("oneway_exists", col("road_data.oneway")) \
                 .withColumn("average_lanes", col("road_data.lanes")) \
                 .withColumn("high_way", col("road_data.highway")) \
                 .withColumn("average_maxspeed", col("road_data.maxspeed"))

In [3]:
#  Network Data 
def extract_network_data(polygon_wkt):
    if polygon_wkt is None:
        return 0, 0, 0
    try:
        polygon = load_wkt(polygon_wkt)
        G = ox.graph_from_polygon(polygon, network_type='drive', simplify=True)
        nodes, edges = ox.graph_to_gdfs(G)
        return len(nodes), len(edges), nx.density(G)
    except Exception as e:
        print(f"Error extracting network data: {str(e)}")
        return 0, 0, 0

In [ ]:
extract_network_data_udf = udf(extract_network_data, "struct<nodes:int, edges:int, density:double>")
grid_df = grid_df.withColumn("network_data", extract_network_data_udf(col("polygon")))
grid_df = grid_df.withColumn("nodes", col("network_data.nodes")) \
                 .withColumn("edges", col("network_data.edges")) \
                 .withColumn("density", col("network_data.density"))

In [4]:
# Missing value
grid_df = grid_df.fillna({
    'landuse': 'Other',
    'oneway_exists': 'no',
    'average_lanes': grid_df.agg({"average_lanes": "mode"}).collect()[0][0] or 2.0,
    'high_way': 'No data',
    'average_maxspeed': grid_df.agg({"average_maxspeed": "median"}).collect()[0][0] or 50.0,
    'nodes': 0,
    'edges': 0,
    'density': 0.0
})

In [ ]:
grid_df = grid_df.drop("cinema_count")

# Save the Preprocessed DataFrame
final_data_path = 'final_data.csv'
grid_df.write.csv(final_data_path, header=True)

# Step 9: Modeling Task
# Load preprocessed data and trained model to make predictions
input_data = pd.read_csv('final_data_with_only_yes_predictions.csv')
if len(input_data.shape) == 1:
    input_data = input_data.values.reshape(1, -1)

# Load the model pipeline
loaded_pipeline = joblib.load('svm_model.pkl')

# Make predictions
predictions = loaded_pipeline.predict(input_data)
print("Predictions:", predictions)

# Step 10: Save Predictions
input_data['output'] = predictions
output_file = 'final_svm.csv'
input_data.to_csv(output_file, index=False)