# core

> Fill in a module description here

In [None]:
#| default_exp core

In [None]:
#| exporti 

import sqlalchemy
from sqlalchemy import create_engine, URL, Engine
from sqlalchemy.orm import Session
from pydantic import SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import Optional,Union
from abc import ABC, abstractproperty,abstractmethod
from contextlib import contextmanager
import inspect
import pandas as pd

In [None]:
#| hide 

from nbdev.showdoc import show_doc

## Delegates

In [None]:

#|export

def delegates(to=None, keep=False):
    "Decorator: replace `**kwargs` in signature with params from `to`"
    def _f(f):
        if to is None: to_f,from_f = f.__base__.__init__,f.__init__
        else:          to_f,from_f = to,f
        sig = inspect.signature(from_f)
        sigd = dict(sig.parameters)
        k = sigd.pop('kwargs')
        s2 = {k:v for k,v in inspect.signature(to_f).parameters.items()
              if v.default != inspect.Parameter.empty and k not in sigd}
        sigd.update(s2)
        if keep: sigd['kwargs'] = k
        from_f.__signature__ = sig.replace(parameters=sigd.values())
        return f
    return _f

## SQL Alchemy Connection

In [None]:
show_doc(URL.create)

---

### URL.create

>      URL.create (drivername:str, username:Optional[str]=None,
>                  password:Optional[str]=None, host:Optional[str]=None,
>                  port:Optional[int]=None, database:Optional[str]=None, query:M
>                  apping[str,Union[Sequence[str],str]]=immutabledict({}))

Create a new :class:`_engine.URL` object.

.. seealso::

    :ref:`database_urls`

:param drivername: the name of the database backend. This name will
  correspond to a module in sqlalchemy/databases or a third party
  plug-in.
:param username: The user name.
:param password: database password.  Is typically a string, but may
  also be an object that can be stringified with ``str()``.

  .. note::  A password-producing object will be stringified only
     **once** per :class:`_engine.Engine` object.  For dynamic password
     generation per connect, see :ref:`engines_dynamic_tokens`.

:param host: The name of the host.
:param port: The port number.
:param database: The database name.
:param query: A dictionary of string keys to string values to be passed
  to the dialect and/or the DBAPI upon connect.   To specify non-string
  parameters to a Python DBAPI directly, use the
  :paramref:`_sa.create_engine.connect_args` parameter to
  :func:`_sa.create_engine`.   See also
  :attr:`_engine.URL.normalized_query` for a dictionary that is
  consistently string->list of string.
:return: new :class:`_engine.URL` object.

.. versionadded:: 1.4

    The :class:`_engine.URL` object is now an **immutable named
    tuple**.  In addition, the ``query`` dictionary is also immutable.
    To create a URL, use the :func:`_engine.url.make_url` or
    :meth:`_engine.URL.create` function/ method.  To modify a
    :class:`_engine.URL`, use the :meth:`_engine.URL.set` and
    :meth:`_engine.URL.update_query` methods.

In [None]:
#| export 

class DatabaseSettings(BaseSettings):
    drivername:str
    username: Optional[str]=None
    password: Optional[SecretStr]=None
    host: Optional[str]=None
    port: Optional[int]=None
    database: Optional[str]=None
    query: dict[str,str]={}
    


In [None]:
settings = DatabaseSettings(
    drivername='sqlite',
    database='test.db',
)
settings

DatabaseSettings(drivername='sqlite', username=None, password=None, host=None, port=None, database='test.db', query={})

In [None]:
url = URL.create(
    **settings.model_dump()
)
url

sqlite:///test.db

In [None]:
engine = create_engine(url)
engine

