In [387]:
import os.path
import pandas as pd
import numpy as np
import timeit

In [388]:
import sys
# sys.path.append('/Users/sjohal/OneDrive/PycharmProjects/cafeday/bjss')

from data import input_data_dir, output_data_dir

# Helper functions

In [389]:
# timer decorator
import time


def _timeit(func):  # decorator function
    def wrapper(*args, **kwargs):  # wrapper function
        start = time.time()
        result = func(*args, **kwargs)  # call the function
        end = time.time()
        duration = round(end - start, 2)
        print(f'{duration} seconds')
        return result  # return the result of the function

    return wrapper


# test timer decorator
@_timeit
def test_timer():
    time.sleep(2)


# test_timer()

# CONSTANTS
DEBUG = True  # upper case to indicate a constant
SYSTEM = 'fitbit'
START_TIME = pd.to_datetime('now', utc=True)


def debug_print(*args):
    if DEBUG:
        print(*args)

# Logger

In [390]:
import logging
from logs import log_dir


class CustomFormatter(logging.Formatter):
    def format(self, record):
        if record.msg.startswith('IMPORTANT'):
            return f"********** {super().format(record)} **********"
        else:
            return super().format(record)


logging.basicConfig(filename=os.path.join(log_dir, 'etl.log'), level=logging.INFO)
# Add a stream handler to the root logger to output to console
console_handler = logging.StreamHandler()
console_handler.setFormatter(CustomFormatter())
console_handler.setLevel(logging.INFO)
logging.getLogger().addHandler(console_handler)

logger = logging.getLogger()

logger.info('IMPORTANT: First Line')

********** IMPORTANT: First Line **********
********** IMPORTANT: First Line **********
********** IMPORTANT: First Line **********
********** IMPORTANT: First Line **********
********** IMPORTANT: First Line **********
********** IMPORTANT: First Line **********
********** IMPORTANT: First Line **********
********** IMPORTANT: First Line **********
********** IMPORTANT: First Line **********
********** IMPORTANT: First Line **********
********** IMPORTANT: First Line **********


# EXTRACT

## EDA - Exploratory Data Analysis

In [391]:
# read from /data
# on_bad_lines example is when e.g. not expected shape (can instead use a callable to handle)
# https://stackoverflow.com/questions/33440805/pandas-dataframe-read-csv-on-bad-data
# no need to have with open as pd.read_csv is a context manager
input_file = os.path.join(input_data_dir, 'activity_data.csv')
input_file_name = os.path.basename(input_file)
input_file_name_without_ext = os.path.splitext(input_file_name)[0]

df = pd.read_csv(input_file, encoding_errors='strict', on_bad_lines='warn')


## Select statements

In [392]:
df[[col for col in df.columns if 'minutes' in col.lower()]]  # select columns based on name

# return a subset of the dataframe's columns 
df.select_dtypes(include=np.number)
df.select_dtypes('int')

df[df.columns[:3]]  # first 3 columns

Unnamed: 0,Id,Date,TotalSteps
0,6117666160,2016-04-20,19542.0
1,5577150313,2016-04-177,12231.0
2,8877689391,2016-04-16,29326.0
3,5577150313,2016-05-01,13368.0
4,5577150313,2016-04-30,12363.0
...,...,...,...
939,8583815059,2016-05-12,0.0
940,1503960366,2016-04-27,
941,8877689391,2016-05-12,8064.0
942,8877689391,2016-04-29,9733.0


## (i)loc filter expressions / slicing

In [393]:
# NB iloc uses index whereas loc uses column names and is ∴ more popular

df1 = pd.DataFrame([[1, 2], [4, 5], [7, 8]],
                   index=['cobra', 'viper', 'sidewinder'],
                   columns=['max_speed', 'shield'])
df1

Unnamed: 0,max_speed,shield
cobra,1,2
viper,4,5
sidewinder,7,8


In [394]:
# series of boolean values which has to be the same length as the index
s = pd.Series([False, True, False],
              index=['viper', 'sidewinder', 'cobra']  # index has to match the index of the dataframe
              )

df1.loc[s]  # returns the rows where s is True

Unnamed: 0,max_speed,shield
sidewinder,7,8


In [395]:
df.iloc[:1, :1]  # first row and first column
df.iloc[1:3, 0:1]  # rows 1 and 2 and column 0
df.iloc[:2]  # first 2 rows


Unnamed: 0,Id,Date,TotalSteps,TotalDistance,SedentaryMinutes,TotalActiveMinutes,Calories,yes_no
0,6117666160,2016-04-20,19542.0,15.01,579.0,324,4900,1
1,5577150313,2016-04-177,12231.0,9.14,525.0,396,4552,0


## Filter

