In [1]:
from abc import abstractmethod, ABC
from json import load
from numbers import Real
from pathlib import Path
from typing import Dict, Iterable, Iterator, Tuple, Union, Any, List, Callable
from enum import Enum
from collections.abc import MutableSequence

# třída Typ = výčtový typ = Enum - může být buď Float (desetinné číslo) nebo String (řetězec). přiřadí se jim hodnoty 0 a 1
class Type(Enum):
    Float = 0
    String = 1

def to_float(objekt) -> float:
    """
převede objekt na dat. typ Float (typ None se převede na None)
    """
    return float(objekt) if objekt is not None else None

def to_str(objekt) -> str:
    """
převede objekt na dat. typ String. (objekt None to ignoruje)
    """
    return str(objekt) if objekt is not None else None

# iterovatelný objekt = iterable = objekt, který umožňuje procházet své prvky jeden po druhém (umožňuje např. for cyklus)
def common(iterable):
    """
    vrátí "True", pokud jsou všechny hodnoty iterovatelného objektu stejné
    """
    try:
        iterátor = iter(iterable)
        prvni_hodnota = next(iterátor)
    except StopIteration:
        raise ValueError("Iterable is empty")

    for hodnota in iterátor:
        if hodnota != prvni_hodnota:
            raise ValueError("Ne všechny hodnoty jsou stejné.")
    return prvni_hodnota

# MutableSequence = knihovna, která vytváří rozhraní pro měnitelné sekvence (př. list)
# dědíme třídu MutableSequence - vyžaduje, abychom implementovali metody: __init__(), __getitem__(), __setitem__(), __delitem__(), __len__(), insert()
class Column(MutableSequence):
    """
    Reprezentace sloupce DataFramu. Datové typy jednotlivých objektů (sloupců): Float (a None), String (a None)
    """
    def __init__(self, data: Iterable, dtype: Type):
        self.dtype = dtype
        self._cast = to_float if self.dtype == Type.Float else to_str     
        # self._cast = převedení dat na vybraný dat. typ (dtype = Float -> to_float / dtype = String -> to_str
        self._data = [self._cast(hodnota) for hodnota in data]          
        # data = iterovatelný objekt - list, tuple, dict 
        # self._data - prochází všechny hodnoty v iterovatelném objektu data a přetypovává je pomocí self._cast

    def __len__(self) -> int:                                   # umožní použití funkce len() na objekt třídy Column
        """
        Vrátí počet řádků sloupce (objektu)
        """
        return len(self._data)

# Union[int, slice] = datový typ může být buď int (index) nebo slice (výřez indexů, př. od 1 do 3 - 1:3)
# Union[float, str, list[str], list[float]] = výstup může být buď float, string (pokud i: int) nebo list[float], list[str] (pokud i: slice)
    def __getitem__(self, i: Union[int, slice]) -> Union[float, str, list[str], list[float]]:
        """
        Získání hodnoty pomocí indexu, vrací položku nebo list položek na daném indexu, rozsahu indexů
        """
        return self._data[i]

    def __setitem__(self, i: Union[int, slice], hodnota: Any) -> None:
        """
        Nastavení hodnoty pomocí indexu, i: na jaký index/výřez indexů , hodnota: co přiřadit (hodnota /list hodnot)
        """
        self._data[i] = self._cast(hodnota)

    def __repr__(self):
        """
        Vrátí řetězcovou reprezentaci objektu = sloupce
        """
        return "\n".join(str(hodnota) for hodnota in self._data)

    def append(self, polozka: Any) -> None:
        """
        Přidá položku do sloupce - objektu (položka je převedena na float nebo na string, pokud to není číslo)
        """
        self._data.append(self._cast(polozka))

    def insert(self, index: int, polozka: Any) -> None:
        """
        Přidání položky do sloupce na daný index (položka je přetypována na float, nebo na řetězec, pokud položka není číslo pomocí self._cast)
        """
        self._data.insert(index, self._cast(polozka))

    def __delitem__(self, i: Union[int, slice]) -> None:
        """
        Odebrání položek z indexu, nebo podmnožiny sloupce definované jako výřez indexů, př 1:3
        """
        del self._data[i]

    def permute(self, indexy: List[int]) -> 'Column':
        """
        Změní a vrátí objekt, jehož položky jsou definované seznamem indexů. Indexy jsou mezi 0 a len(self) - 1
        """
        assert len(indexy) == len(self)
        permutovany_sloupec = Column([], self.dtype)
        for i in indexy:
            permutovany_sloupec.append(self._data[i])
        return permutovany_sloupec
    
    def copy(self) -> 'Column':
        """
        Vrátí nový sloupec se stejnými položkami
        """
        return Column(self._data, self.dtype)

