# Cleaning CDDB

This notebook is an annotated walkthrough of cleaning the Compact Disc Database (CDDB) dataset.

In [None]:
import logging

import pandas as pd
import pandera as pa

import clean_cddb
from clean_cddb.utils import get_failure_cases_summary_as_formatted_table, get_check_func_descriptions

def df_to_var(df, var_name):
    globals()[var_name] = df
    return df

pd.set_option("display.max_rows", 1000)
pd.set_option("display.max_columns", None)
pd.set_option("display.max_colwidth", None)

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(process)d - %(levelname)s - %(message)s",
)
filepath = "../data/input/cddb.tsv"
source_df = pd.read_csv(filepath, sep="\t", dtype="str", encoding='latin1')

## Apply validation checks (pandera schema) and review failure cases

In [None]:
try:
    validated_df = clean_cddb.schema(source_df, lazy=True)
    logging.info("Validation success. No failure cases detected.")
except pa.errors.SchemaErrors as err:
    logging.info("Validation failure. Failure cases detected.")
    logging.debug(err)
    failure_cases_df = err.failure_cases

failure_cases_df = failure_cases_df.pipe(get_check_func_descriptions, clean_cddb.schema)

`failure_cases_df`
* The `failure_cases_df` shows the name of the column, the check, failure case (example), and row index position of the failure case in the original data frame. 
* The index can support bulk operations such as joining and querying the original dataframe for failure cases or rejecting rows in the set of failure case indices.

In [None]:
(
    failure_cases_df
    .loc[
        :, ["schema_context", "column", "check", "failure_case", "index"]
    ]
    .sample(10, random_state=1)
)

### Summary of failure cases

Here we see aggregated counts of the number of failure cases for each validation check.

In [None]:
failure_cases_summary = (
    failure_cases_df.groupby(["column", "check"], as_index=False)
    .size()
    .sort_values(by=["column", "check"])
    .rename(columns={"size": "counts"})
)

failure_cases_summary

We also have a helper utility function to display the source code along side each check function name.

In [None]:
# Report summary counts
failure_cases_summary_table = get_failure_cases_summary_as_formatted_table(failure_cases_df)

print(failure_cases_summary_table)

# Cleaning step

We can use the same checks from the pandera validation schema to trigger cleaning actions such as:
* do nothing / ignore the value
* transform the value; e.g., replace value with a substitute (e.g., 'N/A')
* or reject the entire record

Here we apply several cleaning functions on the source_df via .pipe(Callable).
* Each function takes a dataframe and returns a dataframe, so we can chain together the cleaning operations like so.
* Later, we will 
  1. compare `source_df` and `clean_df` as a before/after check
  2. re-apply our validation checks (pandera schema) to the new `clean_df` to verify that our transformations improved our data quality

In [None]:
from clean_cddb.utils import log_df_change

