**Step 3: Populate Database.**

In [11]:
import os
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
import io
import requests

# For database interaction. Use the library appropriate for your database.
# This example uses psycopg2 for PostgreSQL.
try:
    import psycopg2
    from psycopg2 import sql
except ImportError:
    print("Warning: psycopg2 not found. Database functionality will not work.")
    print("Please install it with: pip install psycopg2-binary")
    psycopg2 = None

# --- Configuration ---
# Define the directory where your raw data files are located
RAW_DATA_DIR = "/content"
PROCESSED_DATA_DIR = os.path.join(RAW_DATA_DIR, "processed")

# Ensure the processed data directory exists
os.makedirs(PROCESSED_DATA_DIR, exist_ok=True)

# Define paths to your input data files
BERLIN_CSV_PATH = os.path.join(RAW_DATA_DIR, "berlin_kitas_raw.csv")
NEIGHBOURHOODS_GEOJSON_URL = "https://raw.githubusercontent.com/m-hoerz/berlin-shapes/master/berliner-bezirke.geojson"

# --- Database Configuration ---
# IMPORTANT: Replace this with your actual database connection string
DATABASE_URL = "postgresql://user:password@host:port/dbname"

# --- 1. Load Data ---
print("--- Loading Data ---")

berlin_df = pd.DataFrame()
try:
    berlin_df = pd.read_csv(BERLIN_CSV_PATH, sep=',', encoding='utf-8', on_bad_lines='skip', header=1)
    berlin_df.columns = berlin_df.columns.str.strip()
    print(f"Successfully loaded Berlin data from {BERLIN_CSV_PATH}")
except FileNotFoundError:
    print(f"Error: Berlin data file not found at {BERLIN_CSV_PATH}.")
except Exception as e:
    print(f"An unexpected error occurred loading Berlin data: {e}")
    berlin_df = pd.DataFrame()

berlin_neighbourhoods = gpd.GeoDataFrame()
try:
    response = requests.get(NEIGHBOURHOODS_GEOJSON_URL)
    response.raise_for_status()
    berlin_neighbourhoods = gpd.read_file(io.StringIO(response.text))
    print(f"\nSuccessfully loaded Berlin neighbourhoods GeoJSON from URL.")
except Exception as e:
    print(f"\nAn error occurred loading neighbourhoods GeoJSON: {e}")
    berlin_neighbourhoods = gpd.GeoDataFrame()

# --- 2. Data Cleaning and Transformation ---
print("\n--- Data Cleaning and Transformation ---")

berlin_processed = pd.DataFrame()
if not berlin_df.empty:
    print("Processing Berlin data...")
    column_map_berlin = {
        'Einrichtungsbezirk': 'district_code',
        'Einrichtungsbezirk Name': 'district',
        'Einrichtungsnummer': 'kita_id',
        'Einrichtungsname': 'name',
        'Straße': 'street',
        'Hausnummer': 'street_number',
        'PLZ': 'postcode',
        'ETRS_YKOORDINATE': 'ETRS_y',
        'ETRS_XKOORDINATE': 'ETRS_x'
    }
    berlin_df.rename(columns={k: v for k, v in column_map_berlin.items() if k in berlin_df.columns}, inplace=True)
    if 'ETRS_x' in berlin_df.columns and 'ETRS_y' in berlin_df.columns:
        berlin_df['ETRS_x'] = pd.to_numeric(berlin_df['ETRS_x'], errors='coerce')
        berlin_df['ETRS_y'] = pd.to_numeric(berlin_df['ETRS_y'], errors='coerce')
        berlin_df.dropna(subset=['ETRS_x', 'ETRS_y'], inplace=True)
        berlin_df['source'] = 'berlin_gov'
        selected_columns_berlin = ['name', 'street', 'postcode', 'ETRS_x', 'ETRS_y', 'source']
        berlin_processed = berlin_df[[col for col in selected_columns_berlin if col in berlin_df.columns]].copy()
        print("Berlin data processed.")
else:
    print("Skipping Berlin data processing due to loading errors.")

# --- 3. Create GeoDataFrame and Spatial Join ---
print("\n--- Creating GeoDataFrame and Performing Spatial Join ---")