#   , * , = parametr po musí být zadán při volání funkce, př. get_formatted_item(0, width = 8)
#   format(hodnota, která má být formátována; formátovací řetězec - jak má být hodnota formátována) = vestavěná funkce pythonu
    def get_formatted_item(self, index: int, *, width: int):
        """
        Formátování položek sloupce do řetězců s danou šířkou. Float - zarovnání doprava, String - zarovnání doleva, None - "n/a"
        """
        assert width > 0
        if self._data[index] is None:
            if self.dtype == Type.Float:
                return "n/a".rjust(width)
            else: # self.dtype == Type.String
                return "n/a".ljust(width)
        return format(self._data[index], f"{width}s" if self.dtype == Type.String else f"-{width}.2g")


class DataFrame:
    """
    Třída datových rámců s pojmenovanými sloupci urč. dat. typu
    """
    def __init__(self, sloupce: Dict[str, Column]):
        """
        Slovník, jehož keys jsou názvy sloupců (String) a values jsou instance třídy Column, délky všech sloupců musí být stejné!
        """
        assert len(sloupce) > 0, "Dataframe z prázdného slovníku nepůjde magore"
        self._size = common(len(sloupec) for sloupec in sloupce.values())        # velikost DataFramu - jako velikost jednoho ze sloupců
        self._columns = {nazev: sloupec.copy() for nazev, sloupec in sloupce.items()}   # vytvoření deep kopie slovníku 

    def __getitem__(self, index: int) -> Tuple[Union[str,float]]:
        """
        Getter získá řádek DataFramu podle indexu a vrátí ho jako tuple
        """
        položky_řádku = []
        for sloupec in self._columns.values():
            položky_řádku.append(sloupec[index])
        return tuple(položky_řádku)

    def __iter__(self) -> Iterator[Tuple[Union[str, float]]]:
        """
        Definuje iterátor, který umožňuje iterovat přes řádky DataFramu pomocí: for řádek in df
        """
        for i in range(len(self)):
            yield tuple(sloupec[i] for sloupec in self._columns.values())

    def __len__(self) -> int:
        """
        Vrátí počet řádků DataFramu
        """
        return self._size

    # getter, slouží k získání názvů sloupců v objektu = DataFramu
    @property
    def columns(self) -> Iterable[str]:
        """
        Vrátí názvy sloupců jako iterovatelný objekt
        """
        return self._columns.keys()

    def __repr__(self) -> str:
        """
        Vrátí reprezentaci objektu (v řetězci), formátovanou v řádcích
        """
        reprezentace = []
        reprezentace.append(" ".join(f"{hodnoty_sloupcu:20s}" for hodnoty_sloupcu in self.columns))
        for i in range(len(self)):
            reprezentace.append(" ".join(self._columns[nazev_sloupce].get_formatted_item(i, width=20)
                                     for nazev_sloupce in self.columns))
        return "\n".join(reprezentace)

    def append_column(self, col_name:str, column: Column) -> None:
        """
        Přidá nový sloupec do objektu. Název musí být unikátní
        """
        if col_name in self._columns:
            raise ValueError("Duplicate column name")
        self._columns[col_name] = column.copy()

    # zip(x,y) - vytváří iterátor, kombinující odpovídající položky z více iterovatelných objektů ... podle indexů
    def append_row(self, row: Iterable) -> None:
        """
        Přidá nový řádek do objektu. Parametrem je tuple s hodnotami pro každý sloupec
        """
        for sloupec, hodnota in zip(self._columns.values(), row):
            sloupec.append(hodnota)
        self._size += 1                                                             # zvětšení dataframu o 1, aby ukazoval nově přidaný řádek

    def filter(self, col_name: str, predicate: Callable[[Union[float, str]], bool]) -> 'DataFrame':
        """
        Vrátí nový objekt - DataFrame s řádky, jejichž hodnoty ve sloupci z parametru (col_name) vrací True ve funkci predicate()
        Je potřeba vytvořit funkci def predicate(hodnota): ... př. return hodnota >=5, hodnota == "Majda"
        """
        if not isinstance(self._columns[col_name], Column):
            raise ValueError(f"Sloupec '{col_name}' není instancí třídy Column.")
        
        filtrujici_sloupec = self._columns[col_name]
        filtrovany_df = {name: Column([],dtype= Type.String) for name in self._columns}
        for i in range(self._size):
            if predicate(filtrujici_sloupec[i]):
                for název_sloupce, sloupec in self._columns.items(): 
                    filtrovany_df[název_sloupce].append(sloupec[i])  
        return DataFrame(filtrovany_df)

    def sort(self, col_name: str, ascending=True) -> 'DataFrame':
        """
        Vytvoří nový objekt - DataFrame, seřazený podle sloupce s názvem `col_name` vzestupně nebo sestupně.
        ascending = True ... vzestupně, ascending = False ... sestupně
        """
        if not isinstance(self._columns[col_name], Column):
            raise ValueError(f"Sloupec '{col_name}' není instancí třídy Column.")
        
        radici_sloupec = self._columns[col_name]
        hodnoty_indexy = [(hodnota, index) for index, hodnota in enumerate(radici_sloupec)]
        serazene_hodnoty_indexy = sorted(hodnoty_indexy, reverse=not ascending)
        serazene_indexy = [i for index, i in serazene_hodnoty_indexy]
        df_serazeny = {nazev_sloupce: Column([sloupec[i] for i in serazene_indexy], self._columns[nazev_sloupce].dtype) for nazev_sloupce, sloupec in self._columns.items()}
        return DataFrame(df_serazeny)

    def describe(self) -> str:
        """
    Generuje formátovaný popis datového rámce - objektu podobně jako funkce describe v knihovně pandas,
    ale pouze s minimem, maximem a průměrem pro číselné hodnoty a počtem prvků.
        """
        popis_df = []
        for nazev_sloupce, sloupec in self._columns.items():
            if sloupec.dtype == Type.Float:
                hodnoty_ve_sloupci = [hodnota for hodnota in sloupec if hodnota is not None]
                if hodnoty_ve_sloupci:                                                                              # Float
                    min_hodnota = min(hodnoty_ve_sloupci)
                    max_hodnota = max(hodnoty_ve_sloupci)
                    prumer = sum(hodnoty_ve_sloupci) / len(hodnoty_ve_sloupci)
                    popis_df.append(f"{nazev_sloupce}: min={min_hodnota}, max={max_hodnota}, avg={prumer:.2f}")
                else:
                    popis_df.append(f"{nazev_sloupce}: min=None, max=None, avg=None")                               # Float s None
            elif sloupec.dtype == Type.String:
                popis_df.append(f"{nazev_sloupce}: počet prvků ={len(sloupec)}")                                    # String
        return "\n".join(popis_df)

    def setvalue(self, col_name: str, row_index: int, value: Any) -> None:
        """
        Nastaví novou hodnotu v objektu, podle parametru název sloupce a index řádku
        """
        sloupec = self._columns[col_name]
        sloupec[row_index] = sloupec._cast(value)

    def remove_column(self, col_name: str) -> None:
        """
        Smaže z objektu - DataFramu sloupec, podle jeho názvu.
        """
        if col_name in self._columns:
            del self._columns[col_name]
            self._size = len(next(iter(self._columns.values())))  # updatuje velikost na velikost jiného zbývajícího sloupce
        else:
            raise ValueError(f"Sloupec '{col_name}' v tomto objektu DataFrame neexistuje.")

    # statické metody třídy - nepotřebují parametr self, nepoužívá instanční proměnné (př. self._size)
    @staticmethod
    def read_csv(path: Union[str, Path]) -> 'DataFrame':
        """
        Přečte cizí DataFrame CSV Readerem
        """
        return CSVReader(path).read()

    @staticmethod
    def read_json(path: Union[str, Path]) -> 'DataFrame':
        """
        Přečte cizí DataFrame JSONReaderem
        """
        return JSONReader(path).read()

    def cumsum(self, název_sloupce: str) :
        if název_sloupce not in self._columns:
            raise ValueError("Nemůžeš dělat cumsum sloupce, co v objektu není.")
        sloupec = self._columns[název_sloupce]._data
        kumulativní_suma = []
        cumsum = 0
        for hodnota in sloupec:
            cumsum += hodnota
            kumulativní_suma.append(cumsum)
        self._columns[f"Kumulativní suma sloupce {název_sloupce}"] = Column(kumulativní_suma, dtype= Type.Float)
        return self
        # return self.append_column("Kumulativní suma", Column(kumulativní_suma, dtype= Type.Float))


    def rozdil(self, sloupec_a: str, sloupec_b: str):
        if sloupec_a not in self._columns or sloupec_b not in self._columns:
            raise ValueError("Jeden nebo oba sloupce se nenachází v objektu.")
        hodnoty_a = self._columns[sloupec_a]._data
        hodnoty_b = self._columns[sloupec_b]._data
        rozdily = [abs(a - b) for a, b in zip(hodnoty_a, hodnoty_b)]
        self._columns["Rozdíl"] = Column(rozdily, dtype = Type.Float)
        return self
    



    # ÚKOL 1 Z.T.
    def product(self, axis) -> 'DataFrame':

        if axis == 0:
            součiny_sloupců = []
            názvy_sloupců = []
            for název_sloupce, sloupec in self._columns.items():
                součin = 1
                data = self._columns[název_sloupce]._data
                for hodnota in data:
                    součin *= hodnota
                součiny_sloupců.append(součin)
                názvy_sloupců.append(název_sloupce)

                df = DataFrame({"Sloupec": Column(názvy_sloupců, dtype = Type.String), 
                                "Součin hodnot ve sloupci": Column(součiny_sloupců, dtype=Type.Float)})
                
        if axis == 1:
            součiny_řádků = []
            indexy_řádků = []
            
            for i in range(self._size): 
                položky_řádku = []
                for sloupec in self._columns.values():
                    položky_řádku.append(sloupec[i])
                    položky_řádku = list(položky_řádku)
                for řádek in self:
                    součin = 1
                    for hodnota in položky_řádku:
                        součin *= hodnota 
                součiny_řádků.append(součin)
                indexy_řádků.append(i)

                df = DataFrame({"Index řádku": Column(indexy_řádků, dtype=Type.String),
                "Součin hodnot v řádku": Column(součiny_řádků, dtype= Type.Float)})
        return df

    # Úkol 2
    def replace(self, to_replace: Union[float, str, list[float], list[str]], value: Union[float, str]) -> 'DataFrame':
        
        for sloupec in self._columns.values():
            for položka in sloupec:
                if položka == to_replace:
                    položka = value 
                # if položka == to_replace or položka in to_replace:
                    # sloupec[položka] = value  
        return self


