In [None]:
import io
import os
import pickle

import datautils
import geopy.distance as gd
import maputils
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tqdm
from convertbng.util import convert_bng, convert_lonlat
from geopy.distance import distance
from PIL import Image
from scipy.spatial.distance import cdist, pdist, squareform
from skimage.draw import polygon

# create folder structures and holds constants
data_path = datautils.DataPaths("../data")
# import osgeo

In [157]:
# df_schulen_steiermark = pd.read_csv('idea/data/Bildungsstandorte_Steiermark2.csv', sep=',')
df_schulen_steiermark = pd.read_csv(
    f"{data_path.statistik_austria}/schools_styria.csv", sep=","
)

In [None]:
df_schulen_steiermark.head()

In [None]:
df_schulen_steiermark.columns

In [None]:
print(min(df_schulen_steiermark["Longitude"]), max(df_schulen_steiermark["Longitude"]))
print(min(df_schulen_steiermark["Latitude"]), max(df_schulen_steiermark["Latitude"]))

In [None]:
x = df_schulen_steiermark["Longitude"]
y = df_schulen_steiermark["Latitude"]

plt.scatter(x, y)
# df_schulen_steiermark.loc[df_schulen_steiermark['TYP_LANG']=='Volksschule']['X']

In [None]:
# some example code how to calculate distances of lat/long coordinates in km (freedom units are possible as well ;))

print(min(x), max(x))
print(min(y), max(y))
print(x.mean())
print(y.mean())
width = gd.distance((y.mean(), min(x)), (y.mean(), max(x))).km
height = gd.distance((min(y), x.mean()), (max(y), x.mean())).km

print("diagonal km", gd.distance((min(x), min(y)), (max(x), max(y))).km)
print("horizonal km", width)
print("vertical km", height)

In [163]:
from typing import Callable


class Rastarize:
    """
    distance values in km
    coordinates inputs in latitude/longitude decimal pairs
    cell size gives the size of one raster cell in km
    e.g 0.1 -> 1km becomes rastarized into a 10x10 grid
    """

    def __init__(
        self,
        x_min: float,
        x_max: float,
        y_min: float,
        y_max: float,
        padding_factor: float = 0.05,
        cell_size: float = 0.1,
        cache_path="./cache",
    ):
        self.x_padding = (x_max - x_min) * padding_factor
        self.y_padding = (y_max - y_min) * padding_factor
        self.x_min = x_min - self.x_padding
        self.x_max = x_max + self.x_padding
        self.y_min = y_min - self.y_padding
        self.y_max = y_max + self.y_padding
        self.padding_factor = padding_factor
        self.cell_size = cell_size
        self.cache_path = cache_path

        self.bounds = [[self.y_min, self.x_min], [self.y_max, self.x_max]]
        self.data = {}
        self.kernel = {}
        self.max_distance = {}
        self.aggregation = {}
        self.upper_limit = {}
        self.lower_limit = {}
        self.positive = {}

        x_center = (self.x_min + self.x_max) / 2
        y_center = (self.y_min + self.y_max) / 2

        self.width = gd.distance((y_center, self.x_min), (y_center, self.x_max)).km
        self.height = gd.distance((self.y_min, x_center), (self.y_max, x_center)).km

        print("center coords: %.2f %.2f" % (x_center, y_center))
        print("width: %.2f" % self.width)
        print("height: %.2f" % self.height)

        data_size = int(self.height / cell_size), int(self.width / cell_size)
        self.tmp = np.zeros(data_size)
        print(self.tmp.shape)

    def add_dataset(
        self,
        name: str,
        max_distance: float,
        distribution_fn: Callable,
        agg_fn: str,
        upper_limit: float,
        lower_limit: float = 0,
        positive: bool = True,
    ) -> None:
        self.data[name] = np.zeros(self.tmp.shape)

        self.max_distance[name] = max_distance
        self.aggregation[name] = agg_fn
        self.upper_limit[name] = upper_limit
        self.lower_limit[name] = lower_limit
        self.positive[name] = positive

        if distribution_fn == None:
            size = 1
        else:
            size = max_distance * 2 // self.cell_size
            if size % 2 == 0:
                size += 1

        size = int(size)

        kernel = np.zeros((size, size))

        print("kernel_size", size)

        if distribution_fn == None:
            kernel[:] = 1
        else:
            center = size // 2
            for x in range(size):
                for y in range(size):
                    distance = np.linalg.norm(np.asarray([x - center, y - center]))
                    kernel[x, y] = max(
                        0, distribution_fn(abs(distance) * self.cell_size)
                    )

        self.kernel[name] = kernel

    def add_coordinates(
        self,
        name: str,
        coords: np.ndarray,
        coords_in_webmercator: bool = False,
        coords_with_count: bool = False,
    ) -> None:
        data = self.data[name]
        data_shape = data.shape
        kernel = self.kernel[name]
        agg_fn = self.aggregation[name]

        width = int(self.x_max - self.x_min)
        height = int(self.y_max - self.y_min)
        # print(width, height)

        fks = kernel.shape[0]
        hks = fks // 2

        if coords_with_count:
            coords = coords
        else:
            # set count to 1, in case it is not provided
            coords = np.insert(coords[:, [0, 1]], 2, 1, axis=1)

        for x, y, count_ in tqdm.tqdm(coords):
            # mercator projection of lon/lat coords
            if coords_in_webmercator == False:
                mx, my = maputils.wgs84_to_web_mercator(
                    (
                        x,
                        y,
                    ),
                    self.bounds,
                    data_shape[1],
                    data_shape[0],
                )
                mx, my = round(mx), round(my)
            else:
                mx, my = x, y

            if kernel.shape[0] == 1:
                data[my, mx] += kernel[0, 0] * count_
            else:
                data_y_left = my - hks
                data_x_left = mx - hks

                self.tmp[:] = 0
                self.tmp[
                    data_y_left : data_y_left + fks, data_x_left : data_x_left + fks
                ] = (
                    kernel * count_
                )  # scale the kernel with the count

                if agg_fn == "add" or agg_fn == "log":
                    data = np.add(self.tmp, data)
                else:
                    data = np.maximum(self.tmp, data)

        self.data[name] = data

    def finish(self, name) -> None:
        data = self.data[name]
        upper_limit = self.upper_limit[name]
        lower_limit = self.lower_limit[name]
        agg_fn = self.aggregation[name]

        data[data > upper_limit] = upper_limit
        data[data < lower_limit] = 0
        if agg_fn == "log":
            data = np.log(data + 1)

        # a little bit of a hack, in case the distribution function was None and
        # the kernel size is 1 and for that cases no normalization should be done
        if self.kernel[name].shape[0] > 1:
            data = data / data.max()
        self.data[name] = data

    def store_data(self, name) -> None:
        path = f"{self.cache_path}/saved"
        if os.path.exists(path) == False:
            os.mkdir(path)

        target_file = os.path.join(path, name)
        pickle.dump(self.data[name], open(target_file, "wb"))

    def load_data(self, name) -> bool:
        path = f"{self.cache_path}/saved"

        target_file = os.path.join(path, name)

        if os.path.exists(target_file) == False:
            return False

        self.data[name] = pickle.load(open(target_file, "rb"))
        print("loaded data from cache '%s'" % target_file)
        return True

    # def add_mask(self, coordinates: np.array):

    def load_mask_cache(self) -> bool:
        mask_path_name = f"{self.cache_path}/mask.npy"
        if os.path.exists(mask_path_name):
            self.mask = np.load(mask_path_name)
            print("loaded mask from cache")
            return True
        else:
            return False

    def add_mask(
        self, coordinates: np.ndarray, coordinates_lonlat: bool = True
    ) -> None:

        self.coordinates = coordinates
        self.mask = np.zeros(self.tmp.shape)
        # Extract x and y coordinates
        x_coords = coordinates[:, 0]
        y_coords = coordinates[:, 1]

        # Convert coordinates to indices in the mask array
        if coordinates_lonlat:
            x_indices = (
                (x_coords - self.x_min) / (self.x_max - self.x_min) * self.mask.shape[1]
            ).astype(int)
            y_indices = (
                self.mask.shape[0]
                - (y_coords - self.y_min)
                / (self.y_max - self.y_min)
                * self.mask.shape[0]
            ).astype(int)
        else:
            x_indices = x_coords
            y_indices = y_coords

        # Create the filled polygon in the mask
        rr, cc = polygon(y_indices, x_indices, self.mask.shape)
        self.mask[rr, cc] = 1
        mask_path_name = f"{self.cache_path}/mask.npy"
        np.save(mask_path_name, self.mask)

