From 3d99081758d51421f05cf5f48338d7100b903ea7 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Mon, 29 Nov 2021 00:52:59 +0100 Subject: [PATCH] TYP: Typ part of python_parser (#44406) --- pandas/io/parsers/arrow_parser_wrapper.py | 7 +- pandas/io/parsers/base_parser.py | 63 ++++++++++-- pandas/io/parsers/c_parser_wrapper.py | 8 +- pandas/io/parsers/python_parser.py | 112 ++++++++++++++-------- 4 files changed, 133 insertions(+), 57 deletions(-) diff --git a/pandas/io/parsers/arrow_parser_wrapper.py b/pandas/io/parsers/arrow_parser_wrapper.py index 9fbeeb74901ef..98d1315c6212c 100644 --- a/pandas/io/parsers/arrow_parser_wrapper.py +++ b/pandas/io/parsers/arrow_parser_wrapper.py @@ -110,7 +110,12 @@ def _finalize_output(self, frame: DataFrame) -> DataFrame: multi_index_named = False frame.columns = self.names # we only need the frame not the names - frame.columns, frame = self._do_date_conversions(frame.columns, frame) + # error: Incompatible types in assignment (expression has type + # "Union[List[Union[Union[str, int, float, bool], Union[Period, Timestamp, + # Timedelta, Any]]], Index]", variable has type "Index") [assignment] + frame.columns, frame = self._do_date_conversions( # type: ignore[assignment] + frame.columns, frame + ) if self.index_col is not None: for i, item in enumerate(self.index_col): if is_integer(item): diff --git a/pandas/io/parsers/base_parser.py b/pandas/io/parsers/base_parser.py index 4f5ba3460a3c8..5d03529654b0d 100644 --- a/pandas/io/parsers/base_parser.py +++ b/pandas/io/parsers/base_parser.py @@ -10,10 +10,13 @@ Any, Callable, DefaultDict, + Hashable, Iterable, + Mapping, Sequence, cast, final, + overload, ) import warnings @@ -56,6 +59,7 @@ from pandas.core.dtypes.dtypes import CategoricalDtype from pandas.core.dtypes.missing import isna +from pandas import DataFrame from pandas.core import algorithms from pandas.core.arrays import Categorical from pandas.core.indexes.api import ( @@ -241,7 +245,7 @@ def _open_handles( errors=kwds.get("encoding_errors", "strict"), ) - def _validate_parse_dates_presence(self, columns: list[str]) -> Iterable: + def _validate_parse_dates_presence(self, columns: Sequence[Hashable]) -> Iterable: """ Check if parse_dates are in columns. @@ -337,11 +341,24 @@ def _should_parse_dates(self, i: int) -> bool: @final def _extract_multi_indexer_columns( - self, header, index_names, passed_names: bool = False + self, + header, + index_names: list | None, + passed_names: bool = False, ): """ - extract and return the names, index_names, col_names - header is a list-of-lists returned from the parsers + Extract and return the names, index_names, col_names if the column + names are a MultiIndex. + + Parameters + ---------- + header: list of lists + The header rows + index_names: list, optional + The names of the future index + passed_names: bool, default False + A flag specifying if names where passed + """ if len(header) < 2: return header[0], index_names, None, passed_names @@ -400,7 +417,7 @@ def extract(r): return names, index_names, col_names, passed_names @final - def _maybe_dedup_names(self, names): + def _maybe_dedup_names(self, names: Sequence[Hashable]) -> Sequence[Hashable]: # see gh-7160 and gh-9424: this helps to provide # immediate alleviation of the duplicate names # issue and appears to be satisfactory to users, @@ -408,7 +425,7 @@ def _maybe_dedup_names(self, names): # would be nice! if self.mangle_dupe_cols: names = list(names) # so we can index - counts: DefaultDict[int | str | tuple, int] = defaultdict(int) + counts: DefaultDict[Hashable, int] = defaultdict(int) is_potential_mi = _is_potential_multi_index(names, self.index_col) for i, col in enumerate(names): @@ -418,6 +435,8 @@ def _maybe_dedup_names(self, names): counts[col] = cur_count + 1 if is_potential_mi: + # for mypy + assert isinstance(col, tuple) col = col[:-1] + (f"{col[-1]}.{cur_count}",) else: col = f"{col}.{cur_count}" @@ -572,7 +591,7 @@ def _agg_index(self, index, try_parse_dates: bool = True) -> Index: @final def _convert_to_ndarrays( self, - dct: dict, + dct: Mapping, na_values, na_fvalues, verbose: bool = False, @@ -664,7 +683,7 @@ def _convert_to_ndarrays( @final def _set_noconvert_dtype_columns( - self, col_indices: list[int], names: list[int | str | tuple] + self, col_indices: list[int], names: Sequence[Hashable] ) -> set[int]: """ Set the columns that should not undergo dtype conversions. @@ -848,7 +867,27 @@ def _cast_types(self, values, cast_type, column): ) from err return values - def _do_date_conversions(self, names, data): + @overload + def _do_date_conversions( + self, + names: Index, + data: DataFrame, + ) -> tuple[Sequence[Hashable] | Index, DataFrame]: + ... + + @overload + def _do_date_conversions( + self, + names: Sequence[Hashable], + data: Mapping[Hashable, ArrayLike], + ) -> tuple[Sequence[Hashable], Mapping[Hashable, ArrayLike]]: + ... + + def _do_date_conversions( + self, + names: Sequence[Hashable] | Index, + data: Mapping[Hashable, ArrayLike] | DataFrame, + ) -> tuple[Sequence[Hashable] | Index, Mapping[Hashable, ArrayLike] | DataFrame]: # returns data, columns if self.parse_dates is not None: @@ -864,7 +903,11 @@ def _do_date_conversions(self, names, data): return names, data - def _check_data_length(self, columns: list[str], data: list[ArrayLike]) -> None: + def _check_data_length( + self, + columns: Sequence[Hashable], + data: Sequence[ArrayLike], + ) -> None: """Checks if length of data is equal to length of column names. One set of trailing commas is allowed. self.index_col not False diff --git a/pandas/io/parsers/c_parser_wrapper.py b/pandas/io/parsers/c_parser_wrapper.py index e96df3b3f3782..05c963f2d2552 100644 --- a/pandas/io/parsers/c_parser_wrapper.py +++ b/pandas/io/parsers/c_parser_wrapper.py @@ -279,7 +279,7 @@ def read(self, nrows=None): data_tups = sorted(data.items()) data = {k: v for k, (i, v) in zip(names, data_tups)} - names, data = self._do_date_conversions(names, data) + names, date_data = self._do_date_conversions(names, data) else: # rename dict keys @@ -302,13 +302,13 @@ def read(self, nrows=None): data = {k: v for k, (i, v) in zip(names, data_tups)} - names, data = self._do_date_conversions(names, data) - index, names = self._make_index(data, alldata, names) + names, date_data = self._do_date_conversions(names, data) + index, names = self._make_index(date_data, alldata, names) # maybe create a mi on the columns names = self._maybe_make_multi_index_columns(names, self.col_names) - return index, names, data + return index, names, date_data def _filter_usecols(self, names): # hackish diff --git a/pandas/io/parsers/python_parser.py b/pandas/io/parsers/python_parser.py index 0ef8d41c58f71..27d0944572024 100644 --- a/pandas/io/parsers/python_parser.py +++ b/pandas/io/parsers/python_parser.py @@ -10,7 +10,10 @@ import sys from typing import ( DefaultDict, + Hashable, Iterator, + Mapping, + Sequence, cast, ) import warnings @@ -19,6 +22,7 @@ import pandas._libs.lib as lib from pandas._typing import ( + ArrayLike, FilePath, ReadCsvBuffer, Scalar, @@ -110,9 +114,10 @@ def __init__( # Get columns in two steps: infer from data, then # infer column indices from self.usecols if it is specified. self._col_indices: list[int] | None = None + columns: list[list[Scalar | None]] try: ( - self.columns, + columns, self.num_original_columns, self.unnamed_cols, ) = self._infer_columns() @@ -123,18 +128,19 @@ def __init__( # Now self.columns has the set of columns that we will process. # The original set is stored in self.original_columns. # error: Cannot determine type of 'index_names' + self.columns: list[Hashable] ( self.columns, self.index_names, self.col_names, _, ) = self._extract_multi_indexer_columns( - self.columns, + columns, self.index_names, # type: ignore[has-type] ) # get popped off for index - self.orig_names: list[int | str | tuple] = list(self.columns) + self.orig_names: list[Hashable] = list(self.columns) # needs to be cleaned/refactored # multiple date column thing turning into a real spaghetti factory @@ -172,7 +178,7 @@ def __init__( ) self.num = re.compile(regex) - def _make_reader(self, f): + def _make_reader(self, f) -> None: sep = self.delimiter if sep is None or len(sep) == 1: @@ -238,7 +244,7 @@ def _read(): # TextIOWrapper, mmap, None]") self.data = reader # type: ignore[assignment] - def read(self, rows=None): + def read(self, rows: int | None = None): try: content = self._get_lines(rows) except StopIteration: @@ -251,7 +257,7 @@ def read(self, rows=None): # done with first read, next time raise StopIteration self._first_chunk = False - columns = list(self.orig_names) + columns: Sequence[Hashable] = list(self.orig_names) if not len(content): # pragma: no cover # DataFrame with the right metadata, even though it's length 0 names = self._maybe_dedup_names(self.orig_names) @@ -275,14 +281,17 @@ def read(self, rows=None): alldata = self._rows_to_cols(content) data, columns = self._exclude_implicit_index(alldata) - data = self._convert_data(data) - columns, data = self._do_date_conversions(columns, data) + conv_data = self._convert_data(data) + columns, conv_data = self._do_date_conversions(columns, conv_data) - index, columns = self._make_index(data, alldata, columns, indexnamerow) + index, columns = self._make_index(conv_data, alldata, columns, indexnamerow) - return index, columns, data + return index, columns, conv_data - def _exclude_implicit_index(self, alldata): + def _exclude_implicit_index( + self, + alldata: list[np.ndarray], + ) -> tuple[Mapping[Hashable, np.ndarray], Sequence[Hashable]]: names = self._maybe_dedup_names(self.orig_names) offset = 0 @@ -304,7 +313,10 @@ def get_chunk(self, size=None): size = self.chunksize # type: ignore[attr-defined] return self.read(rows=size) - def _convert_data(self, data): + def _convert_data( + self, + data: Mapping[Hashable, np.ndarray], + ) -> Mapping[Hashable, ArrayLike]: # apply converters clean_conv = self._clean_mapping(self.converters) clean_dtypes = self._clean_mapping(self.dtype) @@ -336,11 +348,13 @@ def _convert_data(self, data): clean_dtypes, ) - def _infer_columns(self): + def _infer_columns( + self, + ) -> tuple[list[list[Scalar | None]], int, set[Scalar | None]]: names = self.names num_original_columns = 0 clear_buffer = True - unnamed_cols: set[str | int | None] = set() + unnamed_cols: set[Scalar | None] = set() self._header_line = None if self.header is not None: @@ -355,7 +369,7 @@ def _infer_columns(self): have_mi_columns = False header = [header] - columns: list[list[int | str | None]] = [] + columns: list[list[Scalar | None]] = [] for level, hr in enumerate(header): try: line = self._buffered_line() @@ -384,7 +398,7 @@ def _infer_columns(self): line = self.names[:] - this_columns: list[int | str | None] = [] + this_columns: list[Scalar | None] = [] this_unnamed_cols = [] for i, c in enumerate(line): @@ -459,6 +473,7 @@ def _infer_columns(self): if clear_buffer: self._clear_buffer() + first_line: list[Scalar] | None if names is not None: # Read first row after header to check if data are longer try: @@ -534,10 +549,10 @@ def _infer_columns(self): def _handle_usecols( self, - columns: list[list[str | int | None]], - usecols_key: list[str | int | None], + columns: list[list[Scalar | None]], + usecols_key: list[Scalar | None], num_original_columns: int, - ): + ) -> list[list[Scalar | None]]: """ Sets self._col_indices @@ -590,7 +605,7 @@ def _buffered_line(self): else: return self._next_line() - def _check_for_bom(self, first_row): + def _check_for_bom(self, first_row: list[Scalar]) -> list[Scalar]: """ Checks whether the file begins with the BOM character. If it does, remove it. In addition, if there is quoting @@ -621,6 +636,7 @@ def _check_for_bom(self, first_row): return first_row first_row_bom = first_row[0] + new_row: str if len(first_row_bom) > 1 and first_row_bom[1] == self.quotechar: start = 2 @@ -639,9 +655,11 @@ def _check_for_bom(self, first_row): # No quotation so just remove BOM from first element new_row = first_row_bom[1:] - return [new_row] + first_row[1:] - def _is_line_empty(self, line): + new_row_list: list[Scalar] = [new_row] + return new_row_list + first_row[1:] + + def _is_line_empty(self, line: list[Scalar]) -> bool: """ Check if a line is empty or not. @@ -656,7 +674,7 @@ def _is_line_empty(self, line): """ return not line or all(not x for x in line) - def _next_line(self): + def _next_line(self) -> list[Scalar]: if isinstance(self.data, list): while self.skipfunc(self.pos): self.pos += 1 @@ -710,7 +728,7 @@ def _next_line(self): self.buf.append(line) return line - def _alert_malformed(self, msg, row_num): + def _alert_malformed(self, msg: str, row_num: int) -> None: """ Alert a user about a malformed row, depending on value of `self.on_bad_lines` enum. @@ -720,10 +738,12 @@ def _alert_malformed(self, msg, row_num): Parameters ---------- - msg : The error message to display. - row_num : The row number where the parsing error occurred. - Because this row number is displayed, we 1-index, - even though we 0-index internally. + msg: str + The error message to display. + row_num: int + The row number where the parsing error occurred. + Because this row number is displayed, we 1-index, + even though we 0-index internally. """ if self.on_bad_lines == self.BadLineHandleMethod.ERROR: raise ParserError(msg) @@ -731,7 +751,7 @@ def _alert_malformed(self, msg, row_num): base = f"Skipping line {row_num}: " sys.stderr.write(base + msg + "\n") - def _next_iter_line(self, row_num): + def _next_iter_line(self, row_num: int) -> list[Scalar] | None: """ Wrapper around iterating through `self.data` (CSV source). @@ -741,12 +761,16 @@ def _next_iter_line(self, row_num): Parameters ---------- - row_num : The row number of the line being parsed. + row_num: int + The row number of the line being parsed. """ try: # assert for mypy, data is Iterator[str] or None, would error in next assert self.data is not None - return next(self.data) + line = next(self.data) + # for mypy + assert isinstance(line, list) + return line except csv.Error as e: if ( self.on_bad_lines == self.BadLineHandleMethod.ERROR @@ -775,7 +799,7 @@ def _next_iter_line(self, row_num): self._alert_malformed(msg, row_num) return None - def _check_comments(self, lines): + def _check_comments(self, lines: list[list[Scalar]]) -> list[list[Scalar]]: if self.comment is None: return lines ret = [] @@ -796,19 +820,19 @@ def _check_comments(self, lines): ret.append(rl) return ret - def _remove_empty_lines(self, lines): + def _remove_empty_lines(self, lines: list[list[Scalar]]) -> list[list[Scalar]]: """ Iterate through the lines and remove any that are either empty or contain only one whitespace value Parameters ---------- - lines : array-like + lines : list of list of Scalars The array of lines that we are to filter. Returns ------- - filtered_lines : array-like + filtered_lines : list of list of Scalars The same array of lines with the "empty" ones removed. """ ret = [] @@ -822,7 +846,7 @@ def _remove_empty_lines(self, lines): ret.append(line) return ret - def _check_thousands(self, lines): + def _check_thousands(self, lines: list[list[Scalar]]) -> list[list[Scalar]]: if self.thousands is None: return lines @@ -830,7 +854,9 @@ def _check_thousands(self, lines): lines=lines, search=self.thousands, replace="" ) - def _search_replace_num_columns(self, lines, search, replace): + def _search_replace_num_columns( + self, lines: list[list[Scalar]], search: str, replace: str + ) -> list[list[Scalar]]: ret = [] for line in lines: rl = [] @@ -847,7 +873,7 @@ def _search_replace_num_columns(self, lines, search, replace): ret.append(rl) return ret - def _check_decimal(self, lines): + def _check_decimal(self, lines: list[list[Scalar]]) -> list[list[Scalar]]: if self.decimal == parser_defaults["decimal"]: return lines @@ -855,12 +881,12 @@ def _check_decimal(self, lines): lines=lines, search=self.decimal, replace="." ) - def _clear_buffer(self): + def _clear_buffer(self) -> None: self.buf = [] _implicit_index = False - def _get_index_name(self, columns): + def _get_index_name(self, columns: list[Hashable]): """ Try several cases to get lines: @@ -875,6 +901,7 @@ def _get_index_name(self, columns): orig_names = list(columns) columns = list(columns) + line: list[Scalar] | None if self._header_line is not None: line = self._header_line else: @@ -883,6 +910,7 @@ def _get_index_name(self, columns): except StopIteration: line = None + next_line: list[Scalar] | None try: next_line = self._next_line() except StopIteration: @@ -929,7 +957,7 @@ def _get_index_name(self, columns): return index_name, orig_names, columns - def _rows_to_cols(self, content): + def _rows_to_cols(self, content: list[list[Scalar]]) -> list[np.ndarray]: col_len = self.num_original_columns if self._implicit_index: @@ -1012,7 +1040,7 @@ def _rows_to_cols(self, content): ] return zipped_content - def _get_lines(self, rows=None): + def _get_lines(self, rows: int | None = None): lines = self.buf new_rows = None