In [35]:
import pandas as pd
df = pd.read_csv('data/OBY01CRGEN.csv')
df
# uncomment with cmd + k + u

Unnamed: 0,Ukazatel,IndicatorType,Roky,CasR2A,Území,Uz0,Hodnota
0,Úhrnná plodnost,5405W,2023,2023,Česko,CZ,1.452572
1,Úhrnná plodnost,5405W,2022,2022,Česko,CZ,1.617747
2,Úhrnná plodnost,5405W,2021,2021,Česko,CZ,1.826536
3,Úhrnná plodnost,5405W,2020,2020,Česko,CZ,1.707373
4,Úhrnná plodnost,5405W,2019,2019,Česko,CZ,1.708963
...,...,...,...,...,...,...,...
1156,Průměrný věk žen při prvním sňatku,7182Z,1965,1965,Česko,CZ,21.700006
1157,Průměrný věk žen při prvním sňatku,7182Z,1964,1964,Česko,CZ,21.534990
1158,Průměrný věk žen při prvním sňatku,7182Z,1963,1963,Česko,CZ,21.342171
1159,Průměrný věk žen při prvním sňatku,7182Z,1962,1962,Česko,CZ,21.359491


In [36]:
# Keep only what you want (drops the rest)
df = df[["Roky", "Ukazatel", "Hodnota"]].copy()

In [37]:
# Custom transformation to wide format
df_wide = (
    df
    .pivot_table(
        index="Roky",
        columns="Ukazatel",
        values="Hodnota",
        aggfunc="mean"   # safe even if duplicates appear later
    )
    .reset_index()
    .sort_values("Roky")
)

# optional: ensure numeric dtype
for col in df_wide.columns:
    if col != "Roky":
        df_wide[col] = pd.to_numeric(df_wide[col], errors="coerce")

df_wide.to_csv('data/OBY01CRGEN_transformed.csv', index=False, encoding='utf-8-sig')
df = df_wide

In [38]:
import pandas as pd
import numpy as np
import altair as alt

class DataIntake:
    """
    Ultimate ML Pipeline: Section 1.3 - The Nuclear Intake
    Forces data types and uncovers hidden 'String' nulls.
    """
    def __init__(self, df):
        self.df = df.copy()
        self.report = {}

    def _force_discovery(self):
        """
        Attempts to convert every column to numeric. 
        If a cell contains 'Missing value' or text, it becomes np.nan.
        """
        for col in self.df.columns:
            # Skip columns that are clearly intended to be categorical/text
            if self.df[col].dtype == 'object' or self.df[col].dtype == 'string':
                # Attempt conversion; text that can't be a number becomes NaN
                converted = pd.to_numeric(self.df[col], errors='coerce')
                
                # Only update the column if it successfully found numbers
                # (This prevents turning a 'City' name column into all NaNs)
                if converted.notnull().sum() > 0:
                    self.df[col] = converted

    def profile_data(self):
        # 1. Run the discovery first
        self._force_discovery()
        
        # 2. Count actual NaNs now that they are coerced
        null_counts = self.df.isnull().sum().reset_index()
        null_counts.columns = ['column', 'missing_count']
        null_counts['missing_pct'] = (null_counts['missing_count'] / len(self.df)) * 100
        
        # 3. Dynamic Type Tagging
        types = []
        for col in self.df.columns:
            if pd.api.types.is_numeric_dtype(self.df[col]):
                types.append("Continuous Numerical")
            else:
                types.append("Categorical/Text")
        
        null_counts['semantic_type'] = types
        self.report = null_counts
        return self.report

    def plot_dna(self):
        # We add a small baseline (0.5) so you can see a tiny bar 
        # even if the data is 0% missing, ensuring the X-axis labels appear.
        chart = alt.Chart(self.report).mark_bar(size=20).encode(
            x=alt.X('column:N', sort='-y', title="Feature Names (Czech)"),
            y=alt.Y('missing_pct:Q', title="Missing Data (%)", scale=alt.Scale(domain=[0, 100])),
            color=alt.condition(
                alt.datum.missing_pct > 0,
                alt.value('#FF3366'),  # Neon Pink for missing
                alt.value('#00CC96')   # Green for clean
            ),
            tooltip=['column', 'missing_pct']
        ).properties(width=800, height=400, title="Data DNA: Corrected Null Detection")
        
        return chart


