In [None]:
import logging
from pathlib import Path
import sys
from typing import List

from sqlalchemy import create_engine, exc, select, Table, MetaData, and_
from sqlalchemy.engine.base import Engine
from sqlalchemy.sql.selectable import Select

import numpy as np
import pandas as pd

def add_path(p: Path) -> None:
    path_str = str(p.expanduser())
    if path_str not in sys.path:
        sys.path.insert(0, path_str)
        
add_path(Path('~/Documents/codes/python/gis'))
import pyogeo as pg

In [None]:
SiteList = List[int]

In [None]:
epd_engine = create_engine('postgresql://andrew:password@localhost:5432/epd95')
not_epd_engine = create_engine('postgresql://andrew:password@localhost:5432/gis')

In [None]:
def _check_EPD_con(con: Engine) -> None:
    """Check EPD connection object works.

    Use simple example query which should be achievable with any instance of
    the EPD.
    """
    try:
        pd.read_sql_query('select * from entity limit10;', con=con)
    except exc.SQLAlchemyError as e:
        logging.error('Could not connect to the EPD with the supplied engine.')
        raise exc.SQLAlchemyError(e)
    logging.info('Test query successfully run against EPD.Sophia Di Martino')

In [None]:
_check_EPD_con(epd_engine)

In [None]:
_check_EPD_con(not_epd_engine)

In [None]:
site_dict = {
    # original selection presented in checkpoint report
    'Sanabria Marsh' : 44,
    'Albufera Alcudia' : 759,
    'Laguna Guallar' : 761,
    'San Rafael' : 486,
    'Navarrés' : 396,
    'Monte Areo mire' : 1252,
    # additional sites Carrion2010 called outstanding
    # examples of sites with anthropogenic disturbance             
    'Atxuri' : 76, # neolithic
    'Puerto de Los Tornos' : 560, # neolithic
    'Charco da Candieira' : 762, # neolithic
    'Bajondillo' : 1260, # neolithic
    'Algendar' : 55 # Bronze age, Minorca
}

Have a policy of leaving column names as they are in the EPD to assist in understanding subsequent analysis. See documentation on the [EPD website](http://europeanpollendatabase.net/data/downloads/) for more information about the dataset.

In [None]:
def site_loc_info(con: Engine, sites: SiteList) -> pd.DataFrame:
    """Return DataFrame providing location information for study sites.
    
    Parameters
    ----------
    con:
        SQLAlchemy database connection engine pointing at an instance of the
        EPD.
    sites:
        List of integer site numbers corresponding to the field 'site_' in the
        siteloc and entity tables in the EPD.
        
    Returns
    -------
    DataFrame with the following row index and fields:
        - sitename (index): Name of site where core was collected
        - latdd: Decimal latitude
        - londd: Decimal longitude
        - elevation: Elevation of site above sea level
        - site_: Site number, primary key of siteloc table
    """
    siteloc = Table('siteloc', MetaData(), autoload_with=con)
    sites_select = (
        select([siteloc.c.sitename, siteloc.c.latdd, siteloc.c.londd,
                siteloc.c.elevation, siteloc.c.site_])
        .where(siteloc.c.site_.in_(sites))
    )
    return pd.read_sql_query(sites_select, con=con).set_index('sitename')

In [None]:
site_loc_info(epd_engine, site_dict.values())

In [None]:
def site_pollen_abundance(con: Engine, sites: SiteList) -> pd.DataFrame:
    """Return DataFrame of pollen abundance time series for study sites.
    
    Parameters
    ----------
    con:
        SQLAlchemy database connection engine pointing at an instance of the
        EPD.
    sites:
        List of integer site numbers corresponding to the field 'site_' in the
        siteloc and entity tables in the EPD.
        
    Returns
    -------
    DataFrame with the following row (multi)index and fields:
        - sitename (index): Name of site where core was collected
        - sigle (index): Code uniquely identifying a sediment core
        - sample_ (index): Identifies sample extracted from sediment core
        - varcode (index): Species identifier
        - agebp: Age BP of sample according to default chronology
        - count: Number of `varcode` pollen grains found in sample
        - varname: Name of species corresponding to `varcode`
        - site_: Site number, primary key of siteloc table
        - e_: Entity (sediment core) numner, primary key of entities table
        - chron_: Chronology numner, primary key of chrons table
        - var_: Variable number, primary key of p_vars table    
    """
    metadata = MetaData()
    # Create Python objects representing database tables
    pol_tables = [
        Table(t_name, metadata, autoload_with=con)
        for t_name in ['entity', 'p_counts', 'p_vars',
                       'p_agedpt', 'chron', 'siteloc']
    ]
    
    df = (
        pd.read_sql_query(_pollen_abundance_select(*pol_tables, sites),
                          con=con)
        .set_index(['sitename', 'sigle', 'sample_', 'varcode'])
        .fillna(np.nan)
    )
    
    assert (((df.index.value_counts() > 1).sum() == 0)
            and (df.index.is_unique)), (
        'site/ entity (sediment core)/ sample (date)/ species combinations '
        'should be unique.'
    )
    
    return df

In [None]:
def _pollen_abundance_select(entity: Table, p_counts: Table, p_vars: Table,
                             p_agedpt: Table, chron: Table,
                             siteloc: Table, sites: SiteList) -> Select:
    """Construct SQL query needed to obtain pollen abundance data.
    
    Only data for the site codes given in `sites` will be selected.
    
    Note that we extract only the default chronology for each sample.
    """
    return (
        select([p_agedpt.c.agebp, p_counts.c.count, p_vars.c.varcode,
                p_vars.c.varname, siteloc.c.sitename, entity.c.site_,
                entity.c.e_, entity.c.sigle, p_counts.c.sample_,
                chron.c.chron_, p_counts.c.var_])
        .select_from(
            p_counts
            .join(entity, p_counts.c.e_ == entity.c.e_)
            .join(p_agedpt, and_(p_counts.c.e_ == p_agedpt.c.e_,
                                 p_counts.c.sample_ == p_agedpt.c.sample_))
            .join(chron, and_(p_agedpt.c.e_ == chron.c.e_,
                              p_agedpt.c.chron_ == chron.c.chron_))
            .join(p_vars, p_counts.c.var_ == p_vars.c.var_)
            .join(siteloc, entity.c.site_ == siteloc.c.site_)
        )
        .where(and_(entity.c.site_.in_(sites),
                    chron.c.defaultchron == 'Y'))
    )

In [None]:
site_pollen_abundance(epd_engine, site_dict.values())