In [1]:
# Import necessary libraries
import dask.dataframe as dd
import pandas as pd
import glob
import os
import dask
import matplotlib.pyplot as plt
from dask.distributed import Client, progress
import numpy as np


In [2]:
import dask
import dask . dataframe as dd
import dask . array as da
from dask . distributed import Client
print (f" Wersja Dask : { dask . __version__ }")
# Test podstawowej funkcjonalnosci
client = Client ( processes = False , threads_per_worker =2 , n_workers =1)
print (f" Dashboard : { client . dashboard_link }")

 Wersja Dask : 2025.5.0
 Dashboard : http://192.168.100.36:8787/status


In [None]:
# Start a local Dask client
client = Client()
print(f"Dashboard link: {client.dashboard_link}")

In [None]:
res_path = "../../../resources"
excel_files = glob.glob(os.path.join(res_path, "*.xlsx"))
print(f"Found {len(excel_files)} files to process")

In [None]:
def common_elements(list1, list2):
    return list(set(list1) & set(list2))

common_columns = None
all_column_types = {}

print("\n--- COLUMN ANALYSIS ---")
for file in excel_files:
    try:
        print(f"File: {file}")
        df = pd.read_excel(file)
        
        df.columns = [col.strip().lower().replace(" ", "_") for col in df.columns]
        
        print("Columns and datatypes:")
        for col, dtype in df.dtypes.items():
            print(f"  - {col}: {dtype}")
        
        for col in df.columns:
            if col not in all_column_types:
                all_column_types[col] = []
            all_column_types[col].append(str(df[col].dtype))
        
        if common_columns is None:
            common_columns = df.columns.tolist()
        else:
            common_columns = common_elements(common_columns, df.columns)
    except Exception as e:
        print(f"Error while processing {file}: {e}")

In [57]:
def process_excel_file(filename, common_cols):
    try:
        year_match = None
        for year_pattern in ["20182019", "20192020", "20202021", "20212022", "20222023", "20232024", "20242025"]:
            if year_pattern in filename:
                year_match = year_pattern
                break
                
        rok_szkolny = f"{year_match[:4]}/{year_match[4:]}" if year_match else "Unknown"
        
        print(f"Przetwarzanie pliku: {filename}, rok szkolny: {rok_szkolny}")
        
        df = pd.read_excel(filename)
        
        df.columns = [col.strip().lower().replace(" ", "_") for col in df.columns]
        
        if 'rok_szkolny' not in df.columns:
            df['rok_szkolny'] = rok_szkolny
        
        existing_cols = [col for col in common_cols if col in df.columns]
        df = df[existing_cols]
        
        for col in common_cols:
            if col not in df.columns:
                df[col] = np.nan
        
        for col in df.columns:
            if col in ['liczba_absolwentów', 'liczba_młodocianych_pracowników', 'w_tym_dziewczęta']:
                df[col] = pd.to_numeric(df[col], errors='coerce')
        
        print(f"  Przetworzono poprawnie. Wymiar: {df.shape}")
        return df
    except Exception as e:
        print(f"Błąd podczas przetwarzania {filename}: {e}")
        return pd.DataFrame(columns=common_cols)

In [58]:
# Use Dask's map function to apply processing to each file and create delayed objects
delayed_dfs = [dask.delayed(process_excel_file)(file,common_columns) for file in excel_files]

dask_df = dd.from_delayed(delayed_dfs)

In [None]:

# Example map-reduce operations:

# 1. Count students by year - MAP: group by year, REDUCE: count
students_by_year = dask_df.groupby("rok_szkolny").size().compute()
print("\nStudenci w danych latach:")
print(students_by_year)

In [None]:

# Top professions - MAP: group by profession, REDUCE: count and sort

prof_col = (
    "zawód"
)

top_professions = (
    dask_df.groupby(prof_col)
    .size()
    .compute()
    .sort_values(ascending=False)
    .head(10)
)
print("\nTop 10 Zawodów:")
print(top_professions)


In [None]:

