<a href="https://colab.research.google.com/github/rrwiren/ilmanlaatu-ennuste-helsinki/blob/main/Esik%C3%A4sittely_v.0.2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# -*- coding: utf-8 -*-
"""
==========================================================================================
 PIPELINE DATAN ESIKÄSITTELYYN JA TUTKIMISEEN - HELSINGIN SÄÄ- JA ILMANLAATUDATA (FMI)
==========================================================================================

 Versio: 0.2 (Lisätty EDA-funktio)
 Päivitetty: 2025-04-11 23:36 (EEST)
 Edellinen versio: 0.1 (Refaktorointi funktioiksi)

 Skriptin tarkoitus:
 --------------------
 Tämä skripti muodostaa data pipelinen, joka:
 1. Lataa raakadatatiedostot (FMI:n sää- ja ilmanlaatudata).
 2. Esikäsittelee ja yhdistää datan yhdeksi DataFrameksi.
 3. Suorittaa eksploratiivista data-analyysiä (EDA) käsitellylle datalle:
    - Käsittelee päällekkäiset sarakkeet.
    - Korjaa epärealistiset negatiiviset pitoisuudet.
    - Visualisoi kohdemuuttujaa ('Ozone') ja sen jakaumaa.
    - Analysoi korrelaatioita ja riippuvuuksia säämuuttujien kanssa.
    - Tutkii ajallista vaihtelua.
 4. (Valinnaisesti) Tallentaa lopullisen, EDA:n jälkeen siivotun DataFramen
    Parquet-muotoon.

 Vaaditut kirjastot:
 -------------------
 - pandas, numpy, requests, io, os, matplotlib, seaborn
 - pyarrow (Parquet-tallennukseen/lukuun)

 Käyttö:
 -------
 1. Varmista, että vaaditut kirjastot on asennettu.
 2. Aja skripti. Se lataa, esikäsittelee ja tutkii datan. Tulosteet ja kuvaajat
    näytetään suorituksen aikana.
 3. Jos haluat tallentaa lopputuloksen, varmista, että 'pyarrow' on asennettu
    ja poista kommenttimerkit Vaiheen 5 tallennuskoodista pääsuorituslohkossa.
"""

# === Vaihe 1: Alustus ja Kirjastojen Tuonti ===
print("="*60)
print(" Vaihe 1: Alustus ja Kirjastojen Tuonti")
print("="*60)

import pandas as pd
import numpy as np
import requests
import io
import os
import matplotlib.pyplot as plt # Tuodaan visualisointikirjastot
import seaborn as sns

# Asetetaan visualisointien tyyli jo tässä vaiheessa
plt.style.use('ggplot')

# Määritellään datatiedostojen URL-osoitteet
DATA_URLS = {
    "weather1": "https://raw.githubusercontent.com/rrwiren/ilmanlaatu-ennuste-helsinki/main/data/raw/Helsinki%20Kaisaniemi_%2011.4.2023%20-%2011.4.2025_4c0a0316-74e0-4792-9854-fd6315fbc965.csv",
    "weather2": "https://raw.githubusercontent.com/rrwiren/ilmanlaatu-ennuste-helsinki/main/data/raw/Helsinki%20Kaisaniemi_%2011.4.2023%20-%2011.4.2025_84370efb-e7a0-48ed-b991-0432c84c1475.csv",
    "aq": "https://raw.githubusercontent.com/rrwiren/ilmanlaatu-ennuste-helsinki/main/data/raw/Helsinki%20Kallio%202_%2011.4.2023%20-%2011.4.2025_8bbe3500-d31d-459d-ac37-16b4bd64a2cc.csv"
}
# Määritellään tallennuspolku ja -nimi käsitellylle datalle
PROCESSED_DATA_FOLDER = "data/processed"
PROCESSED_DATA_FILENAME = "Helsinki_Kaisaniemi_Kallio_Combined_Processed_Pipeline_v0.2.parquet"
PROCESSED_DATA_PATH = os.path.join(PROCESSED_DATA_FOLDER, PROCESSED_DATA_FILENAME)


