In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
pd.set_option('display.max_columns', 500)
import warnings
import re
warnings.filterwarnings("ignore")
from tqdm import tqdm
import traceback
import itertools
import multiprocessing as mp

In [3]:
liquor_sample = pd.read_csv('../data/Iowa_Liquor_Sales.csv', parse_dates = ['Date'], nrows=5000)

In [4]:
liquor_sample.head()

Unnamed: 0,Invoice/Item Number,Date,Store Number,Store Name,Address,City,Zip Code,County Number,County,Store Location,Category,Category Name,Vendor Number,Vendor Name,Item Number,Item Description,Pack,Bottle Volume (ml),State Bottle Cost,State Bottle Retail,Bottles Sold,Sale (Dollars),Volume Sold (Liters),Volume Sold (Gallons)
0,S13560300005,2013-07-25,4349,Southside Food Mart,1101 ARMY POST RD,DES MOINES,50315,77.0,Polk,1101 ARMY POST RD\nDES MOINES 50315\n(41.52650...,1031080,VODKA 80 PROOF,260,Diageo Americas,37993,Smirnoff Vodka 80 Prf,48,200,$2.46,$3.69,48,$177.12,9.6,2.54
1,S08102400013,2012-10-03,4202,Fareway Stores #829 / Sioux City,4267 SERGEANT RD,SIOUX CITY,51106,97.0,Woodbury,4267 SERGEANT RD\nSIOUX CITY 51106\n(42.451213...,1031080,VODKA 80 PROOF,297,Laird And Company,35916,Five O'clock Vodka,12,750,$3.31,$4.96,24,$119.04,18.0,4.76
2,S22038200026,2014-10-28,4426,LIQUOR AND GROCERY DEPOT,114 CENTRAL ST N Suite 1,MARSHALLTOWN,50158,64.0,Marshall,114 CENTRAL ST N Suite 1\nMARSHALLTOWN 50158\n,1031200,VODKA FLAVORED,205,E AND J GALLO WINERY,40313,New Amsterdam Peach,6,1750,$13.32,$19.98,6,$119.88,10.5,2.77
3,S04667100039,2012-03-21,2576,Hy-Vee Wine and Spirits / Storm Lake,1250 N LAKE ST,STORM LAKE,50588,11.0,Buena Vista,"1250 N LAKE ST\nSTORM LAKE 50588\n(42.653594, ...",1041100,AMERICAN DRY GINS,260,Diageo Americas,30318,Gordon's Gin London Dry - Pet,6,1750,$10.49,$15.74,6,$94.44,10.5,2.77
4,S09447200020,2012-12-12,4193,Hartley Wine And Spirits,361 3RD ST NW,HARTLEY,51346,71.0,O'Brien,"361 3RD ST NW\nHARTLEY 51346\n(43.184878, -95....",1081200,CREAM LIQUEURS,260,Diageo Americas,68037,Bailey's Original Irish Cream,12,1000,$17.00,$25.49,2,$50.98,2.0,0.53


In [47]:
#liquor_sample.head()

### Preprocess Liquor DataFrame

#### Obtain stratified categories

In [6]:
def condense_category(category):
    category = str(category).lower()
    mapping = {
        "vodka" : ["vodka"],
        "gin" : ["gin"],
        "rum" : ["rum"],
        "whiskey" : ["whiskey", "whiskies", "whisky", "bourbon", "scotch", "liqueur", "rock & rye"],
        "tequila" : ["tequila"],
        "brandy" : ["brandy", "brandies"],
        "schnapps" : ["schnapps"],
        "triple sec" : ["triple sec"],
        "cocktail" : ["cocktail"],
        "amaretto" : ["amaretto"],
        "spirit" : ["spirit"],
        "mezcal" : ["mezcal"],
        "anisette" : ["anisette"],
        "creme" : ["creme"],
        "package" : ["package"]
    }
    
    for key in mapping:
        for subkey in mapping[key]:
            if re.search(r"(^|\W)" + subkey + r"($|\W|s|es)", category):
                return key
    return 'other'

In [7]:
def get_price(price_dollars):
    if price_dollars == 'nan':
        return np.nan
    else:
        return float(price_dollars[1:])

def preprocess_liquor_df(liquor_df):
    liquor_df = liquor_df.rename({'Date': 'date', 'County': 'county', 'Category Name': 'category', 
               'Vendor Name': 'vendor_name', 'Bottle Volume (ml)': 'bottle_volume_ml',
               'State Bottle Cost': 'state_bottle_cost', 'State Bottle Retail': 'state_bottle_retail', 
              'Volume Sold (Liters)': 'volume_sold_liters', 'Store Name': 'store_name'}, axis=1)
    liquor_df['category'] = liquor_df['category'].apply(condense_category)
    liquor_df['year'] = liquor_df['date'].dt.year
    liquor_df['quarter'] = liquor_df['date'].dt.quarter
    liquor_df['county'] = liquor_df['county'].str.upper()
    liquor_df['bottle_volume_liters'] = liquor_df['bottle_volume_ml']/1000
    liquor_df['state_bottle_retail'] = liquor_df['state_bottle_retail'].astype(str).apply(get_price)
    liquor_df['state_bottle_cost'] = liquor_df['state_bottle_cost'].astype(str).apply(get_price)
    liquor_df['percent_markup'] = (liquor_df['state_bottle_retail'] - liquor_df['state_bottle_cost']) / liquor_df['state_bottle_cost']
    return liquor_df

#### Load Population .csv

In [9]:
pop = pd.read_csv("../data/County_Population.csv")
pop.rename({'County': 'county', 'Year': 'year', 'Population': 'population'}, axis=1, inplace=True)

pop = pop.drop(["FIPS","Primary Point"], axis=1)
pop["county"] = pop["county"].str[:-7]
pop["year"] = pop["year"].str[-4:].astype(int)
pop = pop[pop['year'] > 2011]
pop = pop.sort_values(by = ["county", "year"])
pop['county'] = pop['county'].str.upper()
pop = pop.set_index("county")

#### Merge with population

In [34]:
def merge_liquor_county_fts(county_population, liquor):
    # Merge with liquor data frame.
    liquor_merged = pd.merge(county_population, liquor, on=['county', 'year'], how='right')
    return liquor_merged

In [35]:
def unique_set(item_lists):
    flat_list =  [el for item in item_lists for el in item]
    return set(flat_list)

In [36]:
def get_aggregates(liquor, groupby_vars):
    liquor = preprocess_liquor_df(liquor)    
    liquor_grouped = liquor.groupby(groupby_vars)
    liquor = pd.DataFrame()
    liquor['num_rows'] = liquor_grouped.size()
    liquor['sum_bottle_volume'] = liquor_grouped['bottle_volume_liters'].sum() 
    liquor['sum_percent_markup'] = liquor_grouped['percent_markup'].sum() 
    liquor['unique_stores'] = liquor_grouped['store_name'].agg(lambda x: list(set(x))) 
    liquor['unique_vendors'] = liquor_grouped['vendor_name'].agg(lambda x: list(set(x))) 
    liquor['sum_volume_sold'] = liquor_grouped['volume_sold_liters'].sum() 
    
    liquor.reset_index(inplace=True)
    return liquor

In [44]:
def post_aggregates(county_population, liquor_agg, groupby_vars):
    # TODO: Add population after merging. 
    # liquor_county_agg = merge_liquor_county_fts(county_population, liquor_agg)
    liquor_county_agg = liquor_agg.groupby(groupby_vars)
    post_liquor_county_agg = pd.DataFrame()
    post_liquor_county_agg['avg_bottle_volume'] = liquor_county_agg['sum_bottle_volume'].sum() / liquor_county_agg['num_rows'].sum()
    post_liquor_county_agg['avg_percent_markup'] = liquor_county_agg['sum_percent_markup'].sum() / liquor_county_agg['num_rows'].sum()
    post_liquor_county_agg['all_unique_stores'] = liquor_county_agg['unique_stores'].agg(lambda x: unique_set(x))
    post_liquor_county_agg['all_unique_vendors'] = liquor_county_agg['unique_vendors'].agg(lambda x: unique_set(x))
    post_liquor_county_agg['volume_sold'] = liquor_county_agg['volume_sold'].sum() / liquor_county_agg['num_rows'].sum()
    post_liquor_county_agg.reset_index(inplace=True)
    post_liquor_county_agg = merge_liquor_county_fts(county_population, post_liquor_county_agg)
    post_liquor_county_agg['num_unique_stores_per_capita'] = post_liquor_county_agg['all_unique_stores'].apply(lambda x: len(x)) / post_liquor_county_agg['population']
    post_liquor_county_agg['num_unique_vendors_per_capita'] = post_liquor_county_agg['all_unique_vendors'].apply(lambda x: len(x)) / post_liquor_county_agg['population']
    post_liquor_county_agg['volume_sold_per_capita'] = post_liquor_county_agg['volume_sold'] / post_liquor_county_agg['population']
    post_liquor_county_agg.drop(['all_unique_stores', 'all_unique_vendors', 'population', 'volume_sold'], axis=1, inplace=True)
    
    return post_liquor_county_agg

#### Obtain Final Aggregate DataFrame

In [45]:
def agg(chunk):
    groupby_vars = ['county', 'year', 'quarter', 'category']
    agg_df = get_aggregates(chunk, groupby_vars)
    return agg_df

In [46]:
liquor_p = preprocess_liquor_df(liquor_sample)
add_df = agg(liquor_p)
post_aggregates(pop, add_df, ['county', 'year', 'quarter', 'category'])

Unnamed: 0,county,year,quarter,category,avg_bottle_volume,avg_percent_markup,num_unique_stores_per_capita,num_unique_vendors_per_capita,avg_bottle_volume_sold_per_capita
0,ADAIR,2012,2,vodka,1.750000,1.586957,0.000134,0.000134,0.002812
1,ADAIR,2013,4,rum,0.500000,-0.500000,0.000135,0.000135,0.000068
2,ADAMS,2012,1,rum,0.750000,2.593750,0.000256,0.000256,0.000192
3,ADAMS,2013,2,rum,1.750000,1.362069,0.000257,0.000257,0.002698
4,ADAMS,2013,3,schnapps,1.000000,5.500000,0.000257,0.000257,0.000514
5,ADAMS,2013,4,whiskey,0.750000,-0.469880,0.000257,0.000257,0.002312
6,ALLAMAKEE,2012,2,rum,0.375000,0.480000,0.000071,0.000071,0.000079
7,ALLAMAKEE,2012,3,brandy,1.000000,0.520000,0.000071,0.000071,0.000848
8,ALLAMAKEE,2012,3,whiskey,0.937500,inf,0.000141,0.000283,0.000676
9,ALLAMAKEE,2013,1,brandy,1.750000,4.255639,0.000071,0.000071,0.000124


### Read .csv

In [15]:
def stream_groupby_csv(path, key, agg, chunk_size=1e6, pool=None, **kwargs):

    # Make sure path is a list
    if not isinstance(path, list):
        path = [path]

    # Chain the chunks
    kwargs['chunksize'] = chunk_size
    chunks = itertools.chain(*[
        pd.read_csv(p, **kwargs)
        for p in path
    ])

    results = []
    orphans = pd.DataFrame()

    for chunk in chunks:

        # Add the previous orphans to the chunk
        chunk = pd.concat((orphans, chunk))

        # Determine which rows are orphans
        last_val = chunk[key].iloc[-1]
        is_orphan = chunk[key] == last_val

        # Put the new orphans aside
        chunk, orphans = chunk[~is_orphan], chunk[is_orphan]

        # If a pool is provided then we use apply_async
        if pool:
            results.append(pool.apply_async(agg, args=(chunk,)))
        else:  
            results.append(agg(chunk))

    # If a pool is used then we have to wait for the results
    if pool:
        try:
            results = [r.get() for r in results]
        except Exception:
            traceback.print_exc()
    res = pd.concat(results)
    res.to_pickle('results.pkl')
    return res

In [48]:
read_params = {'parse_dates': ['Date'], 'usecols': ['Date', 'County', 'Category Name', 'Vendor Name', 'Bottle Volume (ml)',
            'State Bottle Cost', 'State Bottle Retail', 'Volume Sold (Liters)', 'Store Name', 'Invoice/Item Number'], 'dtype': {'County': 'str', 'Category Name': 'str', 'Vendor Name': 'str', 'Bottle Volume (ml)': 'float',
            'State Bottle Cost': 'str', 'State Bottle Retail': 'str', 'Volume Sold (Liters)': 'float', 'Store Name': 'str'}}

vodka_agg = stream_groupby_csv(
    path=
        '../data/Iowa_Liquor_Sales.csv',
    key='Invoice/Item Number',
    agg=agg,
    chunk_size=1e6,
    **read_params
)

vodka_df = post_aggregates(pop, vodka_agg, ['county', 'year', 'quarter', 'category'])

#Save dataframe. 
vodka_df.to_pickle('project_1/data/vodka_df.pkl')
vodka_df.to_csv('project_1/data/vodka_df.csv')

In [50]:
vodka_df.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,avg_bottle_volume,avg_percent_markup,num_unique_stores_per_capita,num_unique_vendors_per_capita,avg_bottle_volume_sold_per_capita
county,year,quarter,category,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
ADAIR,2012,1,amaretto,0.916667,0.500742,0.000803,0.000536,0.00154
ADAIR,2012,1,brandy,0.660088,0.368355,0.003053,0.003053,0.004614
ADAIR,2012,1,cocktail,1.638889,0.512525,0.002066,0.001549,0.004969
ADAIR,2012,1,creme,0.75,0.5,0.000134,0.000134,0.000234
ADAIR,2012,1,gin,0.976562,0.499606,0.001224,0.000918,0.001329


In [51]:
vodka_df.reset_index(inplace=True)

In [59]:
vodka_df.to_csv('../data/vodka_df.csv', index=False)