# Aufgabe 1

## 1 - API / Data-Collection

In [7]:
import requests
import pandas as pd
import uuid
from datetime import datetime

class ThurgauAPIClient:
    """Client to fetch and clean Open Government Data from Thurgau."""

    def __init__(self, base_url: str = "https://data.tg.ch/api/records/1.0/search/"):
        self.base_url = base_url

    def fetch_data(self, dataset: str, max_records: int = 1000) -> pd.DataFrame:
        """Fetches data from the Thurgau - Open Government Data API.

        Args:
            dataset (str): Unique dataset ID from the webpage's API description.
            max_records (int, optional): Max rows to fetch. Defaults to 1000.

        Returns:
            pd.DataFrame: Table with requested data.
        """
        
        params = {"dataset": dataset, "rows": max_records}
        response = requests.get(self.base_url, params=params)
        response.raise_for_status()
        data = response.json()
        records = [r["fields"] for r in data.get("records", [])]
        df = pd.DataFrame(records)
        print(f"Es wurden {len(df)} Datensätze vom Dataset '{dataset}' geladen.")
        return df

class DataCleaner:
    """Utility class for cleaning and enriching datasets with additional columns."""

    @staticmethod
    def add_uuid(df: pd.DataFrame, uuid_column: str = "uuid") -> pd.DataFrame:
        """Add a unique UUID to each row."""
        df[uuid_column] = [str(uuid.uuid4()) for _ in range(len(df))]
        return df

    @staticmethod
    def add_timestamp(df: pd.DataFrame, ts_column: str = "loaded_at") -> pd.DataFrame:
        """Add a timestamp to each row."""
        df[ts_column] = datetime.now().strftime("%Y-%m-%d")
        return df


# Execution
client = ThurgauAPIClient()
df_c02 = client.fetch_data(dataset="div-energie-8")

cleaner = DataCleaner()
df_c02 = cleaner.add_uuid(df_c02)
df_c02 = cleaner.add_timestamp(df_c02)

df_c02


Es wurden 720 Datensätze vom Dataset 'div-energie-8' geladen.


Unnamed: 0,bfs_nr_gemeinde,erdoelbrennstoffe,gemeinde_name,jahr,energiebezugsflaeche,total,einwohner,erdgas,andere,uuid,loaded_at
0,4881,1686.933,Amlikon-Bissegg,2015,113791,1686.933,1320,,,abc4371e-1ff9-4dd4-ba37-989209be13b2,2025-08-02
1,4921,2217.241,Bussnang,2015,187360,3151.487,2262,932.375,1.871,1937b666-02c2-4345-8899-7739451601ff,2025-08-02
2,4751,2836.259,Rickenbach (TG),2015,227233,4585.831,2766,1745.491,4.081,69ff1b5f-5350-41c1-9dc8-0ef03bfee77e,2025-08-02
3,4756,867.692,Schönholzerswilen,2015,72638,867.692,804,,,5ff1770b-d51d-4470-a423-ca37861ca445,2025-08-02
4,4696,3593.640,Tägerwilen,2015,348780,6037.877,4377,2444.237,,05823851-cbfa-4f0b-9c71-9148417aa6c9,2025-08-02
...,...,...,...,...,...,...,...,...,...,...,...
715,4806,1092.829,Eschenz,2023,168717,2234.899,1893,1142.070,,d389d6bd-80b9-4fce-bbd8-5313c937aa52,2025-08-02
716,4724,2305.944,Eschlikon,2023,430312,5319.303,4864,3013.359,,73c23e37-6f33-44f3-b559-ec17981b2508,2025-08-02
717,4741,1227.117,Lommis,2023,103746,1227.117,1267,,,f6860aa6-b5d0-4ea9-86d2-deffdc022f5c,2025-08-02
718,4441,1192.134,Salmsach,2023,116236,1862.043,1578,669.910,,8bfae44d-8b09-44d2-a52c-0fc5b9295505,2025-08-02


## 2 - Analysis

In [5]:
import pandas as pd