In [164]:
from typing import Callable, Union
from enum import Enum

# visualization/image conversion functions


def show_rasterimg(img: np.ndarray):
    plt.figure(num=None, figsize=(20, 20), dpi=80, facecolor="w", edgecolor="k")
    plt.imshow(img, cmap="RdYlGn")
    # plt.imshow(img, cmap='gist_gray')


class MapColoring(Enum):
    POSITIVE = "positive"
    NEGATIVE = "negative"
    AMENITY = "amenity"


def image_with_mask(
    image_in: np.ndarray,
    mask: np.ndarray,
    coloring: MapColoring = MapColoring.POSITIVE,
    color_=None,
    use_binary_mask=False,
) -> Image.Image:
    # Create a greyscale image for using it as alpha channel
    img_mask = create_plot_img(image_in)

    binary_img_mask = img_mask < 245

    # Set the alpha channel from rastarize.mask
    image_array_uint8 = np.zeros(
        shape=(image_in.shape[0], image_in.shape[1], 4), dtype=np.uint8
    )  # img_mask.astype(np.uint8)


    if use_binary_mask:
        combined_mask = np.min([binary_img_mask[:, :, 0], mask], axis=0)
    else:
        combined_mask = np.min([1 - (img_mask[:, :, 0] / 255), mask], axis=0)

    # combined_mask = mask
    if color_ is not None:
        color = color_
    elif coloring == MapColoring.POSITIVE:
        color = create_plot_img(image_in, "Greens")
    elif coloring == MapColoring.NEGATIVE:
        color = create_plot_img(image_in, "Reds")  # [255, 0, 0]
    elif coloring == MapColoring.AMENITY:
        color = create_plot_img(image_in, "autumn")
        # color = create_plot_img(image_in, "afmhot")
        # color = create_plot_img(image_in, "cividis") # ok but outer regions look greyish

        print("amenity color is used")

    image_array_uint8[:, :, 3] = combined_mask * 255.0
    print("shape len", len(color))
    if len(color) == 3:
        image_array_uint8[:, :, :3] = color
    else:
        image_array_uint8[:, :, :2] = color[:, :, :2] 

    # Convert the modified array back to an image
    image_with_mask = Image.fromarray(image_array_uint8)
    return image_with_mask


def create_plot_img(image_in: np.ndarray, cmap: str = "Greys") -> np.ndarray:

    buffer = io.BytesIO()

    # Save the image to the buffer
    plt.imsave(buffer, image_in, cmap=cmap, format="png")

    # Seek to the beginning of the buffer so it can be read later
    buffer.seek(0)
    # Read the image from the buffer
    image = Image.open(buffer)

    # Convert the image to a numpy array
    image_array = np.array(image)
    return image_array