clean_df = (
source_df

    .pipe(df_to_var, '_df_before').pipe(clean_cddb.clean_df_standardize_various_artists)
    .pipe(log_df_change, before_df=_df_before, operation_label="Cleaning with 'clean_cddb.clean_df_try_to_fix_encoding_errors' procedure")
        
    .pipe(df_to_var, '_df_before').pipe(clean_cddb.clean_df_try_to_fix_encoding_errors, "artist")
    .pipe(log_df_change, before_df=_df_before, operation_label="Cleaning with 'clean_cddb.clean_df_try_to_fix_encoding_errors' procedure")
    .pipe(df_to_var, 'clean_df_artist_transforms_only')
    
    .pipe(df_to_var, '_df_before').pipe(clean_cddb.clean_df_invalid_symbols)
    .pipe(log_df_change, before_df=_df_before, operation_label="Cleaning with 'clean_cddb.clean_df_invalid_symbols' procedure")
    
    .pipe(df_to_var, '_df_before').pipe(clean_cddb.clean_df_invalid_categories)
    .pipe(log_df_change, before_df=_df_before, operation_label="Cleaning with 'clean_cddb.clean_df_invalid_categories' procedure")
    
    .pipe(df_to_var, '_df_before').pipe(clean_cddb.clean_df_id_zero_padding)
    .pipe(log_df_change, before_df=_df_before, operation_label="Cleaning with 'clean_cddb.clean_df_id_zero_padding' procedure")
    
    .pipe(df_to_var, '_df_before').pipe(clean_cddb.clean_df_genre_invalid) 
    .pipe(log_df_change, before_df=_df_before, operation_label="Cleaning with 'clean_cddb.clean_df_genre_invalid' procedure")
        
    .pipe(df_to_var, '_df_before').pipe(clean_cddb.clean_df_year)
    .pipe(log_df_change, before_df=_df_before, operation_label="Cleaning with 'clean_cddb.clean_df_year' procedure")
    
    .pipe(df_to_var, '_df_before').pipe(clean_cddb.clean_df_title)
    .pipe(log_df_change, before_df=_df_before, operation_label="Cleaning with 'clean_cddb.clean_df_title' procedure")
    
    .pipe(df_to_var, '_df_before').pipe(clean_cddb.clean_df_genre_coalesce_with_category)
    .pipe(log_df_change, before_df=_df_before, operation_label="Cleaning with 'clean_cddb.clean_df_genre_coalesce_with_category' procedure")
    
    # Save an intermediate dataframe prior to dropping records
    # so we can compare with source_df later
    .pipe(df_to_var, 'clean_df_before_drops')
    
    # Drop rows with "REJECT_ROW*" prefix
    .query("~id.str.contains('REJECT_ROW')")    
    .drop(columns=['merged_values'])
)

Inspecting clean up on 'artist' field with (1) standardization to "Various" and (2) fixing encoding issues

In [None]:
comps_df_sample_markdown: str = (source_df
                                 .compare(clean_df_artist_transforms_only, result_names=('before', 'after'))
                                 .sample(5, random_state=0)
                                 .to_markdown()
                                 )

print("Example diffs between before-and-after")
print(comps_df_sample_markdown)

In [None]:
# apply schema to clean_df
try:
    validated_df = clean_cddb.schema(clean_df, lazy=True)
    logging.info("Validation success. No failure cases detected.")
except pa.errors.SchemaErrors as err:
    logging.info("Validation failure. Failure cases detected.")
    logging.debug(err)
    after_cleaning_failure_cases_df = err.failure_cases

after_cleaning_failure_cases_df = after_cleaning_failure_cases_df.pipe(
    get_check_func_descriptions, clean_cddb.schema
)

after_cleaning_failure_cases_summary = (
    after_cleaning_failure_cases_df.groupby(["column", "check"], as_index=False)
    .size()
    .sort_values(by=["column", "check"])
    .rename(columns={"size": "counts"})
)

# Evaluation

Counts of Failure Cases Before vs After Data Cleaning

In [None]:
(
failure_cases_summary
.merge(after_cleaning_failure_cases_summary, 
       on=['column', 'check'], 
       how='outer', 
       suffixes=['_before_cleaning', '_after_cleaning']
       )
.fillna('')
)

Comparing `source_df` and `clean_df`

* We will actually use an intermediate dataframe `clean_df_before_drops` that has the same dimensions as our original dataframe.
    * Prior to dropping dirty records, `clean_df_before_drops` has values over-written with a prefix "REJECT_RECORD".
    * This enables easier side by side comparison.
* The final output `clean_df` will actually omit records that we intend to drop.


In [None]:
source_df.head()

In [None]:
clean_df_before_drops.head()

In [None]:
pd.set_option('display.max_colwidth', 50)

comps_df = (
source_df.sort_index()
 .compare(clean_df_before_drops.sort_index(), result_names=('before_cleaning', 'after_cleaning'))
 .astype('object')
 .fillna('')
 )

# Show before/after comps

In [None]:
pd.set_option('display.max_colwidth', None)

columns_to_compare = ['artist', 'category', 'genre', 'title', 'tracks', 'year', 'id']