# === Apufunktiot (Lataus & Esikäsittely) ===

def _load_single_csv_from_url(url, key_name):
    """Apufunktio: Lataa YHDEN CSV-tiedoston URL-osoitteesta."""
    print(f"  Yritetään ladata {key_name}: {url[:70]}...")
    try:
        response = requests.get(url)
        response.raise_for_status()
        csv_data = io.StringIO(response.text)
        df = pd.read_csv(csv_data, sep=',')
        print(f"    -> {key_name} luettu onnistuneesti.")
        return df
    except requests.exceptions.RequestException as e:
        print(f"    -> VIRHE LADATESSA ({key_name}): {e}")
        return None
    except Exception as e:
        print(f"    -> VIRHE LUETTAESSA CSV ({key_name}): {e}")
        return None

def _preprocess_single_fmi_df(df, data_key_name):
    """Apufunktio: Esikäsittelee YHDEN FMI DataFrame:n (aika, numerot)."""
    if df is None: return None
    print(f"  Esikäsitellään (aika, numerot): {data_key_name}")
    original_cols = df.columns.tolist()
    date_cols = ['Vuosi', 'Kuukausi', 'Päivä']
    time_col = 'Aika [Paikallinen aika]'
    if all(col in original_cols for col in date_cols) and time_col in original_cols:
        try:
            datetime_str_series = df['Vuosi'].astype(str) + '-' + \
                                  df['Kuukausi'].astype(str).str.zfill(2) + '-' + \
                                  df['Päivä'].astype(str).str.zfill(2) + ' ' + \
                                  df[time_col].astype(str)
            df['Timestamp'] = pd.to_datetime(datetime_str_series)
            cols_to_drop = date_cols + [time_col]
            if 'Havaintoasema' in original_cols: cols_to_drop.append('Havaintoasema')
            df = df.drop(columns=cols_to_drop, errors='ignore').set_index('Timestamp')
        except Exception as e:
            print(f"    -> VIRHE aikaleiman muodostamisessa ({data_key_name}): {e}.")
            return None
    else:
        missing_cols = [col for col in date_cols + [time_col] if col not in original_cols]
        print(f"    -> VIRHE: Aikaleimasarakkeita ei löytynyt ({data_key_name}). Puuttuvat: {missing_cols}")
        return None
    print(f"    -> Aikaleima & indeksi OK ({data_key_name}).")
    print(f"    -> Muunnetaan numeeriseksi ({data_key_name})...")
    for col in df.columns:
        if not pd.api.types.is_numeric_dtype(df[col]):
            df[col] = pd.to_numeric(df[col], errors='coerce')
    df = df.dropna(axis=1, how='all')
    print(f"    -> Esikäsittely valmis ({data_key_name}). Jäljellä sarakkeet: {df.columns.tolist()}")
    return df

# === Pipeline Vaihe 1: Datan Lataus ===
def lataa_raakadata(url_dict):
    """Lataa raakadatatiedostot määritellyistä URL-osoitteista."""
    print("\n" + "="*60)
    print(" Pipeline Vaihe 1: Datan Lataus")
    print("="*60)
    raw_dataframes = {}
    success = True
    for key, url in url_dict.items():
        df_raw = _load_single_csv_from_url(url, key)
        if df_raw is None: success = False
        raw_dataframes[key] = df_raw
    if not success:
        print("\nVAROITUS: Kaikkien datatiedostojen lataus ei onnistunut.")
        return None
    print("\nKaikki datatiedostot ladattu onnistuneesti sanakirjaan.")
    return raw_dataframes

