# ETL Pipeline - Scalable Data Processing

This notebook implements a comprehensive ETL (Extract, Transform, Load) pipeline for processing economic and business data from multiple sources. It transforms raw data from various formats into a normalized star schema suitable for analysis and forecasting.

## Overview

The pipeline performs the following operations:
1. **Extract**: Reads data from multiple CSV and Excel files
2. **Transform**: Normalizes PKD codes, aggregates indicators, and standardizes formats
3. **Load**: Creates a star schema with fact and dimension tables
4. **Validate**: Performs data quality checks
5. **Export**: Saves results to CSV files

## Key Features

- **Modular Architecture**: Processor classes for different data source types
- **PKD Normalization**: Maps historical PKD codes (2007) to current standard (2025)
- **Star Schema**: Optimized dimensional model for analytics
- **Data Quality**: Built-in validation and null handling
- **Scalability**: Configuration-driven approach for adding new data sources

---

## Setup

Install required dependencies and import modules.

In [None]:
# Install xlrd library for reading Excel files with .xls extension
!pip install xlrd




[notice] A new release of pip is available: 24.0 -> 25.3
[notice] To update, run: C:\Users\tusiu\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


In [None]:
import sys
import os
from importlib import reload

# Add the current project directory to Python's module search path
# This allows importing custom modules from the project folder
sys.path.append(os.path.join('.'))

# Import core pipeline components
from etl_pipeline_new import ETLPipeline
import data_sources_config

# Reload the config module to pick up any recent changes
# Useful during development when config file is modified
reload(data_sources_config)
from data_sources_config import get_data_sources_config

print("✓ Modules loaded successfully")

✓ Modules loaded successfully


## Initialize Pipeline

Create an instance of the ETL pipeline with the PKD mapping file. The mapping file contains the historical mapping from PKD 2007 classification to PKD 2025, ensuring consistency across all data sources.

In [None]:
# Initialize ETL pipeline with PKD mapping file
# The mapping file translates PKD 2007 codes to PKD 2025 standard
# This ensures all data sources use consistent classification codes
pipeline = ETLPipeline(
    mapping_file_path=os.path.join('..', '..', 'data', 'mapowanie_pkd.xlsx')
)

print("✓ Pipeline initialized")

✓ Pipeline initialized


## Load and Process Data Sources

All data sources are configured in `data_sources_config.py`. The configuration defines:
- **Data source paths**: Location of input files
- **Processor classes**: Specialized processors for different data formats
- **Processing parameters**: Year ranges, sheet names, column mappings

Each data source is processed by its designated processor, which:
1. Reads the raw data (CSV or Excel)
2. Standardizes column names and formats
3. Maps PKD codes to the 2025 standard
4. Aggregates data at the appropriate level (Sekcja, Dział, etc.)
5. Returns a normalized DataFrame

In [None]:
# Get data sources configuration from the config module
data_sources = get_data_sources_config()

print(f"Found {len(data_sources)} data sources to process:\n")

# Process each data source using its configured processor
# The processor handles format-specific logic and returns normalized data
for source_config in data_sources:
    # Get the processor class (e.g., FinancialDataProcessor, RegonDataProcessor)
    processor_class = source_config['processor_class']
    
    # Instantiate processor with PKD mapper for code normalization
    processor = processor_class(pipeline.pkd_mapper)
    
    # Add the processed data source to the pipeline
    # kwargs contain processor-specific parameters (file paths, years, etc.)
    pipeline.add_data_source(
        processor=processor,
        name=source_config['name'],
        **source_config['kwargs']
    )

print("\n✓ All data sources processed")

Found 8 data sources to process:

Processing Upadłości (KRZ_PKD)...
  ✓ Loaded 2,538 rows
Processing Wskaźniki Finansowe...
  ✓ Loaded 418,560 rows
Processing Dane Kwartalne - Pracujący...
  ✓ Loaded 418,560 rows
Processing Dane Kwartalne - Pracujący...
  ✓ Loaded 40,705 rows
Processing Dane Kwartalne - liczba firm vs działalności gospodarczych...
  ✓ Loaded 40,705 rows
Processing Dane Kwartalne - liczba firm vs działalności gospodarczych...
  ✓ Loaded 24,423 rows
Processing Dane Kwartalne - liczba nowych firm w roku...
  ✓ Loaded 24,423 rows
Processing Dane Kwartalne - liczba nowych firm w roku...
  ✓ Loaded 7,793 rows
Processing Dane Miesięczne - liczba firm zarejestrowanych...
  ✓ Loaded 7,793 rows
Processing Dane Miesięczne - liczba firm zarejestrowanych...
  ✓ Loaded 759 rows
Processing Dane Miesięczne - liczba firm zamkniętych...
  ✓ Loaded 759 rows
