batched and parallel

In [18]:
from neo4j import GraphDatabase
from concurrent.futures import ThreadPoolExecutor
from itertools import islice
import numpy as np
import time

URI = "bolt://localhost:7687"
AUTH = ("neo4j", "12345678")

BATCH_SIZE = 1000
MAX_THREADS = 8

WEIGHTS = {
    "closest_charging_location": 3,
    "ev_charger_density": 5,
    "avg_home_value": 0.5,
    "number_of_vehicles": 2,
    "population_density": 1
}

driver = GraphDatabase.driver(URI, auth=AUTH)

# Score calculation

def calculate_score(candidate, pc4, municipality):
    try:
        return (
            WEIGHTS["closest_charging_location"] * candidate["distance_to_nearest"] +
            WEIGHTS["ev_charger_density"] * (1 / (pc4["density"] + 1)) +
            WEIGHTS["avg_home_value"] * (municipality["home_value"] / 100000) +
            WEIGHTS["number_of_vehicles"] * (municipality["vehicles"] / 1000) +
            WEIGHTS["population_density"] * (municipality["population_density"] / 1000)
        )
    except TypeError:
        return None


def batched(iterable, n):
    it = iter(iterable)
    while batch := list(islice(it, n)):
        yield batch


def process_municipality(municipality_name):
    start = time.perf_counter()
    all_candidates = []

    with driver.session() as session:
        result = session.run("""
            MATCH (c:CandidateLocation)-[:IS_LOCATED_IN]->(p:PC4Area)-[:IS_LOCATED_IN]->(m:Municipality {name: $municipality})
            RETURN properties(c) AS c, properties(p) AS p, properties(m) AS m
        """, municipality=municipality_name)

        for record in result:
            c, p, m = record["c"], record["p"], record["m"]
            if not all([c, p, m]) or m.get("home_value") is None:
                continue
            score = calculate_score(c, p, m)
            if score is None:
                continue
            c["score"] = score
            all_candidates.append(c)


    # Write scores back to Neo4j
    with driver.session() as session:
        for batch in batched(all_candidates, BATCH_SIZE):
            session.run("""
                UNWIND $candidates AS candidate
                MATCH (c:CandidateLocation {lon: candidate.lon, lat: candidate.lat})
                SET c.score = candidate.score
            """, candidates=batch)
    print(f"[{municipality_name}] Scored {len(all_candidates)} candidates in {time.perf_counter() - start:.2f}s.")


def main():
    with driver.session() as session:
        result = session.run("MATCH (m:Municipality) RETURN m.name AS name")
        municipalities = [r["name"] for r in result]

    print(f"Found {len(municipalities)} municipalities. Starting parallel scoring...")
    t0 = time.perf_counter()

    with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
        executor.map(process_municipality, municipalities)

    print(f"✅ Done updating all candidate scores in {time.perf_counter() - t0:.2f}s.")


if __name__ == "__main__":
    main()
    driver.close()

Found 50 municipalities. Starting parallel scoring...
[Nieuwkoop] Scored 201 candidates in 1.73s.
[Lansingerland] Scored 835 candidates in 6.91s.
[Hoeksche Waard] Scored 1070 candidates in 8.81s.
[Katwijk] Scored 604 candidates in 11.63s.
[Dordrecht] Scored 1483 candidates in 11.93s.
[Alblasserdam] Scored 109 candidates in 0.96s.
[Krimpenerwaard] Scored 242 candidates in 1.97s.
[Gouda] Scored 673 candidates in 5.37s.
[Ridderkerk] Scored 197 candidates in 1.52s.
[Voorschoten] Scored 1687 candidates in 16.00s.
[Zoetermeer] Scored 1285 candidates in 10.03s.
[Nissewaard] Scored 444 candidates in 3.64s.
[Hendrik-Ido-Ambacht] Scored 149 candidates in 1.40s.
[Bodegraven-Reeuwijk] Scored 194 candidates in 1.78s.
[Leiden] Scored 1480 candidates in 18.44s.
[Goeree-Overflakkee] Scored 511 candidates in 4.46s.
[Schiedam] Scored 827 candidates in 7.01s.
[Waddinxveen] Scored 1813 candidates in 16.37s.
[Voorne aan Zee] Scored 633 candidates in 5.39s.
[Barendrecht] Scored 317 candidates in 2.69s.
[Gor