In [None]:
x = df_schulen_steiermark["Longitude"]
y = df_schulen_steiermark["Latitude"]
rastarize = Rastarize(
    min(x),
    max(x),
    min(y),
    max(y),
    cell_size=0.05,  # 50m cell size -> data is aggregated into 50m x 50m cells
    padding_factor=0.2,  # 20% padding of given bounds
    cache_path=data_path.cache,
)
print(rastarize.bounds)

In [None]:
def sort_coordinates_by_distance_matrix(coords, distance_matrix):
    sorted_coords = [coords[0]]
    remaining_indices = list(range(1, len(coords)))

    while remaining_indices:
        last_index = len(sorted_coords) - 1
        if len(sorted_coords) % 1000 == 0:
            print(f"Remaining indices: {len(remaining_indices)}")
        distances = distance_matrix[last_index, remaining_indices]
        min_index = remaining_indices.pop(distances.argmin())
        sorted_coords.append(coords[min_index])

    return np.array(sorted_coords)


def sort_coordinates_by_distance(coords):
    sorted_coords = [coords[0]]
    remaining_coords = coords[1:].tolist()

    while remaining_coords:
        last_coord = sorted_coords[-1]
        distances = [
            distance((last_coord[1], last_coord[0]), (coord[1], coord[0])).km
            for coord in remaining_coords
        ]
        min_index = distances.index(min(distances))
        sorted_coords.append(remaining_coords.pop(min_index))

    return np.array(sorted_coords)


def sort_coordinates_by_distance_no_unit(coords):
    if not isinstance(coords, np.ndarray):
        coords = np.array(coords)

    sorted_coords = [coords[0]]
    remaining_coords = list(coords[1:])

    while remaining_coords:
        last_coord = sorted_coords[-1]
        distances = [np.linalg.norm(last_coord - coord) for coord in remaining_coords]
        # distance =
        min_distance = min(distances)
        min_index = distances.index(min(distances))
        row_min_distance = remaining_coords.pop(min_index)
        if min_distance > 20:
            sorted_coords.append(row_min_distance)
        else:
            pass
            # do nothing, just pop and skip, some valeus at the end are too far with this sorting method

    return np.array(sorted_coords)


def simplify_border(border, threshold=0.5):
    simplified_border = [border[0]]
    for i in range(1, len(border)):
        if distance(simplified_border[-1], border[i]).km >= threshold:
            simplified_border.append(border[i])
        else:
            simplified_border[-1] = [
                (simplified_border[-1][0] + border[i][0]) / 2,
                (simplified_border[-1][1] + border[i][1]) / 2,
            ]
    return np.array(simplified_border)


def convert_to_webmercator(coords, bounds, width, height):

    webmercator_coords = np.zeros(shape=coords.shape, dtype=int)
    for i, (lon, lat) in tqdm.tqdm(enumerate(coords)):
        mx, my = maputils.wgs84_to_web_mercator((lon, lat), bounds, width, height)
        mx, my = round(mx), round(my)
        webmercator_coords[i] = [mx, my]
        # webmercator_coords.append((mx, my))
    return webmercator_coords


is_mask_loaded = rastarize.load_mask_cache()

if not is_mask_loaded:

    admin_border = pd.read_csv(f"{data_path.osm}/admin_border_styria.csv", index_col=0)

    admin_border_webmercator = convert_to_webmercator(
        admin_border[["lon", "lat"]].to_numpy(),
        rastarize.bounds,
        rastarize.tmp.shape[1],
        rastarize.tmp.shape[0],
    )
    # remove duplicates
    admin_border_webmercator = np.unique(admin_border_webmercator, axis=0)

    # Calculate the pairwise Euclidean distances
    distances = pdist(admin_border_webmercator, metric="euclidean")
    distance_matrix = squareform(distances)

    # Create a mask to identify points with distances less than 2
    mask_local = (distance_matrix < 4) & (distance_matrix > 0)

    # Merge points that have a distance < 2 by averaging their coordinates
    for i in range(len(admin_border_webmercator)):
        close_points = np.where(mask_local[i])[0]
        if len(close_points) > 0:
            admin_border_webmercator[i] = admin_border_webmercator[close_points].mean(
                axis=0
            )
            admin_border_webmercator[close_points] = admin_border_webmercator[i]

    # Remove duplicate points after merging
    filtered_admin_border_webmercator = np.unique(admin_border_webmercator, axis=0)

    sorted_admin_border_webmercator = sort_coordinates_by_distance_no_unit(
        filtered_admin_border_webmercator
    )
    np.savetxt(
        f"{data_path.cache}/mask_border.csv",
        sorted_admin_border_webmercator,
        delimiter=",",
    )

    rastarize.add_mask(sorted_admin_border_webmercator, coordinates_lonlat=False)

In [167]:
def add_store_dataset(
    name: str,
    max_distance: float,
    distribution_fn: Callable,
    agg_fn: str,
    upper_limit:int,
    coloring: MapColoring,
    data: np.ndarray,
    lower_limit:int = 0,
    use_cached=True,
    visualize=True,
    coords_in_webmercator: bool = False,
    coords_with_count: bool = False,  # in case the coordinates are given with a count column
):

    rastarize.add_dataset(
        name,
        max_distance,
        distribution_fn,
        agg_fn,
        upper_limit,
        lower_limit=lower_limit,
        positive= (coloring == MapColoring.POSITIVE),
    )

    loaded = False
    if use_cached:
        loaded = rastarize.load_data(name)

    if loaded == False:
        rastarize.add_coordinates(name, data, coords_in_webmercator, coords_with_count)
        rastarize.finish(name)
        rastarize.store_data(name)

    img_masked = image_with_mask(
        rastarize.data[name], rastarize.mask, coloring=coloring
    )
    img_masked.save(f"{data_path.maps}/{name}_masked.png")
    #img_masked.save(f"{data_path.maps}/{name}_masked.jpg") # supports no alpha channel
    img_masked.save(f"{data_path.maps}/{name}_masked.webp", "WEBP", lossless=False)

    if visualize:
        # show_rasterimg(rastarize.data[name])
        display(img_masked)
    return False