final_gdf = gpd.GeoDataFrame()
if not berlin_processed.empty and not berlin_neighbourhoods.empty:
    source_crs = "EPSG:25833"
    target_crs = "EPSG:4326"
    gdf = gpd.GeoDataFrame(
        berlin_processed,
        geometry=gpd.points_from_xy(berlin_processed['ETRS_x'], berlin_processed['ETRS_y']),
        crs=source_crs
    )
    gdf = gdf.to_crs(target_crs)
    try:
        kitas_with_neighbourhoods = gpd.sjoin(gdf, berlin_neighbourhoods, how="inner", predicate="intersects")
        print("Spatial join complete.")
        neighbourhood_col_name = next((col for col in ['name', 'name_en', 'Name', 'neighbourhood', 'NAME_BEZIR', 'name_local'] if col in berlin_neighbourhoods.columns), None)
        if neighbourhood_col_name:
            final_gdf = kitas_with_neighbourhoods.rename(columns={neighbourhood_col_name: 'neighbourhood'})
            final_gdf['longitude'] = final_gdf.geometry.x
            final_gdf['latitude'] = final_gdf.geometry.y
            final_gdf.drop(columns=['ETRS_x', 'ETRS_y'], inplace=True)
            print("Final GeoDataFrame with neighbourhood information created.")
        else:
            print("Could not find a suitable neighbourhood name column. Using the joined data as is.")
            final_gdf = kitas_with_neighbourhoods.copy()
            final_gdf['longitude'] = final_gdf.geometry.x
            final_gdf['latitude'] = final_gdf.geometry.y
            final_gdf.drop(columns=['ETRS_x', 'ETRS_y'], inplace=True)

    except Exception as e:
        print(f"An error occurred during the spatial join: {e}")
else:
    print("Skipping spatial join because either kindergarten data or neighbourhood data was not loaded/processed correctly.")

# --- Functions for Database Population (Step 3) ---

def create_kindergartens_table(conn):
    """Creates the berlin_kindergartens table if it doesn't exist."""
    print("Creating 'berlin_kindergartens' table...")
    cur = conn.cursor()
    cur.execute("""
        CREATE TABLE IF NOT EXISTS berlin_kindergartens (
            id SERIAL PRIMARY KEY,
            name VARCHAR(255),
            street VARCHAR(255),
            postcode VARCHAR(10),
            latitude DOUBLE PRECISION,
            longitude DOUBLE PRECISION,
            neighbourhood VARCHAR(255),
            source VARCHAR(50)
        );
    """)
    conn.commit()
    print("Table 'berlin_kindergartens' created or already exists.")

def insert_data(conn, gdf):
    """Inserts data from the GeoDataFrame into the database."""
    print("Inserting data into 'berlin_kindergartens'...")
    cur = conn.cursor()
    # Use a parameterized query to prevent SQL injection
    insert_query = """
        INSERT INTO berlin_kindergartens (name, street, postcode, latitude, longitude, neighbourhood, source)
        VALUES (%s, %s, %s, %s, %s, %s, %s)
    """
    count = 0
    # Iterate over the GeoDataFrame rows and insert each one
    for index, row in gdf.iterrows():
        try:
            cur.execute(
                insert_query,
                (
                    row['name'],
                    row['street'],
                    row['postcode'],
                    row['latitude'],
                    row['longitude'],
                    row['neighbourhood'] if 'neighbourhood' in row else None,
                    row['source']
                )
            )
            count += 1
        except Exception as e:
            print(f"Error inserting row {index}: {e}")
            conn.rollback() # Rollback the transaction on error
            continue

    conn.commit()
    print(f"Successfully inserted {count} rows into the database.")
    cur.close()

# --- 4. Populate Database (Step 3) ---
if not final_gdf.empty and psycopg2 is not None:
    conn = None
    try:
        print("\n--- Populating Database (Step 3) ---")
        conn = psycopg2.connect(DATABASE_URL)
        create_kindergartens_table(conn)
        insert_data(conn, final_gdf)

        # --- Verification Step ---
        print("\n--- Verifying Data Insertion ---")
        cur = conn.cursor()
        cur.execute("SELECT COUNT(*) FROM berlin_kindergartens;")
        print(f"Total rows in 'berlin_kindergartens' table: {cur.fetchone()[0]}")
        cur.execute("SELECT * FROM berlin_kindergartens LIMIT 5;")
        print("First 5 rows of the table:")
        for row in cur.fetchall():
            print(row)
        cur.close()

    except psycopg2.Error as e:
        print(f"Database error: {e}")
    except Exception as e:
        print(f"An unexpected error occurred during database population: {e}")
    finally:
        if conn:
            conn.close()
            print("Database connection closed.")
else:
    if psycopg2 is None:
        print("\nSkipping database population. psycopg2 library is not installed.")
    else:
        print("\nSkipping database population. Final GeoDataFrame is empty.")

print("\n--- Transformation Process Complete ---")

--- Loading Data ---
Successfully loaded Berlin data from /content/berlin_kitas_raw.csv

Successfully loaded Berlin neighbourhoods GeoJSON from URL.

--- Data Cleaning and Transformation ---
Processing Berlin data...
Berlin data processed.

--- Creating GeoDataFrame and Performing Spatial Join ---
Spatial join complete.
Could not find a suitable neighbourhood name column. Using the joined data as is.

--- Populating Database (Step 3) ---
Database error: invalid integer value "port" for connection option "port"


--- Transformation Process Complete ---