# --- Usage Example --- 
intake = DataIntake(df)
profile = intake.profile_data()
print(profile)
intake.plot_dna().display()


                                          column  missing_count  missing_pct  \
0                                           Roky              0     0.000000   
1                          Hrubá míra reprodukce              0     0.000000   
2                             Naděje dožití mužů              0     0.000000   
3                              Naděje dožití žen              0     0.000000   
4   Průměrná délka trvání manželství při rozvodu             11    10.576923   
5      Průměrný věk matek při narození 1. dítěte              5     4.807692   
6         Průměrný věk matek při narození dítěte              0     0.000000   
7            Průměrný věk mužů při prvním sňatku             41    39.423077   
8             Průměrný věk žen při prvním sňatku             41    39.423077   
9              Tabulková prvosňatečnost mužů (%)             41    39.423077   
10              Tabulková prvosňatečnost žen (%)             41    39.423077   
11                               Úhrnná 

For comparison of production and training df 

In [None]:
from scipy.stats import ks_2samp
import plotly.graph_objects as go
import plotly.express as px

class DriftDetector(DataIntake):
    """
    Ultimate ML Pipeline: Section 1.1 - Drift & Shift Monitoring
    Compares Reference vs. Current data distributions.
    """
    def __init__(self, ref_df, cur_df):
        super().__init__(cur_df)
        self.ref_df = ref_df
        self.cur_df = cur_df
        self.drift_report = {}

    def detect_drift(self, threshold=0.05):
        """
        Performs KS-Test for numerical features. 
        P-value < threshold implies the distributions have drifted.
        """
        drift_results = []
        num_cols = self.cur_df.select_dtypes(include=[np.number]).columns
        
        for col in num_cols:
            stat, p_val = ks_2samp(self.ref_df[col].dropna(), self.cur_df[col].dropna())
            drift_results.append({
                'feature': col,
                'ks_stat': stat,
                'p_value': p_val,
                'is_drifted': p_val < threshold
            })
            
        self.drift_report = pd.DataFrame(drift_results)
        return self.drift_report

    def plot_drift_radar(self):
        """
        A modern Plotly Radar chart to visualize drift across features.
        The closer the line is to the center, the more 'stable' the feature is.
        """
        # Using (1 - KS-Stat) as 'Stability Score' for the radar
        fig = go.Figure()

        fig.add_trace(go.Scatterpolar(
            r=1 - self.drift_report['ks_stat'],
            theta=self.drift_report['feature'],
            fill='toself',
            name='Feature Stability',
            marker=dict(color='#00CC96')
        ))

        fig.update_layout(
            polar=dict(
                radialaxis=dict(visible=True, range=[0, 1])
            ),
            showlegend=True,
            title="Feature Stability Radar (1 = No Drift, 0 = High Drift)",
            template="plotly_dark"
        )
        return fig

# --- Usage Example ---
# detector = DriftDetector(training_df, production_df)
# drift_df = detector.detect_drift()
# detector.plot_drift_radar().show()

In [42]:
import holoviews as hv
from sklearn.ensemble import IsolationForest
import pandas as pd
import numpy as np

hv.extension('bokeh')