In [None]:
# Volksschulen

df_elementary = pd.read_csv(
    f"{data_path.statistik_austria}/elementary_schools_styria.csv", sep=","
)


def elementary(x):
    """
    Calculate the relative accessibility of elementary schools based on distance.
    This function determines the accessibility score of a elementary school based on the distance `x` in kilometers.
    If the distance is less than 4 km, it is assumed that the distance can be covered by walking or biking,
    and the score is calculated accordingly. For distances greater than or equal to 4 km, the score is adjusted
    to reflect the reduced accessibility.
    Parameters:
    x (float): The distance to the elementary school in kilometers.
    Returns:
    float: The accessibility score, where a higher score indicates better accessibility.
    """
    max_distance = 10
    if x <= 4:
        return (max_distance - x) / max_distance
    else:
        return (max_distance - x) / (max_distance * 2)


tmp_data = df_elementary[["Longitude", "Latitude", "Name"]].to_numpy()
print(tmp_data[0])
add_store_dataset(
    "school_elementary_styria",
    10,
    elementary,
    "log",
    5,
    MapColoring.AMENITY,
    tmp_data,
    use_cached=True,
)

In [None]:
# secondary/middle schools
df_secondary = pd.read_csv(
    f"{data_path.statistik_austria}/secondary_schools_styria.csv", sep=","
)


def secondary(x):
    """
    Calculate the relative accessibility of a secondary school based on distance.
    This function determines the accessibility score of a secondary school based on the distance `x` in kilometers.
    If the distance is less than 4 km, it is assumed that the distance can be covered by walking or biking,
    and the score is calculated accordingly. For distances greater than or equal to 4 km, the score is adjusted
    to reflect the reduced accessibility.
    Parameters:
    x (float): The distance to the secondary school station in kilometers.
    Returns:
    float: The accessibility score, where a higher score indicates better accessibility.
    """
    max_distance = 15
    if x <= 4:
        return (max_distance - x) / max_distance
    else:
        return (max_distance - x) / (max_distance * 2)


schooltype = "AHS/Mittelschule/Berufsbildene Schulen"
tmp_data = df_secondary[["Longitude", "Latitude", "Name"]].to_numpy()
add_store_dataset(
    "school_secondary_styria",
    15,
    secondary,
    "log",
    5,
    MapColoring.AMENITY,
    tmp_data,
    use_cached=True,
)

In [None]:
# secondary/middle schools
df_secondary = pd.read_csv(
    f"{data_path.statistik_austria}/secondary_schools_styria.csv", sep=","
)


def ahs(x):
    """
    Calculate the relative accessibility of AHS schoolsbased on distance.
    This function determines the accessibility score of AHS schoolsbased on the distance `x` in kilometers.
    If the distance is less than 4 km, it is assumed that the distance can be covered by walking or biking,
    and the score is calculated accordingly. For distances greater than or equal to 4 km, the score is adjusted
    to reflect the reduced accessibility.
    Parameters:
    x (float): The distance to the AHS schools station in kilometers.
    Returns:
    float: The accessibility score, where a higher score indicates better accessibility.
    """
    max_distance = 25
    if x <= 4:
        return (max_distance - x) / max_distance
    else:
        return (max_distance - x) / (max_distance * 2)


schooltype = "Allgemeinbildende höhere Schule"
tmp_data = df_secondary[
    df_secondary.Type_Long.isin(
        ["Allgemeinbildende höhere Schule", "Technisch-gewerbliche Schule"]
    )
][["Longitude", "Latitude", "Name"]].to_numpy()
# add_store_dataset("school_ahs_styria", 25, ahs, "log", 3, True, tmp_data)
# add_store_dataset("school_secondary_styria", 15, secondary, "log", 10, True, tmp_data,  use_cached=True)
add_store_dataset(
    "school_ahs_styria",
    25,
    secondary,
    "log",
    3,
    MapColoring.AMENITY,
    tmp_data,
    use_cached=True,
)

In [None]:
# Neue Mittelschule


def nms(x):
    """
    Calculate the relative accessibility of a middle-school station based on distance.
    This function determines the accessibility score of a middle-school based on the distance `x` in kilometers.
    If the distance is less than 4 km, it is assumed that the distance can be covered by walking or biking,
    and the score is calculated accordingly. For distances greater than or equal to 4 km, the score is adjusted
    to reflect the reduced accessibility.
    Parameters:
    x (float): The distance to the middle-school in kilometers.
    Returns:
    float: The accessibility score, where a higher score indicates better accessibility.
    """
    max_distance = 10
    if x <= 4:
        return (max_distance - x) / max_distance
    else:
        return (max_distance - x) / (max_distance * 1.3)


schooltype = "Mittelschule"
tmp_data = df_schulen_steiermark.loc[df_schulen_steiermark["Type_Long"] == schooltype][
    ["Longitude", "Latitude", "Name"]
].to_numpy()
add_store_dataset(
    "school_nms_styria", 10, nms, "log", 3, MapColoring.AMENITY, tmp_data, use_cached=True
)

In [None]:
# Trainstations
trainstations_stmk = pd.read_csv(f"{data_path.osm}/railway_stations_styria.csv")


