# 📊 Datenimport und -export Tutorial

**Python Grundkurs für Bystronic-Entwickler - Modul 06**

---

## 🎯 Lernziele

In diesem Tutorial lernen Sie:
- CSV-Dateien mit komplexen Strukturen zu importieren
- Excel-Dateien mit mehreren Arbeitsblättern zu verarbeiten
- JSON-Daten aus Dateien und APIs zu laden
- Daten zu bereinigen und zu transformieren
- Ergebnisse in verschiedenen Formaten zu exportieren

## 📋 Inhalt
1. [CSV-Import: Bystronic Maschinendaten](#csv)
2. [Excel-Verarbeitung](#excel) 
3. [JSON und APIs](#json)
4. [Datenbereinigung](#cleaning)
5. [Export und Speicherung](#export)
6. [Praxis-Pipeline](#pipeline)

In [1]:
# Importiere notwendige Bibliotheken
import json
import warnings
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns

warnings.filterwarnings("ignore")

# Konfiguration für bessere Darstellung
pd.set_option("display.max_columns", 20)
pd.set_option("display.max_rows", 10)
plt.style.use("default")
sns.set_palette("husl")

print("✅ Alle Bibliotheken erfolgreich importiert!")
print(f"📊 Pandas Version: {pd.__version__}")
print(f"🔢 NumPy Version: {np.__version__}")

✅ Alle Bibliotheken erfolgreich importiert!
📊 Pandas Version: 2.3.2
🔢 NumPy Version: 2.3.2


---

## 1. CSV-Import: Bystronic Maschinendaten {#csv}

### 🔍 Problem: Komplexe CSV-Struktur

Die V084_Scope.csv Datei hat eine spezielle Struktur:
- **Zeilen 1-6**: Metadaten (Name, Datei, Zeitstempel)
- **Zeile 7**: Spaltennamen (nach "Name" Tag)
- **Zeilen 8-21**: Weitere Metadaten (Kommentare, Datentypen, etc.)
- **Ab Zeile 22**: Echte Messdaten
- **Trennzeichen**: Tab (`\t`)

### 📝 Schritt 1: Datei analysieren

In [2]:
# Pfad zur Bystronic CSV-Datei
csv_file = "../../data/large/V084_Scope.csv"


# Erste Zeilen der Datei anschauen
def analyze_csv_structure(file_path, n_lines=25):
    """
    Analysiert die Struktur einer CSV-Datei
    """
    print(f"🔍 Analyse der Datei: {file_path}")
    print("=" * 80)

    with open(file_path, encoding="utf-8", errors="ignore") as f:
        for i, line in enumerate(f):
            if i >= n_lines:
                break
            # Zeige Zeilennummer und Inhalt (begrenzt)
            content = (
                line.strip()[:100] + "..." if len(line.strip()) > 100 else line.strip()
            )
            print(f"{i + 1:2d}: {content}")

    print("\n" + "=" * 80)


# Analysiere die Struktur
analyze_csv_structure(csv_file)

🔍 Analyse der Datei: ../../data/large/V084_Scope.csv
 1: Name	Distance Control
 2: File	O:\Messungen\NDC\MP4_T001\V084_Scope.csv
 3: Starttime of export	133964386997165000	Dienstag, 8. Juli 2025	09:58:19.716
 4: Endtime of export	133964388292505000	Dienstag, 8. Juli 2025	10:00:29.250
 5: 
 6: 
 7: Name	DISTCTRL kp	Name	DISTCTRL d_tv	Name	DISTCTRL a_max	Name	DISTCTRL a_max_int	Name	DISTCTRL v_max	...
 8: SymbolComment		SymbolComment		SymbolComment		SymbolComment		SymbolComment		SymbolComment		SymbolComm...
 9: Data-Type	REAL64	Data-Type	REAL64	Data-Type	INT32	Data-Type	REAL64	Data-Type	INT32	Data-Type	REAL64	...
10: SampleTime[ms]	1	SampleTime[ms]	1	SampleTime[ms]	1	SampleTime[ms]	1	SampleTime[ms]	1	SampleTime[ms]	...
11: VariableSize	8	VariableSize	8	VariableSize	4	VariableSize	8	VariableSize	4	VariableSize	8	VariableSi...
12: SymbolBased	False	SymbolBased	False	SymbolBased	False	SymbolBased	False	SymbolBased	False	SymbolBase...
13: IndexGroup	1180416	IndexGroup	1180416	IndexGroup	1180

### 📝 Schritt 2: Intelligenter CSV-Parser

Wir erstellen einen Parser, der:
- Automatisch die Header-Zeile findet
- Den Datenbereich identifiziert
- Flexible Trennzeichen unterstützt

In [None]:
def parse_bystronic_csv(
    file_path, header_line=None, data_start_line=None, delimiter="\t", encoding="utf-8"
):
    """
    Flexibler Parser für Bystronic CSV-Dateien

    Parameters:
    -----------
    file_path : str
        Pfad zur CSV-Datei
    header_line : int, optional
        Zeilennummer für Header (1-basiert, z.B. 7). Automatisch erkannt wenn None.
    data_start_line : int, optional
        Zeilennummer für Datenbeginn (1-basiert). Automatisch erkannt wenn None.
    delimiter : str
        Trennzeichen (Standard: Tab)
    encoding : str
        Encoding (Standard: UTF-8)

    Returns:
    --------
    dict: {
        'metadata': dict,
        'data': pd.DataFrame,
        'info': dict
    }
    """
    print(f"📂 Lade Datei: {file_path}")

    # Schritt 1: Datei einlesen
    with open(file_path, encoding=encoding, errors="ignore") as f:
        lines = f.readlines()

    # Schritt 2: Metadaten extrahieren (erste 6 Zeilen)
    metadata = {}
    for i in range(min(6, len(lines))):
        parts = lines[i].strip().split(delimiter, 1)
        if len(parts) == 2:
            key, value = parts
            metadata[key] = value

    # Schritt 3: Header-Zeile bestimmen
    columns = []
    if header_line is not None:
        # Benutzer hat Header-Zeile explizit angegeben
        header_row_index = header_line - 1  # Konvertiere zu 0-basiert
        if header_row_index < len(lines):
            header_content = lines[header_row_index].strip()
            print(f"📋 Verwende Header-Zeile {header_line}: {header_content[:100]}...")

            parts = header_content.split(delimiter)
            # Extrahiere Spaltennamen (jede zweite Spalte nach "Name")
            for j in range(1, len(parts), 2):
                if j < len(parts) and parts[j].strip():
                    columns.append(parts[j].strip())
        else:
            print(
                f"⚠️ Header-Zeile {header_line} existiert nicht (nur {len(lines)} Zeilen)"
            )
    else:
        # Automatische Header-Erkennung
        for i, line in enumerate(lines):
            if line.strip().startswith("Name" + delimiter):
                header_row_index = i
                header_line = i + 1
                parts = line.strip().split(delimiter)
                # Extrahiere Spaltennamen
                for j in range(1, len(parts), 2):
                    if j < len(parts) and parts[j].strip():
                        columns.append(parts[j].strip())
                print(f"📋 Header automatisch erkannt in Zeile {header_line}")
                break

    # Schritt 4: Datenbereich bestimmen
    if data_start_line is not None:
        # Benutzer hat Datenstart explizit angegeben
        data_start_row_index = data_start_line - 1
        print(f"📊 Verwende Datenstart-Zeile {data_start_line}")
    else:
        # Automatische Datenbereich-Erkennung
        data_start_row_index = None
        for i in range(len(lines)):
            line = lines[i].strip()
            if line and not any(
                keyword in line
                for keyword in [
                    "Name",
                    "Data-Type",
                    "SampleTime",
                    "SymbolComment",
                    "VariableSize",
                    "SymbolBased",
                    "IndexGroup",
                ]
            ):
                # Prüfe ob Zeile hauptsächlich aus Zahlen besteht
                parts = line.split(delimiter)
                numeric_count = 0
                for part in parts[: min(10, len(parts))]:  # Prüfe erste 10 Spalten
                    try:
                        float(part)
                        numeric_count += 1
                    except ValueError:
                        pass

                if (
                    numeric_count >= min(len(parts), 10) * 0.7
                ):  # 70% der Werte sind Zahlen
                    data_start_row_index = i
                    data_start_line = i + 1
                    print(
                        f"📊 Datenbereich automatisch erkannt ab Zeile {data_start_line}"
                    )
                    break

    print(f"🔢 Anzahl Spalten gefunden: {len(columns)}")
    if columns:
        print(
            f"📝 Beispiel-Spalten: {', '.join(columns[:5])}{'...' if len(columns) > 5 else ''}"
        )

    # Schritt 5: DataFrame erstellen
    df = pd.DataFrame()

    if data_start_row_index is not None and columns:
        try:
            # Datenzeilen extrahieren
            data_lines = lines[data_start_row_index:]

            parsed_data = []
            for line_num, line in enumerate(data_lines):
                parts = line.strip().split(delimiter)
                if len(parts) >= len(columns):
                    parsed_data.append(parts[: len(columns)])

                # Begrenzte Anzahl für Demo (kann entfernt werden für vollständigen Import)
                if len(parsed_data) >= 1000:  # Lade max. 1000 Zeilen für Demo
                    print("ℹ️ Demo-Modus: Nur erste 1000 Zeilen geladen")
                    break

            if parsed_data:
                df = pd.DataFrame(parsed_data, columns=columns)

                # Datentyp-Konvertierung
                print("🔄 Konvertiere Datentypen...")
                for col in df.columns:
                    try:
                        # Versuche numerische Konvertierung
                        df[col] = pd.to_numeric(df[col], errors="coerce")
                    except:
                        # Behalte als String bei Fehlern
                        pass

                print(
                    f"✅ DataFrame erstellt: {df.shape[0]:,} Zeilen × {df.shape[1]} Spalten"
                )
            else:
                print("⚠️ Keine gültigen Datenzeilen gefunden")

        except Exception as e:
            print(f"❌ Fehler beim DataFrame-Erstellen: {e}")

    return {
        "metadata": metadata,
        "data": df,
        "info": {
            "header_line": header_line,
            "data_start_line": data_start_line,
            "total_lines": len(lines),
            "columns": columns,
            "parsing_success": not df.empty,
        },
    }


# Parser testen - Benutzer kann Header-Zeile explizit angeben
print("🔧 Test 1: Automatische Erkennung")
result_auto = parse_bystronic_csv(csv_file)

print("\n🔧 Test 2: Header-Zeile 7 explizit angeben")
result_manual = parse_bystronic_csv(csv_file, header_line=7, data_start_line=22)

print("\n📋 Metadaten:")
for key, value in result_manual["metadata"].items():
    print(f"  {key}: {value}")

print("\n📊 DataFrame Info:")
if not result_manual["data"].empty:
    print(result_manual["data"].info())
    print("\n🔍 Erste 5 Zeilen:")
    print(result_manual["data"].head())
else:
    print("❌ Kein DataFrame erstellt")

### 📝 Schritt 3: Daten erkunden und visualisieren

In [None]:
# DataFrame für einfachere Verwendung (verwende die manuell geparste Version)
df = result_manual["data"]

print("🔍 Erste 5 Zeilen:")
if not df.empty:
    display(df.head())

    print("\n📈 Statistische Übersicht:")
    display(df.describe())

    # Interessante Spalten für Visualisierung finden
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    print(f"\n🔢 Numerische Spalten ({len(numeric_cols)}):")
    for i, col in enumerate(numeric_cols[:10]):  # Erste 10 anzeigen
        print(f"  {i + 1:2d}. {col}")

    if len(numeric_cols) >= 2:
        # Beispiel-Visualisierung
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        fig.suptitle("🔧 Bystronic Maschinendaten - Analyse", fontsize=16)

        # Plot 1: Zeitreihe der ersten numerischen Spalte
        col1 = numeric_cols[0]
        axes[0, 0].plot(df.index, df[col1], alpha=0.7)
        axes[0, 0].set_title(f"Zeitverlauf: {col1}")
        axes[0, 0].set_xlabel("Messung")
        axes[0, 0].grid(True, alpha=0.3)

        # Plot 2: Histogram
        col2 = numeric_cols[1] if len(numeric_cols) > 1 else numeric_cols[0]
        axes[0, 1].hist(df[col2].dropna(), bins=30, alpha=0.7, edgecolor="black")
        axes[0, 1].set_title(f"Verteilung: {col2}")
        axes[0, 1].set_xlabel("Wert")
        axes[0, 1].set_ylabel("Häufigkeit")

        # Plot 3: Scatter Plot (falls genügend Spalten)
        if len(numeric_cols) >= 3:
            col3 = numeric_cols[2]
            axes[1, 0].scatter(df[col1], df[col3], alpha=0.6, s=20)
            axes[1, 0].set_title(f"Korrelation: {col1} vs {col3}")
            axes[1, 0].set_xlabel(col1)
            axes[1, 0].set_ylabel(col3)
            axes[1, 0].grid(True, alpha=0.3)

        # Plot 4: Boxplot der ersten 5 numerischen Spalten
        sample_cols = numeric_cols[:5]
        df_sample = df[sample_cols]
        # Normalisiere Daten für bessere Darstellung
        df_normalized = (df_sample - df_sample.mean()) / df_sample.std()
        axes[1, 1].boxplot(
            [df_normalized[col].dropna() for col in sample_cols],
            labels=[col[:15] + "..." if len(col) > 15 else col for col in sample_cols],
        )
        axes[1, 1].set_title("Normalisierte Boxplots")
        axes[1, 1].tick_params(axis="x", rotation=45)

        plt.tight_layout()
        plt.show()

        print(f"✅ Visualisierung mit {len(sample_cols)} Parametern erstellt")
    else:
        print("ℹ️ Nicht genügend numerische Spalten für erweiterte Visualisierung")
else:
    print("❌ Kein DataFrame verfügbar für Analyse")

# Zeige Verwendungsbeispiele
print("\n💡 Verwendungsbeispiele:")
print("# Automatische Erkennung:")
print("result = parse_bystronic_csv('V084_Scope.csv')")
print("")
print("# Header-Zeile 7, Daten ab Zeile 22:")
print(
    "result = parse_bystronic_csv('V084_Scope.csv', header_line=7, data_start_line=22)"
)
print("")
print("# Nur Header-Zeile angeben:")
print("result = parse_bystronic_csv('V084_Scope.csv', header_line=7)")
print("")
print("# DataFrame zugreifen:")
print("df = result['data']")
print("metadata = result['metadata']")

---

## 2. Excel-Verarbeitung {#excel}

### 📝 Erstelle Beispiel-Excel-Datei

In [None]:
# Erstelle eine Beispiel-Excel-Datei mit mehreren Arbeitsblättern
def create_sample_excel(file_path):
    """
    Erstellt eine Beispiel-Excel-Datei für Bystronic
    """
    with pd.ExcelWriter(file_path, engine="openpyxl") as writer:
        # Arbeitsblatt 1: Produktionsdaten
        np.random.seed(42)
        dates = pd.date_range("2024-01-01", "2024-12-31", freq="D")
        production_data = pd.DataFrame(
            {
                "Datum": dates,
                "Maschine_A_Stück": np.random.normal(1000, 100, len(dates)).astype(int),
                "Maschine_B_Stück": np.random.normal(800, 80, len(dates)).astype(int),
                "Maschine_C_Stück": np.random.normal(1200, 150, len(dates)).astype(int),
                "Verfügbarkeit_A_%": np.random.normal(85, 5, len(dates)),
                "Verfügbarkeit_B_%": np.random.normal(82, 6, len(dates)),
                "Verfügbarkeit_C_%": np.random.normal(88, 4, len(dates)),
            }
        )
        production_data.to_excel(writer, sheet_name="Produktion", index=False)

        # Arbeitsblatt 2: Qualitätsdaten
        quality_data = pd.DataFrame(
            {
                "Monat": pd.date_range("2024-01-01", "2024-12-01", freq="M"),
                "Ausschuss_Rate_%": np.random.normal(2.5, 0.5, 12),
                "Nacharbeit_Rate_%": np.random.normal(1.2, 0.3, 12),
                "Kundenzufriedenheit": np.random.normal(4.2, 0.2, 12),
                "Reklamationen_Anzahl": np.random.poisson(5, 12),
            }
        )
        quality_data.to_excel(writer, sheet_name="Qualität", index=False)

        # Arbeitsblatt 3: Wartungsdaten
        maintenance_data = pd.DataFrame(
            {
                "Maschine": ["A", "B", "C"] * 50,
                "Wartungsdatum": pd.date_range("2024-01-01", periods=150, freq="3D"),
                "Typ": np.random.choice(["Präventiv", "Korrektiv", "Inspektion"], 150),
                "Dauer_h": np.random.exponential(2, 150),
                "Kosten_EUR": np.random.gamma(2, 500, 150),
            }
        )
        maintenance_data.to_excel(writer, sheet_name="Wartung", index=False)

        # Arbeitsblatt 4: Konfiguration
        config_data = pd.DataFrame(
            {
                "Parameter": [
                    "Max_Geschwindigkeit",
                    "Toleranz_X",
                    "Toleranz_Y",
                    "Laser_Power",
                    "Kühlmittel_Temp",
                ],
                "Maschine_A": [2500, 0.05, 0.05, 6000, 20],
                "Maschine_B": [2200, 0.08, 0.06, 5500, 22],
                "Maschine_C": [2800, 0.04, 0.04, 6500, 18],
                "Einheit": ["mm/min", "mm", "mm", "W", "°C"],
            }
        )
        config_data.to_excel(writer, sheet_name="Konfiguration", index=False)

    print(f"✅ Excel-Datei erstellt: {file_path}")
    return file_path


# Excel-Datei erstellen
excel_file = "../../data/bystronic_production_data.xlsx"
create_sample_excel(excel_file)

### 📝 Excel-Datei laden und analysieren

In [None]:
# Excel-Datei mit allen Arbeitsblättern laden
def load_excel_comprehensive(file_path):
    """
    Lädt eine Excel-Datei mit allen Arbeitsblättern
    """
    print(f"📂 Lade Excel-Datei: {file_path}")

    # Alle Arbeitsblätter laden
    excel_data = pd.read_excel(file_path, sheet_name=None)

    print(f"📋 Arbeitsblätter gefunden: {list(excel_data.keys())}")

    for sheet_name, df in excel_data.items():
        print(f"\n📊 {sheet_name}: {df.shape[0]} Zeilen × {df.shape[1]} Spalten")
        print(
            f"   Spalten: {', '.join(df.columns[:5])}{'...' if len(df.columns) > 5 else ''}"
        )

    return excel_data


# Excel laden
excel_data = load_excel_comprehensive(excel_file)

# Einzelne Arbeitsblätter analysieren
print("\n" + "=" * 80)
print("📋 PRODUKTIONSDATEN ANALYSE")
print("=" * 80)

production_df = excel_data["Produktion"]
display(production_df.head())

print("\n📈 Produktionsstatistiken:")
production_stats = production_df.select_dtypes(include=[np.number]).describe()
display(production_stats)

# Visualisierung der Produktionsdaten
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle("🏭 Bystronic Produktionsanalyse", fontsize=16)

# Plot 1: Produktionsmenge über Zeit
production_df.plot(
    x="Datum",
    y=["Maschine_A_Stück", "Maschine_B_Stück", "Maschine_C_Stück"],
    ax=axes[0, 0],
    title="Produktionsmenge pro Tag",
)
axes[0, 0].set_xlabel("Datum")
axes[0, 0].set_ylabel("Stück pro Tag")
axes[0, 0].grid(True, alpha=0.3)

# Plot 2: Verfügbarkeit über Zeit
production_df.plot(
    x="Datum",
    y=["Verfügbarkeit_A_%", "Verfügbarkeit_B_%", "Verfügbarkeit_C_%"],
    ax=axes[0, 1],
    title="Maschinenverfügbarkeit",
)
axes[0, 1].set_xlabel("Datum")
axes[0, 1].set_ylabel("Verfügbarkeit %")
axes[0, 1].grid(True, alpha=0.3)

# Plot 3: Korrelation Produktion vs Verfügbarkeit
axes[1, 0].scatter(
    production_df["Verfügbarkeit_A_%"], production_df["Maschine_A_Stück"], alpha=0.6
)
axes[1, 0].set_title("Korrelation: Verfügbarkeit vs Produktion (Maschine A)")
axes[1, 0].set_xlabel("Verfügbarkeit %")
axes[1, 0].set_ylabel("Produktion Stück/Tag")
axes[1, 0].grid(True, alpha=0.3)

# Plot 4: Monatliche Zusammenfassung
monthly_prod = production_df.copy()
monthly_prod["Monat"] = monthly_prod["Datum"].dt.to_period("M")
monthly_summary = monthly_prod.groupby("Monat")[
    ["Maschine_A_Stück", "Maschine_B_Stück", "Maschine_C_Stück"]
].sum()
monthly_summary.plot(kind="bar", ax=axes[1, 1], title="Monatsproduktion")
axes[1, 1].set_xlabel("Monat")
axes[1, 1].set_ylabel("Gesamt Stück")
axes[1, 1].tick_params(axis="x", rotation=45)

plt.tight_layout()
plt.show()

---

## 3. JSON und APIs {#json}

### 📝 JSON-Daten verarbeiten

In [None]:
# Beispiel JSON für IoT-Sensordaten erstellen
def create_sample_json(file_path):
    """
    Erstellt eine Beispiel-JSON-Datei mit IoT-Sensordaten
    """
    np.random.seed(42)

    sensor_data = {
        "metadata": {
            "system": "Bystronic IoT Platform",
            "version": "2.1.0",
            "timestamp": "2024-01-15T08:30:00Z",
            "location": "Werk_Niederönz",
        },
        "sensors": [
            {
                "id": "TEMP_001",
                "type": "temperature",
                "location": "Laser_Cutting_1",
                "unit": "celsius",
                "readings": [
                    {
                        "timestamp": "2024-01-15T08:30:00Z",
                        "value": 22.5,
                        "status": "ok",
                    },
                    {
                        "timestamp": "2024-01-15T08:31:00Z",
                        "value": 22.8,
                        "status": "ok",
                    },
                    {
                        "timestamp": "2024-01-15T08:32:00Z",
                        "value": 23.1,
                        "status": "ok",
                    },
                    {
                        "timestamp": "2024-01-15T08:33:00Z",
                        "value": 23.4,
                        "status": "warning",
                    },
                ],
            },
            {
                "id": "VIBR_001",
                "type": "vibration",
                "location": "Laser_Cutting_1",
                "unit": "mm/s",
                "readings": [
                    {"timestamp": "2024-01-15T08:30:00Z", "value": 1.2, "status": "ok"},
                    {"timestamp": "2024-01-15T08:31:00Z", "value": 1.4, "status": "ok"},
                    {
                        "timestamp": "2024-01-15T08:32:00Z",
                        "value": 2.1,
                        "status": "warning",
                    },
                    {
                        "timestamp": "2024-01-15T08:33:00Z",
                        "value": 2.8,
                        "status": "error",
                    },
                ],
            },
            {
                "id": "POWER_001",
                "type": "power",
                "location": "Laser_Cutting_1",
                "unit": "kW",
                "readings": [
                    {"timestamp": "2024-01-15T08:30:00Z", "value": 6.2, "status": "ok"},
                    {"timestamp": "2024-01-15T08:31:00Z", "value": 6.0, "status": "ok"},
                    {"timestamp": "2024-01-15T08:32:00Z", "value": 5.8, "status": "ok"},
                    {"timestamp": "2024-01-15T08:33:00Z", "value": 5.5, "status": "ok"},
                ],
            },
        ],
        "alerts": [
            {
                "id": "ALT_001",
                "timestamp": "2024-01-15T08:33:00Z",
                "severity": "high",
                "sensor_id": "VIBR_001",
                "message": "Vibration level exceeds threshold",
                "threshold": 2.5,
                "actual_value": 2.8,
            }
        ],
    }

    with open(file_path, "w", encoding="utf-8") as f:
        json.dump(sensor_data, f, indent=2, ensure_ascii=False)

    print(f"✅ JSON-Datei erstellt: {file_path}")
    return file_path


# JSON-Datei erstellen
json_file = "../../data/sensor_data.json"
create_sample_json(json_file)

In [None]:
# JSON-Daten laden und verarbeiten
def process_sensor_json(file_path):
    """
    Verarbeitet JSON-Sensordaten und erstellt DataFrame
    """
    print(f"📂 Lade JSON-Datei: {file_path}")

    with open(file_path, encoding="utf-8") as f:
        data = json.load(f)

    print(f"🏭 System: {data['metadata']['system']}")
    print(f"📍 Standort: {data['metadata']['location']}")
    print(f"📊 Sensoren: {len(data['sensors'])}")
    print(f"⚠️ Alerts: {len(data['alerts'])}")

    # Sensordaten in DataFrame umwandeln
    sensor_readings = []

    for sensor in data["sensors"]:
        sensor_id = sensor["id"]
        sensor_type = sensor["type"]
        location = sensor["location"]
        unit = sensor["unit"]

        for reading in sensor["readings"]:
            sensor_readings.append(
                {
                    "sensor_id": sensor_id,
                    "type": sensor_type,
                    "location": location,
                    "unit": unit,
                    "timestamp": pd.to_datetime(reading["timestamp"]),
                    "value": reading["value"],
                    "status": reading["status"],
                }
            )

    df_sensors = pd.DataFrame(sensor_readings)

    # Alerts DataFrame
    df_alerts = pd.DataFrame(data["alerts"])
    if not df_alerts.empty:
        df_alerts["timestamp"] = pd.to_datetime(df_alerts["timestamp"])

    return df_sensors, df_alerts, data["metadata"]


# JSON verarbeiten
df_sensors, df_alerts, metadata = process_sensor_json(json_file)

print("\n📊 Sensordaten:")
display(df_sensors)

print("\n⚠️ Alerts:")
display(df_alerts)

# Visualisierung der Sensordaten
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle("🔧 IoT Sensordaten Analysis", fontsize=16)

# Plot für jeden Sensortyp
sensor_types = df_sensors["type"].unique()
colors = ["blue", "red", "green", "orange"]

for i, sensor_type in enumerate(sensor_types[:4]):
    row, col = i // 2, i % 2
    sensor_data = df_sensors[df_sensors["type"] == sensor_type]

    axes[row, col].plot(
        sensor_data["timestamp"],
        sensor_data["value"],
        color=colors[i],
        marker="o",
        linewidth=2,
        markersize=6,
    )
    axes[row, col].set_title(f"{sensor_type.upper()} Verlauf")
    axes[row, col].set_xlabel("Zeit")
    axes[row, col].set_ylabel(f"{sensor_data.iloc[0]['unit']}")
    axes[row, col].grid(True, alpha=0.3)

    # Status-basierte Farbcodierung
    for status in ["ok", "warning", "error"]:
        status_data = sensor_data[sensor_data["status"] == status]
        if not status_data.empty:
            color_map = {"ok": "green", "warning": "orange", "error": "red"}
            axes[row, col].scatter(
                status_data["timestamp"],
                status_data["value"],
                c=color_map[status],
                s=100,
                alpha=0.7,
                label=status,
                edgecolors="black",
                linewidth=1,
            )
    axes[row, col].legend()

plt.tight_layout()
plt.show()

### 📝 API-Daten simulieren (Web Services)

In [None]:
# Simuliere API-Aufruf (da wir keine echte API haben)
def simulate_api_call(api_type="production"):
    """
    Simuliert verschiedene API-Aufrufe für Bystronic-Systeme
    """
    import time

    print(f"🌐 Simuliere API-Aufruf: {api_type}")
    time.sleep(1)  # Simuliere Netzwerk-Latenz

    np.random.seed(42)

    if api_type == "production":
        # Produktions-API
        data = {
            "status": "success",
            "timestamp": pd.Timestamp.now().isoformat(),
            "data": {
                "machines": [
                    {
                        "id": "LC001",
                        "name": "Laser_Cutting_1",
                        "status": "running",
                        "current_job": "JOB_2024_001",
                        "parts_produced_today": np.random.randint(800, 1200),
                        "efficiency": np.random.uniform(85, 95),
                        "temperature": np.random.uniform(20, 25),
                    },
                    {
                        "id": "LC002",
                        "name": "Laser_Cutting_2",
                        "status": "maintenance",
                        "current_job": None,
                        "parts_produced_today": 0,
                        "efficiency": 0,
                        "temperature": np.random.uniform(18, 22),
                    },
                    {
                        "id": "BR001",
                        "name": "Press_Brake_1",
                        "status": "running",
                        "current_job": "JOB_2024_002",
                        "parts_produced_today": np.random.randint(600, 900),
                        "efficiency": np.random.uniform(80, 90),
                        "temperature": np.random.uniform(22, 28),
                    },
                ]
            },
        }

    elif api_type == "quality":
        # Qualitäts-API
        data = {
            "status": "success",
            "timestamp": pd.Timestamp.now().isoformat(),
            "data": {
                "daily_summary": {
                    "total_parts": np.random.randint(2000, 3000),
                    "defective_parts": np.random.randint(10, 50),
                    "defect_rate": np.random.uniform(0.5, 2.5),
                    "rework_rate": np.random.uniform(0.2, 1.0),
                },
                "defect_types": [
                    {"type": "dimensional", "count": np.random.randint(5, 20)},
                    {"type": "surface", "count": np.random.randint(2, 15)},
                    {"type": "material", "count": np.random.randint(1, 8)},
                ],
            },
        }

    else:
        # Allgemeine API
        data = {"status": "error", "message": "Unknown API endpoint"}

    print(f"✅ API Response: {data['status']}")
    return data


# API-Aufrufe testen
production_data = simulate_api_call("production")
quality_data = simulate_api_call("quality")

print("\n📊 Produktions-API Daten:")
production_df = pd.json_normalize(production_data["data"]["machines"])
display(production_df)

print("\n🔍 Qualitäts-API Daten:")
quality_summary = quality_data["data"]["daily_summary"]
defects_df = pd.DataFrame(quality_data["data"]["defect_types"])

print(f"Tagesproduktion: {quality_summary['total_parts']} Teile")
print(f"Ausschussrate: {quality_summary['defect_rate']:.2f}%")
print("\nFehlertypen:")
display(defects_df)

# Visualisierung der API-Daten
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
fig.suptitle("🔄 Live API-Daten Dashboard", fontsize=16)

# Plot 1: Maschineneffizienz
running_machines = production_df[production_df["status"] == "running"]
if not running_machines.empty:
    bars1 = axes[0].bar(
        running_machines["name"],
        running_machines["efficiency"],
        color=[
            "green" if x > 90 else "orange" if x > 80 else "red"
            for x in running_machines["efficiency"]
        ],
    )
    axes[0].set_title("Maschineneffizienz (%)")
    axes[0].set_ylabel("Effizienz %")
    axes[0].tick_params(axis="x", rotation=45)
    axes[0].grid(True, alpha=0.3)

# Plot 2: Tagesproduktion
all_machines = production_df[production_df["parts_produced_today"] > 0]
if not all_machines.empty:
    bars2 = axes[1].bar(
        all_machines["name"],
        all_machines["parts_produced_today"],
        color="steelblue",
        alpha=0.7,
    )
    axes[1].set_title("Tagesproduktion (Teile)")
    axes[1].set_ylabel("Anzahl Teile")
    axes[1].tick_params(axis="x", rotation=45)
    axes[1].grid(True, alpha=0.3)

# Plot 3: Fehlertypen
if not defects_df.empty:
    colors = ["lightcoral", "lightsalmon", "lightblue"]
    bars3 = axes[2].pie(
        defects_df["count"],
        labels=defects_df["type"],
        colors=colors,
        autopct="%1.1f%%",
        startangle=90,
    )
    axes[2].set_title("Fehlerverteilung")

plt.tight_layout()
plt.show()

---

## 4. Datenbereinigung {#cleaning}

### 📝 Datenqualität prüfen und verbessern

In [None]:
# Funktionen für Datenbereinigung
def analyze_data_quality(df, name="Dataset"):
    """
    Analysiert die Datenqualität eines DataFrames
    """
    print(f"🔍 Datenqualitätsanalyse: {name}")
    print("=" * 50)

    print(f"📊 Shape: {df.shape[0]} Zeilen × {df.shape[1]} Spalten")
    print(f"💾 Memory Usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

    # Fehlende Werte
    missing_data = df.isnull().sum()
    missing_percent = (missing_data / len(df)) * 100

    if missing_data.sum() > 0:
        print("\n❌ Fehlende Werte:")
        missing_summary = pd.DataFrame(
            {
                "Spalte": missing_data.index,
                "Fehlend": missing_data.values,
                "Prozent": missing_percent.values,
            }
        )
        missing_summary = missing_summary[missing_summary["Fehlend"] > 0].sort_values(
            "Fehlend", ascending=False
        )
        display(missing_summary)
    else:
        print("\n✅ Keine fehlenden Werte gefunden")

    # Datentypen
    print("\n📈 Datentypen:")
    dtype_summary = df.dtypes.value_counts()
    for dtype, count in dtype_summary.items():
        print(f"  {dtype}: {count} Spalten")

    # Duplikate
    duplicates = df.duplicated().sum()
    if duplicates > 0:
        print(f"\n⚠️ Duplikate: {duplicates} Zeilen ({duplicates / len(df) * 100:.1f}%)")
    else:
        print("\n✅ Keine Duplikate gefunden")

    # Numerische Spalten analysieren
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    if len(numeric_cols) > 0:
        print(f"\n🔢 Numerische Analyse ({len(numeric_cols)} Spalten):")
        for col in numeric_cols[:5]:  # Erste 5 Spalten
            series = df[col].dropna()
            if len(series) > 0:
                outliers = detect_outliers_iqr(series)
                print(
                    f"  {col}: Min={series.min():.2f}, Max={series.max():.2f}, Outliers={len(outliers)}"
                )

    return {
        "shape": df.shape,
        "missing_values": missing_data.sum(),
        "duplicates": duplicates,
        "memory_mb": df.memory_usage(deep=True).sum() / 1024**2,
    }


def detect_outliers_iqr(series, k=1.5):
    """
    Erkennt Ausreißer mit der IQR-Methode
    """
    Q1 = series.quantile(0.25)
    Q3 = series.quantile(0.75)
    IQR = Q3 - Q1

    lower_bound = Q1 - k * IQR
    upper_bound = Q3 + k * IQR

    outliers = series[(series < lower_bound) | (series > upper_bound)]
    return outliers


# Analysiere verschiedene Datasets
if not result["data"].empty:
    bystronic_quality = analyze_data_quality(result["data"], "Bystronic CSV")

production_quality = analyze_data_quality(production_df, "Produktionsdaten")
sensors_quality = analyze_data_quality(df_sensors, "Sensordaten")

In [None]:
# Datenbereinigung durchführen
def clean_production_data(df):
    """
    Bereinigt Produktionsdaten
    """
    print("🧹 Starte Datenbereinigung...")
    df_clean = df.copy()

    # 1. Duplikate entfernen
    initial_rows = len(df_clean)
    df_clean = df_clean.drop_duplicates()
    removed_duplicates = initial_rows - len(df_clean)
    if removed_duplicates > 0:
        print(f"✅ {removed_duplicates} Duplikate entfernt")

    # 2. Unrealistische Werte korrigieren
    # Verfügbarkeit sollte zwischen 0-100% liegen
    availability_cols = [col for col in df_clean.columns if "Verfügbarkeit" in col]
    for col in availability_cols:
        # Werte > 100% auf 100% setzen
        outliers_high = df_clean[col] > 100
        if outliers_high.sum() > 0:
            df_clean.loc[outliers_high, col] = 100
            print(f"✅ {outliers_high.sum()} hohe Ausreißer in {col} korrigiert")

        # Negative Werte auf 0 setzen
        negative = df_clean[col] < 0
        if negative.sum() > 0:
            df_clean.loc[negative, col] = 0
            print(f"✅ {negative.sum()} negative Werte in {col} korrigiert")

    # 3. Produktionswerte können nicht negativ sein
    production_cols = [col for col in df_clean.columns if "Stück" in col]
    for col in production_cols:
        negative = df_clean[col] < 0
        if negative.sum() > 0:
            df_clean.loc[negative, col] = 0
            print(f"✅ {negative.sum()} negative Produktionswerte in {col} korrigiert")

        # Extreme Ausreißer (> 99.9% Quantil) glätten
        threshold = df_clean[col].quantile(0.999)
        extreme_outliers = df_clean[col] > threshold
        if extreme_outliers.sum() > 0:
            df_clean.loc[extreme_outliers, col] = threshold
            print(f"✅ {extreme_outliers.sum()} extreme Ausreißer in {col} geglättet")

    # 4. Datum validieren
    if "Datum" in df_clean.columns:
        # Sicherstellen, dass Datum im erwarteten Bereich liegt
        min_date = pd.Timestamp("2020-01-01")
        max_date = pd.Timestamp("2030-12-31")

        invalid_dates = (df_clean["Datum"] < min_date) | (df_clean["Datum"] > max_date)
        if invalid_dates.sum() > 0:
            print(f"⚠️ {invalid_dates.sum()} ungültige Datumswerte gefunden")
            df_clean = df_clean[~invalid_dates]

    print(f"🎉 Bereinigung abgeschlossen: {len(df_clean)} Zeilen verbleiben")
    return df_clean


# Produktionsdaten bereinigen
production_clean = clean_production_data(production_df)

# Vorher/Nachher Vergleich
print("\n📊 Bereinigung - Vorher/Nachher:")
comparison = pd.DataFrame(
    {
        "Vorher": [
            len(production_df),
            production_df.isnull().sum().sum(),
            production_df.duplicated().sum(),
        ],
        "Nachher": [
            len(production_clean),
            production_clean.isnull().sum().sum(),
            production_clean.duplicated().sum(),
        ],
    },
    index=["Zeilen", "Fehlende Werte", "Duplikate"],
)

display(comparison)

# Visualisierung der Bereinigung
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle("🧹 Datenbereinigung: Vorher vs. Nachher", fontsize=16)

# Beispiel-Visualisierung mit Maschine A
col_prod = "Maschine_A_Stück"
col_avail = "Verfügbarkeit_A_%"

# Vorher - Produktion
axes[0, 0].hist(
    production_df[col_prod], bins=30, alpha=0.7, color="red", edgecolor="black"
)
axes[0, 0].set_title(f"Vorher: {col_prod}")
axes[0, 0].set_xlabel("Stück pro Tag")
axes[0, 0].set_ylabel("Häufigkeit")

# Nachher - Produktion
axes[0, 1].hist(
    production_clean[col_prod], bins=30, alpha=0.7, color="green", edgecolor="black"
)
axes[0, 1].set_title(f"Nachher: {col_prod}")
axes[0, 1].set_xlabel("Stück pro Tag")
axes[0, 1].set_ylabel("Häufigkeit")

# Vorher - Verfügbarkeit
axes[1, 0].hist(
    production_df[col_avail], bins=30, alpha=0.7, color="red", edgecolor="black"
)
axes[1, 0].set_title(f"Vorher: {col_avail}")
axes[1, 0].set_xlabel("Verfügbarkeit %")
axes[1, 0].set_ylabel("Häufigkeit")

# Nachher - Verfügbarkeit
axes[1, 1].hist(
    production_clean[col_avail], bins=30, alpha=0.7, color="green", edgecolor="black"
)
axes[1, 1].set_title(f"Nachher: {col_avail}")
axes[1, 1].set_xlabel("Verfügbarkeit %")
axes[1, 1].set_ylabel("Häufigkeit")

plt.tight_layout()
plt.show()

---

## 5. Export und Speicherung {#export}

### 📝 Daten in verschiedenen Formaten exportieren

In [None]:
# Export-Funktionen
def export_data_comprehensive(df, base_filename, include_formats=None):
    """
    Exportiert DataFrame in verschiedene Formate
    """
    if include_formats is None:
        include_formats = ["csv", "excel", "json", "parquet"]

    print(f"💾 Exportiere {df.shape[0]} Zeilen in {len(include_formats)} Formaten...")
    exported_files = []

    # CSV Export
    if "csv" in include_formats:
        csv_file = f"../../data/{base_filename}.csv"
        df.to_csv(csv_file, index=False, encoding="utf-8")
        file_size = Path(csv_file).stat().st_size / 1024
        exported_files.append(
            {"Format": "CSV", "Datei": csv_file, "Größe_KB": f"{file_size:.1f}"}
        )
        print(f"✅ CSV: {csv_file} ({file_size:.1f} KB)")

    # Excel Export
    if "excel" in include_formats:
        excel_file = f"../../data/{base_filename}.xlsx"
        with pd.ExcelWriter(excel_file, engine="openpyxl") as writer:
            df.to_excel(writer, sheet_name="Daten", index=False)

            # Zusätzliches Statistik-Blatt
            if not df.select_dtypes(include=[np.number]).empty:
                stats = df.select_dtypes(include=[np.number]).describe()
                stats.to_excel(writer, sheet_name="Statistiken")

        file_size = Path(excel_file).stat().st_size / 1024
        exported_files.append(
            {"Format": "Excel", "Datei": excel_file, "Größe_KB": f"{file_size:.1f}"}
        )
        print(f"✅ Excel: {excel_file} ({file_size:.1f} KB)")

    # JSON Export
    if "json" in include_formats:
        json_file = f"../../data/{base_filename}.json"
        # DataFrame zu Records konvertieren für bessere JSON-Struktur
        json_data = {
            "metadata": {
                "exported_at": pd.Timestamp.now().isoformat(),
                "rows": len(df),
                "columns": len(df.columns),
                "source": "Bystronic Data Pipeline",
            },
            "data": df.to_dict("records"),
        }

        with open(json_file, "w", encoding="utf-8") as f:
            json.dump(json_data, f, indent=2, default=str, ensure_ascii=False)

        file_size = Path(json_file).stat().st_size / 1024
        exported_files.append(
            {"Format": "JSON", "Datei": json_file, "Größe_KB": f"{file_size:.1f}"}
        )
        print(f"✅ JSON: {json_file} ({file_size:.1f} KB)")

    # Parquet Export (effizienter für große Datensätze)
    if "parquet" in include_formats:
        try:
            parquet_file = f"../../data/{base_filename}.parquet"
            df.to_parquet(parquet_file, index=False, compression="snappy")
            file_size = Path(parquet_file).stat().st_size / 1024
            exported_files.append(
                {
                    "Format": "Parquet",
                    "Datei": parquet_file,
                    "Größe_KB": f"{file_size:.1f}",
                }
            )
            print(f"✅ Parquet: {parquet_file} ({file_size:.1f} KB)")
        except Exception as e:
            print(f"⚠️ Parquet Export fehlgeschlagen: {e}")

    # Zusammenfassung
    summary_df = pd.DataFrame(exported_files)
    print("\n📋 Export-Zusammenfassung:")
    display(summary_df)

    return exported_files


# Bereinigte Produktionsdaten exportieren
production_exports = export_data_comprehensive(
    production_clean, "bystronic_production_clean"
)

# Sensordaten exportieren
sensor_exports = export_data_comprehensive(
    df_sensors, "bystronic_sensors", ["csv", "json"]
)

print(
    f"\n🎉 Gesamt exportiert: {len(production_exports) + len(sensor_exports)} Dateien"
)

---

## 6. Praxis-Pipeline {#pipeline}

### 📝 Vollständige Datenverarbeitungs-Pipeline

In [None]:
# Vollständige Datenverarbeitungs-Pipeline
class BystronicDataPipeline:
    """
    Vollständige Datenverarbeitungs-Pipeline für Bystronic
    """

    def __init__(self, config=None):
        self.config = config or self._default_config()
        self.processed_data = {}
        self.metadata = {}
        self.log = []

    def _default_config(self):
        return {
            "csv_delimiter": "\t",
            "csv_encoding": "utf-8",
            "clean_data": True,
            "export_formats": ["csv", "excel", "json"],
            "output_dir": "../../data/pipeline_output/",
        }

    def log_message(self, message, level="INFO"):
        """Logging für Pipeline"""
        timestamp = pd.Timestamp.now().strftime("%H:%M:%S")
        log_entry = f"[{timestamp}] {level}: {message}"
        self.log.append(log_entry)
        print(log_entry)

    def import_csv(self, file_path, data_name):
        """CSV Import mit intelligenter Struktur-Erkennung"""
        self.log_message(f"Importiere CSV: {file_path}")

        try:
            result = parse_bystronic_csv(
                file_path,
                delimiter=self.config["csv_delimiter"],
                encoding=self.config["csv_encoding"],
            )

            self.processed_data[data_name] = result["data"]
            self.metadata[data_name] = result["metadata"]

            self.log_message(f"CSV Import erfolgreich: {result['data'].shape}")
            return True

        except Exception as e:
            self.log_message(f"CSV Import fehlgeschlagen: {e}", "ERROR")
            return False

    def import_excel(self, file_path, data_name):
        """Excel Import"""
        self.log_message(f"Importiere Excel: {file_path}")

        try:
            excel_data = pd.read_excel(file_path, sheet_name=None)

            for sheet_name, df in excel_data.items():
                full_name = f"{data_name}_{sheet_name}"
                self.processed_data[full_name] = df
                self.log_message(f"Arbeitsblatt '{sheet_name}': {df.shape}")

            self.log_message(
                f"Excel Import erfolgreich: {len(excel_data)} Arbeitsblätter"
            )
            return True

        except Exception as e:
            self.log_message(f"Excel Import fehlgeschlagen: {e}", "ERROR")
            return False

    def import_api(self, api_type, data_name):
        """API Import (simuliert)"""
        self.log_message(f"Importiere API-Daten: {api_type}")

        try:
            api_data = simulate_api_call(api_type)

            if api_data["status"] == "success":
                if api_type == "production":
                    df = pd.json_normalize(api_data["data"]["machines"])
                elif api_type == "quality":
                    df = pd.DataFrame([api_data["data"]["daily_summary"]])
                else:
                    df = pd.DataFrame([api_data["data"]])

                self.processed_data[data_name] = df
                self.log_message(f"API Import erfolgreich: {df.shape}")
                return True
            else:
                self.log_message(
                    f"API Fehler: {api_data.get('message', 'Unknown error')}", "ERROR"
                )
                return False

        except Exception as e:
            self.log_message(f"API Import fehlgeschlagen: {e}", "ERROR")
            return False

    def clean_all_data(self):
        """Bereinigt alle importierten Daten"""
        if not self.config["clean_data"]:
            self.log_message("Datenbereinigung deaktiviert")
            return

        self.log_message("Starte Datenbereinigung für alle Datasets")

        for data_name, df in self.processed_data.items():
            try:
                # Grundlegende Bereinigung
                initial_rows = len(df)

                # Duplikate entfernen
                df_clean = df.drop_duplicates()

                # Leere Spalten entfernen
                df_clean = df_clean.dropna(axis=1, how="all")

                # Komplett leere Zeilen entfernen
                df_clean = df_clean.dropna(axis=0, how="all")

                final_rows = len(df_clean)
                removed = initial_rows - final_rows

                self.processed_data[data_name] = df_clean
                self.log_message(
                    f"{data_name}: {removed} Zeilen entfernt, {final_rows} verbleiben"
                )

            except Exception as e:
                self.log_message(
                    f"Bereinigung von {data_name} fehlgeschlagen: {e}", "ERROR"
                )

    def generate_report(self):
        """Erstellt einen Zusammenfassungsbericht"""
        self.log_message("Generiere Pipeline-Report")

        report = {
            "pipeline_info": {
                "timestamp": pd.Timestamp.now().isoformat(),
                "total_datasets": len(self.processed_data),
                "config": self.config,
            },
            "datasets": {},
            "log": self.log,
        }

        total_rows = 0
        total_columns = 0

        for data_name, df in self.processed_data.items():
            dataset_info = {
                "rows": len(df),
                "columns": len(df.columns),
                "memory_mb": df.memory_usage(deep=True).sum() / 1024**2,
                "missing_values": df.isnull().sum().sum(),
                "dtypes": df.dtypes.value_counts().to_dict(),
            }

            report["datasets"][data_name] = dataset_info
            total_rows += dataset_info["rows"]
            total_columns += dataset_info["columns"]

        report["pipeline_info"]["total_rows"] = total_rows
        report["pipeline_info"]["total_columns"] = total_columns

        return report

    def export_all(self, include_report=True):
        """Exportiert alle verarbeiteten Daten"""
        self.log_message("Starte Export aller Daten")

        # Output-Verzeichnis erstellen
        output_dir = Path(self.config["output_dir"])
        output_dir.mkdir(parents=True, exist_ok=True)

        exported_files = []

        for data_name, df in self.processed_data.items():
            try:
                file_path = output_dir / f"{data_name}_processed"

                # CSV Export
                if "csv" in self.config["export_formats"]:
                    csv_file = f"{file_path}.csv"
                    df.to_csv(csv_file, index=False)
                    exported_files.append(csv_file)

                # Excel Export
                if "excel" in self.config["export_formats"]:
                    excel_file = f"{file_path}.xlsx"
                    df.to_excel(excel_file, index=False)
                    exported_files.append(excel_file)

                # JSON Export
                if "json" in self.config["export_formats"]:
                    json_file = f"{file_path}.json"
                    df.to_json(json_file, orient="records", indent=2)
                    exported_files.append(json_file)

                self.log_message(f"Export für {data_name} abgeschlossen")

            except Exception as e:
                self.log_message(f"Export von {data_name} fehlgeschlagen: {e}", "ERROR")

        # Pipeline-Report exportieren
        if include_report:
            report = self.generate_report()
            report_file = output_dir / "pipeline_report.json"

            with open(report_file, "w", encoding="utf-8") as f:
                json.dump(report, f, indent=2, default=str, ensure_ascii=False)

            exported_files.append(str(report_file))
            self.log_message(f"Pipeline-Report: {report_file}")

        self.log_message(f"Export abgeschlossen: {len(exported_files)} Dateien")
        return exported_files


# Pipeline testen
pipeline = BystronicDataPipeline()

print("🚀 Starte Bystronic Datenverarbeitungs-Pipeline")
print("=" * 60)

# Daten importieren
pipeline.import_excel(excel_file, "production")
pipeline.import_api("production", "live_machines")
pipeline.import_api("quality", "quality_metrics")

# Daten bereinigen
pipeline.clean_all_data()

# Report generieren
report = pipeline.generate_report()

print("\n📊 Pipeline Zusammenfassung:")
print(f"📂 Datasets: {report['pipeline_info']['total_datasets']}")
print(f"📈 Gesamt Zeilen: {report['pipeline_info']['total_rows']:,}")
print(f"🔢 Gesamt Spalten: {report['pipeline_info']['total_columns']:,}")

for dataset_name, info in report["datasets"].items():
    print(f"  • {dataset_name}: {info['rows']:,} Zeilen × {info['columns']} Spalten")

# Exportieren
exported_files = pipeline.export_all()

print(f"\n✅ Pipeline abgeschlossen! {len(exported_files)} Dateien exportiert.")

---

## 🎯 Zusammenfassung

### Was Sie gelernt haben:

1. **CSV-Import mit komplexen Strukturen**
   - Intelligente Header-Erkennung
   - Flexible Trennzeichen-Unterstützung
   - Metadaten-Extraktion

2. **Excel-Verarbeitung**
   - Mehrere Arbeitsblätter gleichzeitig laden
   - Strukturierte Datenorganisation
   - Format-spezifische Features nutzen

3. **JSON und API-Integration**
   - Hierarchische Datenstrukturen verarbeiten
   - API-Simulation und echte Webservice-Aufrufe
   - JSON-Normalisierung für tabellarische Daten

4. **Datenbereinigung**
   - Datenqualität systematisch analysieren
   - Ausreißer und Anomalien erkennen
   - Automatische Bereinigungsregeln implementieren

5. **Export-Strategien**
   - Mehrere Formate gleichzeitig
   - Dateigröße und Performance berücksichtigen
   - Metadaten und Dokumentation einschließen

6. **Produktions-Pipeline**
   - Vollständige Automatisierung
   - Fehlerbehandlung und Logging
   - Konfigurierbare Verarbeitungsschritte

### 💡 Tipps für die Praxis:

- **Immer Daten validieren** bevor Verarbeitung
- **Encoding explizit definieren** (UTF-8, Latin1, etc.)
- **Chunk-Processing** für sehr große Dateien
- **Fehlerbehandlung** für robuste Pipelines
- **Dokumentation** der Datenquellen und -transformationen

### 🔗 Nächste Schritte:

- **Modul 07**: Jupyter Notebooks für interaktive Analysen
- **Modul 08**: Benutzeroberflächen mit PyQt/PySide und Streamlit
- **Modul 09**: Vollständige Praxisprojekte

---

*Herzlichen Glückwunsch! Sie beherrschen jetzt die wichtigsten Techniken für Datenimport und -verarbeitung in Python für industrielle Anwendungen.*