class Reader(ABC):
    def __init__(self, path: Union[Path, str]):
        self.path = Path(path)

    @abstractmethod
    def read(self) -> DataFrame:
        raise NotImplemented("Abstract method")

class JSONReader(Reader):
    """
    Factory class for creation of dataframe by CSV file. CSV file must contain
    header line with names of columns.
    The type of columns should be inferred from types of their values (columns which
    contains only value has to be floats columns otherwise string columns),
    """
    def read(self) -> DataFrame:
        with open(self.path, "rt") as f:
            json = load(f)
        columns = {}
        for cname in json.keys(): # cyklus přes sloupce (= atributy JSON objektu)
            dtype = Type.Float if all(value is None or isinstance(value, Real)
                                      for value in json[cname]) else Type.String
            columns[cname] = Column(json[cname], dtype)
        return DataFrame(columns)


class CSVReader(Reader):
    def read(self) -> 'DataFrame':
            with open(self.path, "r", encoding="utf-8-sig") as soubor:
                řádky = list(soubor)
                oddělovač = ';'  
                nazvy_sloupcu = řádky[0].strip().split(oddělovač)
                data_tabulky = [řádek.strip().split(oddělovač) for řádek in řádky[1:]]
            datove_typy_sloupcu = [Type.Float if all(self.is_float(hodnota) for hodnota in sloupec) else Type.String for sloupec in zip(*data_tabulky)]
            sloupce = {}
            for název, dtype, data in zip(nazvy_sloupcu, datove_typy_sloupcu, zip(*data_tabulky)):
                    sloupce[název] = Column(data, dtype)
            return DataFrame(sloupce)
    
    @staticmethod
    def is_float(hodnota: str) -> bool:
        """
        Metoda zjišťuje, zda je zadaná hodnota číselná.
        """
        try:
            float(hodnota)
            return True
        except ValueError:
            return False
        
    @staticmethod
    def parse_value(hodnota: str) -> Union[float, str]:
        """
        Metoda převede měnnou hodnotu na Float
        """
        try:
            hodnota = hodnota.replace(' Kč', '').replace(',', '.').strip()
            hodnota = hodnota.replace(' ', '')
            return float(hodnota)
        except ValueError:
            return hodnota.strip()