In [396]:
df.loc[df['TotalActiveMinutes'] > 100]  # filter where TotalActiveMinutes > 100
df.loc[df['TotalActiveMinutes'] > 100, ['Date', 'TotalActiveMinutes']]  # filter and select columns
df.loc[:2, ['Id']]  # first 2 rows
# or use query
df.query('TotalActiveMinutes > 100')[['Date', 'TotalActiveMinutes']]
df.query('Calories > 1000 and TotalDistance > 10')[['Date', 'Calories', 'TotalDistance']]
df.query('TotalActiveMinutes == 0').shape[0]  # count where TotalActiveMinutes == 0
df.query('TotalSteps.isna()')

Unnamed: 0,Id,Date,TotalSteps,TotalDistance,SedentaryMinutes,TotalActiveMinutes,Calories,yes_no
9,8378563200,2016-04-14,,10.56,699.0,318,4163,0
461,1503960366,2016-04-27,,12.21,1108.0,332,2159,0
940,1503960366,2016-04-27,,12.21,1108.0,332,2159,1


## Aggregation

In [397]:
df['TotalActiveMinutes'].describe()
df[['TotalActiveMinutes', 'TotalDistance']].agg(['mean', 'std', 'min', 'max'])
# quantiles
df['TotalActiveMinutes'].quantile([0.25, 0.5, 0.75])  # 25th, 50th and 75th percentile

0.25    147.0
0.50    247.0
0.75    318.0
Name: TotalActiveMinutes, dtype: float64

## Rank, shift, cumsum, rolling, clip

In [398]:
# Rank the TotalActiveMinutes
# there are 83 tied values of 0, so default rank method is take the average of the 1st and last tie 
# ie 1 + 83 / 2 = 42

# chain methods
df.query('TotalActiveMinutes == 0'). \
    groupby('TotalActiveMinutes') \
    [['Id']]. \
    count()
# or use value_counts which is a shortcut ⭐️
df['TotalActiveMinutes'].value_counts()  # can be extended to include other columns

df['DenseRank'] = df[['TotalActiveMinutes']].rank(method='dense')  # ensures no gaps between ranks
df['DefaultRank'] = df[['TotalActiveMinutes']].rank()
df['FirstRank'] = df[['TotalActiveMinutes']].rank(method='first')  # rank in order of appearance
df[['TotalActiveMinutes', 'DefaultRank', 'DenseRank', 'FirstRank']]. \
    sort_values(by='TotalActiveMinutes') \
    .query('TotalActiveMinutes == 0')
# .reset_index(drop=True)


Unnamed: 0,TotalActiveMinutes,DefaultRank,DenseRank,FirstRank
518,0,42.0,1.0,8.0
357,0,42.0,1.0,6.0
523,0,42.0,1.0,12.0
522,0,42.0,1.0,11.0
772,0,42.0,1.0,53.0
...,...,...,...,...
599,0,42.0,1.0,37.0
693,0,42.0,1.0,42.0
911,0,42.0,1.0,76.0
587,0,42.0,1.0,25.0


In [399]:
# shift -  shift the column down by 1

df['TotalActiveMinutesShift'] = df['TotalActiveMinutes'].shift(1)  # shift the column down by 1
df['TotalActiveMinutesDiff'] = df['TotalActiveMinutes'] - df[
    'TotalActiveMinutesShift']  # calculate the difference
df[['TotalActiveMinutes', 'TotalActiveMinutesShift', 'TotalActiveMinutesDiff']].head()

Unnamed: 0,TotalActiveMinutes,TotalActiveMinutesShift,TotalActiveMinutesDiff
0,324,,
1,396,324.0,72.0
2,552,396.0,156.0
3,444,552.0,-108.0
4,415,444.0,-29.0


In [400]:
# cumsum - cumulative sum
df['TotalActiveMinutesCumSum'] = df['TotalActiveMinutes'].cumsum()
df[['TotalActiveMinutes', 'TotalActiveMinutesCumSum']].head()

Unnamed: 0,TotalActiveMinutes,TotalActiveMinutesCumSum
0,324,324
1,396,720
2,552,1272
3,444,1716
4,415,2131


In [401]:
# cummax - cumulative max (other related functions are cummin, cumprod)
df['TotalActiveMinutesCumMax'] = df['TotalActiveMinutes'].cummax()
df[['TotalActiveMinutes', 'TotalActiveMinutesCumMax']].value_counts()

TotalActiveMinutes  TotalActiveMinutesCumMax
0                   552                         83
258                 552                          9
335                 552                          8
242                 552                          7
268                 552                          7
                                                ..
76                  552                          1
72                  552                          1
71                  552                          1
70                  552                          1
552                 552                          1
Length: 367, dtype: int64

In [402]:
# rolling - moving average
df['TotalActiveMinutesRolling'] = df['TotalActiveMinutes'].rolling(window=2).mean()
df[['TotalActiveMinutes', 'TotalActiveMinutesRolling']].head()