Engine(sqlite:///test.db)

In [None]:
#| exporti 

class AbstractDatabaseClass(ABC):

    """
    Abstract Base Class for all Database Connections.

    <br><br>
    From [SQLAlchemy docs](https://docs.sqlalchemy.org/en/13/core/engines.html):
    > The Engine is the starting point for any SQLAlchemy application. It’s “home base” for the actual database and its DBAPI.
    > An Engine references both a Dialect and a Pool, which together interpret the DBAPI’s module functions as well as the behavior of the database <br><br>
    > Pool object which will establish a DBAPI connection at localhost:5432 when a connection request is first received
    > - Note that the Engine and its underlying Pool do **not** establish the first actual DBAPI connection until the Engine.connect() method is called, or an operation which is dependent on this method such as Engine.execute() is invoked.
    > - In this way, Engine and Pool can be said to have a lazy initialization behavior.
    >
    > The Engine, once created, can either be used directly to interact with the database, or can be passed to a Session object to work with the ORM.

    """

    def query_to_records(
        self,
        query_string:str,
    ):
        with self._engine.connect() as conn:
            results = [row for row in conn.execute(text(query_string)).mappings()]
        return results

    @delegates(pd.read_sql_query)
    def query_to_df(
        self,
        query_string,
        **kwargs
    ):
        f"""{pd.read_sql_query.__doc__}"""
        with self._engine.connect() as conn:
            df = pd.read_sql_query(query_string,conn,**kwargs)
        return df

    @contextmanager
    def session_scope(self,bind=None,**kwargs):
        """Provide a transactional scope around a series of operations."""

        session = Session(bind=self._engine,**kwargs)
        try:
            yield session
            session.commit()
        except:
            session.rollback()
            raise
        finally:
            session.close()


In [None]:
#| export 

class Database(DatabaseSettings,AbstractDatabaseClass):
    
    _engine:Engine = None
    _engine_url:URL = None

    def __init__(
        self,
        **kwargs
    ):
        # settings __init__
        super().__init__(**kwargs)
        if hasattr(self.password,'get_secret_value'):
            
            password = self.password.get_secret_value()
            print(password)
        else:
            password = self.password
        url = URL.create(
            drivername=self.drivername,
            username=self.username,
            password=password,
            host=self.host,
            port=self.port,
            database=self.database,
            query=self.query
        )
        self._engine_url=url
        self._engine=create_engine(url)

    
    model_config = SettingsConfigDict(
        #allows for attributes of `database settings` to be set as defaults in subclasses without type annotation
        ignored_types=(int,str,dict),
        arbitrary_types_allowed=True
    )
    

In [None]:
database = Database(drivername='sqlite',database='test.db')
database

Database(drivername='sqlite', username=None, password=None, host=None, port=None, database='test.db', query={})

## Example

In [None]:
import pandas as pd
import os
from sqlalchemy import text
from urllib.request import urlretrieve

In [None]:
db = Database(drivername='sqlite',database='test.db')

users = pd.DataFrame({
    'id':[1,2,3],
    'user':['larry','moe','curly']
})

users.to_sql('users',db._engine,if_exists='replace',index=False)

queried = db.query_to_df("select * from users")

assert queried.equals(users)
os.remove('test.db')

In [None]:
urlretrieve(
    "http://2016.padjo.org/files/data/starterpack/census-acs-1year/acs-1-year-2015.sqlite",
    filename='acs.db'
)
db = Database(drivername='sqlite',database = 'acs.db')

In [None]:
db.query_to_df("select * from sqlite_schema")

Unnamed: 0,type,name,tbl_name,rootpage,sql
0,table,states,states,2,"CREATE TABLE states (\n year INTEGER , \n ..."
1,table,congressional_districts,congressional_districts,3,CREATE TABLE congressional_districts (\n ye...
2,table,places,places,4,"CREATE TABLE places (\n year INTEGER , \n ..."
3,index,state_on_states,states,5,"CREATE INDEX ""state_on_states"" ON states(state)"
4,index,state_cd_on_cdistricts,congressional_districts,6,"CREATE INDEX ""state_cd_on_cdistricts"" ON congr..."
5,index,state_on_places,places,7,"CREATE INDEX ""state_on_places"" ON places(state)"
6,index,name_on_states,states,8,"CREATE INDEX ""name_on_states"" ON states(name)"
7,index,name_on_cdistricts,congressional_districts,9,"CREATE INDEX ""name_on_cdistricts"" ON congressi..."
8,index,name_on_places,places,10,"CREATE INDEX ""name_on_places"" ON places(name)"


In [None]:
db.query_to_df("""select * from states limit 5""")

Unnamed: 0,year,name,geo_id,total_population,white,black,hispanic,asian,american_indian,pacific_islander,other_race,median_age,total_households,owner_occupied_homes_median_value,per_capita_income,median_household_income,below_poverty_line,foreign_born_population,state
0,2015,Alabama,04000US01,4858979,3204076,1296681,192870,58918,19069,2566,5590,38.7,1846390,134100,44765,44765,876016,169972,1
1,2015,Alaska,04000US02,738432,452472,24739,51825,45753,98300,6341,2201,33.3,250185,259600,73355,73355,74532,58544,2
2,2015,Arizona,04000US04,6828065,3802263,282718,2098411,210922,276132,9963,6951,37.4,2463008,194300,51492,51492,1159043,914400,4
3,2015,Arkansas,04000US05,2978204,2174934,466486,207743,41932,18221,7551,3826,37.9,1144663,120700,41995,41995,550508,142841,5
4,2015,California,04000US06,39144818,14815122,2192844,15184545,5476958,135866,143408,87813,36.2,12896357,449100,64500,64500,5891678,10688336,6


In [None]:
db.query_to_records(
    "select * from states limit 2",
)[0]

{'year': 2015, 'name': 'Alabama', 'geo_id': '04000US01', 'total_population': 4858979, 'white': 3204076, 'black': 1296681, 'hispanic': 192870, 'asian': 58918, 'american_indian': 19069, 'pacific_islander': 2566, 'other_race': 5590, 'median_age': 38.7, 'total_households': 1846390, 'owner_occupied_homes_median_value': 134100, 'per_capita_income': 44765, 'median_household_income': 44765, 'below_poverty_line': 876016, 'foreign_born_population': 169972, 'state': '01'}

## Generic Query Model

In [None]:
#| exporti 

from pydantic import BaseModel,ConfigDict,Field
from typing import Generic, List, TypeVar

In [None]:
#| exporti

DataModelT = TypeVar('DataModelT')

In [None]:
#| export 

class DataModel(BaseModel,Generic[DataModelT]):
    data: List[DataModelT]

    @delegates(BaseModel.model_dump)
    def to_dataframe(self,**kwargs):
        """turns `data` into a DataFrame. Delegates to `pydantic.BaseModel.model_dump` to control model serialization"""
        data = [x.model_dump(**kwargs) for x in self.data]
        return pd.DataFrame(data)

    def _repr_json_(self):
        try:
            return self.model_dump(mode='json')
        except Exception as e:
            logging.warning(e)
            pass

In [None]:
#| export

class Query(DataModel,Generic[DataModelT]):
    query: str
    data: List[DataModelT] = {}

    def __call__(
        self,
        database: Database,
        query: str = None 
    ):
        # allows optional query overwrite 
        query_to_run = query or getattr(self,'query')
        data = database.query_to_records(query_to_run)
        return self.model_validate(dict(query=query_to_run,data=data))

In [None]:
class State(BaseModel):
    """Name, ID and Total Population for a State"""
    model_config = ConfigDict(
        alias_generator=lambda x: x.replace('_',' ').title(),
        populate_by_name=True
    )
    
    name: str = Field(description='The Name of the State')
    state: str = Field(description='The ID of the State')
    total_population: int = Field(description='Total Population of the State')

In [None]:
first_row = db.query_to_records(
    "select * from states limit 2"
)[0]
State.model_validate(first_row)

State(name='Alabama', state='01', total_population=4858979)

In [None]:
preview_states = Query[State](
    query='select * from states limit 10'
)
result = preview_states(db)
result

Query[State](data=[State(name='Alabama', state='01', total_population=4858979), State(name='Alaska', state='02', total_population=738432), State(name='Arizona', state='04', total_population=6828065), State(name='Arkansas', state='05', total_population=2978204), State(name='California', state='06', total_population=39144818), State(name='Colorado', state='08', total_population=5456574), State(name='Connecticut', state='09', total_population=3590886), State(name='Delaware', state='10', total_population=945934), State(name='District of Columbia', state='11', total_population=672228), State(name='Florida', state='12', total_population=20271272)], query='select * from states limit 10')

In [None]:
result.to_dataframe(
    include={'name','total_population'},
    by_alias=True,
    
)

Unnamed: 0,Name,Total Population
0,Alabama,4858979
1,Alaska,738432
2,Arizona,6828065
3,Arkansas,2978204
4,California,39144818
5,Colorado,5456574
6,Connecticut,3590886
7,Delaware,945934
8,District of Columbia,672228
9,Florida,20271272


In [None]:
Query(query='select * from states limit 5')(db).data[0]

{'year': 2015, 'name': 'Alabama', 'geo_id': '04000US01', 'total_population': 4858979, 'white': 3204076, 'black': 1296681, 'hispanic': 192870, 'asian': 58918, 'american_indian': 19069, 'pacific_islander': 2566, 'other_race': 5590, 'median_age': 38.7, 'total_households': 1846390, 'owner_occupied_homes_median_value': 134100, 'per_capita_income': 44765, 'median_household_income': 44765, 'below_poverty_line': 876016, 'foreign_born_population': 169972, 'state': '01'}

In [None]:
from typing import ClassVar
import datetime as dt
from IPython.display import HTML,JSON
import json
import logging

In [None]:
class QueryStates(Query[State]):
    """Query States"""
    
    query: str = "select * from states"
    executed_at: dt.datetime = Field(default_factory=dt.datetime.now)

    def _repr_html_(self):
        
        df_html = self.to_dataframe()._repr_html_()
        schema = self.model_json_schema()
        html_fields = [
            f"<header><b>{schema_field}</b>: {schema[schema_field]}\n</header>"
            for schema_field in ['title','description']
        ]
        for field in self.model_fields.keys():
            if field!='data':
                html_fields.append(
                    f'<header><b>{field}</b>: {getattr(self,field)}</header>'
                )
        return ''.join(
            x for x in html_fields + ['<header><b>DataFrame</b>: </header>',df_html]
        )



In [None]:
JSON(QueryStates.model_json_schema())

<IPython.core.display.JSON object>

In [None]:
QueryStates()(db)

Unnamed: 0,name,state,total_population
0,Alabama,1,4858979
1,Alaska,2,738432
2,Arizona,4,6828065
3,Arkansas,5,2978204
4,California,6,39144818
5,Colorado,8,5456574
6,Connecticut,9,3590886
7,Delaware,10,945934
8,District of Columbia,11,672228
9,Florida,12,20271272


#### To Do: Allow for Query Descriptors

In [None]:
class ACSDatabase(Database):
    drivername='sqlite'
    database = 'acs.db'
    
    #get_states = Query[State](query='select * from states')    


# acs = ACSDatabase()
# acs.get_states()

## ORM Example

### SQL Alchemy Models

In [None]:
from sqlalchemy import ForeignKey
from sqlalchemy.orm import DeclarativeBase,Mapped, mapped_column,relationship



In [None]:

class Base(DeclarativeBase):
    year:  Mapped[int]
    name: Mapped[str]
    geo_id: Mapped[str]
    total_population: Mapped[int]
    white: Mapped[int]
    black: Mapped[int]
    hispanic: Mapped[int]
    asian: Mapped[int]
    american_indian: Mapped[int]
    pacific_islander: Mapped[int]
    other_race: Mapped[int]
    median_age: Mapped[int]
    total_households: Mapped[int]
    owner_occupied_homes_median_value: Mapped[int]
    per_capita_income: Mapped[int]
    median_household_income: Mapped[int]
    below_poverty_line: Mapped[int]
    foreign_born_population: Mapped[int]

class State(Base):
    __tablename__ = 'states'
    state: Mapped[str] = mapped_column(primary_key=True)
    total_population: Mapped[int]

    places: Mapped[List['Place']] = relationship(back_populates='state_')
    congressional_districts: Mapped[List['CongressionalDistrict']] = relationship(back_populates='state_')

class Place(Base):
    __tablename__ = 'places'
    place: Mapped[str] = mapped_column(primary_key=True)
    total_population: Mapped[int]
    state: Mapped[str] = mapped_column(ForeignKey("states.state"))
    
    state_: Mapped['State'] = relationship(back_populates='places')

class CongressionalDistrict(Base):
    __tablename__ = 'congressional_districts'
    
    congressional_district: Mapped[str] = mapped_column(primary_key=True)
    state: Mapped[str] = mapped_column(ForeignKey("states.state"))
    
    state_: Mapped['State'] = relationship(back_populates='congressional_districts')

In [None]:
with db.session_scope() as session:
    s = session.query(State).first()
    print(s,'\n')
    for place in s.places:
        print(place.name,'::',place.median_household_income)

<__main__.State object> 

Birmingham city, Alabama :: 32378
Dothan city, Alabama :: 44208
Hoover city, Alabama :: 77365
Huntsville city, Alabama :: 46769
Mobile city, Alabama :: 38678
Montgomery city, Alabama :: 41836
Tuscaloosa city, Alabama :: 44125


In [None]:
with db.session_scope() as session:
    result = session.query(State).limit(7).all()
    for state in result:
        print(
            state.name,
            len(state.places),
            len(state.congressional_districts)
        )

Alabama 7 7
Alaska 1 1
Arizona 16 9
Arkansas 6 4
California 137 53
Colorado 16 7
Connecticut 8 5


In [None]:
db.query_to_df("""select * from states""").head().columns

Index(['year', 'name', 'geo_id', 'total_population', 'white', 'black',
       'hispanic', 'asian', 'american_indian', 'pacific_islander',
       'other_race', 'median_age', 'total_households',
       'owner_occupied_homes_median_value', 'per_capita_income',
       'median_household_income', 'below_poverty_line',
       'foreign_born_population', 'state'],
      dtype='object')

## Pydantic Models

In [None]:
from pydantic import computed_field,field_validator

In [None]:
class ACSBase(BaseModel):
    model_config = ConfigDict(from_attributes=True)
    
    year: int = Field()
    name: str = Field()
    geo_id: str = Field()
    total_population: Optional[int] = Field(None)
    white: Optional[int] = Field(None)
    black: Optional[int] = Field(None)
    hispanic: Optional[int] = Field(None)
    asian: Optional[int] = Field(None)
    american_indian: Optional[int] = Field(None)
    pacific_islander: Optional[int] = Field(None)
    other_race: Optional[int] = Field(None)
    median_age: float = Field()
    total_households: Optional[int] = Field(None)
    owner_occupied_homes_median_value: int = Field()
    per_capita_income: int = Field()
    median_household_income: int = Field()
    below_poverty_line: Optional[int] = Field(None)
    foreign_born_population: Optional[int] = Field(None)    
    state: int = Field()

class PlaceModel(ACSBase):
    """A Model for a record from the 'places' table"""
    place: str

class CDModel(ACSBase):
    """A Model for a record from the 'congressional_districts' table"""
    congressional_district: str 

class StateModel(ACSBase):
    """A Model for a record from the 'states' table"""

    places: List[PlaceModel]
    congressional_districts: List[CDModel]
    
    @computed_field(return_type=float,title='People per District',)
    def avg_people_per_cd(self) -> float:
        return sum([cd.total_population for cd in self.congressional_districts]) / len(self.congressional_districts)


In [None]:
# mode = serialization includes computed fields
JSON(StateModel.model_json_schema(mode='serialization'))

<IPython.core.display.JSON object>

In [None]:
ACSDataModel = DataModel[StateModel]
JSON(ACSDataModel.model_json_schema(mode='serialization'))

<IPython.core.display.JSON object>

In [None]:
with db.session_scope() as session:
    orm_result = session.query(State).all()
    result = ACSDataModel(data=orm_result)

result

DataModel[StateModel](data=[StateModel(year=2015, name='Alabama', geo_id='04000US01', total_population=4858979, white=3204076, black=1296681, hispanic=192870, asian=58918, american_indian=19069, pacific_islander=2566, other_race=5590, median_age=38.7, total_households=1846390, owner_occupied_homes_median_value=134100, per_capita_income=44765, median_household_income=44765, below_poverty_line=876016, foreign_born_population=169972, state=1, places=[PlaceModel(year=2015, name='Birmingham city, Alabama', geo_id='16000US0107000', total_population=214911, white=None, black=None, hispanic=8940, asian=None, american_indian=None, pacific_islander=None, other_race=None, median_age=35.6, total_households=93467, owner_occupied_homes_median_value=93000, per_capita_income=32378, median_household_income=32378, below_poverty_line=60868, foreign_born_population=8258, state=1, place='07000'), PlaceModel(year=2015, name='Dothan city, Alabama', geo_id='16000US0121184', total_population=67536, white=None,

In [None]:
#| hide 
import nbdev; nbdev.nbdev_export()