if __name__ == "__main__":

    roky = Column([2020,2023,2024,2022,2021], dtype=Type.String)
    počet = Column([4,3,6,7,1], dtype=Type.Float)

    df = DataFrame({"Rok":roky,
                    "Počet požárů":počet})
    
    df = df.sort("Rok", ascending= True)
    df = df.cumsum("Počet požárů")
    df = df.rozdil("Počet požárů", "Kumulativní suma sloupce Počet požárů")


    matice = DataFrame({"A": Column([1,1,2], dtype= Type.Float),
                        "B": Column([2,3,2], dtype=Type.Float),
                        "C": Column([0,1,3], dtype= Type.Float)})

    násobení_řádků = matice.product(1)
    násobení_sloupců = matice.product(0)

    print(f"DataFrame - matice:\n {matice}")
    print(f"\nNásobení sloupců:\n {násobení_sloupců}")
    print(f"\nNásobení řádků:\n {násobení_řádků}")


    matice_replace = matice.replace(1,0)
    print(f"\n{matice_replace}")




DataFrame - matice:
 A                    B                    C                   
                   1                    2                    0
                   1                    3                    1
                   2                    2                    3

Násobení sloupců:
 Sloupec              Součin hodnot ve sloupci
A                                       2
B                                      12
C                                       0

Násobení řádků:
 Index řádku          Součin hodnot v řádku
0                                       0
1                                       3
2                                      12

A                    B                    C                   
                   1                    2                    0
                   1                    3                    1
                   2                    2                    3