Unnamed: 0,TotalActiveMinutes,TotalActiveMinutesRolling
0,324,
1,396,360.0
2,552,474.0
3,444,498.0
4,415,429.5


In [403]:
# clip - clip values to a threshold
# so any value below 0 is set to 0 and any value above 100 is set to 100
# useful for outliers
df['TotalActiveMinutesClip'] = df['TotalActiveMinutes'].clip(lower=0, upper=100)
df[['TotalActiveMinutes', 'TotalActiveMinutesClip']]

Unnamed: 0,TotalActiveMinutes,TotalActiveMinutesClip
0,324,100
1,396,100
2,552,100
3,444,100
4,415,100
...,...,...
939,0,0
940,332,100
941,161,100
942,233,100


## Group By and Aggregation

- group by a column and aggregate

In [404]:
# from docs for multi index
arrays = [['Falcon', 'Falcon', 'Parrot', 'Parrot'],
          ['Captive', 'Wild', 'Captive', 'Wild']]
index = pd.MultiIndex.from_arrays(arrays, names=('Animal', 'Type'))  # multi index
df_animal = pd.DataFrame({'Max Speed': [390., 350., 30., np.NAN]},
                  index=index)
df_animal

Unnamed: 0_level_0,Unnamed: 1_level_0,Max Speed
Animal,Type,Unnamed: 2_level_1
Falcon,Captive,390.0
Falcon,Wild,350.0
Parrot,Captive,30.0
Parrot,Wild,


In [405]:

# group by the first level of the multi index
df_animal.groupby(level=0).mean()
df_animal.groupby(level='Animal').mean()
df_animal.groupby(level='Type').mean()

# OR group by a column (NB. can use value_counts() to get the count)
df_agg = df_animal.groupby('Type').agg(['mean', 'std', 'min', 'max'])
# df_agg = df_animal.groupby('Type').agg(['mean', 'std', 'min', 'max']).dropna() # might need to drop NaNs
# df_agg.columns is a multi index so now join the column names in the multi index 
df_agg.columns = ['_'.join(col).replace(' ', '_') for col in df_agg.columns.values]
df_agg.sort_values(by='Max_Speed_mean', ascending=False).fillna(-999)

Unnamed: 0_level_0,Max_Speed_mean,Max_Speed_std,Max_Speed_min,Max_Speed_max
Type,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
Wild,350.0,-999.0,350.0,350.0
Captive,210.0,254.558441,30.0,390.0


### Check for Nulls / Missing data

In [406]:
# df.info() # can deduce from the non-null count vs the row count if there are any nulls
df['TotalSteps'].isna().sum()  # count the number of NaNs in TotalSteps
df['TotalSteps'].value_counts(dropna=False)  # count the number of NaNs in TotalSteps

df[df['TotalSteps'].isna()]  # show the rows with NaN in TotalSteps
df.isna().sum()  # count the number of NaNs in each column

Id                           0
Date                         0
TotalSteps                   3
TotalDistance                0
SedentaryMinutes             1
TotalActiveMinutes           0
Calories                     0
yes_no                       0
DenseRank                    0
DefaultRank                  0
FirstRank                    0
TotalActiveMinutesShift      1
TotalActiveMinutesDiff       1
TotalActiveMinutesCumSum     0
TotalActiveMinutesCumMax     0
TotalActiveMinutesRolling    1
TotalActiveMinutesClip       0
dtype: int64

In [407]:
## FILLNA

# s = df.isnull().any(axis='columns')
# df[s]
# replace NaN with -1
# df.fillna(-1, inplace=True)
# replace with mean
mean = df['TotalSteps'].mean()
print(f"{mean=}")
df_nan_filled = df.fillna(mean).copy()
df_nan_filled.query('TotalSteps == @mean')  # check that the mean has been replaced

# show the rows with -1
# df[df.eq(-1).any(axis=1)]

mean=7628.280552603614


Unnamed: 0,Id,Date,TotalSteps,TotalDistance,SedentaryMinutes,TotalActiveMinutes,Calories,yes_no,DenseRank,DefaultRank,FirstRank,TotalActiveMinutesShift,TotalActiveMinutesDiff,TotalActiveMinutesCumSum,TotalActiveMinutesCumMax,TotalActiveMinutesRolling,TotalActiveMinutesClip
9,8378563200,2016-04-14,7628.280553,10.56,699.0,318,4163,0,259.0,711.0,708.0,298.0,20.0,3912,552,308.0,100
461,1503960366,2016-04-27,7628.280553,12.21,1108.0,332,2159,0,272.0,758.5,759.0,370.0,-38.0,126802,552,351.0,100
940,1503960366,2016-04-27,7628.280553,12.21,1108.0,332,2159,1,272.0,758.5,760.0,0.0,332.0,214222,552,166.0,100


In [408]:
pd.to_datetime(df['Date'], errors='coerce')
# use query to filer where TotalSteps is NaN