def trainstations(x):
    """
    Calculate the relative accessibility of a train station based on distance.
    This function determines the accessibility score of a train station based on the distance `x` in kilometers.
    If the distance is less than 4 km, it is assumed that the distance can be covered by walking or biking,
    and the score is calculated accordingly. For distances greater than or equal to 4 km, the score is adjusted
    to reflect the reduced accessibility.
    Parameters:
    x (float): The distance to the train station in kilometers.
    Returns:
    float: The accessibility score, where a higher score indicates better accessibility.
    """
    max_distance = 10

    if x < 4:
        return (max_distance - x) / (max_distance)
    else:
        return (max_distance - x) / (max_distance * 2)


tmp_data = trainstations_stmk[["lon", "lat", "name"]].to_numpy()
add_store_dataset(
    "transport_trainstations_styria",
    10,
    trainstations,
    "max",
    1,
    MapColoring.AMENITY,
    tmp_data,
    use_cached=True,
)

In [None]:
# Bushaltestellen

bus_stops_stmk = pd.read_csv(f"{data_path.osm}/bus_stops_styria.csv")

max_distance = 3


def busstops(x):
    return (max_distance - x) / max_distance


tmp_data = bus_stops_stmk[["lon", "lat", "name"]].to_numpy()
add_store_dataset(
    "transport_busstations_styria",
    max_distance,
    busstops,
    "max",
    1,
    MapColoring.AMENITY,
    tmp_data,
    use_cached=True,
)

In [None]:
# Supermarkets 1 - diversity
# supermarkets_styria = pd.read_csv("supermarkets_styria.csv")
supermarkets_styria = pd.read_csv(f"{data_path.osm}/supermarkets_styria.csv")


def supermarkets(x):
    max_distance = 8
    if x < 1.5:
        return (max_distance - x) / max_distance
    else:
        return (max_distance - x) / (max_distance * 2)


tmp_data = supermarkets_styria[["lon", "lat", "name"]].to_numpy()
add_store_dataset(
    "infrastructure_supermarkets_diversity_styria",
    8,
    distribution_fn=supermarkets,
    agg_fn="log",
    upper_limit=5,
    lower_limit=2,
    coloring=MapColoring.AMENITY,
    data=tmp_data,
    use_cached=False,
)

In [None]:
rastarize.data["infrastructure_supermarkets_diversity_styria"].max()

In [None]:
# Supermarkets 2 - single close
supermarkets_styria = pd.read_csv(f"{data_path.osm}/supermarkets_styria.csv")


def supermarkets(x):
    max_distance = 10
    if x < 1.5:
        return (max_distance - x) / max_distance
    else:
        return (max_distance - x) / (max_distance * 1.5)


tmp_data = supermarkets_styria[["lon", "lat", "name"]].to_numpy()
add_store_dataset(
    "infrastructure_supermarkets_close_styria",
    10,
    supermarkets,
    "max",
    1,
    MapColoring.AMENITY,
    tmp_data,
    use_cached=True,
)

In [None]:
# Drogerie
chemist_styria = pd.read_csv(f"{data_path.osm}/chemist_styria.csv")

max_distance = 10


def dist_func(x):
    return (max_distance - x) / max_distance


tmp_data = chemist_styria[["lon", "lat", "name"]].to_numpy()
add_store_dataset(
    "infrastructure_drogerie_styria",
    max_distance,
    dist_func,
    "max",
    1,
    MapColoring.AMENITY,
    tmp_data,
    use_cached=True,
)

In [None]:
# Pharmacy
pharmacy_styria = pd.read_csv(f"{data_path.osm}/pharmacy_styria.csv")

max_distance = 10


def dist_func(x):
    return (max_distance - x) / max_distance


add_store_dataset(
    "infrastructure_pharmacy_styria",
    max_distance,
    dist_func,
    "max",
    1,
    MapColoring.AMENITY,
    tmp_data,
    use_cached=True,
)

In [None]:
# bakery
max_distance = 6
bakery_styria = pd.read_csv(f"{data_path.osm}/bakery_styria.csv")


def dist_func(x):
    return (max_distance - x) / max_distance


tmp_data = bakery_styria[["lon", "lat", "name"]].to_numpy()
add_store_dataset(
    "infrastructure_bakery_styria",
    max_distance,
    dist_func,
    "max",
    1,
    MapColoring.AMENITY,
    tmp_data,
    use_cached=True,
)

In [None]:
restaurant_styria = pd.read_csv(f"{data_path.osm}/restaurant_styria.csv")

max_distance = 6


def dist_func(x):
    return (max_distance - x) / max_distance


tmp_data = restaurant_styria[["lon", "lat", "name"]].to_numpy()
add_store_dataset(
    "infrastructure_restaurant_styria",
    max_distance,
    dist_func,
    "log",
    20,
    MapColoring.AMENITY,
    tmp_data,
    use_cached=True,
)

In [None]:
max_distance = 10


def dist_func(x):
    return (max_distance - x) / max_distance


tmp_data = pd.read_csv(f"{data_path.osm}/motorway_exits.csv")[
    ["lon", "lat", "name"]
].to_numpy()
add_store_dataset(
    "transport_motorway_exits_styria",
    max_distance,
    dist_func,
    "max",
    1,
    MapColoring.AMENITY,
    tmp_data,
    use_cached=False,
)

In [None]:
# collected data
for k in rastarize.data.keys():
    print(k)

In [None]:
# weight and combine positive data