# === Pipeline Vaihe 2: Datan Esikäsittely ja Yhdistäminen ===
def esikasittele_yhdistetty_data(raw_dfs_dict):
    """Yhdistää ja esikäsittelee ladatut raakadatan DataFramet."""
    print("\n" + "="*60)
    print(" Pipeline Vaihe 2: Datan Esikäsittely ja Yhdistäminen")
    print("="*60)
    required_keys = ['weather1', 'weather2', 'aq']
    if not isinstance(raw_dfs_dict, dict) or not all(key in raw_dfs_dict for key in required_keys):
        print("VIRHE: Syöte 'raw_dfs_dict' ei ole kelvollinen.")
        return None

    # --- 1. Esikäsittele ---
    df_w1 = _preprocess_single_fmi_df(raw_dfs_dict.get('weather1'), 'weather1')
    df_w2 = _preprocess_single_fmi_df(raw_dfs_dict.get('weather2'), 'weather2')
    df_aq = _preprocess_single_fmi_df(raw_dfs_dict.get('aq'), 'aq')
    if df_w1 is None or df_w2 is None or df_aq is None:
        print("VIRHE: Kaikkien DataFramejen alustava esikäsittely ei onnistunut.")
        return None

    # --- 2. Nimeä uudelleen ---
    print("\nUudelleennimetään sarakkeita...")
    rename_map_w1 = {'Ilman lämpötila keskiarvo [°C]': 'Temperature_W1','Tuulen suunta keskiarvo [°]': 'WindDirection_W1', 'Keskituulen nopeus keskiarvo [m/s]': 'WindSpeed_W1','Näkyvyys keskiarvo [m]': 'Visibility','Pilvisyys [1/8]': 'Cloudiness', 'Ilmanpaine merenpinnan tasolla keskiarvo [hPa]': 'Pressure_SeaLevel'}
    valid_rename_map_w1 = {k: v for k, v in rename_map_w1.items() if k in df_w1.columns}
    df_w1 = df_w1.rename(columns=valid_rename_map_w1)
    print(f"  -> weather1 sarakkeet: {df_w1.columns.tolist()}")

    rename_map_w2 = {'Lämpötilan keskiarvo [°C]': 'Temperature_W2', 'Ylin lämpötila [°C]': 'Temperature_Max','Alin lämpötila [°C]': 'Temperature_Min', 'Keskituulen nopeus [m/s]': 'WindSpeed_W2','Tuulen suunnan keskiarvo [°]': 'WindDirection_W2','Ilmanpaineen keskiarvo [hPa]': 'Pressure_W2'}
    valid_rename_map_w2 = {k: v for k, v in rename_map_w2.items() if k in df_w2.columns}
    df_w2 = df_w2.rename(columns=valid_rename_map_w2)
    print(f"  -> weather2 sarakkeet: {df_w2.columns.tolist()}")

    aq_cols_to_keep = {'Otsoni [µg/m3]': 'Ozone', 'Hengitettävät hiukkaset <10 µm [µg/m3]': 'PM10','Pienhiukkaset <2.5 µm [µg/m3]': 'PM25','Typpidioksidi [µg/m3]': 'NO2', 'Typpimonoksidi [µg/m3]': 'NO', 'Hiilimonoksidi [µg/m3]': 'CO', 'Rikkidioksidi [µg/m3]': 'SO2', 'Musta hiili [µg/m3]': 'BlackCarbon'}
    cols_to_select = [k for k in aq_cols_to_keep.keys() if k in df_aq.columns]
    if 'Otsoni [µg/m3]' not in cols_to_select:
         print("  -> KRIITTINEN VIRHE: Otsonisaraketta ei löytynyt!")
         return None
    valid_rename_map_aq = {k: v for k, v in aq_cols_to_keep.items() if k in cols_to_select}
    df_aq = df_aq[cols_to_select].rename(columns=valid_rename_map_aq)
    print(f"  -> aq sarakkeet: {df_aq.columns.tolist()}")

    # --- 3. Yhdistä ---
    print("\nYhdistetään DataFrameja...")
    df_weather_combined = pd.merge(df_w1, df_w2, left_index=True, right_index=True, how='outer')
    df_final = pd.merge(df_weather_combined, df_aq, left_index=True, right_index=True, how='outer')
    print(f"  -> Yhdistetty. Muoto: {df_final.shape}")

    # --- 4. Siivoa ---
    print("\nSuoritetaan lopullinen siivous (lajittelu, duplikaatit, reindex, NaN)...")
    df_final = df_final.sort_index()
    if not df_final.index.is_unique:
        num_duplicates = df_final.index.duplicated().sum()
        print(f"  -> Käsitellään {num_duplicates} duplikaatti-indeksiä keskiarvolla...")
        df_final = df_final.groupby(level=0).mean()
        if df_final.index.has_duplicates: print("  -> VIRHE: Duplikaatteja jäi!")
        else: print("  -> Duplikaatit käsitelty.")
    else: print("  -> Ei duplikaatti-indeksejä.")

    if not df_final.empty:
        print("  -> Varmistetaan tasainen tuntifrekvenssi (reindex)...")
        min_ts, max_ts = df_final.index.min(), df_final.index.max()
        full_time_range = pd.date_range(start=min_ts, end=max_ts, freq='h')
        df_final = df_final.reindex(full_time_range)
        print(f"     -> Uusi muoto reindexin jälkeen: {df_final.shape}")
    else: return None

    print("  -> Täytetään NaN-arvot (ffill + bfill)...")
    nan_before = df_final.isnull().sum().sum()
    df_final = df_final.ffill().bfill()
    nan_after = df_final.isnull().sum().sum()
    if nan_after == 0: print(f"     -> NaN-arvot täytetty (alkuperäinen määrä: {nan_before}).")
    else: print(f"     -> VAROITUS: {nan_after} NaN jäi täytön jälkeen!")

    print("\nEsikäsittely ja yhdistäminen valmis.")
    return df_final