0     2016-04-20
1            NaT
2     2016-04-16
3     2016-05-01
4     2016-04-30
         ...    
939   2016-05-12
940   2016-04-27
941   2016-05-12
942   2016-04-29
943          NaT
Name: Date, Length: 944, dtype: datetime64[ns]

### CHECKPOINT 1

- check the row count

In [409]:
# validate that the number of rows is as expected
_df_row_count = df.shape[0]

# count the number of rows in the source
with open(input_file) as f:
    _src_row_count = sum(1 for line in f) - 1  # -1 to exclude the header
    logger.info(f'Number of rows in source: {_src_row_count}')

assert _df_row_count == _src_row_count, 'Row count mismatch'  # assert that the row count is as expected

Number of rows in source: 944
Number of rows in source: 944
Number of rows in source: 944
Number of rows in source: 944
Number of rows in source: 944
Number of rows in source: 944
Number of rows in source: 944
Number of rows in source: 944
Number of rows in source: 944
Number of rows in source: 944
Number of rows in source: 944


In [410]:
df.head()  # first 5 rows
# df.shape # number of rows and columns
df.describe()  # summary statistics
# df.dtypes # data types for each column

logger.info(f'Number of rows in df: {_df_row_count}')


Number of rows in df: 944
Number of rows in df: 944
Number of rows in df: 944
Number of rows in df: 944
Number of rows in df: 944
Number of rows in df: 944
Number of rows in df: 944
Number of rows in df: 944
Number of rows in df: 944
Number of rows in df: 944
Number of rows in df: 944


In [411]:
# create a copy of df without rows ie same number of cols
df_dq_issues = df.head(0)
df_dq_column_name = '_dq_reason'
df_dq_issues.insert(0, df_dq_column_name, '')


In [412]:
# log each column and dtype using logger
logger.info('>>> Columns and data types')
for col in df.columns:
    logger.info(f'{col} - {df[col].dtype}')
logger.info('<<< Columns and data types')



>>> Columns and data types
>>> Columns and data types
>>> Columns and data types
>>> Columns and data types
>>> Columns and data types
>>> Columns and data types
>>> Columns and data types
>>> Columns and data types
>>> Columns and data types
>>> Columns and data types
>>> Columns and data types
Id - int64
Id - int64
Id - int64
Id - int64
Id - int64
Id - int64
Id - int64
Id - int64
Id - int64
Id - int64
Id - int64
Date - object
Date - object
Date - object
Date - object
Date - object
Date - object
Date - object
Date - object
Date - object
Date - object
Date - object
TotalSteps - float64
TotalSteps - float64
TotalSteps - float64
TotalSteps - float64
TotalSteps - float64
TotalSteps - float64
TotalSteps - float64
TotalSteps - float64
TotalSteps - float64
TotalSteps - float64
TotalSteps - float64
TotalDistance - float64
TotalDistance - float64
TotalDistance - float64
TotalDistance - float64
TotalDistance - float64
TotalDistance - float64
TotalDistance - float64
TotalDistance - float64
Total

## Get PK - Primary Key

In [413]:
# Log start of extract
logger.info(f'EXTRACT STARTED at {pd.to_datetime("now")}')

  logger.info(f'EXTRACT STARTED at {pd.to_datetime("now")}')
EXTRACT STARTED at 2024-04-19 15:25:35.677738
EXTRACT STARTED at 2024-04-19 15:25:35.677738
EXTRACT STARTED at 2024-04-19 15:25:35.677738
EXTRACT STARTED at 2024-04-19 15:25:35.677738
EXTRACT STARTED at 2024-04-19 15:25:35.677738
EXTRACT STARTED at 2024-04-19 15:25:35.677738
EXTRACT STARTED at 2024-04-19 15:25:35.677738
EXTRACT STARTED at 2024-04-19 15:25:35.677738
EXTRACT STARTED at 2024-04-19 15:25:35.677738
EXTRACT STARTED at 2024-04-19 15:25:35.677738
EXTRACT STARTED at 2024-04-19 15:25:35.677738


In [414]:

# check for primary key candidate by iterating over each column
pk_candidate = [col for col in df.columns if df[col].nunique() == _df_row_count]  # ⭐️
pk_not_candidate = [col for col in df.columns if df[col].nunique() != _df_row_count]

# if no primary key candidate, create a surrogate key
if len(pk_candidate) == 0:
    print('Adding a surrogate key')
    df.insert(0, '_id', range(1, 1 + _df_row_count))

logger.info(f'PKs {pk_candidate}')
logger.info(f'NOT PKs {pk_not_candidate}')
df.head()

