# Exploration der Dataset
In diesem Notebook werden die ersten Schritte im Data Wrangling gemacht, das explorieren. 
Da die Datasets aber sehr gross sind, wird DuckDB genutzt, um die Daten erstmal in Memory zu persistieren, 
damit Stück für Stück die Daten eingelesen werden können.

In [None]:
# Imports
import duckdb
import pandas as pd
from sodapy import Socrata
import json 
import tempfile
import re

## Einlesen der Daten in DuckDB

**DuckDB** ist eine in-memory Datenbank, welche uns ermöglicht einen sehr grossen Datensatz einzulesen und diesen Stückweise
im Code zu verwenden.

In [None]:
# Einlesen der CSV-Datei und Konvertierung in Parquet
con = duckdb.connect("mydb.duckdb")
data_sets = [
     "data/medianAskingRent_All.csv",
]

selected_columns = [
    "unique_key",
    "created_date",
    "closed_date",
    "agency",
    "agency_name",
    "complaint_type",
    "descriptor",
    "location_type",
    "incident_zip",
    "incident_address",
    "street_name",
    "borough",
    "city",
    "cross_street_1",
    "cross_street_2",
    "intersection_street_1",
    "intersection_street_2",
    "address_type",
    "landmark",
    "facility_type",
    "status",
    "due_date",
    "resolution_description",
    "park_borough",
    "park_facility_name",
    "resolution_action_updated_date",
    "community_board",
    "bbl",
    "open_data_channel_type",
]

def generate_quarters(start_year, end_year):
    quarters = []
    for year in range(start_year, end_year + 1):
        quarters.append((f"{year}-01-01T00:00:00", f"{year}-03-31T23:59:59"))
        quarters.append((f"{year}-04-01T00:00:00", f"{year}-06-30T23:59:59"))
        quarters.append((f"{year}-07-01T00:00:00", f"{year}-09-30T23:59:59"))
        quarters.append((f"{year}-10-01T00:00:00", f"{year}-12-31T23:59:59"))
    return quarters

for csv_file in data_sets:
    # Filename für Parquet bestimmen und Tabelle benennen 
    parquet_file = csv_file.replace(".csv", ".parquet")
    table_name = csv_file.split("/")[-1].replace(".csv", "").replace("-", "_")
    # Tabellenname darf nicht mit Zahl beginnen
    if re.match(r'^\d', table_name):
        table_name = "t_" + table_name

    print(f"Tablename: {table_name}")
    try:
        con.execute(f"""
            COPY (SELECT * FROM read_csv_auto('{csv_file}'))
            TO '{parquet_file}' (FORMAT 'parquet');
        """)
        print(f"Conversion successfull {csv_file} → {parquet_file}")
        print("--------------------------------")
        con.execute(f"""
            CREATE OR REPLACE TABLE {table_name} AS
            SELECT * FROM read_parquet('{parquet_file}');
        """)
        print(f"{csv_file} Table successfully converted!")
        print("--------------------------------")
        print("--------------------------------")
    except Exception as e:
        print("❌ Failed to convert:", e)

print("CSV successfully converted to parquet and Table created!")

# Generates quarters from 2021 to 2025
quarters = generate_quarters(2024, 2025)

# Fetch data from Socrata API and create table
client = Socrata("data.cityofnewyork.us",
                  "Upkc825Y13IyAEMlWSq4kL2dz",
                  username="roberto.fazekas.priv@gmail.com",
                  password="jugGom-kypcom-pytsu3",
                  timeout=120)


BATCH_SIZE = 50_000
table_created = False
table_cols = None  # merken, welche Spalten die Tabelle hat

# --- Quartale laden ---
for start, end in quarters:
    print(f"\n📥 Lade Daten von {start} bis {end}")

    offset = 0
    total_rows = 0
    batch_idx = 0

    while True:
        results = client.get(
            "erm2-nwe9",
            select= ",".join(selected_columns),
            where=f"created_date between '{start}' and '{end}'",
            limit=BATCH_SIZE,
            offset=offset,
        )

        if not results:
            break
        """
        # In DataFrame wandeln (nur für Typ-Cleaning)
        df = pd.DataFrame.from_records(results)

        # alle object-Spalten in String casten (gegen STRUCT-Mischtypen)
        for col in df.columns:
            if df[col].dtype == "object":
                df[col] = df[col].astype(str)

        # wieder als JSON-String speichern
        json_str = df.to_json(orient="records")
        """
        json_str = json.dumps(results)


        # Tmp-Datei nutzen (automatisch gelöscht, wenn geschlossen)
        with tempfile.NamedTemporaryFile(suffix=".json") as tmp:
            tmp.write(json_str.encode("utf-8"))
            tmp.flush()  # sicherstellen, dass geschrieben wurde

            # Tabelle beim ersten Batch erzeugen
            if not table_created:
                con.execute(f"""
                    CREATE OR REPLACE TABLE calls_311 AS 
                    SELECT * FROM read_json_auto('{tmp.name}') 
                """)
                table_created = True
                print("🆕 Tabelle calls_311 erstellt (Schema vom ersten Batch)")

            # Insert direkt aus JSON-Tempfile
            con.execute(f"""
                INSERT INTO calls_311 
                SELECT * FROM read_json_auto('{tmp.name}')
            """)

        rows = len(results)
        total_rows += rows
        print(f"   ➡️ Batch {batch_idx} mit {rows} Zeilen gespeichert (Offset={offset})")

        offset += BATCH_SIZE
        batch_idx += 1

    print(f"✅ Quartal {start[:7]} bis {end[:7]} geladen, insgesamt {total_rows} Zeilen.")

con.close()

In [None]:
# Aufbau der Verbindung zur DuckDB-Datenbank
con = duckdb.connect("mydb.duckdb")

df_affordable = con.execute("""
    SELECT *
    FROM medianAskingRent_All
    LIMIT 100
""").fetchdf()

res = con.execute("SELECT COUNT(*) FROM calls_311").fetchone()[0]
print("Anzahl Zeilen:", res)