# === Pipeline Vaihe 3: Datan Tutkiminen (EDA) ===
def tutki_dataa(df):
    """
    Suorittaa eksploratiivista data-analyysiä (EDA) esikäsitellylle DataFramelle.

    Toimenpiteet:
    - Päällekkäisten sääsarakkeiden käsittely (keskiarvoistus).
    - Negatiivisten pitoisuuksien korjaus nollaksi.
    - Kohdemuuttujan ('Ozone') perustilastojen ja visualisointien tulostus.
    - Korrelaatioanalyysi ja visualisointi (Otsoni vs. Sää).
    - Hajontakaavioiden piirto (Otsoni vs. valitut säämuuttujat).
    - Ajallisen vaihtelun analyysi ja visualisointi (kuukausi, tunti).

    Args:
        df (pandas.DataFrame): Esikäsitelty DataFrame (esim. esikasittele_yhdistetty_data-funktion palauttama).

    Returns:
        pandas.DataFrame or None: DataFrame, jossa päällekkäiset sarakkeet ja
                                   negatiiviset arvot on käsitelty, tai None jos syöte puuttui.
    """
    print("\n" + "="*60)
    print(" Pipeline Vaihe 3: Datan Tutkiminen (EDA)")
    print("="*60)

    if df is None or df.empty:
        print("VIRHE: Ei dataa tutkittavaksi (syöte-DataFrame on None tai tyhjä).")
        return None

    # Tehdään kopio, jotta alkuperäinen DataFrame ei muutu tämän funktion ulkopuolella
    df_eda = df.copy()

    # --- 3a: Päällekkäisten Sääsarakkeiden Käsittely ---
    print("\n--- 3a: Päällekkäisten Sääsarakkeiden Käsittely ---")
    duplicate_pairs = {
        'Temperature': ('Temperature_W1', 'Temperature_W2'),
        'WindSpeed': ('WindSpeed_W1', 'WindSpeed_W2'),
        'WindDirection_Avg': ('WindDirection_W1', 'WindDirection_W2'), # Korjattu nimi W1:lle
        'Pressure': ('Pressure_SeaLevel', 'Pressure_W2')
    }
    columns_to_drop = []
    for new_col_name, (col1, col2) in duplicate_pairs.items():
        if col1 in df_eda.columns and col2 in df_eda.columns:
            print(f"  Käsitellään pari: {col1} & {col2} -> {new_col_name}")
            if new_col_name == 'WindDirection_Avg':
                 rad1 = np.deg2rad(df_eda[col1])
                 rad2 = np.deg2rad(df_eda[col2])
                 x_avg = (np.cos(rad1) + np.cos(rad2)) / 2.0
                 y_avg = (np.sin(rad1) + np.sin(rad2)) / 2.0
                 avg_rad = np.arctan2(y_avg, x_avg)
                 avg_deg = np.rad2deg(avg_rad)
                 df_eda[new_col_name] = (avg_deg + 360) % 360
                 print("    -> Vektorikeskiarvo laskettu tuulen suunnalle.")
            else:
                 df_eda[new_col_name] = df_eda[[col1, col2]].mean(axis=1)
            columns_to_drop.extend([col1, col2])
        else:
            missing = [c for c in [col1, col2] if c not in df_eda.columns]
            print(f"  -> HUOM: Paria {col1}/{col2} ei voitu käsitellä. Puuttuu: {missing}")

    columns_to_drop_unique = list(set(columns_to_drop))
    columns_to_drop_final = [col for col in columns_to_drop_unique if col in df_eda.columns]
    if columns_to_drop_final:
        df_eda = df_eda.drop(columns=columns_to_drop_final)
        print(f"  -> Poistettu alkuperäiset: {columns_to_drop_final}")
    if 'WindDirection_Avg' in df_eda.columns: # Nimetään tuulen suunta lopullisesti
        df_eda = df_eda.rename(columns={'WindDirection_Avg': 'WindDirection'})
        print("  -> Sarake 'WindDirection_Avg' nimetty 'WindDirection'.")
    print("  -> Päällekkäisten sarakkeiden käsittely valmis.")
    print("     Nykyiset sarakkeet:", df_eda.columns.tolist())


    # --- 3b: Negatiivisten Pitoisuuksien Käsittely ---
    print("\n--- 3b: Negatiivisten Pitoisuuksien Tarkistus ja Korjaus ---")
    concentration_cols = ['Ozone', 'PM10', 'PM25', 'NO2', 'NO', 'SO2', 'BlackCarbon', 'CO']
    cols_to_check = [col for col in concentration_cols if col in df_eda.columns]
    negative_counts = (df_eda[cols_to_check] < 0).sum()
    if (negative_counts > 0).any():
        print("  Korjataan negatiiviset pitoisuudet nollaksi...")
        print("  Negatiivisten määrät ennen korjausta:\n", negative_counts[negative_counts > 0].to_string())
        for col in cols_to_check:
            if negative_counts.get(col, 0) > 0: # Käytä get() jos sarake puuttuu
                df_eda[col] = df_eda[col].clip(lower=0)
        print("  -> Negatiiviset pitoisuudet korjattu.")
    else:
        print("  -> Ei negatiivisia pitoisuuksia havaittu.")

    # --- 3c: Kohdemuuttujan (Ozone) Analyysi ---
    print("\n--- 3c: Kohdemuuttujan (Ozone) Alustava Analyysi ---")
    target_col = 'Ozone'
    if target_col not in df_eda.columns:
        print(f"VIRHE: Kohdemuuttujaa '{target_col}' ei löydy! Keskeytetään EDA.")
        return None

    print(f"  Perustilastot '{target_col}':")
    print(df_eda[target_col].describe().to_string())

    print(f"  Visualisoidaan '{target_col}' aikasarja...")
    plt.figure(figsize=(18, 5))
    df_eda[target_col].plot(alpha=0.8, title='Otsonipitoisuus (Ozone) ajan funktiona')
    plt.ylabel('Otsoni (µg/m³)')
    plt.xlabel('Aika')
    plt.grid(True)
    plt.tight_layout(); plt.show()

    print(f"  Visualisoidaan '{target_col}' jakauma...")
    plt.figure(figsize=(10, 5))
    sns.histplot(df_eda[target_col], kde=True, bins=50)
    plt.title(f'{target_col}-pitoisuuksien jakauma'); plt.xlabel('Otsoni (µg/m³)')
    plt.ylabel('Frekvenssi / Tiheys'); plt.grid(True, axis='y')
    plt.tight_layout(); plt.show()

    # --- 3d: Korrelaatioanalyysi (Otsoni vs. Sää) ---
    print("\n--- 3d: Korrelaatioanalyysi (Otsoni vs. Sää) ---")
    weather_cols_for_corr = ['Ozone', 'Temperature', 'WindSpeed', 'WindDirection', 'Pressure', 'Visibility', 'Temperature_Max', 'Temperature_Min']
    weather_cols_for_corr = [col for col in weather_cols_for_corr if col in df_eda.columns]
    print(f"  Lasketaan korrelaatiot sarakkeille: {weather_cols_for_corr}")
    correlation_matrix = df_eda[weather_cols_for_corr].corr()
    plt.figure(figsize=(9, 7))
    sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', fmt=".2f", linewidths=.5)
    plt.title('Korrelaatiomatriisi: Otsoni vs. Säämuuttujat'); plt.show()
    print("\n  Otsonin korrelaatiot muihin (laskevassa järj.):")
    print(correlation_matrix['Ozone'].sort_values(ascending=False).drop('Ozone').to_string())

    # --- 3e: Hajontakaaviot (Otsoni vs. Sää) ---
    print("\n--- 3e: Hajontakaaviot (Otsoni vs. Valitut Säämuuttujat) ---")
    scatter_features = ['Temperature', 'WindSpeed', 'Visibility', 'Pressure']
    scatter_features = [col for col in scatter_features if col in df_eda.columns]
    print(f"  Piirretään hajontakaaviot: Ozone vs {scatter_features}")
    num_features = len(scatter_features)
    num_rows = (num_features + 1) // 2
    plt.figure(figsize=(14, 5 * num_rows))
    for i, feature in enumerate(scatter_features):
        plt.subplot(num_rows, 2, i + 1)
        sns.scatterplot(data=df_eda, x=feature, y='Ozone', alpha=0.1, s=5)
        plt.title(f'Otsoni vs. {feature}'); plt.xlabel(feature); plt.ylabel('Otsoni (µg/m³)')
        plt.grid(True)
    plt.tight_layout(); plt.show()

    # --- 3f: Ajallinen Analyysi ---
    print("\n--- 3f: Ajallinen Analyysi (Kausi- ja Vuorokausivaihtelu) ---")
    print("  Visualisoidaan keskimääräinen kuukausivaihtelu...")
    monthly_avg = df_eda.groupby(df_eda.index.month)[['Ozone', 'Temperature']].mean()
    monthly_avg.index.name = 'Kuukausi'
    fig, ax1 = plt.subplots(figsize=(12, 5))
    ax2 = ax1.twinx()
    monthly_avg['Ozone'].plot(ax=ax1, color='tab:red', marker='o', label='Otsoni')
    monthly_avg['Temperature'].plot(ax=ax2, color='tab:blue', marker='s', linestyle='--', label='Lämpötila')
    ax1.set_ylabel('Keskim. Otsoni (µg/m³)', color='tab:red'); ax1.tick_params(axis='y', labelcolor='tab:red')
    ax2.set_ylabel('Keskim. Lämpötila (°C)', color='tab:blue'); ax2.tick_params(axis='y', labelcolor='tab:blue')
    ax1.set_xlabel('Kuukausi'); ax1.set_title('Keskimääräinen kuukausivaihtelu'); ax1.grid(True)
    ax1.set_xticks(monthly_avg.index); fig.legend(loc="upper right", bbox_to_anchor=(1,1), bbox_transform=ax1.transAxes)
    fig.tight_layout(); plt.show()

    print("  Visualisoidaan keskimääräinen tuntivaihtelu...")
    hourly_avg = df_eda.groupby(df_eda.index.hour)[['Ozone', 'Temperature']].mean()
    hourly_avg.index.name = 'Tunti'
    fig, ax1 = plt.subplots(figsize=(12, 5))
    ax2 = ax1.twinx()
    hourly_avg['Ozone'].plot(ax=ax1, color='tab:red', marker='o', label='Otsoni')
    hourly_avg['Temperature'].plot(ax=ax2, color='tab:blue', marker='s', linestyle='--', label='Lämpötila')
    ax1.set_ylabel('Keskim. Otsoni (µg/m³)', color='tab:red'); ax1.tick_params(axis='y', labelcolor='tab:red')
    ax2.set_ylabel('Keskim. Lämpötila (°C)', color='tab:blue'); ax2.tick_params(axis='y', labelcolor='tab:blue')
    ax1.set_xlabel('Tunti vuorokaudessa'); ax1.set_title('Keskimääräinen tuntivaihtelu'); ax1.grid(True)
    ax1.set_xticks(hourly_avg.index[::2]); fig.legend(loc="upper right", bbox_to_anchor=(1,1), bbox_transform=ax1.transAxes)
    fig.tight_layout(); plt.show()

    print("\nEDA-vaihe suoritettu.")
    # Palautetaan DataFrame, jossa päällekkäiset sarakkeet ja negatiiviset arvot on käsitelty
    return df_eda

