In [None]:
# import pandas as pd

# RSV = pd.read_csv(
#     "07_RSV_vlastnik_provozovatel_ico.csv",nrows=8000000,sep=",",
#     on_bad_lines="skip", quoting=0, encoding="utf-8",low_memory=False)

# RSV.head(2)

#Typ subjektu 1 FO s trvalým v CR, 2 PO nebo podnikající FO 3 neztotožněný (zahraniční osoby, historická data)
#Vztah k vozidlu: 1-vlastník, 2-provozovatel

Unnamed: 0,PČV,Typ subjektu,Vztah k vozidlu,IČO,Datum od,Datum do
0,8766865,3,2,,24.05.1995,05.03.1998
1,10240842,1,2,,29.10.2001,19.12.2011


##### jsou tu nevalidní data v Datum od a Datum do, projít po chuncích řádky těchto dvou sloupců a hodnoty, které nejsou ve formátu dd.mm.rrrr a nebo nejsou v rozmezí 1950 až 2030, tak dát do nových csv. nevalidní, kde je pak upravíme

In [2]:
import pandas as pd
import csv
import numpy as np

# ---------- Automatická detekce oddělovače ----------
with open("07_RSV_vlastnik_provozovatel_ico.csv", encoding="utf-8-sig") as f:
    sample = f.read(2048)
    dialect = csv.Sniffer().sniff(sample)
    delimiter = dialect.delimiter
    print("Zjištěný oddělovač:", repr(delimiter))

# ---------- Cesty k souborům ----------
chunk_size = 100_000
input_file = "07_RSV_vlastnik_provozovatel_ico.csv"
output_file = "07_RSV_vlastnik_final.csv"

error_od_file = "07_RSV_nevalidni_datum_od.csv"
error_do_file = "07_RSV_nevalidni_datum_do.csv"

od_txt = "nevalidni_datum_od.txt"
do_txt = "nevalidni_datum_do.txt"

# ---------- Stavy ----------
bad_od_count = 0
bad_do_count = 0

is_first_chunk = True
is_first_error_od = True
is_first_error_do = True

unique_invalid_od = set()
unique_invalid_do = set()

# ---------- Zpracování po částech ----------
for chunk in pd.read_csv(
    input_file,
    sep=delimiter,
    quotechar='"',
    on_bad_lines="warn",
    quoting=csv.QUOTE_MINIMAL,
    encoding="utf-8-sig",
    low_memory=False,
    chunksize=chunk_size
):
    if is_first_chunk:
        print("Nalezené sloupce:", chunk.columns.tolist())

    expected_cols = ["PČV", "Typ subjektu", "Vztah k vozidlu", "IČO", "Datum od", "Datum do"]
    if not all(col in chunk.columns for col in expected_cols):
        print("⚠️ Některé očekávané sloupce chybí. Přeskakuji chunk.")
        continue

    # Parsování datumů
    chunk["Datum od parsed"] = pd.to_datetime(chunk["Datum od"], format="%d.%m.%Y", errors="coerce")
    chunk["Datum do parsed"] = pd.to_datetime(chunk["Datum do"], format="%d.%m.%Y", errors="coerce")

    # Detekce nevalidních formátů (např. špatný zápis)
    invalid_od_mask = chunk["Datum od"].notna() & chunk["Datum od parsed"].isna()
    invalid_do_mask = chunk["Datum do"].notna() & chunk["Datum do parsed"].isna()

    # Přidání kontroly rozsahu roku
    invalid_od_year_mask = (
        chunk["Datum od parsed"].notna() &
        ~chunk["Datum od parsed"].dt.year.between(1951, 2029)
    )

    invalid_do_year_mask = (
        chunk["Datum do parsed"].notna() &
        ~chunk["Datum do parsed"].dt.year.between(1951, 2029)
    )

    # Spojení masek (špatný formát nebo špatný rok)
    invalid_od_mask |= invalid_od_year_mask
    invalid_do_mask |= invalid_do_year_mask

    # Uložení nevalidních řádků
    od_chunk = chunk[invalid_od_mask]
    do_chunk = chunk[invalid_do_mask]

    if not od_chunk.empty:
        od_chunk.drop(columns=["Datum od parsed", "Datum do parsed"]).to_csv(
            error_od_file,
            mode="w" if is_first_error_od else "a",
            header=is_first_error_od,
            index=False,
            sep=";"
        )
        is_first_error_od = False
        bad_od_count += len(od_chunk)
        unique_invalid_od.update(od_chunk["Datum od"].dropna().unique())

    if not do_chunk.empty:
        do_chunk.drop(columns=["Datum od parsed", "Datum do parsed"]).to_csv(
            error_do_file,
            mode="w" if is_first_error_do else "a",
            header=is_first_error_do,
            index=False,
            sep=";"
        )
        is_first_error_do = False
        bad_do_count += len(do_chunk)
        unique_invalid_do.update(do_chunk["Datum do"].dropna().unique())

    # Výběr platných řádků
    valid_mask = ~invalid_od_mask & ~invalid_do_mask
    valid_chunk = chunk[valid_mask]

    if not valid_chunk.empty:
        valid_chunk.drop(columns=["Datum od parsed", "Datum do parsed"]).to_csv(
            output_file,
            mode="w" if is_first_chunk else "a",
            header=is_first_chunk,
            index=False,
            sep=";"
        )
        is_first_chunk = False