class AnalyseC02Data:
    def __init__(self, df: pd.DataFrame):
        self.df = df.copy()
        self.tables = {}  # Speicherung der dfs

        # Data Cleaning
        self.df['andere'] = self.df['andere'].fillna(0)
        #Renaming
        self.df = self.df.rename(columns={"total": "c02_emissions"})

        # Analysen
        self._prepare_kpis_pro_jahr()
        self._prepare_lookerstudio_table()
        self._prepare_energiemix()
        self._prepare_gemeindeentwicklung()
        self._prepare_entwicklung_einwohner_vs_C02()

    def _prepare_kpis_pro_jahr(self):
        df = self.df.groupby('jahr').agg({
            'c02_emissions': 'sum',
            'einwohner': 'sum'
        }).reset_index()

        df['energy_per_inhabitant'] = df['c02_emissions'] / df['einwohner']
        df['growth_total_energy_pct'] = df['c02_emissions'].pct_change() * 100
        df['growth_energy_per_inhabitant_pct'] = df['energy_per_inhabitant'].pct_change() * 100

        # Cast datatypes
        df["jahr"] = pd.to_datetime(df["jahr"].astype(str) + "-01-01")

        self.tables['kpi_pro_jahr'] = df

    def _prepare_lookerstudio_table(self):
        df = self.df.copy()

        # Optional: Datum im Format "jahr" als echtes Datum (z. B. 01.01.2023)
        df["jahr"] = pd.to_datetime(df["jahr"].astype(str) + "-01-01")

        # Kein groupby — keine Aggregation!
        df = df[["jahr", "gemeinde_name", "einwohner", "c02_emissions", "energiebezugsflaeche", "loaded_at"]].copy()

        self.tables["lookerstudio_base"] = df


    def _prepare_energiemix(self):
        df = self.df.copy()
        # Summieren der Energiearten pro jahr
        df_grouped = df.groupby("jahr")[["erdoelbrennstoffe", "erdgas", "andere"]].sum().reset_index()

        # Umwandeln in Long-Format
        df = df_grouped.melt(
            id_vars="jahr",
            value_vars=["erdoelbrennstoffe", "erdgas", "andere"],
            var_name="energietraeger",
            value_name="menge"
        )

        df["anteil_prozent"] = df.groupby("jahr")["menge"].transform(lambda x: round(x / x.sum() * 100, 2))

        # Cast datatypes
        df["jahr"] = pd.to_datetime(df["jahr"].astype(str) + "-01-01")

        self.tables['energiemix_pro_jahr'] = df


    def _prepare_gemeindeentwicklung(self):
        df = self.df.copy()
        df['energy_per_inhabitant'] = df['c02_emissions'] / df['einwohner']

        df = df[['bfs_nr_gemeinde', 'gemeinde_name', 'jahr', 'c02_emissions', 'energy_per_inhabitant']]

        # Cast datatypes
        df["jahr"] = pd.to_datetime(df["jahr"].astype(str) + "-01-01")

        self.tables['gemeindeentwicklung'] = df


    def _prepare_entwicklung_einwohner_vs_C02(self):
        df = self.df.copy()
        df = df[["jahr", "gemeinde_name", "einwohner", "c02_emissions", "energiebezugsflaeche"]].copy()

        df["C02_pro_qm"] = df["c02_emissions"] / df["energiebezugsflaeche"] * 1000
        df["C02_pro_einwohner"] = df["c02_emissions"] / df["einwohner"] * 1000

        # Cast datatypes
        df["jahr"] = pd.to_datetime(df["jahr"].astype(str) + "-01-01")

        self.tables["einwohner_vs_c02"] = df


    def get_table(self, name: str) -> pd.DataFrame:
        return self.tables.get(name)

    def get_all_tables(self) -> dict:
        return self.tables

## 3 - BigQuery Upload

In [10]:
from google.oauth2 import service_account
from pandas_gbq import to_gbq
import os

class BigQueryUploader:
    def __init__(self, project_id: str, credentials_path: str):
        self.project_id = project_id
        self.credentials_path = credentials_path
        self.credentials = service_account.Credentials.from_service_account_file(credentials_path)

    def upload_tables(self, tables: dict, if_exists: str = "replace"):
        """
        tables: dict im Format {
            "name1": {
                "dataframe": df1,
                "dataset": "mein_dataset",
                "table": "meine_tabelle"
            },
            ...
        }
        """
        for name, config in tables.items():
            df = config["dataframe"]
            dataset = config["dataset"]
            table = config["table"]
            full_table_name = f"{dataset}.{table}"

            print(f"⬆️ Lade {name} hoch nach: {full_table_name}...")

            try:
                to_gbq(
                    dataframe=df,
                    destination_table=full_table_name,
                    project_id=self.project_id,
                    credentials=self.credentials,
                    if_exists=if_exists
                )
                print(f"✅ {name} erfolgreich hochgeladen.\n")
            except Exception as e:
                print(f"❌ Fehler beim Hochladen von {name}: {e}\n")

In [11]:
# Analyse und Uploadinfos vorbereiten
analyse = AnalyseC02Data(df_c02)
upload_dict = {}
for name, df in analyse.get_all_tables().items():
    upload_dict[name] = {
        "dataframe": df,
        "dataset": "energie_daten",
        "table": name  # Tabelle im BQ trägt denselben Namen wie die Analyse
    }

#Upload
uploader = BigQueryUploader(
    project_id="c02-tg",
    credentials_path=os.path.join(os.pardir, "secrets", "bigquery-service-account-c02-tg.json")
)
uploader.upload_tables(upload_dict)

⬆️ Lade kpi_pro_jahr hoch nach: energie_daten.kpi_pro_jahr...
✅ kpi_pro_jahr erfolgreich hochgeladen.

⬆️ Lade lookerstudio_base hoch nach: energie_daten.lookerstudio_base...
✅ lookerstudio_base erfolgreich hochgeladen.

⬆️ Lade energiemix_pro_jahr hoch nach: energie_daten.energiemix_pro_jahr...
✅ energiemix_pro_jahr erfolgreich hochgeladen.