school_primary = rastarize.data["school_elementary_styria"] * 0.4
school_ahs = rastarize.data["school_ahs_styria"] * 0.3
school_nms = rastarize.data["school_nms_styria"] * 0.3
transport_trainstations = rastarize.data["transport_trainstations_styria"] * 0.6
transport_busstations = rastarize.data["transport_busstations_styria"] * 0.6
infrastructure_supermarkets_diversity = (
    rastarize.data["infrastructure_supermarkets_diversity_styria"] * 0.3
)
infrastructure_supermarkets_close = (
    rastarize.data["infrastructure_supermarkets_close_styria"] * 0.6
)
infrastructure_pharmacy = rastarize.data["infrastructure_pharmacy_styria"] * 0.2
infrastructure_drogerie = rastarize.data["infrastructure_drogerie_styria"] * 0.1
shops_bakery = rastarize.data["infrastructure_bakery_styria"] * 0.2
shop_restaurant = rastarize.data["infrastructure_restaurant_styria"] * 0.3
infrastructure_motorway_exits = rastarize.data["transport_motorway_exits_styria"] * 1.0

positiv_data = (
    school_primary
    + school_ahs
    + school_nms
    + transport_trainstations
    + transport_busstations
)
positiv_data += (
    infrastructure_supermarkets_diversity
    + infrastructure_supermarkets_close
    + infrastructure_drogerie
)
positiv_data += (
    infrastructure_pharmacy
    + shops_bakery
    + shop_restaurant
    + infrastructure_motorway_exits
)

positiv_data = positiv_data / positiv_data.max()
# show_rasterimg(positiv_data)
positive_masked = image_with_mask(positiv_data, rastarize.mask)
positive_masked.save(f"{data_path.maps}/positive_masked.png")
positive_masked.save(f"{data_path.maps}/positive_masked.webp", "WEBP", lossless=False)
positive_masked.show()

In [None]:
plt.figure(dpi=300)
plt.imshow(rastarize.mask, cmap="gray")
plt.show()

In [None]:
masked_image = image_with_mask(positiv_data, rastarize.mask)
masked_image.save(f"{data_path.maps}/positive_effects_masked.png")
masked_image.save(f"{data_path.maps}/positive_effects_masked.webp", "WEBP", lossless=False)
masked_image

In [None]:
# negativ gasstations
max_distance = 0.4


def dist_func(x):
    if x <= 0.1:
        return 1.0
    else:
        return (max_distance - x) / max_distance


tmp_data = pd.read_csv(f"{data_path.osm}/gasstations_styria.csv")[
    ["lon", "lat", "name"]
].to_numpy()
add_store_dataset(
    "negativ_infrastructure_gasstations_styria",
    max_distance,
    dist_func,
    "max",
    1,
    MapColoring.NEGATIVE,
    tmp_data,
    use_cached=True,
)

In [None]:
# negativ soccer fields
max_distance = 0.5


def dist_func(x):
    if x <= 0.2:
        return 1
    else:
        return (max_distance - x) / max_distance


area_df = pd.read_csv(f"{data_path.osm}/soccer_fields_areas.csv", skiprows=1, names=["lat", "lon", "area"])
# only use soccer fields bigger than 4000m^2, small fields are mainly parks/for kids
area_df = area_df[area_df["area"] > 6000]
print(len(area_df))
display(area_df.head(10))


tmp_data = area_df[["lon", "lat", "area"]].to_numpy()
add_store_dataset(
    "negativ_leisure_soccer_styria",
    max_distance,
    dist_func,
    "max",
    1,
    MapColoring.NEGATIVE,
    tmp_data,
    use_cached=True,
)

In [None]:
# negativ motorway
max_distance = 2.0


def dist_func(x):
    return (max_distance - x) / max_distance


tmp_data = pd.read_csv(f"{data_path.osm}/motorway_styria.csv")[
    ["lon", "lat", "name"]
].to_numpy()

print(tmp_data)
add_store_dataset(
    "negativ_infrastructure_motorway_styria",
    max_distance,
    dist_func,
    "max",
    1,
    MapColoring.NEGATIVE,
    tmp_data,
    use_cached=True,
)

In [None]:
# negativ street primary/secondary
# max_distance = 0.5
max_distance = 0.1  # JUDGING from Petersbergen 500m is way to much


def dist_func(x):
    return (max_distance - x) / max_distance


tmp_data = pd.read_csv(f"{data_path.osm}/street_primary_secondary_styria.csv")[
    ["lon", "lat", "name"]
].to_numpy()
add_store_dataset(
    "negativ_infrastructure_street_pimary_secondary_styria",
    max_distance,
    dist_func,
    "max",
    1,
    MapColoring.NEGATIVE,
    tmp_data,
    use_cached=True,
)

In [None]:
# negativ railway rails
max_distance = 1


def dist_func(x):
    return (max_distance - x) / max_distance


tmp_data = pd.read_csv(f"{data_path.osm}/railway_rails_styria.csv")[
    ["lon", "lat", "name"]
].to_numpy()
add_store_dataset(
    "negativ_infrastructure_railway_rails_styria",
    max_distance,
    dist_func,
    "max",
    1,
    MapColoring.NEGATIVE,
    tmp_data,
    use_cached=True,
)

In [191]:
skip_streams = True

if not skip_streams:

    # negativ streams
    max_distance = 0.05

    def dist_func(x):
        return (max_distance - x) / max_distance

    tmp_data = pd.read_csv(f"{data_path.osm}/streams_styria.csv")[
        ["lon", "lat", "name"]
    ].to_numpy()
    add_store_dataset(
        "negativ_streams_styria",
        max_distance,
        dist_func,
        "max",
        1,
        MapColoring.AMENITY,
        tmp_data,
        use_cached=True,
    )


In [192]:
# negativ rivers

skip_rivers = True

if not skip_rivers:
    max_distance = 0.1

    def dist_func(x):
        if x <= 0.05:
            return 1
        else:
            return (max_distance - x) / max_distance

    tmp_data = pd.read_csv(f"{data_path.osm}/rivers_styria.csv")[
        ["lon", "lat", "name"]
    ].to_numpy()
    add_store_dataset(
        "negativ_rivers_styria",
        max_distance,
        dist_func,
        "max",
        1,
        MapColoring.AMENITY,
        tmp_data,
        use_cached=True,
    )
    # plt.imsave(
    #     "rivers_styria.png", rastarize.data["negativ_rivers_styria"], cmap="RdYlGn"
    # )

