<a id='table_of_contents'></a>

0. [Import libraries](#imports)
1. [Import data](#import_data)
2. [Initial Cleaning](#initial_cleaning)
3. [Price and Quantity Cleaning](#price_and_quantity_cleaning)
4. [Data Checks](#data_checks)
5. [Export cleaned data](#export_data)

# 0. Import libraries <a id='imports'></a>
[Back to top](#table_of_contents)

In [None]:
%load_ext autoreload
%autoreload 2

from pathlib import Path
import json

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl

pd.set_option('display.max_columns', None)
mpl.rcParams['figure.dpi']= 300

# 1. Import raw data <a id='import_data'></a>
[Back to top](#table_of_contents)

In [None]:
# Read in raw data
data_dir = Path('../../data')
file_path = data_dir / 'interim' / '05052024_roast_review_openrefine.csv'
df_raw = pd.read_csv(file_path)

display(df_raw.info())
display(df_raw.sample(5))

# 2. Initial Cleaning <a id='initial_cleaning'></a>
[Back to top](#table_of_contents)

First step is to do some of the basic data checks and cleanup. This includes dropping columns that are not needed, setting datatypes, renaming columns,
combining columns, cleaning up strings, and creating new columns. 

In [None]:

def tweak_df(df: pd.DataFrame) -> pd.DataFrame:
    """Initial data tweaks"""
    return(
        df
        .assign(review_date = lambda df_: pd.to_datetime(df_['review_date'], format="%B %Y"),
                # Combing acidity and acidity/structure into one column, they are the same 
                # field but names used in reviews changed at one point
                acidity = lambda df_: df_['acidity'].fillna(df_['acidity/structure']),
                # Split the agtron column into one for external bean agtron data and ground 
                # bean agtron data
                agtron_external=lambda df_: pd.to_numeric(df_['agtron']
                                    .str.split('/', expand=True)[0]
                                    .str.strip(), errors='coerce'),
                agtron_ground = lambda df_: pd.to_numeric(df_['agtron']
                                        .str.split('/', expand=True)[1]
                                        .str.strip(), errors='coerce'
                                        )
        )
        .dropna(subset=['agtron_external', 'agtron_ground', 'acidity',
                        'review_date', 'est_price', 'coffee_origin',
                        'aroma', 'roast_level', 'aftertaste',]
        )
        .drop(columns=['with_milk', 'acidity/structure',])
        .astype({'acidity': 'float'})
        .replace('', np.nan)
        .replace('United States of America', 'USA')
        # Agtron values must be equalt to or below 100, some entries on website have typos 
        .loc[lambda df_: (df_['agtron_external'] <= 100) & (df_['agtron_ground'] <= 100), :]
        # Run str.strip on every string column
        .applymap(lambda x: x.strip() if isinstance(x, str) else x)
    )

df = df_raw.pipe(tweak_df)

df.info()


# 3. Price and Quantity Cleaning <a id='price_and_quantity_cleaning'></a>
[Back to top](#table_of_contents)

The `est_price` column contains information on price, currency, quantity and its unit of measurement. We will split these up into their own columns and clean them up.

We split on the "/" character to create one column with price and currency information and another with quantity and unit information and then further process these.

The quantities have to be cleaned so they contain a single representation for each unit and so unecessary punctuation and parentheses are removed. We filter the dataset to remove all products that came in units of cans, boxes, capusles, pods, etc. We will only concern ourselves with coffee sold in bags or bulk, ground or whole.

Regular expressions are used to separate the numerical and non-numerical characters from the quantity and price columns.


In [None]:
def price_quantity_split(df: pd.DataFrame) -> pd.DataFrame:
    price_quantity = (
        df
        # Split est_price into columns for price and quantity
        .est_price.str.split("/", n=1, expand=True)
        # Remove any commas from the price and quantity columns
        .replace(',', '', regex=True)
        .rename(columns={0: 'price', 1: 'quantity'})
        .assign(quantity = lambda df_: (df_['quantity']
                                        # Remove parentheses and anything inside them
                                        .str.replace(r"\(.*?\)", "", regex=True)
                                        # Remove anything after a semicolon. This is usually a note, or deal price.
                                        .str.replace(r";.*", "", regex=True)
                                        # Standardize units
                                        .str.replace(r".g$", " grams", regex=True)
                                        .str.replace(r"\sg$", "grams", regex=True)
                                        .str.replace(r"pound$", "1 pounds", regex=True)
                                        .str.replace(r"oz|onces|ounce$|ounces\*", "ounces", regex=True)
                                        # Remove "online" from any quantity
                                        .str.replace("online", "")
                                        .str.strip()
                                        )
            )
        .dropna()
        # Remove rows where coffee is sold in a can, box, pouch, packet, or tin
        .loc[lambda df_: ~df_['quantity'].str.contains('can|box|capsules|K-|cups|pods|pouch|packet|tin'), :]
        # Split quantity into value and unit, and split price into value and currency
        .assign(quantity_value = lambda df_: (df_['quantity']
                                              .str.extract(r'(\d+)')
                                              .astype(float)
                                              ),
                quantity_unit = lambda df_: (df_['quantity']
                                             .str.replace(r"(\d+)", "", regex=True)
                                             .replace("\.", "", regex=True)
                                             .str.strip()
                                             .mask(lambda s: s == 'g', 'grams')
                                             .str.strip()
                                             ),
                price_value = lambda df_: (df_['price']
                                           .str.extract(r'(\d+\.\d+|\d+)')
                                           .astype(float)
                                           ),
                price_currency = lambda df_: (df_['price']
                                              .str.replace(",", "")
                                              .str.replace(r'(\d+\.\d+|\d+)', '', regex=True)
                                              .str.strip()
                                              )
                )
        # Drop the original price and quantity columns
        .drop(columns=['price', 'quantity'])
    )
    # Merge the price_quantity DataFrame with the original DataFrame
    return df.merge(price_quantity, left_index=True, right_index=True)


df = (df_raw
      .pipe(tweak_df)
      .pipe(price_quantity_split)
      )

display(df.info())
display(df.loc[:, ['price_value', 'quantity_value']].describe())
display(df['price_currency'].value_counts())   
display(df['quantity_unit'].value_counts()) 

#### Cleaning Currencies

Normalize the currency column to contain a standardized set of currency symbols. We will use the ISO 4217 codes to make it easier to get foreign exchange data from an external API later on. 


In [None]:
def clean_currency(df: pd.DataFrame) -> pd.DataFrame:
    """Standardize currencies to ISO 4217 codes."""
    price_currency = (
        df.price_currency
        .str.upper()
        .str.replace(r'^\$$', 'USD', regex=True)
        .str.replace('PRICE: $', 'USD')
        .str.replace('$', '')
        .str.replace('#', 'GBP')
        .str.replace('¥', 'JPY')
        .str.replace('£', 'GBP')
        .str.replace('POUND', 'GBP')
        .str.replace('PESOS', 'MXN')
        .str.replace('RMB', 'CNY')
        .str.strip()
        .mask(lambda s: s == "US", "USD")
        .mask(lambda s: s == ' ', "USD")
        .mask(lambda s: s == 'E', 'EUR')
        .mask(lambda s: s == 'NTD', 'TWD')
        .mask(lambda s: s == 'NT', 'TWD')
        .mask(lambda s: s == '', 'USD')
        .mask(lambda s: s == 'HK', 'HKD')
        .str.strip()
    )
    return df.assign(price_currency=price_currency)

df = df_raw.pipe(tweak_df).pipe(price_quantity_split).pipe(clean_currency)
df.loc[:, ["est_price", "price_currency"]].groupby('price_currency').sample(3, replace=True)

### Converting prices to 2024 USD

1. Convert price to USD using historical exchange rates
2. Adjust price to 2024 USD using BLS consumer price index

In [None]:
def convert_row(row):
    date = row['review_date'].strftime('%Y-%m-%d')
    currency = row['price_currency']
    price = row['price_value']
    if currency == 'USD':
        return price
    else:
        return np.round(price / exchange_rates[date][currency], 2)
    
def convert_to_usd(df: pd.DataFrame) -> pd.DataFrame:
    df['price_value_usd_hist'] = df.apply(convert_row, axis=1)
    return df
    
# Read in exchange rates
with open(data_dir / 'external' / 'openex_exchange_rates.json') as f:
    exchange_rates = json.load(f)

df = (df_raw
      .pipe(tweak_df)
      .pipe(price_quantity_split)
      .pipe(clean_currency)
      .pipe(convert_to_usd)
      )

(
    df
    .loc[:, ['price_value','price_currency', 'price_value_usd_hist']]
    .groupby('price_currency')
).sample(3, replace= True)

In [None]:
def load_transform_cpi(file_path: Path) -> pd.DataFrame:
    """Loads and transforms the CPI data."""
    MONTH_MAP = {
    'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
    'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12
    }

    try:
        cpi = pd.read_csv(file_path, usecols=['Year',
                                              'Jan', 'Feb', 'Mar',
                                              'Apr', 'May', 'Jun',
                                              'Jul', 'Aug', 'Sep',
                                              'Oct', 'Nov', 'Dec'
                                              ]
                          )
    except FileNotFoundError:
        raise FileNotFoundError("CPI file is not found in the specified directory.")

    return (cpi
            .melt(id_vars='Year', var_name='Month', value_name='cpi')
            .assign(Month=lambda x: x['Month'].map(MONTH_MAP),
                    date=lambda x: pd.to_datetime(x[['Year', 'Month']].assign(day=1)))
            .dropna()
            .drop(columns=['Year', 'Month'])
            .rename(columns={'cpi': 'consumer_price_index'})
            .sort_values('date')
            .reset_index(drop=True)
           )
    
def create_cpi_adjusted_price(df: pd.DataFrame, file_path: Path, date: str='2024-01-01') -> pd.DataFrame:
    """
    Adjusts historical price data to 2024 prices using CPI data.
    """
    cpi = load_transform_cpi(file_path)
    cpi_baseline = cpi.loc[cpi['date'] == date, 'consumer_price_index'].values[0]
    
    return (df
            .merge(cpi, left_on="review_date", right_on="date")
            .drop(columns='date')
            .assign(price_usd_adj_2024=lambda df_: np.round(
                df_['price_value_usd_hist'] * cpi_baseline / df_['consumer_price_index'], 2)
                    )
            )

data_dir = Path('../../data')
cpi_path = data_dir / 'external' / 'consumer_price_index.csv'

df = (df_raw
      .pipe(tweak_df)
      .pipe(price_quantity_split)
      .pipe(clean_currency)
      .pipe(convert_to_usd)
      .pipe(create_cpi_adjusted_price, file_path=cpi_path)
      )

df.sample(3)    



In [None]:
(
    df.assign(price_diff=lambda df_: (df_['price_usd_adj_2024'] - df_['price_value_usd_hist'])/df_['price_usd_adj_2024'])
).plot(x='review_date',
       y='price_diff',
       title='% Price difference between adjusted and historical prices')


### Converting quantities to lbs

Create a normalized quantity column that converts all quantities to lbs.

In [None]:
def convert_to_lbs(df: pd.DataFrame) -> pd.DataFrame:
    to_lbs_conversion = {"ounces": 1/16, "pounds":1, "kilogram": 2.20462, "grams": 0.00220462}
    df['quantity_in_lbs'] = np.round(df['quantity_value'] * df['quantity_unit'].map(to_lbs_conversion), 2)
    return df

df =(df_raw
     .pipe(tweak_df)
     .pipe(price_quantity_split)
     .pipe(clean_currency)
     .pipe(convert_to_usd)
     .pipe(create_cpi_adjusted_price, file_path=cpi_path)
     .pipe(convert_to_lbs)
)

(df.loc[:, ['quantity_value', 'quantity_unit', 'quantity_in_lbs']]
        .groupby("quantity_unit")
        .sample(3, replace=True)
)

In [None]:
# Create a new column for price per pound
def price_per_lbs(df: pd.DataFrame) -> pd.DataFrame:
    df['price_usd_adj_2024_per_lb'] = np.round(df['price_usd_adj_2024'] / df['quantity_in_lbs'], 2)
    return df

df = (df_raw
      .pipe(tweak_df)
      .pipe(price_quantity_split)
      .pipe(clean_currency)
      .pipe(convert_to_usd)
      .pipe(create_cpi_adjusted_price, file_path=cpi_path)
      .pipe(convert_to_lbs)
      .pipe(price_per_lbs)
)

df.describe()

In [None]:
# list of US states
us_states_and_territories= [
    'Alabama', 'Alaska', 'Arizona', 'Arkansas', 'California',
    'Colorado', 'Connecticut', 'Delaware', 'Florida', 'Georgia',
    'Hawaii', 'Idaho', 'Illinois', 'Indiana', 'Iowa',
    'Kansas', 'Kentucky', 'Louisiana', 'Maine', 'Maryland',
    'Massachusetts', 'Michigan', 'Minnesota', 'Mississippi', 'Missouri',
    'Montana', 'Nebraska', 'Nevada', 'New Hampshire', 'New Jersey',
    'New Mexico', 'New York', 'North Carolina', 'North Dakota', 'Ohio',
    'Oklahoma', 'Oregon', 'Pennsylvania', 'Rhode Island', 'South Carolina',
    'South Dakota', 'Tennessee', 'Texas', 'Utah', 'Vermont',
    'Virginia', 'Washington', 'West Virginia', 'Wisconsin', 'Wyoming',
    'District of Columbia', 'Puerto Rico'
]

def create_us_state(row):
    if row['territorial_entity_2'] in us_states_and_territories:
        return row['territorial_entity_2']
    elif row['territorial_entity_1'] in us_states_and_territories:
        return row['territorial_entity_1']
    elif row['og_roaster_location'].split(",")[-1].strip() in us_states_and_territories:
        return row['og_roaster_location'].split(",")[-1].strip()
    else:
        return np.nan

def create_county_and_state_columns(df: pd.DataFrame) -> pd.DataFrame:
    df['roaster_county'] = np.where(df['territorial_entity_1'].str.contains('County', na=False),
                                    df['territorial_entity_1'],
                                    np.nan)
    df['roaster_us_state'] = df.apply(create_us_state, axis=1)
    return df

df = (df_raw
      .pipe(tweak_df)
      .pipe(price_quantity_split)
      .pipe(clean_currency)
      .pipe(convert_to_usd)
      .pipe(create_cpi_adjusted_price, file_path=cpi_path)
      .pipe(convert_to_lbs)
      .pipe(price_per_lbs)
      .pipe(create_county_and_state_columns) 
)

display(df.loc[df['roaster_country'] == 'USA', ['roaster_country', 'roaster_us_state', 'roaster_county']].info())

# 4. Data Checks <a id='data_checks'></a>
[Back to top](#table_of_contents)

In [None]:
df = (df_raw
        .pipe(tweak_df)
        .pipe(price_quantity_split)
        .pipe(clean_currency)
        .pipe(convert_to_usd)
        .pipe(create_cpi_adjusted_price, file_path=cpi_path)
        .pipe(convert_to_lbs)
        .pipe(price_per_lbs)
        .pipe(create_county_and_state_columns)
    )

df.info()

In [None]:
df_numeric = df.select_dtypes(include=['number']).drop(columns=['price_value'], axis=1)
len(df_numeric.columns)

In [None]:
fig, ax = plt.subplots(3, 5, figsize=(15, 10))

for i, col in enumerate(df_numeric.columns):
    df[col].plot(kind='hist', ax=ax[i//5, i%5], title=col, bins=20, edgecolor='black', alpha=0.7)
    
plt.tight_layout()
plt.show()

In [None]:
df[df['price_usd_adj_2024_per_lb'] < 200].price_usd_adj_2024_per_lb.hist(cumulative=True,
                                                                           density=True,
                                                                           edgecolor='black',
                                                                           alpha=0.7)
plt.title("Cumulative Histogram Price $USD/lbs")

In [None]:
# Checking countries
display(df.roaster_country.sort_values().unique())
display(df.coffee_origin_country.sort_values().unique())

In [None]:
# Checking roast level
df.roast_level.value_counts()

# 5. Export cleaned data <a id='export_data'></a>
[Back to top](#table_of_contents)

In [None]:
fout = data_dir / 'processed' / '05052024_roast_review_cleaned.csv'
df.to_csv(fout, index=False)