⬆️ Lade gemeindeentwicklung hoch nach: energie_daten.gemeindeentwicklung...
✅ gemeindeentwicklung erfolgreich hochgeladen.

⬆️ Lade gemeindedetails hoch nach: energie_daten.gemeindedetails...
✅ gemeindedetails erfolgreich hochgeladen.

⬆️ Lade einwohner_vs_c02 hoch nach: energie_daten.einwohner_vs_c02...
✅ einwohner_vs_c02 erfolgreich hochgeladen.



# Aufgabe 2

Ist dein Docker-Image eine Web-App oder ein Batch-Job?

Wenn es ein Batch-Job ist (z.B. Python-Script, das einmal läuft und endet), dann ist Cloud Run nicht ideal, weil es für dauerhafte Dienste gedacht ist, die HTTP-Requests annehmen. Für geplante Batch-Jobs kannst du besser Cloud Scheduler + Cloud Run Jobs oder Cloud Functions verwenden.

In [None]:
## Docker

#Build image:
docker build -t europe-west1-docker.pkg.dev/c02-tg/docker-images/c02-automation:latest .

#Push image to Artifact Registry Repo
docker push europe-west1-docker.pkg.dev/c02-tg/docker-images/c02-automation:latest

#Update Cloud Run Job to latest image
gcloud run jobs update c02-automation-job --image=europe-west1-docker.pkg.dev/c02-tg/docker-images/c02-automation:latest --region=europe-west1

#Test Job (manuell)
gcloud run jobs execute c02-automation-job --region=europe-west1


In [None]:
## Cloud Run Job
path für cmd = C:\Users\Buddy\AppData\Local\Google\Cloud SDK

#Check next automated update
gcloud scheduler jobs describe c02-automation-job-schedule --location=europe-west1

#Change Time to automatically update
gcloud scheduler jobs update pubsub c02-automation-job-schedule --schedule="0 11 * * *" --location=europe-west1 --time-zone="Europe/Berlin" 

# Liste aller jobs
gcloud scheduler jobs list --location=europe-west1

# Log des letzten Jobs
gcloud logging read "resource.type=cloud_run_revision AND resource.labels.service_name=c02-automation-job" --limit=50 --order=desc

In [None]:
## Webinterface

# Cloud Scheduler Übersicht (Job-Zeitplan ansehen & ändern)
https://console.cloud.google.com/cloudscheduler?project=c02-tg&inv=1&invt=Ab4YwQ

# Cloud Run Jobs
https://console.cloud.google.com/run/jobs?project=c02-tg&inv=1&invt=Ab4YwQ

# Artifact Registry (Docker-Images)
https://console.cloud.google.com/artifacts/docker/c02-tg/europe-west1/docker-images?project=c02-tg&invt=Ab4YwQ&inv=1

# Logs der letzten Jobs
https://console.cloud.google.com/logs/query;cursorTimestamp=2025-08-02T08:30:06.787513805Z;duration=PT1H?project=c02-tg&inv=1&invt=Ab4YwQ

Zusammenfassung der IT-Infrastruktur & Begründung

- Codeentwicklung & Image	Lokale Entwicklungsumgebung + Docker	Einfaches Verpacken und Versionieren der Python-Anwendung, um eine portable Laufumgebung sicherzustellen.
- Container Registry	Google Artifact Registry	Sichere und zentral verwaltete Speicherung von Container-Images in der Google Cloud, optimiert für Cloud Run.
- Serverless Ausführung	Google Cloud Run (managed)	Automatische Skalierung, keine Serververwaltung, ideal für einmal täglich ausgeführte Jobs, da Kosten nur bei Nutzung anfallen.
- Job Scheduling	Google Cloud Scheduler	Verlässliche und flexible Steuerung der Ausführungszeit (Cronjobs) in der Cloud, native Integration mit Pub/Sub und Cloud Run.
- Kommunikation Job-Auslöser → Job	Google Pub/Sub (Topic)	Entkoppelte, asynchrone Nachrichtenvermittlung, damit der Scheduler Cloud Run Jobs triggert, ohne direkt Services zu koppeln.
- Datenbank / Storage	BigQuery	Schnelle, skalierbare Analyseplattform, ideal für große Datenmengen, die das Python-Script täglich aktualisiert.

Warum Cloud Run Job?
Cloud Run ist optimal für batch-artige Jobs, da keine dauerhaften Ressourcen benötigt werden, automatische Skalierung und einfache Integration in die Google Cloud Plattform vorhanden sind. Zudem laufen Jobs in Containern, was die Portabilität sicherstellt.



In [None]:
# Setup

Cloud Scheduler (mit Uhrzeit & Topic) 
         ↓
   Pub/Sub Topic (Nachricht wird gesendet)
         ↓
Cloud Function (hört zu & startet Cloud Run Job)
         ↓
Cloud Run Job (dein Docker-Image mit main.py läuft los)

URL Listener:
https://trigger-job-function-stb4zjlreq-ew.a.run.app


IndentationError: unindent does not match any outer indentation level (<tokenize>, line 5)