# Step 1: Research & Data Source Discovery

## 1.1 Environment Setup and Dependencies

We fetch the data from OpenStreetMap. We use the original OSM ID (osmid) as our primary identifier and calculate the exact center point (latitude and longitude) for each location.

* **Primary Source: OpenStreetMap (OSM)**: Used to extract the spatial location of employment agencies.


## 1.2 Data and Boundary Configuration

The project focuses exclusively on data within the **Berlin, Germany** boundary.

* **Spatial Integrity Plan**: Data will be joined to the **Local Reference System (LOR) boundaries** to derive the mandatory `district_id` and `neighborhood_id` for final database compliance.


* **Import all necessary libraries**
 * Imports used across this notebook: pandas, geopandas, osmnx, geopy, sqlalchemy/psycopg2, shapely, hashlib, json, os, warnings.

In [63]:
import pandas as pd
import numpy as np
import geopandas as gpd
import osmnx as ox
import os 
import json
%pip install geopy
%pip install sqlalchemy psycopg2-binary
import psycopg2
from sqlalchemy import create_engine, text
from geopy.geocoders import Nominatim
from time import sleep
import hashlib
import warnings

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


# 1.1 CONFIGURATION 
- Using the specific paths and tags 

In [64]:
PLACE_NAME = "Berlin, Germany"
OSM_TAGS = {"office": "employment_agency"}

# Update LOR_PATH to match the exact filename of the GeoJSON you uploaded
#LOR_PATH = "lor_ortsteile (1).geojson" 
#OUTPUT_PATH = "output/jobcenters_berlin.csv"

print("Libraries loaded.")
print(f"Configuration set for {PLACE_NAME} with OSM tags: {OSM_TAGS}")

Libraries loaded.
Configuration set for Berlin, Germany with OSM tags: {'office': 'employment_agency'}


# 1.2 LIVE DATA EXTRACTION (OSM) 

In [65]:
print("Fetching live data from OpenStreetMap (Overpass API)")
try:
    # Fetch data and ensure the coordinate system is standard WGS84 (EPSG:4326)
    jobcenter_data_raw = ox.features_from_place(PLACE_NAME, OSM_TAGS)
    jobcenter_data_raw = gpd.GeoDataFrame(
        jobcenter_data_raw,
        geometry="geometry",
        crs="EPSG:4326"
    )
    print(f"Success! Retrieved {len(jobcenter_data_raw)} features.")
except Exception as e:
    raise RuntimeError(f"OSM extraction failed: {e}")

Fetching live data from OpenStreetMap (Overpass API)
Success! Retrieved 65 features.


# 1.3 MANDATORY DATA CLEANING 
- Explicitly check and report on null values in mandatory columns

In [66]:
print("Diagnostic Check: Nulls in Critical Columns")
null_counts = jobcenter_data_raw[['name', 'geometry']].isnull().sum()
print("Missing values in critical columns")
print(null_counts)

# Drop rows missing 'name' or 'geometry' to enforce database NOT NULL compliance
initial_count = len(jobcenter_data_raw)
jobcenter_enriched = jobcenter_data_raw.dropna(subset=["name", "geometry"]).copy()

dropped_count = initial_count - len(jobcenter_enriched)
print(f"Mandatory Drop Removed {dropped_count} rows due to missing name/geometry")


Diagnostic Check: Nulls in Critical Columns
Missing values in critical columns
name        2
geometry    0
dtype: int64
Mandatory Drop Removed 2 rows due to missing name/geometry


# 1.4 COORDINATE PREPARATION 
- Extract centroids to handle both 'Point' and 'Polygon' features safely

In [67]:
jobcenter_enriched['latitude'] = jobcenter_enriched.geometry.centroid.y
jobcenter_enriched['longitude'] = jobcenter_enriched.geometry.centroid.x

print("Step 1 Complete ")
print(jobcenter_enriched[['name', 'latitude', 'longitude']].head())

Step 1 Complete 
                                               name   latitude  longitude
