Connecting to the Database 

In [None]:
import sqlalchemy as sa

# Create the connection string with placeholders for credentials
db_uri = "postgresql+psycopg2://<USERNAME>:<PASSWORD>@localhost:5433/layereddb"

# Create the Engine object, keeping pool_pre_ping for reliability
engine = sa.create_engine(db_uri, pool_pre_ping=True)

print("✅ Connection engine for the local 'layereddb' is ready.")

✅ Connection engine for the local 'layereddb' is ready.


Checking existing schemas

In [66]:
from sqlalchemy import inspect
# Create an inspector object from engine
inspector = inspect(engine)

# Get the list of schema names
schemas = inspector.get_schema_names()

print("Available schemas in the database:")
print(schemas)

Available schemas in the database:
['berlin_labels', 'berlin_recommender', 'berlin_source_data', 'dashboard_data', 'information_schema', 'public']


Creating a table 'doctors' in 'berlin_source_data' schema

In [67]:
from sqlalchemy import text, create_engine

# SQL Statement to Create New Table 
create_table_sql = """
-- Drop the table if it already exists to start fresh
DROP TABLE IF EXISTS berlin_source_data.doctors;

-- Create the new table for doctors and clinics
CREATE TABLE berlin_source_data.doctors (
    id VARCHAR(30) PRIMARY KEY,
    district_id VARCHAR(20) NOT NULL,
    neighborhood_id VARCHAR(20) NOT NULL,
    name VARCHAR(255),
    street VARCHAR(255),
    housenumber VARCHAR(30),
    city VARCHAR(50),
    postcode VARCHAR(10),
    amenity VARCHAR(30),
    speciality TEXT,
    opening_hours VARCHAR(500),
    website VARCHAR(500),
    longitude DECIMAL(9,6) NOT NULL,
    latitude DECIMAL(9,6) NOT NULL,
    wheelchair VARCHAR(30),
    description TEXT,
    email VARCHAR(255),
    toilets_wheelchair VARCHAR(30),
    wheelchair_description TEXT
    
);
"""

# Connect and Execute
print("Connecting to database to create new table 'doctors'...")
try:
    with engine.connect() as connection:
        # Execute the SQL statement
        connection.execute(text(create_table_sql))
        
        # Commit the transaction
        connection.commit()
        
    print("✅ Table 'berlin_source_data.doctors' created successfully.")

except Exception as e:
    print(f"❌ An error occurred during table creation: {e}")

Connecting to database to create new table 'doctors'...
✅ Table 'berlin_source_data.doctors' created successfully.


Preparing the data for upload

In [68]:
import pandas as pd
from pathlib import Path
import io

# Load the CSV file 
file_to_load = Path('../clean/doctors_clean_with_distr.csv') 

# Force pandas to read ALL ID columns and 'postcode' as strings (object)
df = pd.read_csv(
    file_to_load,
    dtype={
        'id': str, # OSM IDs are sometimes numbers, must be string
        'postcode': str,
        'district_id': str,
        'neighborhood_id': str
    }
)

# Define the correct column order

sql_column_order = [
    'id',
    'district_id',
    'neighborhood_id',
    'name',
    'street',
    'housenumber',
    'city',
    'postcode',
    'amenity',
    'speciality',
    'opening_hours',
    'website',
    'longitude',
    'latitude',
    'wheelchair',
    'description',
    'email',
    'toilets_wheelchair',
    'wheelchair_description'
]

# Create the final DataFrame for upload. This re-orders the columns to match the SQL table perfectly
df_for_upload = df[sql_column_order]

# Check the result
print("DataFrame 'doctors' has been prepared for upload.")
print("Column order now matches the SQL database schema:")
df_for_upload.info()

