In [7]:
import os
import pandas as pd

DATA_DIR = "data"
PH_DAYS_PATH = os.path.join(DATA_DIR, "ph_v1_days.csv")
PHLIST_PATH = os.path.join(DATA_DIR, "phlist.csv")
PRODS_PATH = os.path.join(DATA_DIR, "prods.csv")

ph_days = pd.read_csv(PH_DAYS_PATH)
phlist = pd.read_csv(PHLIST_PATH)
prods = pd.read_csv(PRODS_PATH)

quasi_ident_days_cols = ["Date"]
quasi_ident_phlist_cols = ["date"]
quasi_ident_prods_cols = ["Date"]

clinical_days_base = ["PH_morning", "PH_midday", "PH_evening", "PH_avg", "Ailment", "Gym", "Liquid"]
clinical_days_extra = [col for col in ph_days.columns if col not in quasi_ident_days_cols + clinical_days_base]
clinical_days_cols = clinical_days_base + clinical_days_extra

clinical_phlist_cols = [col for col in phlist.columns if col not in quasi_ident_phlist_cols]
clinical_prods_cols = [col for col in prods.columns if col not in quasi_ident_prods_cols]

ph_days_personal = ph_days[quasi_ident_days_cols].copy()
ph_days_clinical = ph_days[clinical_days_cols].copy()

phlist_personal = phlist[quasi_ident_phlist_cols].copy()
phlist_clinical = phlist[clinical_phlist_cols].copy()

prods_personal = prods[quasi_ident_prods_cols].copy()
prods_clinical = prods[clinical_prods_cols].copy()

display(ph_days.head())
display(phlist.head())
display(prods.head())

print("ph_days_personal:", ph_days_personal.columns.tolist())
print("ph_days_clinical:", ph_days_clinical.columns.tolist())
print("phlist_personal:", phlist_personal.columns.tolist())
print("phlist_clinical:", phlist_clinical.columns.tolist())
print("prods_personal:", prods_personal.columns.tolist())
print("prods_clinical:", prods_clinical.columns.tolist())


Unnamed: 0,Date,PH_morning,PH_midday,PH_evening,PH_avg,Ailment,Gym,Liquid,Lemon water,Tea(black/green),...,Cookie,Cake,Bun,Croissant,Chocolate,Candies,Halva,Marshmallow,Red caviar,Jelly
0,06.09.2017,5.41,5.25,5.55,5.4,,,1300,250.0,,...,,,,,,,,,,
1,07.09.2017,7.06,5.7,5.32,6.03,,1.0,1300,250.0,,...,,,,,,,,,,
2,08.09.2017,6.26,5.75,5.43,5.81,,,1350,250.0,,...,,,,,,,,,,
3,09.09.2017,5.35,6.85,6.09,6.1,,,2350,250.0,,...,,,,,,,,,,
4,10.09.2017,6.62,6.34,5.23,6.06,,1.0,1600,250.0,,...,,,150.0,,,,,,,


Unnamed: 0,PH,time,date
0,4.56,8.0,09.04.2018
1,5.04,9.0,09.04.2018
2,4.72,11.5,09.04.2018
3,5.02,12.5,09.04.2018
4,5.11,15.2,09.04.2018


Unnamed: 0,Date,Product,Weight,Time
0,09.04.2018,Water,300,7.0
1,09.04.2018,Water,250,8.0
2,09.04.2018,Lemon water,300,9.4
3,09.04.2018,Oatmeal,220,10.3
4,09.04.2018,Fruit tea,250,10.3


