### Methods for setting the quality control attributes

Load data from database

In [5]:
import json

def load_config(config_path):
    try:
        with open(config_path, "r", encoding="utf-8") as f:
            return json.load(f)
    except Exception as e:
        print("Failed to load configuration file:", exception=e)
        return None

In [6]:
def fetch_data_qc(config, limit=100):
    import psycopg2
    import pandas as pd

    try:
        conn = psycopg2.connect(**config)
        cur = conn.cursor()

        query = """
        SELECT
            ds.dataset_identifier,
             dist.distribution_format,
             dist.distribution_download_url
        FROM merged_dataset_metadata ds
        LEFT JOIN merged_distribution_metadata dist
            ON ds.dataset_identifier = dist.dataset_identifier
        """

        if limit is not None:
            query += f" LIMIT {int(limit)}"

        cur.execute(query)
        rows = cur.fetchall()
        colnames = [desc[0] for desc in cur.description]
        df = pd.DataFrame(rows, columns=colnames)

        cur.close()
        conn.close()

        if not df.empty:
            print("Successfully loaded data out of the db")

        return df

    except Exception as e:
        print("Error loading dataset metadata with distribution formats", exception=e)
        return pd.DataFrame()


In [3]:
dbname = "4M_copy"
config = load_config(r"01_ETL\21_load\db_config.json")
config["dbname"] = dbname


Homogenis format names of the distribution

In [4]:
import pandas as pd

# Load lookup table
lookup_path = r"04_QC\formats_lockup_utf8.csv"
lookup_df = pd.read_csv(lookup_path)

# Clean up for stable matching
lookup_df["original_name"] = lookup_df["original_name"].str.strip()

# Load your database data as before
df = fetch_data_qc(config, limit=None)

# Clean and replace empty distribution_format values
df["distribution_format"] = df["distribution_format"].astype(str).str.strip()
df["distribution_format"] = df["distribution_format"].replace("", "no_information")
df["distribution_format"] = df["distribution_format"].fillna("no_information")

# Perform the merge with the lookup table
df_merged = df.merge(
    lookup_df,
    how="left",
    left_on="distribution_format",
    right_on="original_name"
)

# Drop the now redundant original_name column
df_merged = df_merged.drop(columns=["original_name"])


FileNotFoundError: [Errno 2] No such file or directory: 'C:\\FHNW_lokal\\6000\\4M\\04_QC\\formats_lockup_utf8.csv'

Count format names in the dataset

In [None]:
# Filter formats that are not "no_information"
df_valid_formats = df_merged[df_merged["distribution_format"] != "no_information"]

# Group and count the number of different formats per dataset
format_counts = (
    df_valid_formats.groupby("dataset_identifier")["distribution_format"]
    .nunique()
    .reset_index(name="format_count")
)

# Merge with original df_merged to also include datasets with 0 valid formats
df_with_format_count = df_merged.merge(format_counts, on="dataset_identifier", how="left")

# Replace missing count values with 0
df_with_format_count["format_count"] = df_with_format_count["format_count"].fillna(0).astype(int)

# Preview
print(df_with_format_count[["dataset_identifier", "distribution_format", "format_count"]].head())


                                  dataset_identifier  \
0    15f368b3-c660-4fcd-bec6-1413094d44bb@kanton-zug   
1  8aeaff60-3351-4730-af28-7ec9813d2689@amt-fuer-...   
2  8aeaff60-3351-4730-af28-7ec9813d2689@amt-fuer-...   
3  2834dae3-266c-4aa0-8220-1eaea7a351ed@amt-geoin...   
4                          100182@kanton-basel-stadt   

                                 distribution_format  format_count  
0  http://publications.europa.eu/resource/authori...             2  
1  http://publications.europa.eu/resource/authori...             3  
2  http://publications.europa.eu/resource/authori...             3  
3  http://publications.europa.eu/resource/authori...             3  
4  http://publications.europa.eu/resource/authori...            13  


Check the status of the distribution_download_url

In [None]:
import pandas as pd
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

MAX_WORKERS = 30
TIMEOUT = 5

# Extract unique, cleaned URLs only
unique_urls = df_merged["distribution_download_url"].dropna().astype(str).str.strip().unique()

# Function to check the status of a single URL
def check_url_status(url):
    if not url:
        return url, "empty"
    try:
        response = requests.head(url, allow_redirects=True, timeout=TIMEOUT)
        return url, response.status_code
    except requests.exceptions.RequestException:
        return url, "error"

# Temporarily store results
url_status_map = {}