# === Pipeline Vaihe 4: Tallenna Käsitelty Data (Valinnainen) ===
# HUOM: Nimeä tallennusfunktio uudelleen, koska se tallentaa nyt EDA:n jälkeisen datan
def tallenna_eda_jalkeen_data(df, polku):
    """Tallentaa annetun DataFramen Parquet-tiedostoon EDA-vaiheen jälkeen."""
    print("\n" + "="*60)
    print(" Pipeline Vaihe 4: Datan Tallennus EDA:n Jälkeen (Parquet)")
    print("="*60)
    if df is None or df.empty:
        print("VIRHE: Ei dataa tallennettavaksi.")
        return False
    try:
        import pyarrow
        print("'pyarrow'-kirjasto löytyy.")
    except ImportError:
        print("VAROITUS: 'pyarrow' puuttuu. Asenna: pip install pyarrow")
        return False
    try:
        output_folder = os.path.dirname(polku)
        if output_folder: os.makedirs(output_folder, exist_ok=True)
        df.to_parquet(polku, index=True)
        print(f"\nEDA:n jälkeinen data tallennettu Parquet-tiedostoon: {polku}")
        return True
    except Exception as e:
        print(f"\nVIRHE tallennettaessa EDA:n jälkeistä dataa Parquet-muodossa tiedostoon {polku}: {e}")
        return False