element id                                                               
node    275368512   Jobcenter Mitte am Leopoldplatz  52.546772  13.356516
        1211913324           Arbeitsagentur Spandau  52.533775  13.186554
        1340158173        Jobcenter Berlin Neuk√∂lln  52.478975  13.427887
        1450906609               Agentur f√ºr Arbeit  52.578452  13.308718
        2277566662               Agentur f√ºr Arbeit  52.456592  13.411478



  jobcenter_enriched['latitude'] = jobcenter_enriched.geometry.centroid.y

  jobcenter_enriched['longitude'] = jobcenter_enriched.geometry.centroid.x


# Step 2: Cleanup & Removing Redundancy
- Explanation: Here i drop city and country because they are redundant for a Berlin project. Also removed contact-website and operator-type to keep the schema lean.
  Why drop contact "website"

- Maintenance: External URLs like websites change frequently. If its included in the primary table now, the data becomes "stale" very quickly.

- Scope: The current goal is to map the job centers to the Berlin District LOR system. Extra information like websites or phone numbers can be added in a later "enrichment"         task once the primary table structure is approved.

- Additionally: The operator:type column is a classification tag in OpenStreetMap. It tells the database who runs the facility. In the context of Berlin Job Centers, this usually   indicates public. 

- The center is a government-run entity (e.g., the Bundesagentur f√ºr Arbeit or local municipal government). Most Job Centers fall into this category.


### 2.1 INITIAL CLEANUP Rename and prepare coordinates



In [68]:
#  Prefer the cleaned/enriched frame from earlier fall back to the raw OSM frame
if 'jobcenter_enriched' in globals():
    jobcenter_clean = jobcenter_enriched.copy()
elif 'jobcenter_data_raw' in globals():
    jobcenter_clean = jobcenter_data_raw.copy()
else:
    raise NameError("Expected 'jobcenter_enriched' or 'jobcenter_data_raw' to be defined.")
jobcenter_clean = jobcenter_clean.rename(columns={'name': 'center_name'})

# Calculate centroids to ensure we have lat/lon for both Points and Polygons
centroids = jobcenter_clean.geometry.centroid
jobcenter_clean['latitude'] = centroids.y
jobcenter_clean['longitude'] = centroids.x



  centroids = jobcenter_clean.geometry.centroid


### 2.2  BUILD ADDRESS FROM COLUMNS

In [69]:
# Using fillna('') to avoid "NaN" appearing in the text strings
jobcenter_clean['address'] = (
    jobcenter_clean['addr:street'].fillna('') + ' ' + 
    jobcenter_clean['addr:housenumber'].fillna('')
).str.strip()

# Add house name in brackets if it exists (e.g., "Jobcenter Mitte")
mask_housename = jobcenter_clean['addr:housename'].notna()
jobcenter_clean.loc[mask_housename, 'address'] = (
    jobcenter_clean['address'] + ' (' + jobcenter_clean['addr:housename'] + ')'
).str.strip()
# Map the postal code from OSM
jobcenter_clean['postal_code'] = jobcenter_clean['addr:postcode']

### 2.3 NOMINATIM FALLBACK 

In [70]:
geolocator = Nominatim(user_agent="berlin_jobcenter_locator")

def get_nominatim_data(lat, lon):
    """Retrieves both address and postcode from Nominatim"""
    try:
        location = geolocator.reverse((lat, lon), exactly_one=True, language='de')
        sleep(1) # Crucial: Respect Nominatim's 1-second rate limit
        if location:
            address_text = location.address
            postcode = location.raw.get('address', {}).get('postcode')
            return address_text, postcode
        return None, None
    except:
        return None, None

# Find rows where address is still empty OR postal_code is NaN
mask_missing = (jobcenter_clean['address'] == "") | (jobcenter_clean['postal_code'].isna())

