# Best Practices

In [None]:
import pandas as pd

In [None]:
activity_df = pd.read_json('data/activities.json', convert_dates=['start_date_local'])

## Some Simple and Useful DataFrame Methods

In [None]:
activity_df.describe()

In [None]:
activity_df.head()

In [None]:
activity_df.tail(3)

In [None]:
activity_df.sample(10)

In [None]:
(
    activity_df
    .sort_values('start_date_local', ascending=False)
    .head(10)
)

In [None]:
activity_df['type']

In [None]:
activity_df['type'].unique()

In [None]:
activity_df['type'].value_counts()

In [None]:
len(activity_df)

Q: in evaluation of the above expression, is any `DataFrame` method called?

## Separation of Concerns

In [None]:
def process_activities():
    activity_df = pd.read_json('data/activities.json', convert_dates=['start_date_local'])
    runs_sorted_by_date_df = (
        activity_df
        .loc[lambda df: df['type'] == 'Run']
        .sort_values('start_date_local')
    )
    runs_sorted_by_date_df.to_csv('data/processed_activities.csv')

In [None]:
process_activities()

**Discussion**:

- What if there are many of these processes, and the data source changes from .json file to database?
- How can we test the business logic without access to file system or database?
- How can this be improved?

In [None]:
# !rm data/processed_activities.csv

In [None]:
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Callable

In [None]:
class ActivitySource(ABC):
    @abstractmethod
    def load(self):
        pass

@dataclass
class JsonLocalActivitySource(ActivitySource):
    filename: str = 'data/activities.json'  # will usually come from a config file
    
    def load(self):
        return pd.read_json(self.filename, convert_dates=['start_date_local'])

@dataclass
class PostgressActivitySource(ActivitySource):
    db_connection: Any  # will usually come from a config file
    
    def load(self):
        # pseudo-code
        return db_connection.query('SELECT * FROM activities')

class ActivitySink(ABC):
    @abstractmethod
    def save(self, activity_df: pd.DataFrame):
        pass

@dataclass
class CsvLocalActivitySink(ActivitySink):
    filename: str = 'data/processed_activities.csv'  # will usually come from a config file
    
    def save(self, activity_df: pd.DataFrame):
        activity_df.to_csv(self.filename)

In [None]:
def select_runs_and_sort_by_date(activity_df: pd.DataFrame) -> pd.DataFrame:
    return (
        activity_df
        .loc[lambda df: df['type'] == 'Run']
        .sort_values('start_date_local')
    )

def process_activities(
    source: ActivitySource,
    processing_fn: Callable[[pd.DataFrame], pd.DataFrame],
    sink: ActivitySink
) -> None:
    sink.save(processing_fn(source.load()))

In [None]:
process_activities(JsonLocalActivitySource(), select_runs_and_sort_by_date, CsvLocalActivitySink())

TODO: architecture picture

Hint: for large/complex workflows, consider tools such as [Apache Airflow](https://airflow.apache.org/)

## Decluttering

In [None]:
activity_df['year'] = activity_df['start_date_local'].dt.year
activity_df.head()

https://pandas.pydata.org/pandas-docs/stable/user_guide/basics.html#dt-accessor
https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#time-date-components

In [None]:
activities_2018_df = activity_df.loc[activity_df['year'] == 2018]
activities_2018_by_type = activities_2018_df.groupby('type')

In [None]:
mean_velocity_by_type = activities_2018_by_type['velocity_mean'].mean()
mean_velocity_by_type

In [None]:
activity_df = pd.read_json('data/activities.json', convert_dates=['start_date_local'])
(
    activity_df
    .loc[lambda df: df['start_date_local'].dt.year == 2018]
    .groupby('type')['velocity_mean'].mean()
)

Introducing ... `Groupby.transform()`, TODO link to docs of agg/transform

In [None]:
(
    activity_df
    .loc[lambda df: df['start_date_local'].dt.year == 2018]
    .groupby('type')['velocity_mean'].transform('mean')
)

In [None]:
(
    activity_df
    .loc[lambda df: df['start_date_local'].dt.year == 2018]
    .assign(v_mean_diff=lambda df: (
        df.groupby('type')['velocity_mean']
        .transform(lambda type_v: type_v - type_v.mean())))
    .sample(10)
)

## Readable, Testable Pipelines

In [None]:
def is_non_warmup(activity_df):
    return (
        (activity_df['elapsed_time'] > 600) |
        (activity_df['heartrate_mean'] > 150)
    )

def select_non_warmup_runs(activity_df):
    return (
        activity_df
        .loc[is_non_warmup]
        .loc[lambda df: df['type'] == 'Run']
    )

def to_z_score(series):
    # What if series has a lenght of 1?
    return (series - series.mean()) / series.std()

def add_z_score(activity_df, column):
    return activity_df.assign(**{f'{column}_z': lambda df: to_z_score(df[column])})

def best_n_years(activity_df, metrics=['velocity_mean'], n_years=1):
    return (
        activity_df
        .assign(year=lambda df: df['start_date_local'].dt.year)
        .groupby('year')[metrics].mean()
        .sort_values(metrics, ascending=False)
        .head(n_years)
    )

In [None]:
(
    activity_df
    .pipe(select_non_warmup_runs)
    .pipe(add_z_score, column='velocity_mean')
    .pipe(best_n_years, metrics=['velocity_mean_z', 'velocity_mean'], n_years=5)
)

Q: Is this a reliable analysis? What can be improved?

## [Don't Repeat Yourself](https://en.wikipedia.org/wiki/Don%27t_repeat_yourself)

In [None]:
import logging
import time
from functools import wraps

In [None]:
logging.basicConfig(level=logging.DEBUG)  # You may need to restart the notebook to make this work
logger = logging.getLogger(__name__)

In [None]:
def df_info(df):
    if df is not None:
        column_info = ','.join([f'{col} ({df[col].isnull().sum()} missing)' for col in df])
        return f'rows: {len(df)}, columns: {column_info}'
    else:
        return '<None>'

def pandas_pipe_logging(func):
    @wraps(func)
    def logging_wrapper(*args, **kwargs):
        in_df = args[0]
        logger.debug(f'Calling pipeline function {func.__name__} with input {df_info(in_df)}')
        start = time.time()
        out = func(*args, **kwargs)
        stop = time.time()
        logger.debug(f'Returning dataframe with {df_info(out)}')
        logger.debug(f'Took {stop - start:.4f}s')
        return out

    return logging_wrapper