# === Pääsuorituslohko ===
if __name__ == "__main__":
    print("\n" + "="*80)
    print(" ALOITETAAN DATAN ESIKÄSITTELY- JA EDA-PIPELINE")
    print("="*80)

    # 1. Lataa raakadata
    raaka_datat_dict = lataa_raakadata(DATA_URLS)

    # 2. Esikäsittele ja yhdistä data
    df_kasitelty = None
    if raaka_datat_dict:
        df_kasitelty = esikasittele_yhdistetty_data(raaka_datat_dict)

    # 3. Suorita EDA (jos esikäsittely onnistui)
    df_eda_valmis = None
    if df_kasitelty is not None:
        df_eda_valmis = tutki_dataa(df_kasitelty) # Kutsutaan uutta EDA-funktiota
    else:
        print("\nPipeline keskeytyi, koska datan esikäsittely epäonnistui.")

    # 4. Tulosta yhteenveto EDA:n jälkeisestä datasta (jos EDA onnistui)
    if df_eda_valmis is not None:
        print("\n" + "-"*40)
        print(" EDA:N JÄLKEISEN DATAN YHTEENVETO ")
        print("-"*40)
        print("\nInfo:")
        df_eda_valmis.info()
        print("\n5 viimeistä riviä:") # Vaihdettu head -> tail
        print(df_eda_valmis.tail())

        # 5. Tallenna EDA:n jälkeinen data (Valinnainen)
        # Voit nyt tallentaa tämän df_eda_valmis DataFramen
        tallennettu_ok = tallenna_eda_jalkeen_data(df_eda_valmis, PROCESSED_DATA_PATH)
        if tallennettu_ok: print(" -> Tallennus onnistui.")
        else: print(" -> Tallennus ei onnistunut tai sitä ei suoritettu.")
    else:
         print("\nPipeline keskeytyi ennen EDA-vaiheen päättymistä tai se epäonnistui.")


    print("\n" + "="*80)
    print(" PIPELINE SUORITETTU LOPPUUN")
    print("="*80)