class OutlierLab:
    """
    Automated Outlier Detection + Time-series plots for ALL numeric variables.
    """
    def __init__(self, df, time_col="Roky"):
        self.df = df.copy()
        self.time_col = time_col

        if self.time_col not in self.df.columns:
            raise ValueError(f"time_col='{self.time_col}' not found in df columns.")

        # Detect numeric columns, excluding time_col if it's numeric
        self.numeric_cols = self.df.select_dtypes(include=[np.number]).columns.tolist()
        if self.time_col in self.numeric_cols:
            self.numeric_cols.remove(self.time_col)

    def detect_outliers(self, contamination=0.05):
        iso = IsolationForest(contamination=contamination, random_state=42)

        temp_data = self.df[self.numeric_cols].fillna(self.df[self.numeric_cols].median())
        self.df["is_outlier"] = iso.fit_predict(temp_data)
        self.df["is_outlier"] = self.df["is_outlier"].map({1: "Inlier", -1: "Outlier"})
        return self.df

    def plot_all_timeseries(self, ncols=2, width=650, height=250, show_outliers=True):
        """
        One time-series per numeric variable.
        If show_outliers=True and detect_outliers() was run, overlays outlier points.
        """
        # Sort by time for proper lines
        dfp = self.df.sort_values(self.time_col).copy()

        plots = []
        for col in self.numeric_cols:
            line = hv.Curve(dfp, kdims=[self.time_col], vdims=[col]).opts(
                width=width,
                height=height,
                title=col,
                tools=["hover"],
                show_grid=False,
            )

            if show_outliers and "is_outlier" in dfp.columns:
                out_pts = dfp[dfp["is_outlier"] == "Outlier"]
                pts = hv.Scatter(out_pts, kdims=[self.time_col], vdims=[col]).opts(
                    size=6,
                    color="#FF3366",
                    marker="circle",
                    tools=["hover"],
                )
                plots.append(line * pts)
            else:
                plots.append(line)

        return hv.Layout(plots).cols(ncols)

# --- Usage Example ---
lab = OutlierLab(df, time_col="Roky")
lab.detect_outliers(contamination=0.04)   # optional
lab.plot_all_timeseries(ncols=2)

In [13]:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.preprocessing import StandardScaler, RobustScaler, OneHotEncoder, OrdinalEncoder
import pandas as pd
import numpy as np

class FeatureTransformer:
    """
    Ultimate ML Pipeline: Section 3 - Preprocessing Suite
    Automatically routes features to the correct transformation logic.
    """
    def __init__(self, df, target_col=None):
        self.df = df.copy()
        self.target_col = target_col
        self.numeric_features = self.df.select_dtypes(include=[np.number]).columns.tolist()
        self.categorical_features = self.df.select_dtypes(exclude=[np.number]).columns.tolist()
        
        # Remove target and outlier flag from feature list
        for col in [target_col, 'is_outlier']:
            if col in self.numeric_features: self.numeric_features.remove(col)
            if col in self.categorical_features: self.categorical_features.remove(col)

    def build_processor(self, use_knn_imputer=False, scaling_type='robust'):
        """
        Creates a Scikit-Learn ColumnTransformer for automated processing.
        """
        # 1. Numeric Path: Impute -> Scale
        num_imputer = KNNImputer(n_neighbors=5) if use_knn_imputer else SimpleImputer(strategy='median')
        scaler = RobustScaler() if scaling_type == 'robust' else StandardScaler()
        
        numeric_transformer = Pipeline(steps=[
            ('imputer', num_imputer),
            ('scaler', scaler)
        ])

        # 2. Categorical Path: Impute -> Encode
        categorical_transformer = Pipeline(steps=[
            ('imputer', SimpleImputer(strategy='most_frequent')),
            ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
        ])

        # 3. Combine into the Universal Processor
        self.processor = ColumnTransformer(
            transformers=[
                ('num', numeric_transformer, self.numeric_features),
                ('cat', categorical_transformer, self.categorical_features)
            ]
        )
        return self.processor

    def fit_transform(self):
        """Executes the pipeline and returns a cleaned DataFrame with names."""
        processed_data = self.processor.fit_transform(self.df)
        
        # Recover feature names for the modern aesthetic
        cat_names = self.processor.named_transformers_['cat'].get_feature_names_out(self.categorical_features)
        all_col_names = self.numeric_features + list(cat_names)
        
        return pd.DataFrame(processed_data, columns=all_col_names)

# --- Usage Example ---
transformer = FeatureTransformer(df, target_col='Hodnota')
transformer.build_processor(scaling_type='robust')
clean_df = transformer.fit_transform()