Processing Dane Miesięczne - liczba firm zamkniętych...
  ✓ Loaded 759 rows
Processing Dane Miesięczne - liczba firm z zawieszoną dz

## Combine Data

Merge all processed data sources into a single unified dataset. This step:
- Concatenates all DataFrames vertically (union)
- Standardizes the schema across all sources
- Preserves metadata (year, PKD code, indicator name, value)

The combined dataset contains all KPI values from all sources, ready for dimensional modeling.

In [None]:
# Combine all data sources into one unified fact table
# Each row represents: (year, PKD code, indicator, value)
combined_data = pipeline.combine_data()

# Display summary statistics about the combined dataset
print(f"✓ Combined data: {len(combined_data):,} rows")
print(f"  Years: {combined_data['rok'].min()} - {combined_data['rok'].max()}")
print(f"  Unique indicators: {combined_data['WSKAZNIK'].nunique():,}")
print(f"  Unique PKD codes: {combined_data['pkd_2025'].nunique():,}")

# Show sample rows to verify data structure
combined_data.head(10)

✓ Combined data: 496,077 rows
  Years: 2005 - 2025
  Unique indicators: 37
  Unique PKD codes: 1,489


Unnamed: 0,rok,pkd_2025,WSKAZNIK,wartosc
0,2018,01.11.Z,Upadłość,6
1,2018,01.13.Z,Upadłość,4
2,2018,01.19.Z,Upadłość,2
3,2018,01.24.Z,Upadłość,1
4,2018,01.29.Z,Upadłość,1
5,2018,01.30.Z,Upadłość,1
6,2018,01.41.Z,Upadłość,3
7,2018,01.47.Z,Upadłość,2
8,2018,01.48.Z,Upadłość,1
9,2018,01.50.Z,Upadłość,9


## Build Dimension Tables

Transform the combined dataset into a **star schema** for efficient analysis. This creates:

### Fact Table
- Contains foreign keys and measurable values
- Columns: `rok`, `PKD_INDEX`, `WSKAZNIK_INDEX`, `wartosc`
- Optimized for queries and aggregations

### Dimension Tables

1. **WSKAZNIK Dictionary** - Indicator dimension
   - Maps WSKAZNIK_INDEX → indicator name
   - Includes MinMax attribute for optimization direction

2. **PKD Dictionary** - Industry classification dimension
   - Maps PKD_INDEX → PKD code and name
   - Links to PKD type via TYP_INDEX

3. **PKD Type Dictionary** - Classification level dimension
   - Maps TYP_INDEX → type name (Sekcja, Dział, etc.)

This star schema design enables:
- Fast analytical queries
- Easy joins between fact and dimensions
- Reduced data redundancy
- Simplified business intelligence reporting

In [None]:
# Build dimension tables and map indices in the fact table
# This transforms the denormalized data into a star schema
(
    fact_table,           # Fact table with numeric indices
    wskaznik_dictionary,  # Indicator dimension
    pkd_dictionary,       # PKD dimension
    pkd_typ_dictionary    # PKD type dimension
) = pipeline.build_dictionaries(combined_data)

print("✓ Dimension tables built")

✓ Dimension tables built


## Inspect Results

Examine the generated tables to verify structure and content. This section displays:
- Sample rows from each table
- Row counts and column lists
- Data types and formats
- Mapping coverage statistics

In [None]:
# Display fact table structure and sample data
# Fact table contains the core measurable data with foreign keys to dimensions
print("=== FACT TABLE ===")
print(f"Rows: {len(fact_table):,}")
print(f"Columns: {list(fact_table.columns)}")
display(fact_table.head(10))

=== FACT TABLE ===
Rows: 486,315
Columns: ['rok', 'wartosc', 'WSKAZNIK_INDEX', 'PKD_INDEX']


Unnamed: 0,rok,wartosc,WSKAZNIK_INDEX,PKD_INDEX
0,2018,6,35,3.0
1,2018,4,35,7.0
2,2018,2,35,15.0
3,2018,1,35,24.0
4,2018,1,35,34.0
5,2018,1,35,37.0
6,2018,3,35,40.0
7,2018,2,35,52.0
8,2018,1,35,54.0
9,2018,9,35,57.0


In [None]:
# Display indicator dimension table
# Contains unique indicators with their names and optimization direction (MinMax)
print("=== WSKAZNIK DIMENSION ===")
print(f"Total indicators: {len(wskaznik_dictionary):,}")
display(wskaznik_dictionary.head(20))

=== WSKAZNIK DIMENSION ===
Total indicators: 37