if mask_missing.any():
    print(f"üîç Found {mask_missing.sum()} rows needing Nominatim enrichment. Starting fallback...")
    
    # We apply the function to fill both columns at once
    results = jobcenter_clean[mask_missing].apply(
        lambda row: get_nominatim_data(row['latitude'], row['longitude']), axis=1
    )
    
    # Extract the results back into the dataframe
    jobcenter_clean.loc[mask_missing, 'address'] = [r[0] for r in results]
    jobcenter_clean.loc[mask_missing, 'postal_code'] = [r[1] for r in results]
else:
    print(" All addresses and postal codes were successfully built from existing data!")

print("Verification of Enriched Data")
print(jobcenter_clean[['center_name', 'address', 'postal_code']].head())

üîç Found 14 rows needing Nominatim enrichment. Starting fallback...
Verification of Enriched Data
                                        center_name  \
element id                                            
node    275368512   Jobcenter Mitte am Leopoldplatz   
        1211913324           Arbeitsagentur Spandau   
        1340158173        Jobcenter Berlin Neuk√∂lln   
        1450906609               Agentur f√ºr Arbeit   
        2277566662               Agentur f√ºr Arbeit   

                                                              address  \
element id                                                              
node    275368512   M√ºllerstra√üe 147 (Jobcenter Mitte am Leopoldpl...   
        1211913324                           Brunsb√ºtteler Damm 75-77   
        1340158173                                  Mainzer Stra√üe 27   
        1450906609                                   Innungsstra√üe 40   
        2277566662  Agentur f√ºr Arbeit, 43-44, Gottlieb-Dunkel-Str.

## Step 3: Spatial Mapping (District Join)

- Explanation - Load the official Berlin district file and  use a Spatial Join to see which district polygon each job center point "falls into." This gives us the neighborhood and  district names automatically.

In [71]:
LOR_PATH = "lor_ortsteile.geojson"
lor_gdf = gpd.read_file(LOR_PATH).to_crs(epsg=4326)

In [72]:
import os
print("LOR file exists:", os.path.exists(LOR_PATH))

LOR file exists: True


### 3.1 Safe Renaming: Only rename if the old columns still exist

In [73]:
if "BEZIRK" in lor_gdf.columns:
    lor_gdf = lor_gdf.rename(columns={
        "BEZIRK": "district",
        "OTEIL": "neighborhood",
        "spatial_name": "neighborhood_id"
    })

#  Safety check Remove 'index_right' to prevent the SJOIN ValueError
if 'index_right' in jobcenter_clean.columns:
    jobcenter_clean = jobcenter_clean.drop(columns=['index_right'])

#  Spatial Join
jobcenter_mapped = gpd.sjoin(
    jobcenter_clean.reset_index(drop=True), 
    lor_gdf[['district', 'neighborhood', 'neighborhood_id', 'geometry']], 
    how='left', 
    predicate='within'
)

print("Join completed successfully (even on a re-run!)")

Join completed successfully (even on a re-run!)


## 4: Stable ID Generation and District Mapping
Deterministic Stable ID: A persistent, numeric-only ID is generated using hashlib.sha256. By hashing the geographic centroid, we ensure IDs are unique and immutable, avoiding previous AttributeError issues with different geometry types.

Official District Mapping: To comply with the final data pool schema, we map administrative district names to their official 8-digit numeric IDs (e.g., Mitte = 11001001). This ensures the data is ready for SQL relational joins.hment:** The `enrich_data_from_wikidata` function is applied to fill the `operator_name` and `contact_website` columns.

### 4.1 DEFINITIONS 
- By using a deterministic hashing algorithm to create stable, 8-10 digit IDs. This ensures that even if we refresh the data, the same job center will always keep the same ID.

In [74]:
def generate_stable_id(name, lat, lon):
    """Generates a unique 10-digit ID based on name and coordinates."""
    input_data = f"{name}_{lat}_{lon}".encode('utf-8')
    hash_hex = hashlib.sha256(input_data).hexdigest()
    return int(hash_hex, 16) % (10**10)

district_mapping = {
    'Mitte': '11001001', 'Friedrichshain-Kreuzberg': '11002002',
    'Pankow': '11003003', 'Charlottenburg-Wilmersdorf': '11004004',
    'Spandau': '11005005', 'Steglitz-Zehlendorf': '11006006',
    'Tempelhof-Sch√∂neberg': '11007007', 'Neuk√∂lln': '11008008',
    'Treptow-K√∂penick': '11009009', 'Marzahn-Hellersdorf': '11010010',
    'Lichtenberg': '11011011', 'Reinickendorf': '11012012'
}

### 4.2 EXECUTION (The Calls)

In [75]:
# Coordinate Prep (Ensuring columns exist)
jobcenter_mapped['latitude'] = jobcenter_mapped.geometry.centroid.y
jobcenter_mapped['longitude'] = jobcenter_mapped.geometry.centroid.x

# Call the Stable ID function
print("Generating stable IDs...")
jobcenter_mapped['id'] = jobcenter_mapped.apply(
    lambda row: generate_stable_id(row['center_name'], row['latitude'], row['longitude']), 
    axis=1
)

# Call the District mapping
print("Mapping districts...")
jobcenter_mapped['district_id'] = jobcenter_mapped['district'].map(district_mapping)

print("Step 4 complete. Data is enriched and identified.")
print("Mapping district names to official IDs...")
jobcenter_mapped['district_id'] = jobcenter_mapped['district'].map(district_mapping).astype(str)

print("Final verification of mapped data")

print(jobcenter_mapped[['id', 'center_name', 'district', 'district_id']].head())

Generating stable IDs...
Mapping districts...
Step 4 complete. Data is enriched and identified.
Mapping district names to official IDs...
Final verification of mapped data
           id                      center_name              district  \
0  6660665090  Jobcenter Mitte am Leopoldplatz                 Mitte   
1  1092468394           Arbeitsagentur Spandau               Spandau   
2   730832232        Jobcenter Berlin Neuk√∂lln              Neuk√∂lln   
3   246338546               Agentur f√ºr Arbeit         Reinickendorf   
4  6239357044               Agentur f√ºr Arbeit  Tempelhof-Sch√∂neberg   

  district_id  
0    11001001  
1    11005005  
2    11008008  
3    11012012  
4    11007007  



  jobcenter_mapped['latitude'] = jobcenter_mapped.geometry.centroid.y

  jobcenter_mapped['longitude'] = jobcenter_mapped.geometry.centroid.x


# 5 Data Standardization and Final Export
Schema Compliance: The final dataset is filtered to include only the mandatory 8 columns required for the database pool: id, district_id, center_name, latitude, longitude, neighborhood, district, and neighborhood_id.

WKT & Coordinate Prep: Coordinates are extracted from the geometric centroids and formatted as numeric floats, ensuring compatibility with standard SQL spatial types.

Stable ID Integration: The deterministic IDs generated in Step 4 are finalized as the primary keys for this dataset.

Data Source Attribution: A data_source tag (OSM_LOR) is appended to ensure traceability for future audits.

### 5.1 CLEANING & QUALITY CONTROL
Data Sanitization - Normalize text by stripping whitespace and converting invisible empty strings into standardized Null values to ensure database integrity.

In [76]:
# Start fresh from your mapped data
df_final = jobcenter_mapped.copy()

# Fix the ID format Ensure IDs are 10-digit strings without decimals or spaces
df_final['id'] = df_final['id'].apply(lambda x: str(int(float(x))).strip() if pd.notnull(x) else x)

In [77]:
# Clean invisible spaces strip whitespace and convert empty text/placeholders to NaN
for col in ['center_name', 'address']:
    df_final[col] = df_final[col].astype(str).str.strip()

In [78]:
# Flag all variations of "empty" as true NaN values
df_final = df_final.replace(['', 'None', 'nan', 'NaN', 'nan '], np.nan)

In [79]:
# Drop incomplete rows to hit the production target
df_final = df_final.dropna(subset=['center_name', 'address'], how='any')

### 5.2 SCHEMA & GEOMETRY SERIALIZATION
Serialization for Production - Since SQL databases cannot natively interpret Python geometry objects, converting map coordinates into Well-Known Text (WKT) format.

In [80]:
# Select only the required columns for the database
target_columns = [
    'id', 'district_id', 'center_name', 'address', 'postal_code', 
    'latitude', 'longitude', 'geometry', 'neighborhood', 
    'district', 'neighborhood_id'
]
df_final = df_final[target_columns].copy()
df_final['data_source'] = 'OSM_LOR'

# Convert map objects to text (WKT) so AWS can accept the data
df_final['geometry'] = df_final['geometry'].apply(lambda x: x.wkt if hasattr(x, 'wkt') else str(x))

# SAVE TO CSV 
os.makedirs("output", exist_ok=True)
df_final.to_csv("output/jobcenters_berlin_final.csv", index=False)
print(f"Local Backup: Saved {len(df_final)} records to output/jobcenters_berlin_final.csv")

Local Backup: Saved 63 records to output/jobcenters_berlin_final.csv


  df_final['geometry'] = df_final['geometry'].apply(lambda x: x.wkt if hasattr(x, 'wkt') else str(x))


# 6 DATABASE DEPLOYMENT & AWS Synchronization 
## 6.1 Database Infrastructure Setup
In this stage intializing the connection to the AWS RDS (Relational Database Service). We use a secure tunnel to bridge the local development environment with the cloud infrastructure.

Security Note: All credentials and session paths are sanitized before repository submission.

In [81]:
# Use 'replace' to clear and fix the table structure
try:
    df_final.to_sql(
        name='job_centers',
        con=engine,
        schema='berlin_source_data',
        if_exists='replace', 
        index=False
    )
    print(f"SUCCESS: {len(df_final)} records deployed with clean ID formatting.")
except Exception as e:
    print(f" Deployment failed: {e}")

  self.meta.reflect(


SUCCESS: 63 records deployed with clean ID formatting.


### Configuration 

### Database Engine Initialization

In [88]:
# Use generic placeholders for security before pushing to GitHub
user_name = '' 
password = ''      
host = '' 
port = ''
database = ''

In [83]:
engine = create_engine(f'postgresql+psycopg2://{user_name}:{password}@{host}:{port}/{database}')

## 6.2 Schema Definition & Relational 
Defining the table structure using explicit SQL DDL. To ensure the highest level of data quality implemented 

- Primary Key (id) - Ensures every job center has a unique, stable identifier.

- Foreign Key (district_id) - Enforces referential integrity by linking our data to the official berlin_source_data.districts table.

- Geospatial Serialization - Coordinates are stored as WKT (Well-Known Text) to ensure compatibility between Python's Shapely library and the PostgreSQL database driver.

In [84]:
# 1. DDL Definition 
# 'district_id' for the reference as it's the standard for this database

create_table_query = """
DROP TABLE IF EXISTS berlin_source_data.job_centers CASCADE;

CREATE TABLE berlin_source_data.job_centers (
    id TEXT PRIMARY KEY,
    district_id TEXT NOT NULL,
    center_name TEXT,
    address TEXT,
    postal_code TEXT,
    latitude DOUBLE PRECISION,
    longitude DOUBLE PRECISION,
    geometry TEXT,
    neighborhood TEXT,
    district TEXT,
    neighborhood_id TEXT,
    data_source TEXT,
    CONSTRAINT fk_district FOREIGN KEY (district_id) 
        REFERENCES berlin_source_data.districts (district_id) -- Matching the LOR standard
);
"""





##  6.3 Execute Table Creation(Data Sync)

In [85]:
# 2. Execute Table Creation
with engine.connect() as conn:
    conn.execute(text(create_table_query))
    conn.commit()
    print("Database Schema Initialized.")

Database Schema Initialized.


In [86]:
#  3. Final Production Upload
#  I use 'append' because the table was freshly created in the previous step
df_final.to_sql(
    name='job_centers',
    con=engine,
    schema='berlin_source_data',
    if_exists='append', 
    index=False
)

print(f" Success {len(df_final)} records successfully deployed to AWS Production.")


 Success 63 records successfully deployed to AWS Production.