In [None]:
# negativ restaurants
# tmp_data = pd.read_csv("restaurant_styria.csv")
max_distance = 0.1


def dist_func(x):
    return (max_distance - x) / max_distance


tmp_data = pd.read_csv(f"{data_path.osm}/restaurant_styria.csv")[
    ["lon", "lat", "name"]
].to_numpy()
add_store_dataset(
    "negativ_restaurant_styria",
    max_distance,
    dist_func,
    "max",
    1,
    MapColoring.NEGATIVE,
    tmp_data,
    use_cached=True,
)

In [194]:
skip_houses = True

if not skip_houses:
    # houses
    max_distance = 0.05

    tmp_data = pd.read_csv(f"{data_path.osm}/houses_styria_center.csv")
    tmp_data = tmp_data[
        ~tmp_data["building"].isin(
            [
                "shed",
                "garage",
                "garages",
                "barn",
                "cabin",
                "hut",
                "collapsed",
                "cowshed",
                "pavilion",
                "shelter",
                "greenhouse",
            ]
        )
    ]
    # tmp_data = tmp_data[tmp_data['building'].isin(['shed', 'garage', 'garages', 'barn', 'cabin', 'hut', 'collapsed', 'cowshed', 'pavilion', 'shelter', 'greenhouse'])]
    tmp_data = tmp_data[["lon", "lat", "building"]].to_numpy()
    print(tmp_data[0])
    add_store_dataset(
        "houses",
        max_distance,
        None,
        "add",
        10,
        MapColoring.NEGATIVE,
        tmp_data,
        use_cached=True,
    )
    houses_img = image_with_mask(rastarize.data["houses"], rastarize.mask)
    houses_img.save(f"{data_path.maps}/houses_masked.png")
    houses_img.save(f"{data_path.maps}/houses_masked.webp", "WEBP", lossless=False)

    # [[47,0436802, 15,5191805, 'Home']]

In [None]:
houses = pd.read_csv(f"{data_path.osm}/houses_styria_center.csv")
print(f"orignal number of houses: {len(houses)}")
houses = houses[
    ~houses["building"].isin(
        [
            "shed",
            "garage",
            "garages",
            "barn",
            "cabin",
            "hut",
            "collapsed",
            "cowshed",
            "pavilion",
            "shelter",
            "greenhouse",
        ]
    )
]
print(f"limited to types of houses: {len(houses)}")

xx = houses["lon"].to_numpy()
yy = houses["lat"].to_numpy()

xxm, yym = maputils.wgs84_to_web_mercator_array(
    xx, yy, rastarize.bounds, rastarize.tmp.shape[1], rastarize.tmp.shape[0]
)
houses.loc[:, "x"] = xxm
houses.loc[:, "y"] = yym
houses = houses.groupby(["x", "y"]).size().reset_index(name="count")

add_store_dataset(
    "houses",
    max_distance,
    None,
    "add",
    100,
    MapColoring.NEGATIVE,
    houses[["x", "y", "count"]].to_numpy(),
    use_cached=True,
    coords_in_webmercator=True,
    coords_with_count=True
)
print(f"number of house-cells after grouping: {len(houses)}")

In [196]:
# max_distance = 0.5  # orignally 0.75


# def dist_func(x):

#     return 1  # used for counting houses in the given range
#     # return (max_distance - x) / max_distance # used for scoring with decaying influence by distance


# rastarize.add_dataset("houses_dist", max_distance, dist_func, "add", 10000)

# kernel = rastarize.kernel["houses_dist"]
# kernel.shape

# houses_array = rastarize.data["houses"]

# houses_dist = np.zeros(houses_array.shape)

# kernel_size = kernel.shape[0]
# kernel_half = kernel_size // 2

# for i in tqdm.tqdm(range(kernel_half, houses_array.shape[0] - kernel_half)):
#     for j in range(kernel_half, houses_array.shape[1] - kernel_half):
#         houses_dist[i, j] = np.sum(
#             houses_array[
#                 i - kernel_half : i + kernel_half + 1,
#                 j - kernel_half : j + kernel_half + 1,
#             ]
#             * kernel
#         )

# rastarize.data["houses_dist1km"] = houses_dist
# rastarize.finish("houses_dist")


In [None]:
max_distance = 0.5  # orignally 0.75


def dist_func(x):

    return 1  # used for counting houses in the given range
    # return (max_distance - x) / max_distance # used for scoring with decaying influence by distance


houses_dist_alternative = "houses_dist_alternative"

rastarize.add_dataset(houses_dist_alternative, max_distance, dist_func, "add", 10000)

kernel = rastarize.kernel[houses_dist_alternative]
kernel_size = kernel.shape[0]
kernel_half = kernel_size // 2

houses_array = rastarize.data["houses"]

# houses_dist = np.zeros(houses_array.shape)
# Alternative implementation using kernel iteration
# seems to be the smarter and faster way, could be applied to all other datasets as well
houses_dist_alternative = np.zeros(houses_array.shape)

for ki in tqdm.tqdm(range(kernel_size)):
    for kj in range(kernel_size):
        if kernel[ki, kj] != 0:
            shifted_houses = np.roll(
                houses_array, shift=(ki - kernel_half, kj - kernel_half), axis=(0, 1)
            )
            houses_dist_alternative += shifted_houses * kernel[ki, kj]

rastarize.data["houses_dist_alternative"] = houses_dist_alternative
# rastarize.finish("houses_dist_alternative")