Unnamed: 0,WSKAZNIK_INDEX,WSKAZNIK,MinMax
0,0,C Środki pieniężne i pap. wart.,Max
1,1,CF Nadwyżka finansowa,Max
2,2,DEPR Amortyzacja,Min
3,3,EN Liczba jednostek gospodarczych,Max
4,4,GS (I) Przychody netto ze sprzedaży i zrównane...,Max
5,5,GS Przychody ogółem,Max
6,6,INV Zapasy,Min
7,7,IO Wartość nakładów inwestycyjnych,Min
8,8,IP Odsetki do zapłacenia,Min
9,9,LTC Długoterminowe kredyty bankowe,Min


In [None]:
# Analyze MinMax mapping coverage
# MinMax indicates whether an indicator should be maximized or minimized
# This is important for optimization and scoring algorithms
print("=== MinMax Mapping Status ===")
has_minmax = wskaznik_dictionary['MinMax'].notna().sum()
total = len(wskaznik_dictionary)
print(f"{has_minmax} out of {total} indicators have MinMax values ({has_minmax/total*100:.1f}%)")

# Show indicators that have MinMax defined
print("\n=== Indicators WITH MinMax ===")
display(wskaznik_dictionary[wskaznik_dictionary['MinMax'].notna()])

# Show indicators missing MinMax (may need manual mapping)
print("\n=== Indicators WITHOUT MinMax ===")
display(wskaznik_dictionary[wskaznik_dictionary['MinMax'].isna()])

=== MinMax Mapping Status ===
37 out of 37 indicators have MinMax values (100.0%)

=== Indicators WITH MinMax ===


Unnamed: 0,WSKAZNIK_INDEX,WSKAZNIK,MinMax
0,0,C Środki pieniężne i pap. wart.,Max
1,1,CF Nadwyżka finansowa,Max
2,2,DEPR Amortyzacja,Min
3,3,EN Liczba jednostek gospodarczych,Max
4,4,GS (I) Przychody netto ze sprzedaży i zrównane...,Max
5,5,GS Przychody ogółem,Max
6,6,INV Zapasy,Min
7,7,IO Wartość nakładów inwestycyjnych,Min
8,8,IP Odsetki do zapłacenia,Min
9,9,LTC Długoterminowe kredyty bankowe,Min



=== Indicators WITHOUT MinMax ===


Unnamed: 0,WSKAZNIK_INDEX,WSKAZNIK,MinMax


In [None]:
# Display PKD (Polish Classification of Activities) dimension table
# Maps PKD codes to their descriptions and type classifications
print("=== PKD DIMENSION ===")
print(f"Total PKD codes: {len(pkd_dictionary):,}")
display(pkd_dictionary.head(20))

=== PKD DIMENSION ===
Total PKD codes: 1,764


Unnamed: 0,PKD_INDEX,symbol,nazwa,TYP_INDEX
0,0,01,"UPRAWY ROLNE, CHÓW I HODOWLA ZWIERZĄT, ŁOWIECT...",1
1,1,01.1,Uprawy rolne inne niż wieloletnie,2
2,2,01.11,"Uprawa zbóż innych niż ryż, roślin strączkowyc...",3
3,3,01.11.Z,"Uprawa zbóż innych niż ryż, roślin strączkowyc...",4
4,4,01.12,Uprawa ryżu,3
5,5,01.12.Z,Uprawa ryżu,4
6,6,01.13,"Uprawa warzyw, włączając melony oraz uprawa ro...",3
7,7,01.13.Z,"Uprawa warzyw, włączając melony oraz uprawa ro...",4
8,8,01.14,Uprawa trzciny cukrowej,3
9,9,01.14.Z,Uprawa trzciny cukrowej,4


In [None]:
# Display PKD type dimension table
# Defines hierarchy levels: Sekcja (Section), Dział (Division), etc.
print("=== PKD TYPE DIMENSION ===")
print(f"Total types: {len(pkd_typ_dictionary):,}")
display(pkd_typ_dictionary)

=== PKD TYPE DIMENSION ===
Total types: 6


Unnamed: 0,TYP_INDEX,typ
0,0,SEKCJA
1,1,DZIAŁ
2,2,GRUPA
3,3,KLASA
4,4,PODKLASA
5,5,OGÓŁEM


## Data Quality Checks

Perform validation checks to ensure data integrity:
- **Null value analysis**: Identify missing values by column
- **Data type verification**: Ensure proper types for each field
- **Range validation**: Check for outliers or invalid values
- **Referential integrity**: Verify foreign key relationships

These checks help identify data quality issues before downstream processing.

In [None]:
# Analyze null values in the fact table
# Nulls in 'wartosc' (value) column are expected for missing data points
# Nulls in key columns (rok, PKD_INDEX, WSKAZNIK_INDEX) indicate data quality issues
print("=== NULL VALUE ANALYSIS ===")
null_counts = fact_table.isnull().sum()
null_percentages = (null_counts / len(fact_table) * 100).round(2)