PKs ['FirstRank']
PKs ['FirstRank']
PKs ['FirstRank']
PKs ['FirstRank']
PKs ['FirstRank']
PKs ['FirstRank']
PKs ['FirstRank']
PKs ['FirstRank']
PKs ['FirstRank']
PKs ['FirstRank']
PKs ['FirstRank']
NOT PKs ['Id', 'Date', 'TotalSteps', 'TotalDistance', 'SedentaryMinutes', 'TotalActiveMinutes', 'Calories', 'yes_no', 'DenseRank', 'DefaultRank', 'TotalActiveMinutesShift', 'TotalActiveMinutesDiff', 'TotalActiveMinutesCumSum', 'TotalActiveMinutesCumMax', 'TotalActiveMinutesRolling', 'TotalActiveMinutesClip']
NOT PKs ['Id', 'Date', 'TotalSteps', 'TotalDistance', 'SedentaryMinutes', 'TotalActiveMinutes', 'Calories', 'yes_no', 'DenseRank', 'DefaultRank', 'TotalActiveMinutesShift', 'TotalActiveMinutesDiff', 'TotalActiveMinutesCumSum', 'TotalActiveMinutesCumMax', 'TotalActiveMinutesRolling', 'TotalActiveMinutesClip']
NOT PKs ['Id', 'Date', 'TotalSteps', 'TotalDistance', 'SedentaryMinutes', 'TotalActiveMinutes', 'Calories', 'yes_no', 'DenseRank', 'DefaultRank', 'TotalActiveMinutesShift', 'TotalAct

Unnamed: 0,Id,Date,TotalSteps,TotalDistance,SedentaryMinutes,TotalActiveMinutes,Calories,yes_no,DenseRank,DefaultRank,FirstRank,TotalActiveMinutesShift,TotalActiveMinutesDiff,TotalActiveMinutesCumSum,TotalActiveMinutesCumMax,TotalActiveMinutesRolling,TotalActiveMinutesClip
0,6117666160,2016-04-20,19542.0,15.01,579.0,324,4900,1,264.0,732.0,729.0,,,324,324,,100
1,5577150313,2016-04-177,12231.0,9.14,525.0,396,4552,0,325.0,890.0,889.0,324.0,72.0,720,396,360.0,100
2,8877689391,2016-04-16,29326.0,25.290001,888.0,552,4547,1,365.0,944.0,944.0,396.0,156.0,1272,552,474.0,100
3,5577150313,2016-05-01,13368.0,9.99,499.0,444,4546,0,347.0,926.0,926.0,552.0,-108.0,1716,552,498.0,100
4,5577150313,2016-04-30,12363.0,9.24,621.0,415,4501,1,338.0,913.0,913.0,444.0,-29.0,2131,552,429.5,100


## Check for Data Quality

- 1. Invalid data types
- 2. Missing Values: `df[df.isnull().any(axis=1)]`
- 3. Duplicates

In [415]:
# empty df_dq_issues
# df_dq_issues = df.head(0)

### Invalid Data Types

In [416]:
# validate that Date is a datetime

# df.astype({'Date': 'datetime64[ns]'})


In [417]:
# check for invalid dates
def check_date_cast(column: pd.Series):
    try:
        pd.to_datetime(column)
        return False
    except ValueError:
        return True


invalid_dates = df['Date'].apply(check_date_cast)  # apply function to each row
df_invalid_dates = df[invalid_dates]  # apply boolean indexing
df_invalid_dates.insert(0, df_dq_column_name, 'Invalid date')

print(df_invalid_dates.index.to_list())
logger.info(f'DQ::records with invalid dates {df_invalid_dates.index.to_list()}')
df_invalid_dates

DQ::records with invalid dates [1, 8, 943]
DQ::records with invalid dates [1, 8, 943]
DQ::records with invalid dates [1, 8, 943]
DQ::records with invalid dates [1, 8, 943]
DQ::records with invalid dates [1, 8, 943]
DQ::records with invalid dates [1, 8, 943]
DQ::records with invalid dates [1, 8, 943]
DQ::records with invalid dates [1, 8, 943]
DQ::records with invalid dates [1, 8, 943]
DQ::records with invalid dates [1, 8, 943]
DQ::records with invalid dates [1, 8, 943]


[1, 8, 943]


Unnamed: 0,_dq_reason,Id,Date,TotalSteps,TotalDistance,SedentaryMinutes,TotalActiveMinutes,Calories,yes_no,DenseRank,DefaultRank,FirstRank,TotalActiveMinutesShift,TotalActiveMinutesDiff,TotalActiveMinutesCumSum,TotalActiveMinutesCumMax,TotalActiveMinutesRolling,TotalActiveMinutesClip
1,Invalid date,5577150313,2016-04-177,12231.0,9.14,525.0,396,4552,0,325.0,890.0,889.0,324.0,72.0,720,396,360.0,100
8,Invalid date,8378563200,2016-04-211,15148.0,12.01,677.0,298,4236,1,240.0,646.0,644.0,398.0,-100.0,3594,552,348.0,100
943,Invalid date,5577150313,2016-04-177,12231.0,9.14,525.0,396,4552,0,325.0,890.0,891.0,233.0,163.0,215012,552,314.5,100


In [418]:
# add invalid dates to df_dq_issues and update _dq_reason
df_dq_issues = pd.concat([df_invalid_dates, df_dq_issues])
df_dq_issues


Unnamed: 0,_dq_reason,Id,Date,TotalSteps,TotalDistance,SedentaryMinutes,TotalActiveMinutes,Calories,yes_no,DenseRank,DefaultRank,FirstRank,TotalActiveMinutesShift,TotalActiveMinutesDiff,TotalActiveMinutesCumSum,TotalActiveMinutesCumMax,TotalActiveMinutesRolling,TotalActiveMinutesClip
1,Invalid date,5577150313,2016-04-177,12231.0,9.14,525.0,396,4552,0,325.0,890.0,889.0,324.0,72.0,720,396,360.0,100
8,Invalid date,8378563200,2016-04-211,15148.0,12.01,677.0,298,4236,1,240.0,646.0,644.0,398.0,-100.0,3594,552,348.0,100
943,Invalid date,5577150313,2016-04-177,12231.0,9.14,525.0,396,4552,0,325.0,890.0,891.0,233.0,163.0,215012,552,314.5,100


### Missing Values

Breakdown:  
- `df.isnull().any(axis=1)` returns a *boolean Series* that is True for each row in df that contains at least one null value and False otherwise. The axis=1 parameter means that the operation is performed across columns (i.e., for each row).    
- Finally, `df[df.isnull().any(axis=1)]` uses boolean indexing to select only the rows in df for which the Series is True. This results in a new DataFrame that only includes the rows from df that do not have any null values.
- `df[df.isnull().any(axis=1)].index.to_list()` returns the index as a list
- Once stored in a list e.g. `idx_null_recs` then use it `df.loc[_idx_null_recs]` to return the rows with missing values

NB. Could use the inverse `df[~df.isnull().any(axis=1)]` to get rows without missing values

#### DropNA

In [419]:
# drop rows with missing values
df_nan = df.dropna()
df_nan.shape

# instead fill missing values with 'XXX' if string or 0 if numeric
df.fillna('xxxx', inplace=True)

# check where any column has 0 in it
df[df.eq('xxxx').any(axis=1)]





Unnamed: 0,Id,Date,TotalSteps,TotalDistance,SedentaryMinutes,TotalActiveMinutes,Calories,yes_no,DenseRank,DefaultRank,FirstRank,TotalActiveMinutesShift,TotalActiveMinutesDiff,TotalActiveMinutesCumSum,TotalActiveMinutesCumMax,TotalActiveMinutesRolling,TotalActiveMinutesClip
0,6117666160,2016-04-20,19542.0,15.01,579.0,324,4900,1,264.0,732.0,729.0,xxxx,xxxx,324,324,xxxx,100
9,8378563200,2016-04-14,xxxx,10.56,699.0,318,4163,0,259.0,711.0,708.0,298.0,20.0,3912,552,308.0,100
461,1503960366,2016-04-27,xxxx,12.21,1108.0,332,2159,0,272.0,758.5,759.0,370.0,-38.0,126802,552,351.0,100
936,1503960366,2016-05-12,0.0,0.0,xxxx,0,0,1,1.0,42.0,80.0,13.0,-13.0,213890,552,6.5,0
940,1503960366,2016-04-27,xxxx,12.21,1108.0,332,2159,1,272.0,758.5,760.0,0.0,332.0,214222,552,166.0,100


#### Instead trap missing values

In [420]:
# create a dataframe to store rows with missing values for those columns in null_cols
missing_values = df[df.isnull().any(axis=1)]  # see above explanation
missing_values.insert(0, df_dq_column_name, 'Missing values')
# missing_values.index.to_list()

# add missing values to df_dq_issues
df_dq_issues = pd.concat([df_dq_issues, missing_values])

logger.info(f'DQ::Records with missing NaN values: {missing_values.index.to_list()}')
df_dq_issues

DQ::Records with missing NaN values: []
DQ::Records with missing NaN values: []
DQ::Records with missing NaN values: []
DQ::Records with missing NaN values: []
DQ::Records with missing NaN values: []
DQ::Records with missing NaN values: []
DQ::Records with missing NaN values: []
DQ::Records with missing NaN values: []
DQ::Records with missing NaN values: []
DQ::Records with missing NaN values: []
DQ::Records with missing NaN values: []


Unnamed: 0,_dq_reason,Id,Date,TotalSteps,TotalDistance,SedentaryMinutes,TotalActiveMinutes,Calories,yes_no,DenseRank,DefaultRank,FirstRank,TotalActiveMinutesShift,TotalActiveMinutesDiff,TotalActiveMinutesCumSum,TotalActiveMinutesCumMax,TotalActiveMinutesRolling,TotalActiveMinutesClip
1,Invalid date,5577150313,2016-04-177,12231.0,9.14,525.0,396,4552,0,325.0,890.0,889.0,324.0,72.0,720,396,360.0,100
8,Invalid date,8378563200,2016-04-211,15148.0,12.01,677.0,298,4236,1,240.0,646.0,644.0,398.0,-100.0,3594,552,348.0,100
943,Invalid date,5577150313,2016-04-177,12231.0,9.14,525.0,396,4552,0,325.0,890.0,891.0,233.0,163.0,215012,552,314.5,100


### Duplicates

- ignore any derived columns that begin with `_`
- `df.loc[:, df.columns != '_id'].duplicated(keep=False)` returns a boolean Series that is True for every row that is a duplicate. 
- By using this Series to index into df, we get a new DataFrame that only contains the duplicate rows. 
- The `keep=False` argument means that all duplicates are marked as True, not just the second and subsequent occurrences.


In [421]:
df.duplicated(keep=False)

0      False
1      False
2      False
3      False
4      False
       ...  
939    False
940    False
941    False
942    False
943    False
Length: 944, dtype: bool

In [None]:
# check for duplicates and ignore column: _id
columns_to_sort = [col for col in df.columns if col != '_id']  # ⭐️

df_dq_issue_duplicated = df[df.loc[:, df.columns != '_id'].duplicated(keep=False)].sort_values(
    by=list(columns_to_sort)).copy()
df_dq_issue_duplicated.insert(0, df_dq_column_name, 'Duplicated rows')

# add df_dq_issue_duplicated to df_dq_issues
df_dq_issues = pd.concat([df_dq_issues, df_dq_issue_duplicated])  # ⭐️

logger.info(f'DQ::Records with duplicates: {df_dq_issue_duplicated.index.to_list()}')
df_dq_issues

## Extract Clean DF

- Remove rows from df using df_dq_issues 
- Group by example: `df_dq_issues.groupby(by='_dq_reason')['_dq_reason'].count()`
- Outer join example: `df_temp = pd.merge(df, df_dq_issues, how='outer', indicator=True)`
- Get distinct values: `df_dq_issues['_id'].unique()`

### DQ Report

In [None]:
# how many per DQ group
df_dq_issues.groupby(by='_dq_reason')['_dq_reason'].count()
_dq_reason_count = df_dq_issues.groupby(by='_dq_reason')['_dq_reason'].count()

# log that per row
for reason, count in _dq_reason_count.items():
    logger.info(f'DQ::{reason}::{count}')


### Build Clean DF

In [None]:
# get distinct _id values
_dq_ids = df_dq_issues['_id'].unique()
_dq_row_count = len(_dq_ids)

# https://stackoverflow.com/questions/39880627/in-pandas-how-to-delete-rows-from-a-data-frame-based-on-another-data-frame
# use a database style join ⭐️ 
df_temp = pd.merge(df, df_dq_issues, how='outer', indicator=True)
df_temp.groupby(by='_merge', observed=False)['_merge'].count()

# filter using a scalar ie where clause
df_clean = df_temp.loc[df_temp['_merge'] == 'left_only'].copy()
df_clean.drop(columns=['_dq_reason', '_merge'], inplace=True)
# df_extract.columns.to_list()

### CHECKPOINT 2

In [None]:
# assert counts match
assert _df_row_count == _dq_row_count + df_clean.shape[0], 'Row count mismatch'

In [None]:
# log
logger.info(f'Extract::Source row count: {_df_row_count}')
logger.info(f'Extract::DQ row count: {_dq_row_count}')
logger.info(f'Extract::Clean row count: {df_clean.shape[0]}')

logger.info(f'EXTRACT COMPLETE at {pd.to_datetime("now")}')


# TRANSFORM

- add runtime metrics (system_id, run_datetime)
- Normalise truth values `df_extract['yes_no'] = df_extract['yes_no'].apply(lambda x: True if x == 1 else False)`
- derive new columns `df_extract.insert(0, '_avgdistanceperstep', df_extract['TotalDistance'] / df_extract['TotalSteps'])`
- Conform dates to UTC where there's time element ❌

In [None]:
logger.info(f'TRANSFORM STARTED at {pd.to_datetime("now")}')

## Cast Data Types

- convert any columns to datetime. Once converted we can then perform date operations and do timeseries analysis

In [None]:
# store the data types before transformation
df_clean_dtypes_b4 = df_clean.dtypes

# instead create some methods to do these 3 conversions
# df_clean['Date'] = pd.to_datetime(df_clean['Date'])
a = df_clean[pd.to_datetime(df_clean['Date'], errors='coerce').notnull()]
a.shape
# df_clean = df_clean.astype({'SedentaryMinutes': 'int64', 'TotalActiveMinutes': 'int64'}) 

# df_clean_dtypes_after = df_clean.dtypes

### CHECKPOINT 3 - Data Types

In [None]:
# loop through the columns and print the data types before and after
for col in df_clean_dtypes_b4.index:
    if df_clean_dtypes_b4[col] != df_clean_dtypes_after[col]:
        logger.info(f'CAST::{col}::{df_clean_dtypes_b4[col]}::{df_clean_dtypes_after[col]}')


## Add Runtime Metrics

In [None]:
# add runtime metrics
_extract_datetime = pd.to_datetime('now')
df_clean.insert(0, '_system_name', SYSTEM)
df_clean.insert(1, '_extract_datetime', _extract_datetime)

## Normalise Truth Values

In [None]:
# first check the distribution of truth values
df_clean.groupby('yes_no')['yes_no'].count().sort_values(ascending=False).rename('count')

In [None]:
# normalise truth values
df_clean['yes_no'] = df_clean['yes_no'].apply(lambda x: True if x == 1 else False)
# check the distribution of truth values
df_clean.groupby('yes_no')['yes_no'].count().sort_values(ascending=False).rename('count')

## Derive New Columns

In [None]:
# derive average distance per steps and put next to where column is TotalDistance
df_clean.insert(df_clean.columns.get_loc('TotalDistance') + 1, '_avgdistanceperstep',
                df_clean['TotalDistance'] / df_clean['TotalSteps'])
df_clean

## Dates

### Timeseries Analysis

In [None]:
# Date aggregates
df_clean.aggregate(
    {
        'Date': ['max', 'min']
    }
)

df_clean['Date'].max() - df_clean['Date'].min()  # pandas.Timedelta object
# use the dt. accessor to derive date parts
# https://pandas.pydata.org/docs/reference/api/pandas.Series.dt.date.html

# df_clean['Date'].dt.year
# df_clean['Date'].dt.month.unique()
# df_clean['Date'].dt.day
# df_clean['Date'].dt.weekday # (with Monday=0 and Sunday=6)
# df_clean['Date'].dt.day_name()

### Plotting

- use a log scale because it helps to see the differences

In [None]:
# group by day and get the mean calories
df_clean_groupby = df_clean.groupby(
    (
        df_clean['Date'].dt.day_name()
    )
)['Calories'].mean().sort_values(ascending=False).rename('mean_calories')

# plot using a log scale because it helps to see the differences
df_clean_groupby.plot(kind='bar', logy=True)

### Set date as the index

In [None]:
# set date as the index
# https://pandas.pydata.org/docs/getting_started/intro_tutorials/09_timeseries.html#datetime-as-index
# https://pandas.pydata.org/docs/user_guide/timeseries.html#timeseries-datetimeindex (
# df_clean.set_index('Date', inplace=True)
df_clean.index.day_name()

#  get for where dayname is Saturday
df_clean[df_clean.index.day_name() == 'Saturday']

# get where date is 2016-04-30
df_clean[df_clean.index == '2016-04-30']
# df_clean.loc['2016-04-30']


### Resample

- akin to groupby but for dates ie rolling up to a higher level

In [None]:
# resample by month
df_clean['Calories'].resample('ME').mean()

# resample by dayname of the week
df_clean['Calories'].resample('D').mean()

In [None]:
logger.info(f'TRANSFORM COMPLETE at {pd.to_datetime("now")}')
df_clean.shape

# LOAD

## Write to a local sqlite database

In [None]:
# write df to a local sqlite database
from sqlalchemy import create_engine

# create a connection to the database using output_data_dir
db_path = os.path.join(output_data_dir, f'{SYSTEM}.db')
engine = create_engine(f'sqlite:///{db_path}', echo=False)

In [None]:
# write to the database and handle errors
try:
    df_clean.to_sql(input_file_name_without_ext, con=engine, if_exists='replace', index=False)
except Exception as e:
    logger.error(f'LOAD::Error::{e}')

## Read from the database

In [None]:
# read from the database and handle errors
df_db = pd.read_sql(input_file_name_without_ext, con=engine)
df_db.head()

## CHECKPOINT 4

In [None]:
import sqlite3

query_string = f'SELECT COUNT(*) FROM {input_file_name_without_ext}'

# Establish a connection to the SQLite database 
# use the with statement to handle the connection
with sqlite3.connect(db_path) as connection:
    cursor = connection.cursor()
    cursor.execute(query_string)
    _db_row_count = cursor.fetchone()[0]

logger.info(f'LOAD::Row count in database: {_db_row_count}')
logger.info(f'LOAD::Row count in df_clean: {df_clean.shape[0]}')

assert df_clean.shape[0] == _db_row_count, 'Row count mismatch'

In [None]:
logger.info(f'ETL LOAD COMPLETE at {pd.to_datetime("now")}')

# Extras

## Other reads