DataFrame 'doctors' has been prepared for upload.
Column order now matches the SQL database schema:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1615 entries, 0 to 1614
Data columns (total 19 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   id                      1615 non-null   object 
 1   district_id             1615 non-null   object 
 2   neighborhood_id         1615 non-null   object 
 3   name                    1615 non-null   object 
 4   street                  1611 non-null   object 
 5   housenumber             1307 non-null   object 
 6   city                    1615 non-null   object 
 7   postcode                1545 non-null   object 
 8   amenity                 1615 non-null   object 
 9   speciality              1442 non-null   object 
 10  opening_hours           1116 non-null   object 
 11  website                 904 non-null    object 
 12  longitude               1615 non-null   float6

Insert Data into Table

In [69]:
raw_conn = None
try:
    # Get a single, low-level connection from the engine
    raw_conn = engine.raw_connection()
    cursor = raw_conn.cursor()

    # Set the search path for this session
    cursor.execute("SET search_path TO berlin_source_data;")
    
    # Load data into the memory buffer 
    # We save to the buffer WITH a header (header=True)
    buffer = io.StringIO()
    df_for_upload.to_csv(buffer, index=False, header=True)
    buffer.seek(0) # Reset the buffer to the beginning

    # Define the COPY SQL
    copy_sql = """
        COPY doctors FROM STDIN WITH
            (FORMAT CSV, HEADER TRUE)
    """
    
    # Execute the COPY command
    cursor.copy_expert(sql=copy_sql, file=buffer)
    
    # Commit the entire transaction
    raw_conn.commit()
    print(f"✅ Transaction committed successfully. {len(df_for_upload)} rows were copied to 'doctors'.")

except Exception as e:
    print(f"❌ An error occurred: {e}")
    if raw_conn:
        raw_conn.rollback() # Roll back the transaction on error
finally:
    if raw_conn:
        raw_conn.close() # Always close the raw connection

✅ Transaction committed successfully. 1615 rows were copied to 'doctors'.


Adding the Foreign Key Constraint

In [70]:
# SQL statement to add the foreign key constraint
add_foreign_key_sql = """
ALTER TABLE berlin_source_data.doctors
ADD CONSTRAINT district_id_fk FOREIGN KEY (district_id)
REFERENCES berlin_source_data.districts(district_id)
ON DELETE RESTRICT
ON UPDATE CASCADE;
"""

# Connect using the engine and execute the SQL
try:
    with engine.connect() as connection:
        connection.execute(text(add_foreign_key_sql))
        connection.commit()
    print("✅ Foreign key constraint 'district_id_fk' added successfully.")
except Exception as e:
    print(f"An error occurred while adding the foreign key: {e}")

✅ Foreign key constraint 'district_id_fk' added successfully.


In [71]:
# Final check: read the first 5 rows from the new table
try:
    check_df = pd.read_sql("SELECT * FROM berlin_source_data.doctors LIMIT 5", engine)
    print(" Verification: First 5 rows from the database ")
    print(check_df)
except Exception as e:
    print(f"An error occurred during verification: {e}")

 Verification: First 5 rows from the database 
            id district_id neighborhood_id                              name  \
0   7823742547    11001001             101                  A-Arbeitsmedizin   
1  11108705050    11003003             301                 A. Dorosti Nadali   
2   4513645689    11002002             201                AID Friedrichshain   
3   7844240263    11004004             402  AL Urologie am Rüdesheimer Platz   
4   1930491527    11009009             907                               ARZ   

                   street housenumber    city postcode   amenity  \
0     Schwartzkopffstraße          15  Berlin    10115  practice   
1           Dunckerstraße          17  Berlin    10437  practice   
2       Frankfurter Allee         100  Berlin     None  practice   
3        Homburger Straße          16  Berlin    14197  practice   
4  Albert-Einstein-Straße           4  Berlin    12489  practice   

     speciality                                      opening_ho

Now we need to load the generated data from healthcare_features.csv into the berlin_labels.district_features table in the database.

In [72]:
# Parameters

CSV_PATH = '../clean/healthcare_features.csv'
TARGET_TABLE = 'berlin_labels.district_features'
TEMP_TABLE = 'temp_healthcare_scores' # Name for the temporary staging table
SCHEMA_NAME = 'berlin_labels'

# Load and Prepare Data 
try:
    df_scores = pd.read_csv(CSV_PATH)
except FileNotFoundError:
    print(f"Error: File not found at path {CSV_PATH}")
    # Consider raising an exception or exiting gracefully
    raise FileNotFoundError(f"Could not find the scores file at {CSV_PATH}")

df_scores['district_id'] = df_scores['district_id'].astype(str)

# Select only the necessary columns for the update operation
UPDATE_COLS = [
    'total_primary_adult_score', 
    'total_pediatric_score', 
    'specialist_score_total'
]
df_update = df_scores[['district_id'] + UPDATE_COLS].copy()

print(f"Update data loaded. Rows: {len(df_update)}")
print(f"Columns to be updated: {df_update.columns.tolist()}")

# Ensure the scores columns are correctly typed (float/decimal) before loading
for col in UPDATE_COLS:
    df_update[col] = pd.to_numeric(df_update[col], errors='coerce')


# Check and Add Columns (ALTER TABLE)
# This step ensures the target columns exist in the district_features table.
with engine.connect() as connection:
    # Set the required role for administrative changes (ALTER TABLE)
    connection.execute(text("SET ROLE data_team;")) 
    print("Role set to data_team.")
    
    for col in UPDATE_COLS:
        try:
            # Use DECIMAL for score columns (to match SQL best practice for precise numeric values)
            alter_query = text(f"""
                ALTER TABLE {TARGET_TABLE}
                ADD COLUMN IF NOT EXISTS "{col}" DECIMAL;
            """)
            connection.execute(alter_query)
            print(f"Checked/added column: {col}")
        except Exception as e:
            print(f"Error during ALTER TABLE for {col}: {e}")
            raise # Re-raise the exception to stop the process if critical
            
    # Reset the role after administrative changes
    connection.execute(text("RESET ROLE;"))
    print("Role reset.")
    
    connection.commit()

# Load Data into a Temporary Staging Table. We load the dataframe into a temporary table in the same schema.

df_update.to_sql(
    name=TEMP_TABLE, 
    con=engine, 
    if_exists='replace', 
    index=False,
    schema=SCHEMA_NAME
)
print(f"Data successfully loaded into staging table {SCHEMA_NAME}.{TEMP_TABLE}")

grant_query = text(f"""
    GRANT SELECT ON TABLE {SCHEMA_NAME}.{TEMP_TABLE} TO data_team;
""")

with engine.connect() as grant_connection:
    grant_connection.execute(grant_query)
    grant_connection.commit()
    print(f"Granted SELECT permission on {SCHEMA_NAME}.{TEMP_TABLE} to data_team.")

# Execute Batch UPDATE (UPSERT) and Cleanup
# We use one efficient SQL query to UPDATE the target table based on the data in the staging table, matching by district_id.
update_query = text(f"""
    UPDATE {TARGET_TABLE} AS t
    SET
        total_primary_adult_score = s.total_primary_adult_score,
        total_pediatric_score = s.total_pediatric_score,
        specialist_score_total = s.specialist_score_total
    FROM
        {SCHEMA_NAME}.{TEMP_TABLE} AS s
    WHERE
        t.district_id = s.district_id;
""")

drop_query = text(f"DROP TABLE {SCHEMA_NAME}.{TEMP_TABLE};")

with engine.connect() as connection:
    # Set the role again for the UPDATE and DROP operations
    connection.execute(text("SET ROLE data_team;"))
    print("Role set to data_team for update/cleanup.")
    
    # Execute the UPDATE
    result = connection.execute(update_query)
    print(f"Number of rows updated: {result.rowcount}")

    # Reset the role after changes
    connection.execute(text("RESET ROLE;"))
    print("Role reset.")
    
    # Drop the temporary staging table
    connection.execute(drop_query)
    
    connection.commit()
    
print("Update operation complete. Staging table dropped.")

Update data loaded. Rows: 12
Columns to be updated: ['district_id', 'total_primary_adult_score', 'total_pediatric_score', 'specialist_score_total']
Role set to data_team.
Checked/added column: total_primary_adult_score
Checked/added column: total_pediatric_score
Checked/added column: specialist_score_total
Role reset.
Data successfully loaded into staging table berlin_labels.temp_healthcare_scores
Granted SELECT permission on berlin_labels.temp_healthcare_scores to data_team.
Role set to data_team for update/cleanup.
Number of rows updated: 12
Role reset.
Update operation complete. Staging table dropped.


Final Validation via SQL

This section executes several SQL queries directly against the database to perform final validation on the newly loaded post_offices_test table. These checks verify the total row count, ensure all coordinates fall within the expected geographical boundaries, and confirm the referential integrity of the district_id by comparing the IDs in the main table against the reference districts table.

In [73]:
# Query 1: Total row count
query1 = "SELECT COUNT(*) AS total_rows FROM berlin_source_data.doctors;"
df1 = pd.read_sql(query1, engine)
print("Total row count in 'doctors'")
print(df1)

Total row count in 'doctors'
   total_rows
0        1615


In [74]:
# Query 2: Count of locations with coordinates outside Berlin
query2 = """
    SELECT COUNT(*) AS outliers
    FROM berlin_source_data.doctors
    WHERE NOT (latitude BETWEEN 52.3 AND 52.7 AND longitude BETWEEN 13.0 AND 13.8);
"""
df2 = pd.read_sql(query2, engine)
print("\nCount of locations outside Berlin's bounding box")
print(df2)


Count of locations outside Berlin's bounding box
   outliers
0         0


In [75]:
# Query 3: Distinct district_id's in the doctors table
query3 = "SELECT DISTINCT district_id FROM berlin_source_data.doctors ORDER BY 1;"
df3 = pd.read_sql(query3, engine)
print("\n Distinct district_id's found in the doctors table")
print(df3)


 Distinct district_id's found in the doctors table
   district_id
0     11001001
1     11002002
2     11003003
3     11004004
4     11005005
5     11006006
6     11007007
7     11008008
8     11009009
9     11010010
10    11011011
11    11012012


In [76]:
# Query 4: Distinct district_id's in the districts lookup table
query4 = "SELECT DISTINCT district_id FROM berlin_source_data.districts ORDER BY 1;"
df4 = pd.read_sql(query4, engine)
print("\n Distinct district_id's from the reference 'districts' table")
print(df4)


 Distinct district_id's from the reference 'districts' table
   district_id
0     11001001
1     11002002
2     11003003
3     11004004
4     11005005
5     11006006
6     11007007
7     11008008
8     11009009
9     11010010
10    11011011
11    11012012


In [77]:
# Query 5:  Primary key uniqueness
query5 = "SELECT COUNT (DISTINCT id) FROM berlin_source_data.doctors;"
df5 = pd.read_sql(query5, engine)
print("\n Number of distinct id in the doctors table")
print(df5)


 Number of distinct id in the doctors table
   count
0   1615


In [78]:
# Query 6: Check for doctors that have a district_id with no match in the districts table.
query6 = "SELECT doс.id, doс.district_id FROM berlin_source_data.doctors doс LEFT JOIN berlin_source_data.districts d ON doс.district_id = d.district_id WHERE d.district_id IS NULL; "
df6 = pd.read_sql(query6, engine)
print("\n Doctors with an invalid district_id (no match in the districts table)")
print(df6)


 Doctors with an invalid district_id (no match in the districts table)
Empty DataFrame
Columns: [id, district_id]
Index: []


In [79]:
# Reload df_update to get staging_rows_count (the expected number of updates)
try:
    df_scores = pd.read_csv(CSV_PATH)
except NameError:
    # If parameters haven't been defined, we must stop.
    print("ERROR: Essential parameters (CSV_PATH, etc.) are undefined. Cannot proceed.")
    raise
except FileNotFoundError:
    print(f"Error: File not found at path {CSV_PATH}")
    raise

df_update = df_scores.copy() # Used only for getting the expected count
staging_rows_count = len(df_update)

# POST-LOAD VERIFICATION 

with engine.connect() as connection:
    
    # Set the role for reading
    connection.execute(text("SET ROLE data_team;")) 
    
    print("\n Starting Post-Load Verification Checks ")
    
    # Query 1: Get the count of rows in the target table that successfully received data
    check_rows_query = text(f"""
        SELECT COUNT(district_id) 
        FROM {TARGET_TABLE} 
        WHERE total_primary_adult_score IS NOT NULL;
    """)
    
    # Execute and retrieve the count of rows that were populated
    rows_with_data = connection.execute(check_rows_query).scalar()
    
    # 1. Comparison: Check if the number of rows populated matches the number of rows expected from the staging data
    if rows_with_data == staging_rows_count:
        print(f"PASS: Rows with new data ({rows_with_data}) matches expected staging row count ({staging_rows_count}).")
    else:
        print(f"CRITICAL WARNING: Rows with new data ({rows_with_data}) DOES NOT match expected row count ({staging_rows_count}). Missing or incomplete updates detected.")

    
    # Query 2: Check for NULLs in the newly added score columns where data was expected.
    check_nulls_query = text(f"""
        SELECT COUNT(district_id)
        FROM {TARGET_TABLE}
        WHERE 
            total_primary_adult_score IS NULL 
            OR total_pediatric_score IS NULL
            OR specialist_score_total IS NULL;
    """)

    null_count = connection.execute(check_nulls_query).scalar()

    # 2. Comparison: Check for data integrity (no unexpected NULLs)
    if null_count == 0:
        print("PASS: Data integrity check passed. No unexpected NULLs found in new score columns.")
    else:
        print(f"CRITICAL ERROR: Data integrity check FAILED. Found {null_count} rows with NULL scores. Investigate the UPDATE operation.")

    
    # Reset the role after checking
    connection.execute(text("RESET ROLE;"))
    
    connection.commit()


 Starting Post-Load Verification Checks 
PASS: Rows with new data (12) matches expected staging row count (12).
PASS: Data integrity check passed. No unexpected NULLs found in new score columns.


In [80]:
# Define the SQL query to get the table schema
query_schema = """
SELECT
    column_name,
    data_type,
    is_nullable
FROM
    information_schema.columns
WHERE
    table_schema = 'berlin_source_data' AND table_name = 'doctors';
"""

# Execute the query and print the result
try:
    print("\nChecking the schema of the 'doctors' table")
    
    # Execute the query and load the result into a DataFrame
    df_schema = pd.read_sql(query_schema, engine)
    
    # Print the resulting schema information
    print(df_schema.to_string())

except Exception as e:
    print(f"\n❌ An error occurred while executing the query: {e}")


Checking the schema of the 'doctors' table
               column_name          data_type is_nullable
0                       id  character varying          NO
1              district_id  character varying          NO
2          neighborhood_id  character varying          NO
3                     name  character varying         YES
4                   street  character varying         YES
5              housenumber  character varying         YES
6                     city  character varying         YES
7                 postcode  character varying         YES
8                  amenity  character varying         YES
9               speciality               text         YES
10           opening_hours  character varying         YES
11                 website  character varying         YES
12               longitude            numeric          NO
13                latitude            numeric          NO
14              wheelchair  character varying         YES
15             description  