comps_df_formatted = (
comps_df
 .astype(str) 
 .stack()
 .reset_index()
 .rename(columns={'level_0': 'row_id', 'level_1': 'before_or_after'})
 .drop(columns=['merged_values'])
 .groupby(['row_id'], as_index=False)
    [columns_to_compare]
    .agg(lambda row: '  =>  '.join(row))
 .replace('^(  =>  )$', '', regex=True)
)

comps_df_formatted.sample(5, random_state=1)

In [None]:
print("Number of rows changed per column")

for column in clean_df.columns:
    n_rows_changed = comps_df[column][comps_df[column]['before_cleaning'] != comps_df[column]['after_cleaning']].shape[0]
    print(f"{column:.<15}{n_rows_changed}")

### Sample transformations

* Here we can see that we transform "Various Artists" and "<various>" to "Various". 
* We also fixed invalid characters converting text from "JÃ¶rg Hilbert & Felix Janosa" to "Jörg Hilbert & Felix Janosa".
* Later, we will do a more comprehensive before/after analysis after applying all the cleaning transformations

In [None]:
example_idxs = [7629, 1822, 117, 4129]
source_df.compare(clean_df_artist_transforms_only).loc[example_idxs, :].fillna(pd.NA)

In [None]:
comps_df.sample(10, random_state=3)

In [None]:
comps_df.sample(5, random_state=4)

Smaller before/after example
* Here we can see that we transform "Various Artists" and "<various>" to "Various". 
* We also fixed invalid characters converting text from "JÃ¶rg Hilbert & Felix Janosa" to "Jörg Hilbert & Felix Janosa".
* Later, we will do a more comprehensive before/after analysis after applying all the cleaning transformations

In [None]:
example_idxs = [7629, 1822, 117, 4129]
source_df.compare(clean_df_artist_transforms_only).loc[example_idxs, :].fillna(pd.NA)

### Sample transformations

* Here we can see that we transform "Various Artists" and "<various>" to "Various". 
* We also fixed invalid characters converting text from "JÃ¶rg Hilbert & Felix Janosa" to "Jörg Hilbert & Felix Janosa".
* Later, we will do a more comprehensive before/after analysis after applying all the cleaning transformations

In [None]:
example_idxs = [7629, 1822, 117, 4129]
source_df.compare(clean_df_artist_transforms_only).loc[example_idxs, :].fillna(pd.NA)

# Transform to track-level data

We want a separate track-level dataset that can be joined to the album-level data in `clean_df`.

In [None]:
# Transform to track-level data
track_level_df = (
    # Start with original dataframe
    clean_df
    # Split 'tracks' on pipe into an array; we can "explode" it later
    .pipe(lambda _df: _df.assign(tracks=_df["tracks"].str.split("|")))
    # "explode"/expand from "tracks" array in to 1 observation per track
    # perform a self-join to CD data set; the CD-level data will repeat for each track
    .pipe(
        lambda _df: _df.merge(
            _df["tracks"].explode(), left_index=True, right_index=True
        )
    )
    .pipe(df_to_var, "df_after_explode")
    # Make new 'tracks' field; strip ' ' empty space track names to '' empty string
    .pipe(lambda _df: _df.assign(tracks=_df["tracks_y"].str.strip()))
    # Don't need these fields anymore
    .drop(columns=["tracks_x", "tracks_y"])
    # Filter out empty string track names
    .query("tracks!=''")
    .pipe(df_to_var, "df_after_empty_track_name_filter")
    .reset_index(drop=True)
    .loc[:, ['id', 'tracks']]
    .reset_index()
    .rename(columns={'id': 'album_row_id', 
                    'index': 'track_id',
                    'tracks': 'track_name',
                    }
            )
)

In [None]:
# Demo joining track_level_df and clean_df
(track_level_df
 .head(15)
 .merge(clean_df, left_on=['album_row_id'], right_on=['id'], how='inner')
 .loc[:, ['album_row_id', 'track_id', 'title', 'track_name']]
)

# End