ph_days_personal: ['Date']
ph_days_clinical: ['PH_morning', 'PH_midday', 'PH_evening', 'PH_avg', 'Ailment', 'Gym', 'Liquid', 'Lemon water', 'Tea(black/green)', 'Fruit tea', 'Water', 'Latte', 'Mineral water (Esentuki - 4)', 'Mineral water (Borjomi)', 'Mineral water (Prolom)', 'Tequila', 'Red wine', 'White wine', 'Strong alcohol', 'Beer', 'Morse', 'Milk', 'Orange juice', 'Oatmeal', 'Pizza with meat', 'Cheese Pizza', 'Cheesecakes', 'Scrambled eggs', 'Mayonnaise salad', 'Bread', 'Meat', 'Sauce', 'Cutlet', 'Sausage', 'Fish', 'Sushi', 'Cod liver', 'Cheese', 'Curd', 'Sour cream', 'Buckwheat porridge', 'Noodles with vegetables', 'Noodles with shrimp', 'Meat pilaf', 'Meatless pilaf', 'Pea soup with meat', 'Fried vegetables', 'Pyagse with vegetables', 'Dumplings with cabbage', 'Potato dumplings', 'Zucchini Fritters', 'Stewed cabbage', 'Solyanka with meat', 'Eggplant with cheese', 'Pasta', 'Fried potatoes', 'Cooked Potatoes', 'Braised Potatoes', 'Mashed potatoes', 'Apple Pie', 'Patty with cabbage

In [8]:
import os
import pandas as pd
from cryptography.fernet import Fernet
import hashlib

DATA_DIR = "data"
PH_DAYS_PATH = os.path.join(DATA_DIR, "ph_v1_days.csv")

if "ph_days" not in globals():
    ph_days = pd.read_csv(PH_DAYS_PATH)

fernet_key = Fernet.generate_key()
cipher = Fernet(fernet_key)

ph_days_enc = ph_days.copy()
ph_days_enc["Ailment_enc"] = ph_days_enc["Ailment"].astype(str).apply(
    lambda x: cipher.encrypt(x.encode()).decode()
)

print("[Etap 2.1] Klucz szyfrujący wygenerowany.\n")
print("[Etap 2.1] Przykład zaszyfrowanej kolumny Ailment:")
display(ph_days_enc[["Ailment", "Ailment_enc"]].head())

def compute_file_hash(path, algo="sha256"):
    h = hashlib.new(algo)
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            h.update(chunk)
    return h.hexdigest()

original_hash = compute_file_hash(PH_DAYS_PATH, "sha256")
print("\n[Etap 2.2] SHA-256 oryginalnego pliku ph_v1_days.csv:")
print(original_hash)

PH_DAYS_MOD_PATH = os.path.join(DATA_DIR, "ph_v1_days_modified.csv")
ph_days_mod = ph_days.copy()
if "PH_avg" in ph_days_mod.columns:
    ph_days_mod.loc[0, "PH_avg"] = ph_days_mod.loc[0, "PH_avg"] + 0.0001
else:
    first_num_col = ph_days_mod.select_dtypes(include=["number"]).columns[0]
    ph_days_mod.loc[0, first_num_col] = ph_days_mod.loc[0, first_num_col] + 0.0001

ph_days_mod.to_csv(PH_DAYS_MOD_PATH, index=False)
modified_hash = compute_file_hash(PH_DAYS_MOD_PATH, "sha256")

print("\n[Etap 2.3] SHA-256 po zmianie jednego elementu w danych:")
print("Oryginał :", original_hash)
print("Po zmianie:", modified_hash)
print("\n[Etap 2.3] Czy skróty są równe?", original_hash == modified_hash)

[Etap 2.1] Klucz szyfrujący wygenerowany.

[Etap 2.1] Przykład zaszyfrowanej kolumny Ailment:


Unnamed: 0,Ailment,Ailment_enc
0,,gAAAAABpL-NvwjBARyBSrA10KUFehGFcd6vKtnLdR4uI-q...
1,,gAAAAABpL-NvCvkblJj8cDx5ysCI3KA9sB478g7sQps2tn...
2,,gAAAAABpL-NveVCRUaaoyYc4cJD66Dvk8ONBG-azQ-2ymO...
3,,gAAAAABpL-Nvtz_QQVXSgcF8uJI1Ugk93oVkt4APvq-RUS...
4,,gAAAAABpL-NvdwrZOUdVxGDmpkrH0pGiaSG77d2BURgeqf...



[Etap 2.2] SHA-256 oryginalnego pliku ph_v1_days.csv:
44fe47a40f36f280672effdca85a8c595aa6b93a1ff45feeac1fc5caf9f3eb2f

[Etap 2.3] SHA-256 po zmianie jednego elementu w danych:
Oryginał : 44fe47a40f36f280672effdca85a8c595aa6b93a1ff45feeac1fc5caf9f3eb2f
Po zmianie: 16bc76f438f466ef071097f6ef4e85374218b53ad0b62119529693c71c58cf86

[Etap 2.3] Czy skróty są równe? False


In [9]:
import uuid
from datetime import datetime
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression

DATA_DIR = "data"
PH_DAYS_PATH = os.path.join(DATA_DIR, "ph_v1_days.csv")

if "ph_days" not in globals():
    ph_days = pd.read_csv(PH_DAYS_PATH)

dfb = ph_days.copy()

base_cols = dfb.columns.tolist()
diet_cols_days = [
    c for c in base_cols
    if c not in [
        "Date",
        "PH_morning",
        "PH_midday",
        "PH_evening",
        "PH_avg",
        "Ailment",
        "Gym",
        "Liquid",
    ]
]

dfb["Date_parsed"] = pd.to_datetime(dfb["Date"], format="%d.%m.%Y")
dfb["Date_ordinal"] = dfb["Date_parsed"].map(lambda x: x.toordinal())

dfb["visit_token"] = [str(uuid.uuid4()) for _ in range(len(dfb))]

print("[Etap 3.1] Przykład pseudonimizacji daty na losowy token:")
display(dfb[["Date", "visit_token"]].head())

dfb["PH_avg_group"] = pd.cut(
    dfb["PH_avg"],
    bins=[0, 6.4, 6.8, 14],
    labels=["low", "optimal", "high"],
    include_lowest=True,
)

ph_days_anonymized = dfb.drop(columns=["Date", "visit_token"])

print("\n[Etap 3.2] Kolumny przed anonimizacją:")
print(sorted(set(ph_days.columns.tolist())))

print("\n[Etap 3.2] Kolumny po anonimizacji (bez Date i visit_token):")
print(sorted(set(ph_days_anonymized.columns.tolist())))

dfb["target_optimal_ph"] = (
    (dfb["PH_avg"] >= 6.4) & (dfb["PH_avg"] <= 6.8)
).astype(int)

feature_cols_before = diet_cols_days + ["Gym", "Liquid", "Date_ordinal"]

Xb = dfb[feature_cols_before].copy()
Xb[diet_cols_days] = Xb[diet_cols_days].fillna(0)

Xb["Gym"] = Xb["Gym"].fillna("No_info").astype(str)
Xb["Liquid"] = Xb["Liquid"].fillna("No_info").astype(str)

Xb = pd.get_dummies(Xb, columns=["Gym", "Liquid"], drop_first=True)

Xb = Xb.dropna()
yb = dfb.loc[Xb.index, "target_optimal_ph"]

if len(Xb) > 1 and yb.nunique() > 1:
    Xb_train, Xb_test, yb_train, yb_test = train_test_split(
        Xb, yb, test_size=0.25, random_state=42
    )
    model_before = LogisticRegression(max_iter=1000)
    model_before.fit(Xb_train, yb_train)
    acc_before = model_before.score(Xb_test, yb_test)
else:
    acc_before = np.nan

feature_cols_after = diet_cols_days + ["Gym", "Liquid"]

Xa = ph_days_anonymized[feature_cols_after].copy()
Xa[diet_cols_days] = Xa[diet_cols_days].fillna(0)
Xa["Gym"] = Xa["Gym"].fillna("No_info").astype(str)
Xa["Liquid"] = Xa["Liquid"].fillna("No_info").astype(str)
Xa = pd.get_dummies(Xa, columns=["Gym", "Liquid"], drop_first=True)

Xa = Xa.dropna()
ya = dfb.loc[Xa.index, "target_optimal_ph"]

common_cols = sorted(set(Xb.columns) & set(Xa.columns))
Xa = Xa[common_cols]
Xb_aligned = Xb[common_cols].loc[Xa.index]

if len(Xa) > 1 and ya.nunique() > 1:
    Xa_train, Xa_test, ya_train, ya_test = train_test_split(
        Xa, ya, test_size=0.25, random_state=42
    )
    model_after = LogisticRegression(max_iter=1000)
    model_after.fit(Xa_train, ya_train)
    acc_after = model_after.score(Xa_test, ya_test)
else:
    acc_after = np.nan

print("\n[Etap 3.3] Dokładność modelu przed anonimizacją:", acc_before)
print("[Etap 3.3] Dokładność modelu po anonimizacji   :", acc_after)


[Etap 3.1] Przykład pseudonimizacji daty na losowy token:


Unnamed: 0,Date,visit_token
0,06.09.2017,f3e9a152-9f0d-4143-ac4f-c85bd2ca72a4
1,07.09.2017,ac341192-b905-48cf-a122-9eae9815234c
2,08.09.2017,c8bec1d5-9830-4c91-ba59-bb49db9f6bd9
3,09.09.2017,d52c4fbf-4ecc-4528-a520-f19b697cb11f
4,10.09.2017,698e03c6-0a75-40ac-ac94-1a361cca840c



[Etap 3.2] Kolumny przed anonimizacją:
['Ailment', 'Apple', 'Apple Pie', 'Banana', 'Beer', 'Braised Potatoes', 'Bread', 'Buckwheat porridge', 'Bun', 'Cake', 'Candies', 'Cheese', 'Cheese Pie', 'Cheese Pizza', 'Cheesecakes', 'Chocolate', 'Cod liver', 'Cooked Potatoes', 'Cookie', 'Croissant', 'Curd', 'Curd Pie', 'Cutlet', 'Date', 'Dried apricots', 'Dumplings with cabbage', 'Egg Pie', 'Eggplant caviar', 'Eggplant with cheese', 'Fish', 'Fried potatoes', 'Fried vegetables', 'Fruit tea', 'Grapes', 'Gym', 'Halva', 'Ice cream', 'Jelly', 'Khachapuri', 'Kumquat', 'Latte', 'Lemon water', 'Liquid', 'Marshmallow', 'Mashed potatoes', 'Mayonnaise salad', 'Meat', 'Meat pie', 'Meat pilaf', 'Meatless pilaf', 'Melon', 'Milk', 'Mineral water (Borjomi)', 'Mineral water (Esentuki - 4)', 'Mineral water (Prolom)', 'Morse', 'Noodles with shrimp', 'Noodles with vegetables', 'Oatmeal', 'Orange', 'Orange juice', 'PH_avg', 'PH_evening', 'PH_midday', 'PH_morning', 'Pasta', 'Paste', 'Patty with cabbage', 'Pea soup w

STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(



[Etap 3.3] Dokładność modelu przed anonimizacją: 0.7358490566037735
[Etap 3.3] Dokładność modelu po anonimizacji   : 0.7169811320754716


In [10]:
from datetime import datetime

try:
    df_anonym = ph_days_anonymized.copy()
except NameError:
    df_anonym = ph_days_clinical.copy()

ROLES = {
    "admin": {
        "columns": df_anonym.columns.tolist()
    },
    "doctor": {
        "columns": [
            "patient_id",
            "Date",
            "PH_morning",
            "PH_midday",
            "PH_evening",
            "PH_avg",
            "Ailment",
            "Gym",
            "Liquid"
        ]
    },
    "analyst": {
        "columns": [
            "PH_avg",
            "Gym",
            "Liquid"
        ]
    }
}

audit_log = []

def audit(user: str, role: str, action: str, status: str):
    audit_log.append({
        "time": datetime.now().isoformat(timespec="seconds"),
        "user": user,
        "role": role,
        "action": action,
        "status": status
    })

def get_data_view(df_in, role: str, user: str = "unknown"):
    if role not in ROLES:
        audit(user, role, "ACCESS_DENIED_UNKNOWN_ROLE", "denied")
        raise ValueError(f"Nieznana rola: {role}")
    allowed_cols = [c for c in ROLES[role]["columns"] if c in df_in.columns]
    view = df_in[allowed_cols].copy()
    audit(user, role, f"ACCESS_GRANTED_{','.join(allowed_cols)}", "granted")
    return view

print("\n[Etap 4] Przykładowe widoki danych dla różnych ról:")

df_admin_view = get_data_view(df_anonym, role="admin", user="alice_admin")
print("\nRola admin (pierwsze 3 wiersze):")
print(df_admin_view.head(3))

df_doctor_view = get_data_view(df_anonym, role="doctor", user="bob_doctor")
print("\nRola doctor (pierwsze 3 wiersze):")
print(df_doctor_view.head(3))

df_analyst_view = get_data_view(df_anonym, role="analyst", user="carol_analyst")
print("\nRola analyst (pierwsze 3 wiersze):")
print(df_analyst_view.head(3))

print("\n[Etap 4] Log audytu:")
for entry in audit_log:
    print(entry)



[Etap 4] Przykładowe widoki danych dla różnych ról:

Rola admin (pierwsze 3 wiersze):
   PH_morning  PH_midday  PH_evening  PH_avg  Ailment  Gym  Liquid  \
0        5.41       5.25        5.55    5.40      NaN  NaN    1300   
1        7.06       5.70        5.32    6.03      NaN  1.0    1300   
2        6.26       5.75        5.43    5.81      NaN  NaN    1350   

   Lemon water  Tea(black/green)  Fruit tea  ...  Croissant  Chocolate  \
0        250.0               NaN      600.0  ...        NaN        NaN   
1        250.0               NaN      600.0  ...        NaN        NaN   
2        250.0               NaN      600.0  ...        NaN        NaN   

   Candies  Halva  Marshmallow  Red caviar  Jelly  Date_parsed  Date_ordinal  \
0      NaN    NaN          NaN         NaN    NaN   2017-09-06        736578   
1      NaN    NaN          NaN         NaN    NaN   2017-09-07        736579   
2      NaN    NaN          NaN         NaN    NaN   2017-09-08        736580   

   PH_avg_grou

In [13]:
import os
import pandas as pd
import numpy as np
import hashlib
from datetime import datetime
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression

PATH = os.path.join("data", "ph_v1_days.csv")
df_raw = pd.read_csv(PATH)

def compute_file_hash(path, algo="sha256"):
    h = hashlib.new(algo)
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            h.update(chunk)
    return h.hexdigest()

original_hash = compute_file_hash(PATH, "sha256")
TRAIN_HASH = original_hash

def verify_data_integrity(path: str, expected_hash: str) -> bool:
    current_hash = compute_file_hash(path, "sha256")
    return current_hash == expected_hash

print("\n[Etap 5.1] Weryfikacja integralności pliku wejściowego:")
print("Integralność OK?", verify_data_integrity(PATH, TRAIN_HASH))

df_model = df_raw.copy()
df_model["target_optimal_ph"] = ((df_model["PH_avg"] >= 6.4) & (df_model["PH_avg"] <= 6.8)).astype(int)
df_model["Date_parsed"] = pd.to_datetime(df_model["Date"], format="%d.%m.%Y")
df_model["Date_ordinal"] = df_model["Date_parsed"].map(lambda x: x.toordinal())
df_model["Gym"] = df_model["Gym"].fillna(0)
df_model["Liquid"] = df_model["Liquid"].fillna(0)

ALLOWED_PH_RANGE = (4.0, 9.0)
ALLOWED_LIQUID_RANGE = (0, 6000)

def validate_input_row(row: pd.Series) -> bool:
    try:
        if not (ALLOWED_PH_RANGE[0] <= row["PH_morning"] <= ALLOWED_PH_RANGE[1]):
            return False
        if not (ALLOWED_PH_RANGE[0] <= row["PH_midday"] <= ALLOWED_PH_RANGE[1]):
            return False
        if not (ALLOWED_PH_RANGE[0] <= row["PH_evening"] <= ALLOWED_PH_RANGE[1]):
            return False
        if not (ALLOWED_PH_RANGE[0] <= row["PH_avg"] <= ALLOWED_PH_RANGE[1]):
            return False
        if not (ALLOWED_LIQUID_RANGE[0] <= row["Liquid"] <= ALLOWED_LIQUID_RANGE[1]):
            return False
        gym_val = 0 if pd.isna(row["Gym"]) else row["Gym"]
        if gym_val not in [0, 1]:
            return False
    except KeyError:
        return False
    return True

print("\n[Etap 5.2] Walidacja przykładowych wierszy:")
for i in range(3):
    print(i, "=>", validate_input_row(df_raw.iloc[i]))

feature_cols = ["PH_morning", "PH_midday", "PH_evening", "PH_avg", "Liquid", "Gym", "Date_ordinal"]
X = df_model[feature_cols].copy()
X = X.fillna(0)
y = df_model["target_optimal_ph"]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.25, random_state=42
)

model = LogisticRegression(max_iter=1000)
model.fit(X_train, y_train)

print("\n[Etap 5.3] Trenowanie modelu zakończone.")
print("Accuracy (train):", round(model.score(X_train, y_train), 3))
print("Accuracy (test):", round(model.score(X_test, y_test), 3))

def secure_predict(model, row: pd.Series, data_path: str, expected_hash: str):
    if not verify_data_integrity(data_path, expected_hash):
        raise RuntimeError("Dane zostały zmodyfikowane – przerwij predykcję!")
    if not validate_input_row(row):
        raise ValueError("Nieprawidłowe dane wejściowe – odrzucono wiersz.")
    r = row.copy()
    r["Gym"] = 0 if pd.isna(r["Gym"]) else r["Gym"]
    r["Liquid"] = 0 if pd.isna(r["Liquid"]) else r["Liquid"]
    date_parsed = datetime.strptime(r["Date"], "%d.%m.%Y")
    r["Date_ordinal"] = date_parsed.toordinal()
    row_df = pd.DataFrame([{c: r[c] for c in feature_cols}])
    row_df = row_df.fillna(0)
    proba_optimal = model.predict_proba(row_df)[0, 1]
    pred = model.predict(row_df)[0]
    return pred, proba_optimal

print("\n[Etap 5.4] Bezpieczna predykcja dla wybranego dnia:")
sample_row = df_raw.iloc[0]
pred, proba = secure_predict(model, sample_row, PATH, TRAIN_HASH)
print("Dzień 0 -> predykcja target_optimal_ph:", int(pred), "prawdopodobieństwo:", round(proba, 3))

print("\n[Etap 5.5] Przykładowe kategorie ryzyk w systemach SI na danych medycznych:")
print("- Modyfikacja danych treningowych (data poisoning)")
print("- Manipulacja danymi wejściowymi (data tampering, adversarial examples)")
print("- Wycieki prywatności (membership inference, model inversion)")
print("- Błędne etykiety i bias w danych (stronnicze decyzje kliniczne)")
print("- Brak kontroli integralności i audytu dostępu do danych")



[Etap 5.1] Weryfikacja integralności pliku wejściowego:
Integralność OK? True

[Etap 5.2] Walidacja przykładowych wierszy:
0 => True
1 => True
2 => True

[Etap 5.3] Trenowanie modelu zakończone.
Accuracy (train): 0.925
Accuracy (test): 0.868

[Etap 5.4] Bezpieczna predykcja dla wybranego dnia:
Dzień 0 -> predykcja target_optimal_ph: 0 prawdopodobieństwo: 0.008

[Etap 5.5] Przykładowe kategorie ryzyk w systemach SI na danych medycznych:
- Modyfikacja danych treningowych (data poisoning)
- Manipulacja danymi wejściowymi (data tampering, adversarial examples)
- Wycieki prywatności (membership inference, model inversion)
- Błędne etykiety i bias w danych (stronnicze decyzje kliniczne)
- Brak kontroli integralności i audytu dostępu do danych


In [14]:
from cryptography.fernet import Fernet
import os
import hashlib

DATA_DIR = "data"
PATH = os.path.join(DATA_DIR, "ph_v1_days.csv")
KEY_PATH = "file_key.key"
ENC_PATH = PATH + ".enc"

if os.path.exists(KEY_PATH):
    with open(KEY_PATH, "rb") as f:
        key = f.read()
    print("[Etap 6] Istniejący klucz wczytany z:", KEY_PATH)
else:
    key = Fernet.generate_key()
    with open(KEY_PATH, "wb") as f:
        f.write(key)
    print("[Etap 6] Nowy klucz wygenerowany i zapisany w:", KEY_PATH)

cipher = Fernet(key)

with open(PATH, "rb") as f:
    plaintext = f.read()

encrypted = cipher.encrypt(plaintext)
with open(ENC_PATH, "wb") as f:
    f.write(encrypted)

print("[Etap 6] Plik zaszyfrowany zapisano jako:", ENC_PATH)

def sha256_bytes(b: bytes) -> str:
    return hashlib.sha256(b).hexdigest()

decrypted = cipher.decrypt(encrypted)

hash_orig = sha256_bytes(plaintext)
hash_dec = sha256_bytes(decrypted)

print("[Etap 6] SHA-256 oryginału:      ", hash_orig)
print("[Etap 6] SHA-256 po odszyfrowaniu:", hash_dec)
print("[Etap 6] Integralność poprawna?  ", hash_orig == hash_dec)


[Etap 6] Nowy klucz wygenerowany i zapisany w: file_key.key
[Etap 6] Plik zaszyfrowany zapisano jako: data\ph_v1_days.csv.enc
[Etap 6] SHA-256 oryginału:       44fe47a40f36f280672effdca85a8c595aa6b93a1ff45feeac1fc5caf9f3eb2f
[Etap 6] SHA-256 po odszyfrowaniu: 44fe47a40f36f280672effdca85a8c595aa6b93a1ff45feeac1fc5caf9f3eb2f
[Etap 6] Integralność poprawna?   True


In [15]:
import pandas as pd

print("\n[Etap 7] Analiza logów audytu:")

if "audit_log" not in globals() or len(audit_log) == 0:
    print("Brak wpisów w audit_log – najpierw wywołaj get_data_view dla różnych ról.")
else:
    audit_df = pd.DataFrame(audit_log)

    print("\n[Etap 7.1] Podgląd logów (pierwsze wiersze):")
    display(audit_df.head())

    print("\n[Etap 7.2] Statystyki ogólne:")
    print("Liczba zdarzeń:", len(audit_df))
    print("\nZdarzenia wg statusu:")
    print(audit_df["status"].value_counts())
    print("\nZdarzenia wg użytkownika:")
    print(audit_df["user"].value_counts())
    print("\nZdarzenia wg roli:")
    print(audit_df["role"].value_counts())

    if audit_df["action"].str.contains("ACCESS_DENIED").any():
        print("\nWpisy z odmową dostępu (ACCESS_DENIED):")
        display(audit_df[audit_df["action"].str.contains("ACCESS_DENIED")])

    improvements = [
        "Wprowadzić limit nieudanych prób dostępu i blokadę konta po przekroczeniu progu.",
        "Logować dodatkowo adres IP i identyfikator urządzenia użytkownika.",
        "Regularnie przeglądać logi pod kątem nietypowych wzorców (np. częste zmiany ról).",
        "Ograniczyć rolę 'admin' wyłącznie do dedykowanych kont technicznych.",
        "Włączyć alerty w czasie zbliżonym do rzeczywistego dla zdarzeń ACCESS_DENIED.",
    ]

    print("\n[Etap 7.3] Propozycje ulepszeń polityk bezpieczeństwa:")
    for rec in improvements:
        print("-", rec)



[Etap 7] Analiza logów audytu:

[Etap 7.1] Podgląd logów (pierwsze wiersze):


Unnamed: 0,time,user,role,action,status
0,2025-12-03T08:19:50,alice_admin,admin,"ACCESS_GRANTED_PH_morning,PH_midday,PH_evening...",granted
1,2025-12-03T08:19:50,bob_doctor,doctor,"ACCESS_GRANTED_PH_morning,PH_midday,PH_evening...",granted
2,2025-12-03T08:19:50,carol_analyst,analyst,"ACCESS_GRANTED_PH_avg,Gym,Liquid",granted



[Etap 7.2] Statystyki ogólne:
Liczba zdarzeń: 3

Zdarzenia wg statusu:
status
granted    3
Name: count, dtype: int64

Zdarzenia wg użytkownika:
user
alice_admin      1
bob_doctor       1
carol_analyst    1
Name: count, dtype: int64

Zdarzenia wg roli:
role
admin      1
doctor     1
analyst    1
Name: count, dtype: int64

[Etap 7.3] Propozycje ulepszeń polityk bezpieczeństwa:
- Wprowadzić limit nieudanych prób dostępu i blokadę konta po przekroczeniu progu.
- Logować dodatkowo adres IP i identyfikator urządzenia użytkownika.
- Regularnie przeglądać logi pod kątem nietypowych wzorców (np. częste zmiany ról).
- Ograniczyć rolę 'admin' wyłącznie do dedykowanych kont technicznych.
- Włączyć alerty w czasie zbliżonym do rzeczywistego dla zdarzeń ACCESS_DENIED.
