# Setup

### Table of Contents
1. [SQL Interface](#sql-interface)
2. [Helper Methods](#helper-methods)
2. [Pandas, extended inplace](#pandas-extended-inplace)
2. [Format District](#format-district)
2. [GeoDF](#geodf)
2. [GroupedDF](#groupeddf)

In [None]:
# pip install numpy pandas

#### `extend-inplace` - a simple library I built, inspired by this project.
- Docs & source here: [github.com/ryayoung/extend-inplace](https://github.com/ryayoung/extend-inplace)

In [None]:
# pip install extend-inplace

In [3]:
from typing import Iterable, Any
import pandas as pd
import numpy as np
import sqlite3 as sqlite
import re
from extend_inplace import Extend

## SQL Interface

- End-user will use `read_raw()`, `read_main()`, `write_raw()`, and `write_main()` to communicate with sqlite

In [None]:
get_con_raw = lambda: sqlite.connect("data_raw.db")
get_con_main = lambda: sqlite.connect("data_main.db")
sql = dict(
    raw = dict(con = get_con_raw, cache = dict()),
    main = dict(con = get_con_main, cache = dict()),
)

Helper methods for the end-user methods

In [None]:
def _to_sql(
    df: pd.DataFrame,
    name: str,
    con: str
) -> None | int:
    """
    Save/replace `df` under `name` in sqlite. Database depends on `con`
    """
    res = df.to_sql(name, con=sql[con]['con'](), index=False, if_exists='replace')
    return res


def _format_query(
    *args: tuple[str], # only one string arg processed
    **kwargs: dict[str, str],
) -> str:
    """
    Allow for more pythonic style of writing sql queries, for better
    readability, user experience, and user-error prevention. Underscores
    can be used to prefix/suffix python reserved word kwargs like FROM, IF, etc.
    --
    >>> _format_query("SELECT name, age FROM my_table")
    SELECT name, age FROM my_table
    >>> _format_query("my_table")
    SELECT * FROM my_table
    >>> _format_query("my_table", WHERE = "something = something")
    SELECT * FROM my_table WHERE something = something
    >>> _format_query("name, age", from_ = "my_table")
    SELECT name, age FROM my_table
    """
    kwarg_query = " ".join([f'{k.strip("_").upper()} {v}' for k,v in kwargs.items()])
    if len(args) == 1:
        if len(args[0]) == len(re.sub(r"\s+", "", args[0])):
            query = f"SELECT * FROM {args[0]} "
        elif "select" not in args[0].lower():
            query = f"SELECT {args[0]} "
        else:
            query = args[0] + " "
        return query + kwarg_query
    else:
        return kwarg_query


def _read_sql(
    *args: tuple[str, ...],
    con: str,
    **kwargs: dict[str, str],
) -> pd.DataFrame:
    """
    This method is specific to this script. It references global variables and funcs
    """
    query = _format_query(*args, **kwargs)

    if (cached := sql[con]['cache'].get(query, None)) is not None:
        return cached.copy()

    df = sql[con]['cache'][query] = pd.read_sql(query, con=sql[con]['con']())
    return df

End-user will use these read/write methods to communicate with sqlite

In [6]:
read_raw = lambda *args, **kwargs: _read_sql(*args, **kwargs, con='raw')
read_main = lambda *args, **kwargs: _read_sql(*args, **kwargs, con='main')
# def write_raw(*args, **kwargs):
    # _to_sql(*args, **kwargs, con='raw')
write_raw = lambda *args, **kwargs: _to_sql(*args, **kwargs, con='raw')
write_main = lambda *args, **kwargs: _to_sql(*args, **kwargs, con='main')

## Helper methods

In [None]:
def head(
    *dfs: tuple[pd.DataFrame, ...],
    n: int = 3,
    with_tail: bool = False
) -> None:
    '''
    Like display() and pd.DataFrame.head got married and had a kid.
    - `with_tail` will concat head and tail together.
    '''
    for df in dfs:
        print(f'{df.shape[1]} cols x {df.shape[0]} rows')
        if with_tail:
            display(pd.concat([df.head(n-1), df.tail(n-1)], axis=0))
        else:
            display(df.head(3))


def _flatten_iterable(
    args: Any
) -> tuple[Any, ...]:
    """
    Turns anything into a flattened tuple of non-iterable (str, bytes excluded) values

    >>> _flatten_iterable(int)
    (<class 'int'>,)
    >>> _flatten_iterable(['hi'])
    ('hi',)
    >>> _flatten_iterable(['hi', (1,3, [8]), [((3,3,3))]])
    ('hi', 1, 3, 8, 3, 3, 3)
    """

    def valid_iterable(e: Any) -> bool:
        if isinstance(e, Iterable) and not isinstance(e, (str, bytes)):
            return True
        return False

    def flatten(elems: Iterable[Any]):
        for e in elems:
            if valid_iterable(e):
                yield from flatten(e)
            else:
                yield e

    args = args if valid_iterable(args) else (args,)

    return tuple(flatten(args))


## Pandas, extended inplace
- Modifying pandas `DataFrame` and `Series` classes in-place with the following extra instance methods

In [None]:
@Extend(pd.Series)
def rename_vals_from_df(self, changes:pd.DataFrame) -> pd.Series:
    cols = changes.columns
    old, new = changes[cols[0]], changes[cols[1]]
    return self.map(dict(zip(old, new)))

In [None]:
@Extend(pd.DataFrame)
class _:
    def set_columns(self, *new) -> pd.DataFrame:
        new = _flatten_iterable(new)
        self.columns = new
        return self


    def rename_col(self, old:str, new:str) -> pd.DataFrame:
        """
        Simplify syntax for renaming one column
        """
        return self.rename(columns={old:new})


    def drop_cols(self, *columns) -> pd.DataFrame:
        """
        Simplify syntax of dropping columns
        """
        cols = _flatten_iterable(columns)
        cols = [c for c in cols if c in self.columns]
        return self.drop(columns=cols)


    def prefix_cols(self, cols:str or list, prefix:str) -> pd.DataFrame:
        """
        Like df.add_prefix(), but takes a subset of columns as first positional
        """
        if isinstance(cols, str):
            cols = [cols]
        return self.rename(columns={col: f'{prefix}{col}' for col in cols})


    def reset_multilevel_columns(
        self,
        *new_columns: tuple[str | Iterable[str], ...],
    ) -> pd.DataFrame:
        """
        Use this after df.pivot() to flatten and rename columns.
        """
        new_columns = _flatten_iterable(new_columns)
        self = self.reset_index()
        self.columns = self.columns.droplevel()
        self.columns.name = None
        self = self.set_columns(new_columns)
        return self


    def col_replace(
        self,
        text: str | dict,
        replacement: str | None = None
    ) -> pd.DataFrame:
        """
        Replace a pattern in each column NAME
        """
        if replacement is None:
            if isinstance(text, dict):
                to_replace = text
            else:
                raise ValueError("Multiple values must be passed as dict")
        else:
            to_replace = {text: replacement}
        
        for old, new in to_replace.items():
            for c in self.columns:
                if c != old:
                    self = self.rename(columns={c: c.replace(old, new)})
        return self


    def coerce_type(
        self,
        dtype: str or type,
        subset: list = None,
        exclude: list = None
    ) -> pd.DataFrame:
        """
        Iteratively try to set all columns to type
        """
        df = self.copy()
        cols = tuple(subset) if subset else tuple(self.columns)
        if exclude:
            cols = [c for c in cols if c not in exclude]
        for c in cols:
            try:
                df[c] = df[c].astype(dtype)
            except Exception:
                pass
        return df


    def insert_at(
        self,
        target: str | int,
        name: str,
        col: pd.Series
    ) -> pd.DataFrame:
        """
        Insert col before target col name, or to index.
        Like df.insert(), but takes a column name as location, instead of int """
        df = self.copy()
        if isinstance(target, int):
            idx = target
        else:
            idx = list(df.columns).index(target)
        df.insert(idx, name, col)
        return df


    def move_col(
        self,
        name: str,
        target: str | int
    ) -> pd.DataFrame:
        """
        Move col to before target col name, or to index.
        - Must not mutate original dataframe (so we can chain the func
        and re-run cells).
        - Placement must be correct: if target column is string, always
        place our column before the target column. If target is an index,
        the RESULTING dataframe must have our new column in the specified index.
        """
        cols = list(self.columns)

        if isinstance(target, int):
            idx = target
            cols.remove(name)
            cols.insert(idx, name)
        elif isinstance(target, str):
            idx = list(self.columns).index(target)
            cols[cols.index(name)] = "7dwIFmVgq5f1z"
            cols.insert(idx, name)
            cols.remove("7dwIFmVgq5f1z")

        return self[cols]


    def separate_by(
        self,
        to_match: list or str,
        index: list = [],
        keep: list = [],
        start: bool = False,
        end: bool = False,
        mode: str = "include",
    ) -> pd.DataFrame:
        """
        Given a df and a substring, filter for columns whose name does, or
        does not, contain a substring

        to_match
            text(s) to match
        index
            key columns to ignore (year, county, district, etc.)
        keep
            columns to ignore in matching. If "exclude", these columns will be removed
        start
            match only if column starts with `to_match` element, instead of contains
        end
            match only if column ends with `to_match` element, instead of contains
        mode
            If "include", returned df will include columns in `index`, `keep`, and matches
            If "exclude", returned df will exclude columns in `keep`, and matches
        """
        if not isinstance(to_match, list):
            to_match = [to_match]

        names = [item for sublist in [[c for c in self.columns if (
                c.startswith(txt) if start else c.endswith(txt) if end else txt in c
            )] for txt in to_match] for item in sublist]

        if mode == 'include':
            return self.copy()[index + keep + names]
        if mode == 'exclude':
            return self.copy().drop(columns = keep + names)


    def display(self, text: bool = None, head: bool = True) -> pd.DataFrame:
        """
        Display while chaining methods. Return self
        """
        if text is not None:
            print(text)
        to_display = self.head(3) if head else self
        display(to_display)
        return self
    

    def rename_vals_from_df(self, column: str, changes: pd.DataFrame) -> pd.DataFrame:
        """
        Renames values in a column using a 2-column dataframe (old, new)
        instead of dictionary.
        """
        df = self.copy()
        df[column] = df[column].rename_vals_from_df(changes)
        return df


    def rename_cols_from_df(self, changes: pd.DataFrame) -> pd.DataFrame:
        """
        Renames columns in df, mapping changes from a 2-column df (old, new)
        """
        cols = changes.columns
        old, new = changes[cols[0]], changes[cols[1]]
        return self.rename(columns=dict(zip(old, new)))

## Format District
- Apply the below function to all school district columns from source, to standardize their naming conventions to allow joins

In [None]:
def standardize_district_name(name: str) -> str:
    """
    Apply this iteratively (with pd.Series.apply())
    to school district name columns to standardize their naming conventions
    as best as possible prior to merging datasets with potentially
    very different naming conventions.
    """
    import re
    name = name.upper()
    name = re.sub('S/D', '', name)
    name = re.sub('-|\.|\(|\)|/|:', '', name)
    name = re.sub(' CONSOLIDATED', '', name)
    name = re.sub('\s?SCHOOL DISTRICT', '', name)
    
    # Number patterns
    name = re.sub(r' RENO\s?(\d+)', r'\1', name)
    name = re.sub(r' NO\s?(\d+)', r'\1', name)
    name = re.sub(r' RD\s?(\d+)', r'\1', name)
    name = re.sub(r' RJ\s?(\d+)', r'\1', name)
    name = re.sub(r' RE\s?(\d+)J?T?', r'\1', name)
    name = re.sub(r' R\s?(\d+)J?', r'\1', name)
    name = re.sub(r' C\s?(\d+)', r'\1', name)

    # Remove spaces
    name = re.sub('\s', '', name)

    # Number patterns (text at end)
    name = re.sub(r'(\d+)R', r'\1', name)
    name = re.sub(r'(\d+)J', r'\1', name)
    name = re.sub(r'(\d+)JT', r'\1', name)

    # Delete text parts
    name = re.sub('RURAL', '', name)
    name = re.sub('SCHOOLS', '', name)
    # name = re.sub('SCHOOLDISTRICT', '', name)
    name = re.sub('SCHOOLDIST', '', name)
    name = re.sub('WATERSHED', '', name)
    name = name.strip()

    # Replace full
    name = name.replace(r'GILCREST', 'WELDCOUNTY')
    name = name.replace(r'FLORENCE', 'FREMONT')
    name = name.replace(r'CONSOLIDATED1', 'CUSTERCOUNTY1')
    name = re.sub(r'(PUEBLOCITY)(\d+)', r'\1', name)
    name = re.sub(r'^CREEDE$', r'CREEDE1', name)

    name = name.strip()
    # Push number out
    name = re.sub(r'(.*?)(\d+)(.*)', r'\1\3 \2', name)
    return name


def join_conflicts(
    df1: pd.DataFrame,
    df2: pd.DataFrame,
    col: str
) -> pd.DataFrame:
    '''
    Use when trying to join columns and see values that aren't shared.
    There's DEFINITELY a better way to do this. But I'm lazy, and nobody cares!
    '''
    import pandas as pd

    col_items1 = sorted(df1[col])
    col_items2 = sorted(df2[col])

    items1_diff = [i for i in col_items1 if i not in col_items2]
    items2_diff = [i for i in col_items2 if i not in col_items1]

    # Make lists same length.
    if len(items1_diff) > len(items2_diff):
        items2_diff += [None] * (len(items1_diff) - len(items2_diff))
    elif len(items2_diff) > len(items1_diff):
        items1_diff += [None] * (len(items2_diff) - len(items1_diff))
    
    return pd.DataFrame(list(zip(items1_diff, items2_diff)))

## GeoDF

- Child class of `geopy.GeoDataFrame` to provide ease of use and additional functionality

In [None]:
import geopandas as gp
from shapely import wkt

class GeoDF(gp.GeoDataFrame):

    def __init__(self, df, geo=None, crs='epsg:4326'):
        if type(df) == str:
            df = pd.read_csv(df)
        if type(df) == pd.DataFrame:
            df = df.copy()

            cols = [c for c in df.columns if c.startswith('geo_')]
            for c in cols:
                df[c] = df[c].fillna('GEOMETRYCOLLECTION EMPTY')
                df[c] = gp.GeoSeries(df[c].apply(wkt.loads))

            if not geo:
                geo = cols[0]

            df['geometry'] = df[geo]

        super(GeoDF, self).__init__(df, crs=crs)
    

    def explore(self, tooltip=None, geo=None, **kwargs):
        if geo:
            self.set_geo(geo)
        
        if not tooltip:
            tooltip = self.columns[0]
            if 'county' in self.columns and 'dist' in self.columns:
                tooltip = ['county', 'dist']

        return super().loc[self['geometry'].astype(str) != 'GEOMETRYCOLLECTION EMPTY'].explore(tooltip=tooltip, **kwargs)


    def df(self):
        return self[self.geometry.astype(str) != 'GEOMETRYCOLLECTION EMPTY']


    def set_geo(self, geo, crs='epsg:4326'):
        self['geometry'] = self[geo]
    

    def copy(self):
        return GeoDF(super().copy())

## GroupedDF

- Lets us split a dataframe into organized groups based on column names.
- Useful for massive dataframes with too many columns to conveniently select columns manually

In [None]:
from copy import deepcopy

class GroupedDF:
    default_index = []
    groups: dict = None

    def __init__(self, df, index=[], custom={}, show_g_names=True):
        self.index = index
        if self.index == []: self.index = GroupedDF.default_index

        self.index = index
        self._df = deepcopy(df)
        self._show_g_names = show_g_names

        self._custom = custom
        self.refresh_groups()
    

    def refresh_groups(self):
        self._dict = {g: self._df.separate_by(g, self.index, start=True, mode='include') for g in GroupedDF.groups.keys()}

        if self._show_g_names == False:
            for k, v in self._dict.items():
                self._dict[k] = v.col_replace(f'{k}_', '')

        for name, cols in self._custom.items():
            self._dict[name] = self._df[cols]

        for k, v in self._dict.items():
            setattr(self, k, v)
    

    @classmethod
    def set_groups(cls, items: dict or list):

        if type(items) == list:
            cls.groups = {k: "" for k in items}
            return
        
        cls.groups = items


    @property
    def df(self):
        return self._df
    
    @df.setter
    def df(self, new):
        self._df = new
        self.refresh_groups()


    def __getattr__(self, name):
        return self._dict.get(name)

    def __getitem__(self, name):
        return self._dict[name]
    

    @property
    def dict(self):
        return self._dict
    
    @property
    def show_g_names(self):
        return self._show_g_names
    
    @show_g_names.setter
    def show_g_names(self, val:bool):
        self._show_g_names = val
        self.refresh_groups()
    

    def display(self, rows=3, exclude=[]):
        for k, v in self._dict.items():
            print(k, GroupedDF.groups[k], sep=': ')
            display(v.drop(columns=exclude).head(rows))
            print()