In [None]:
masked_image = image_with_mask(rastarize.data["houses_dist_alternative"], rastarize.mask)
masked_image.show()

In [None]:
rastarize.data["houses_dist_alternative"].max()

In [None]:
# houses_dist_img = image_with_mask(rastarize.data["houses_dist1km"], rastarize.mask)
# houses_dist_img.save(f"{data_path.maps}/houses_density.png")
# houses_dist_img.show()
masked_houses_dist1km = rastarize.data["houses_dist_alternative"].copy()

# masked_houses_dist1km = rastarize.data["houses_dist1km"].copy()
no_houses_limits = [1000, 500, 250, 100, 50, 10, 1]
colors = [
    [255, 0, 0],  # Red
    [255, 165, 0],  # Orange
    [255, 255, 0],  # Yellow
    [0, 255, 0],  # Green
    [0, 128, 0],  # Dark Green
    [0, 192, 192],  # Teal
    [0, 255, 255],  # Cyan
]

upper_limit = 100000
for limit, color in zip(no_houses_limits, colors):
    masked_houses_dist1km_copy = masked_houses_dist1km.copy()
    masked_houses_dist1km_copy[
        (masked_houses_dist1km_copy < limit)
        | (masked_houses_dist1km_copy > upper_limit)
    ] = 0

    houses_dist_img = image_with_mask(
        masked_houses_dist1km_copy,
        rastarize.mask,
        coloring=MapColoring.NEGATIVE,
        color_=color,
        use_binary_mask=True,
    )
    houses_dist_img.save(f"{data_path.maps}/houses_dist_{limit}to{upper_limit}_per_km2.png")
    houses_dist_img.save(f"{data_path.maps}/houses_dist_{limit}to{upper_limit}_per_km2.webp", "WEBP", lossless=False)

    #houses_dist_img.show()
    upper_limit = limit


In [None]:
masked_houses_dist1km.max()

In [None]:

percentile_90 = np.percentile(rastarize.data["houses_dist_alternative"], 90)
print(f"90th percentile: {percentile_90}")

In [None]:

percentile_99 = np.percentile(rastarize.data["houses_dist_alternative"], 99)
print(f"99th percentile: {percentile_99}")

In [None]:
mean_houses = np.mean(rastarize.data["houses_dist_alternative"])
median_houses = np.median(rastarize.data["houses_dist_alternative"])
std_houses = np.std(rastarize.data["houses_dist_alternative"])
min_houses = np.min(rastarize.data["houses_dist_alternative"])
max_houses = np.max(rastarize.data["houses_dist_alternative"])

print(f"Mean: {mean_houses}")
print(f"Median: {median_houses}")
print(f"Standard Deviation: {std_houses}")
print(f"Min: {min_houses}")
print(f"Max: {max_houses}")
# Agerage houses in 1km2 area is 22
# Max. houses in 1km2 area is 1462


In [None]:
1000*1000/2000

In [205]:
# pro quadratkilometer stehen im 

In [206]:
n1 = rastarize.data["negativ_restaurant_styria"] * 0.5
n2 = rastarize.data["negativ_infrastructure_street_pimary_secondary_styria"] * 0.5
n3 = rastarize.data["negativ_infrastructure_motorway_styria"]

n4 = rastarize.data["negativ_infrastructure_gasstations_styria"]
n5 = rastarize.data["negativ_leisure_soccer_styria"]
n6 = rastarize.data["negativ_infrastructure_railway_rails_styria"] * 0.8
# n7 = rastarize.data["negativ_rivers_styria"] * 0
# n8 = rastarize.data["negativ_streams_styria"] * 0
# n9 = dense_build_areas
# n10 = sparse_build_areas

In [None]:
n4 = rastarize.data["negativ_infrastructure_gasstations_styria"]
n5 = rastarize.data["negativ_leisure_soccer_styria"]
negativ_data_small = np.maximum.reduce([n4, n5])
image_with_mask(negativ_data_small, rastarize.mask).save(
    f"{data_path.maps}/soccer_and_gasstations.png"
)

In [None]:
# calc in smaller combinations, otherwise memory overflows
negativ_data = np.maximum.reduce([n1, n2, n3])
negativ_data = np.maximum.reduce([negativ_data, n4, n5, n6])
# negativ_data =np.maximum.reduce([negativ_data,n7,n8, n9])
# negativ_data =np.maximum.reduce([negativ_data,n7,n8,n9,n10])

#show_rasterimg(negativ_data)
negative_masked = image_with_mask(negativ_data, rastarize.mask, coloring=MapColoring.NEGATIVE)
negative_masked.save(f"{data_path.maps}/negative_masked.png")
negative_masked.save(f"{data_path.maps}/negative_masked.webp", "WEBP", lossless=False)

negative_masked.show()

In [209]:
# #ignore railway and streets
# #calc in smaller combinations, otherwise memory overflows
# negativ_data =np.maximum.reduce([n1,n4,n5])
# negativ_data =np.maximum.reduce([negativ_data,n7,n8,n9])

# #negativ_data =np.maximum.reduce([negativ_data,n7,n8,n10])
# #negativ_data =np.maximum.reduce([negativ_data,n7,n8,n9,n10])

# show_rasterimg(negativ_data)

In [None]:
combined_data = np.maximum(positiv_data - negativ_data, 0)
show_rasterimg(combined_data)
print(np.max(combined_data))
plt.imsave("steiermark_scored1.png", combined_data, cmap="RdYlGn")
combined_data = image_with_mask(combined_data, rastarize.mask)
combined_data.save(f"{data_path.maps}/combined_data_masked.png")
combined_data.save(f"{data_path.maps}/combined_data_masked.webp", "WEBP", lossless=False)

combined_data.show()