with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    futures = {executor.submit(check_url_status, url): url for url in unique_urls}
    for future in tqdm(as_completed(futures), total=len(futures), desc="Checking unique URLs"):
        url, status = future.result()
        url_status_map[url] = status

# Add column: status code based on mapping
df_merged["download_url_status_code"] = df_merged["distribution_download_url"].astype(str).str.strip().map(url_status_map)


Checking unique URLs: 100%|██████████| 33100/33100 [2:54:59<00:00,  3.15it/s]   


In [None]:
# Convert status codes to string
df_merged["download_url_status_code"] = df_merged["download_url_status_code"].astype(str)

# Count the number of occurrences per status code
status_counts = (
    df_merged["download_url_status_code"]
    .value_counts()
    .reset_index()
    .rename(columns={"index": "download_url_status_code", "download_url_status_code": "count"})
)

# Extract one example URL per status code
example_urls = (
    df_merged
    .dropna(subset=["download_url_status_code", "distribution_download_url"])
    .drop_duplicates(subset=["download_url_status_code"])
    [["download_url_status_code", "distribution_download_url"]]
)

# Merge both into a summary table
status_overview = pd.merge(
    status_counts,
    example_urls,
    on="download_url_status_code",
    how="left"
).sort_values(by="count", ascending=False).reset_index(drop=True)

# Display result
print(status_overview)


   download_url_status_code  count  \
0                       nan  33920   
1                       200  32192   
2                       403   2379   
3                       405   2018   
4                       404   1411   
5                     error   1305   
6                       400    541   
7                       204    230   
8                       503    100   
9                       500     84   
10                      401     18   

                            distribution_download_url  
0                                                None  
1   https://data.bs.ch/api/v2/catalog/datasets/100...  
2   https://www.baselland.ch/politik-und-behorden/...  
3   https://wab.zug.ch/vote/ausbau-nationalstrasse...  
4   https://wab.zug.ch/vote/anderung-vom-26.-septe...  
5    https://mapplus01/mapplus/fribourg/?layers=11432  
6   https://geoportal.georhena.eu/geoserver/transp...  
7             https://data.zg.ch/store/1/resource/682  
8   https://geo.ur.ch/sec-webmercator/w

In [None]:
def fetch_data_qc(config, limit=100):
    import psycopg2
    import pandas as pd

    try:
        conn = psycopg2.connect(**config)
        cur = conn.cursor()

        query = """
        SELECT
            ds.dataset_identifier,
             dist.distribution_format,
             dist.distribution_download_url
        FROM merged_dataset_metadata ds
        LEFT JOIN merged_distribution_metadata dist
            ON ds.dataset_identifier = dist.dataset_identifier
        """

        if limit is not None:
            query += f" LIMIT {int(limit)}"

        cur.execute(query)
        rows = cur.fetchall()
        colnames = [desc[0] for desc in cur.description]
        df = pd.DataFrame(rows, columns=colnames)

        cur.close()
        conn.close()

        if not df.empty:
            print("Successfully loaded data out of the db")

        return df

    except Exception as e:
        print("Error loading dataset metadata with distribution formats", exception=e)
        return pd.DataFrame()


### Count keywords

In [None]:
"""SELECT
    dataset_identifier,
    dataset_keyword_DE,
    CASE
        WHEN dataset_keyword_DE IS NOT NULL AND dataset_keyword_DE <> '' THEN
            array_length(string_to_array(trim(both '{}' from dataset_keyword_DE), ','), 1)
        ELSE 0
    END AS keyword_count
FROM merged_dataset_metadata;
"""

"SELECT\n    dataset_identifier,\n    dataset_keyword_DE,\n    CASE\n        WHEN dataset_keyword_DE IS NOT NULL AND dataset_keyword_DE <> '' THEN\n            array_length(string_to_array(trim(both '{}' from dataset_keyword_DE), ','), 1)\n        ELSE 0\n    END AS keyword_count\nFROM merged_dataset_metadata;\n"

### Anzahl Zeichen in der Beschreibung

In [None]:
"""SELECT
    dataset_identifier,
    dataset_description_de,
    char_length(dataset_description_de) AS character_count
FROM merged_dataset_metadata
WHERE dataset_description_de IS NOT NULL
LIMIT 10;
"""

'SELECT\n    dataset_identifier,\n    dataset_description_de,\n    char_length(dataset_description_de) AS character_count\nFROM merged_dataset_metadata\nWHERE dataset_description_de IS NOT NULL\nLIMIT 10;\n'