# Trend analysis - MAP: group by year and profession, REDUCE: count
prof_trends = (
    dask_df.groupby(["rok_szkolny", prof_col])
    .size()
    .compute()
    .reset_index(name="count")
)
print("\nTrendy zawodów w latach:")
print(prof_trends.head(10))

In [None]:

# Get top 5 professions
top_5_profs = (
    prof_trends.groupby(prof_col)["count"]
    .sum()
    .sort_values(ascending=False)
    .head(5)
    .index
)

# Filter for just those professions
top_5_trends = prof_trends[prof_trends[prof_col].isin(top_5_profs)]


In [None]:

# 3. Geographic analysis (if you have a region column)
if "region" in dask_df.columns or "wojewodztwo" in dask_df.columns:
    region_col = "region" if "region" in dask_df.columns else "wojewodztwo"

# MAP: group by region, REDUCE: count
students_by_region = (
    dask_df.groupby(region_col).size().compute().sort_values(ascending=False)
)
print("\nUczniowie w województwach:")
print(students_by_region)


In [None]:

# Regional trends over time
region_trends = (
    dask_df.groupby(["rok_szkolny", region_col])
    .size()
    .compute()
    .reset_index(name="count")
)


In [None]:

# 4. Complex aggregation - average students per school by year
school_col = "nazwa_placówki"

# MAP: group by year and school, REDUCE: count students, then average
avg_by_school = dask_df.groupby(["rok_szkolny", school_col]).size().compute()
avg_students = avg_by_school.groupby("rok_szkolny").mean()
print("\nŚrednia liczba studentów na szkołę na rok szkolny:")
print(avg_students)


In [None]:

# 5. Growth calculation - percent change year over year
if students_by_year is not None and len(students_by_year) > 1:
    # Sort by year for proper calculations
    students_by_year = students_by_year.sort_index()
    growth = students_by_year.pct_change() * 100
    print("\nWzrost liczby studentów rok po roku(%):")
    print(growth)


In [None]:

plt.figure(figsize=(12, 6))
students_by_year.plot(kind="bar")
plt.title("Liczba uczniów szkół zawodowych w latach")
plt.xlabel("Rok szkolny")
plt.ylabel("Liczba studentów")
plt.tight_layout()
plt.savefig("../output/students_by_year.png")

if "region_trends" in locals():
    pivot_regions = region_trends.pivot(
        index="rok_szkolny", columns=region_col, values="count"
    )

    plt.style.use('seaborn-v0_8-whitegrid')

    plt.figure(figsize=(14, 10))

    import matplotlib.colors as mcolors
    colors = list(mcolors.TABLEAU_COLORS.values()) + list(mcolors.CSS4_COLORS.values())[:10]

    ax = pivot_regions.plot(
        marker='o',
        markersize=6,
        linewidth=2.5,
        color=colors,
        alpha=0.8,
        ax=plt.gca()
    )

    plt.title("Trend na szkoły zawodowe w danych województwach", fontsize=16, pad=20)
    plt.xlabel("Rok szkolny", fontsize=12, labelpad=10)
    plt.ylabel("Liczba studentów", fontsize=12, labelpad=10)

    plt.xticks(fontsize=10, rotation=45)
    plt.yticks(fontsize=10)

    plt.legend(
        title="Województwo", 
        title_fontsize=12,
        fontsize=10,
        loc="center left", 
        bbox_to_anchor=(1.02, 0.5),  
        frameon=True,  
        facecolor='white', 
        edgecolor='lightgray' 
    )

    plt.grid(True, linestyle='--', alpha=0.7)

    y_max = pivot_regions.max().max() * 1.1  
    plt.ylim(0, y_max)

    if len(pivot_regions.columns) <= 5: 
        for column, color in zip(pivot_regions.columns, colors[:len(pivot_regions.columns)]):
            plt.fill_between(
                pivot_regions.index, 
                pivot_regions[column], 
                alpha=0.1, 
                color=color
            )

    plt.tight_layout()

    plt.savefig("../output/region_trends.png", dpi=300, bbox_inches='tight')

    plt.show()

    client.close()

else:
    print("No valid data found in the files.")