# ---------- Uložení unikátních nevalidních hodnot ----------
with open(od_txt, "w", encoding="utf-8") as f:
    f.write("Unikátní nevalidní hodnoty ve sloupci 'Datum od':\n")
    for val in sorted(unique_invalid_od):
        f.write(f"{val}\n")

with open(do_txt, "w", encoding="utf-8") as f:
    f.write("Unikátní nevalidní hodnoty ve sloupci 'Datum do':\n")
    for val in sorted(unique_invalid_do):
        f.write(f"{val}\n")

# ---------- Výstup ----------
print("✅ Hotovo.")
print(f"Nevalidních 'Datum od': {bad_od_count}")
print(f"Nevalidních 'Datum do': {bad_do_count}")
print("Výstupy:")
print("-", output_file)
print("-", error_od_file)
print("-", error_do_file)
print("-", od_txt)
print("-", do_txt)

Zjištěný oddělovač: ','
Nalezené sloupce: ['PČV', 'Typ subjektu', 'Vztah k vozidlu', 'IČO', 'Datum od', 'Datum do']
✅ Hotovo.
Nevalidních 'Datum od': 18886
Nevalidních 'Datum do': 2467
Výstupy:
- 07_RSV_vlastnik_final.csv
- 07_RSV_nevalidni_datum_od.csv
- 07_RSV_nevalidni_datum_do.csv
- nevalidni_datum_od.txt
- nevalidni_datum_do.txt


#### níže je kód, kde se rovnou, když se narazí na nevalidní hodnotu, tak se přepíše na NaN

In [1]:
import pandas as pd
import csv
import numpy as np

# ---------- Nastavení ----------
chunk_size = 100_000
input_file = "07_RSV_vlastnik_provozovatel_ico.csv"
output_file = "07_RSV_vlastnik_finalni_verze.csv"

bad_od_count = 0
bad_do_count = 0
is_first_chunk = True

# ---------- Detekce oddělovače ----------
with open(input_file, encoding="utf-8-sig") as f:
    sample = f.read(2048)
    dialect = csv.Sniffer().sniff(sample)
    delimiter = dialect.delimiter
    print("Zjištěný oddělovač:", repr(delimiter))

# ---------- Zpracování po částech ----------
for chunk in pd.read_csv(
    input_file,
    sep=delimiter,
    quotechar='"',
    on_bad_lines="warn",
    quoting=csv.QUOTE_MINIMAL,
    encoding="utf-8-sig",
    low_memory=False,
    chunksize=chunk_size
):
    if is_first_chunk:
        print("Nalezené sloupce:", chunk.columns.tolist())

    expected_cols = ["PČV", "Typ subjektu", "Vztah k vozidlu", "IČO", "Datum od", "Datum do"]
    if not all(col in chunk.columns for col in expected_cols):
        print("⚠️ Některé očekávané sloupce chybí. Přeskakuji chunk.")
        continue

    # ---------- Parsování datumů ----------
    chunk["Datum od parsed"] = pd.to_datetime(chunk["Datum od"], format="%d.%m.%Y", errors="coerce")
    chunk["Datum do parsed"] = pd.to_datetime(chunk["Datum do"], format="%d.%m.%Y", errors="coerce")

    # ---------- Detekce nevalidních hodnot ----------
    invalid_od_mask = chunk["Datum od"].notna() & chunk["Datum od parsed"].isna()
    invalid_do_mask = chunk["Datum do"].notna() & chunk["Datum do parsed"].isna()

    # ---------- Kontrola rozsahu let ----------
    invalid_od_year_mask = (
        chunk["Datum od parsed"].notna() &
        ~chunk["Datum od parsed"].dt.year.between(1951, 2029)
    )
    invalid_do_year_mask = (
        chunk["Datum do parsed"].notna() &
        ~chunk["Datum do parsed"].dt.year.between(1951, 2029)
    )

    # ---------- Kombinace masek ----------
    invalid_od_mask |= invalid_od_year_mask
    invalid_do_mask |= invalid_do_year_mask

    # ---------- Přepis na NaN ----------
    chunk.loc[invalid_od_mask, "Datum od"] = np.nan
    chunk.loc[invalid_do_mask, "Datum do"] = np.nan

    # ---------- Statistiky ----------
    bad_od_count += invalid_od_mask.sum()
    bad_do_count += invalid_do_mask.sum()

    # ---------- Uložení bez pomocných sloupců ----------
    chunk.drop(columns=["Datum od parsed", "Datum do parsed"]).to_csv(
        output_file,
        mode="w" if is_first_chunk else "a",
        header=is_first_chunk,
        index=False,
        sep=";"
    )
    is_first_chunk = False

# ---------- Výstup ----------
print("✅ Hotovo.")
print(f"Nevalidních hodnot ve sloupci 'Datum od': {bad_od_count}")
print(f"Nevalidních hodnot ve sloupci 'Datum do': {bad_do_count}")
print("Výstupní soubor:", output_file)


Zjištěný oddělovač: ','
Nalezené sloupce: ['PČV', 'Typ subjektu', 'Vztah k vozidlu', 'IČO', 'Datum od', 'Datum do']
✅ Hotovo.
Nevalidních hodnot ve sloupci 'Datum od': 18886
Nevalidních hodnot ve sloupci 'Datum do': 2467
Výstupní soubor: 07_RSV_vlastnik_finalni_verze.csv