for col in fact_table.columns:
    if null_counts[col] > 0:
        print(f"{col}: {null_counts[col]:,} ({null_percentages[col]}%)")
    else:
        print(f"{col}: ✓ No nulls")

=== NULL VALUE ANALYSIS ===
rok: ✓ No nulls
wartosc: 101,940 (20.96%)
WSKAZNIK_INDEX: ✓ No nulls
PKD_INDEX: ✓ No nulls


In [None]:
# Verify data types of fact table columns
# rok: should be int
# PKD_INDEX, WSKAZNIK_INDEX: should be int
# wartosc: should be float64 or object (for Decimal/None)
print("=== DATA TYPES ===")
print(fact_table.dtypes)
print("\nValue types in 'wartosc' column:")
print(fact_table['wartosc'].apply(type).value_counts())

=== DATA TYPES ===
rok                 int64
wartosc            object
WSKAZNIK_INDEX      int64
PKD_INDEX         float64
dtype: object

Value types in 'wartosc' column:
wartosc
<class 'decimal.Decimal'>    320479
<class 'NoneType'>           101940
<class 'int'>                 56484
<class 'float'>                7412
Name: count, dtype: int64


## Save Results

Export all tables to CSV files for downstream use:
- Analysis and reporting
- Machine learning model training
- Dashboard visualization
- Data sharing and archiving

Files are saved to the `results-pipeline/` directory.

In [None]:
# Save all tables to CSV files in the results-pipeline directory
# This creates the final output files that will be used by:
# - Prediction models (src/prediction.py)
# - Dashboard application (dashboard/)
# - Analysis scripts
pipeline.save_results(
    fact_table=fact_table,
    wskaznik_dict=wskaznik_dictionary,
    pkd_dict=pkd_dictionary,
    pkd_typ_dict=pkd_typ_dictionary,
    output_dir=os.path.join('..', '..', 'results-pipeline')
)


✓ All tables saved to ..\..\results-pipeline/
  - Fact table: 486,315 rows
  - WSKAZNIK dictionary: 37 indicators
  - PKD dictionary: 1,764 codes
  - PKD type dictionary: 6 types


## Summary

This ETL pipeline successfully processes multiple economic data sources into a normalized star schema.

### Output Files

The pipeline generates four CSV files in `results-pipeline/`:

1. **kpi-value-table.csv** - Fact table containing all KPI values
   - Primary data table for analysis
   - Contains time-series economic indicators
   
2. **wskaznik_dictionary.csv** - Indicator dimension table
   - Maps indicator IDs to names
   - Includes MinMax optimization direction
   
3. **pkd_dictionary.csv** - PKD classification dimension table
   - Maps industry codes to descriptions
   - Links to PKD type hierarchy
   
4. **pkd_typ_dictionary.csv** - PKD type dimension table
   - Defines classification levels (Sekcja, Dział, etc.)

### Schema Details

**Fact Table** (`kpi-value-table.csv`):
- `rok` (int): Year of measurement
- `PKD_INDEX` (int): Foreign key to pkd_dictionary
- `WSKAZNIK_INDEX` (int): Foreign key to wskaznik_dictionary
- `wartosc` (Decimal/None): KPI value (null for missing data)

**WSKAZNIK Dictionary** (`wskaznik_dictionary.csv`):
- `WSKAZNIK_INDEX` (int): Primary key
- `WSKAZNIK` (str): Indicator name/description
- `MinMax` (str): Optimization direction ('Max' or 'Min')

**PKD Dictionary** (`pkd_dictionary.csv`):
- `PKD_INDEX` (int): Primary key
- `symbol` (str): PKD classification code
- `nazwa` (str): Industry/sector name
- `TYP_INDEX` (int): Foreign key to pkd_typ_dictionary

**PKD Type Dictionary** (`pkd_typ_dictionary.csv`):
- `TYP_INDEX` (int): Primary key
- `typ` (str): Classification level name (e.g., 'Sekcja', 'Dział')

### Data Flow

```
Raw Data Sources → Processors → Combined DataFrame → Star Schema → CSV Files
     ↓                ↓              ↓                    ↓            ↓
  CSV/Excel      Normalize     Union All          Dictionaries   Export
  Files          PKD Codes     Sources            + Fact Table   Results
```

### Usage

The generated files can be used for:
- **Predictive modeling**: Training forecasting models
- **Data visualization**: Creating dashboards and reports
- **Statistical analysis**: Analyzing trends and patterns
- **Business intelligence**: Supporting decision-making processes

### Next Steps

1. Run prediction models: `python src/run_prediction.py`
2. Generate forecasts: `python src/prediction.py`
3. Launch dashboard: `npm run dev` (in dashboard directory)
4. Perform custom analysis using the star schema tables