In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import dask.dataframe as dd
import dask.array as da
import dask.bag as db

In [98]:
def data_prep(columns, filepath="./data/parquet/data-*.parquet"):
    ddf = dd.read_parquet("./data/parquet/data-*.parquet")
    
    return ddf.loc[:, columns]

In [99]:
ddf = dd.read_parquet("./data/parquet/data-*.parquet")

In [3]:
ddf.columns

Index(['index', 'Invoice/Item Number', 'Date', 'Store Number', 'Store Name',
       'Address', 'City', 'Zip Code', 'Store Location', 'County Number',
       'County', 'Category', 'Category Name', 'Vendor Number', 'Vendor Name',
       'Item Number', 'Item Description', 'Pack', 'Bottle Volume (ml)',
       'State Bottle Cost', 'State Bottle Retail', 'Bottles Sold',
       'Sale (Dollars)', 'Volume Sold (Liters)', 'Volume Sold (Gallons)'],
      dtype='object')

In [3]:
ddf.shape[0].compute()

23346088

In [4]:
county_info = ['County Number', 'County']
vendor_info = ['Vendor Name', 'Vendor Number']
product_info = ['Item Number', 'Item Description', 'Pack',
                'Bottle Volume (ml)']
price_info = ['Item Number', 'State Bottle Cost', 'State Bottle Retail', "Date"]
store_info = ['Store Number', 'Store Name', 'Address', 'City', 'Zip Code', 
              'Store Location', 'County Number', "Date"]


In [5]:
ddf = ddf[county_info]

In [6]:
ddf = ddf.drop_duplicates().compute()

In [21]:
# changes to pd dataframe when size is small enough
# ddf[ddf['County'].isna()] = pd.Series(['None', 'None'])

In [15]:
# strange to see EL PASO? not a county in Iowa.
ddf[ddf['County Number'].isna()]

Unnamed: 0,County Number,County
2405,,
15387,,EL PASO


In [8]:
ddf = ddf[~ddf['County Number'].isna()]
ddf = ddf.append(pd.Series({'County Number': 999, 'County': 'Unknown'}), ignore_index=True)
ddf

Unnamed: 0,County Number,County
0,92.0,Washington
1,63.0,Marion
2,17.0,Cerro Gordo
3,82.0,Scott
4,57.0,Linn
...,...,...
196,26.0,DAVIS
197,13.0,CALHOUN
198,93.0,WAYNE
199,2.0,ADAMS


In [12]:
ddf[ddf['County'].str.contains('Cerro')]

Unnamed: 0,County Number,County
2,17.0,Cerro Gordo
158,17.0,Cerro Gord


In [13]:
ddf[ddf['County'].str.contains(' ')]

Unnamed: 0,County Number,County
2,17.0,Cerro Gordo
5,29.0,Des Moines
8,11.0,Buena Vista
15,74.0,Palo Alto
19,7.0,Black Hawk
88,89.0,Van Buren
118,7.0,BLACK HAWK
130,17.0,CERRO GORD
131,74.0,PALO ALTO
139,29.0,DES MOINES


In [11]:
ddf.loc[ddf['County Number'].drop_duplicates().index]

Unnamed: 0,County Number,County
0,92.0,Washington
1,63.0,Marion
2,17.0,Cerro Gordo
3,82.0,Scott
4,57.0,Linn
...,...,...
95,41.0,Hancock
96,61.0,Madison
97,87.0,Taylor
98,36.0,Fremont


In [14]:
ddf = ddf.loc[ddf['County Number'].drop_duplicates().index]

In [16]:
ddf.loc[:, 'County Number'] = ddf['County Number'].astype(int)

In [18]:
ddf.columns = ['CountyNumber', 'County']

In [20]:
ddf.to_csv('./data/county.csv')

Now onto the next sub-dataframe, we repeat a lot of steps but with every one we have to at least take the time to make sure we're not missing anything as we chop the data up.

In [21]:
ddf = dd.read_parquet("./data/parquet/data-*.parquet")

In [22]:
ddf.columns

Index(['index', 'Invoice/Item Number', 'Date', 'Store Number', 'Store Name',
       'Address', 'City', 'Zip Code', 'Store Location', 'County Number',
       'County', 'Category', 'Category Name', 'Vendor Number', 'Vendor Name',
       'Item Number', 'Item Description', 'Pack', 'Bottle Volume (ml)',
       'State Bottle Cost', 'State Bottle Retail', 'Bottles Sold',
       'Sale (Dollars)', 'Volume Sold (Liters)', 'Volume Sold (Gallons)'],
      dtype='object')

In [23]:
vendor_info = ['Vendor Name', 'Vendor Number']
ddf = ddf[vendor_info]

In [24]:
ddf = ddf.drop_duplicates().compute()

In [26]:
ddf.columns = ['VendorName', 'VendorNumber']

In [60]:
idx_na = ddf['VendorNumber'].isna()
name_na = ddf['VendorName'].isna()

In [61]:
ddf[idx_na]

Unnamed: 0,VendorName,VendorNumber
10224,Reservoir Distillery,


In [62]:
orderly_idx = ddf[~idx_na]['VendorNumber'].sort_values()

orderly_idx

19765     10.0
26872     10.0
22376     14.0
37747     27.0
36267     33.0
         ...  
4760     977.0
8        978.0
29948    978.0
38618    987.0
6001     999.0
Name: VendorNumber, Length: 555, dtype: float64

In [65]:
ddf[idx_na] = pd.Series(['Reservoir Distillery', 20])
ddf[idx_na]

Unnamed: 0,VendorName,VendorNumber
10224,Reservoir Distillery,20.0


In [66]:
ddf['VendorNumber'] = ddf['VendorNumber'].astype(int)

In [69]:
len(ddf['VendorName'].unique())

554

In [70]:
len(ddf['VendorNumber'].unique())

402

In [None]:
ddf['VendorNumber']

In [83]:
np.where(ddf.VendorNumber.value_counts() > 1)
ddf.VendorNumber.value_counts().index[np.where(ddf.VendorNumber.value_counts() > 1)]

Int64Index([803, 214, 391, 192,  79, 114, 478, 389, 255, 154,
            ...
            208, 977, 198, 459, 107, 226, 381, 978, 495, 346],
           dtype='int64', length=126)

In [84]:
dup_idx = ddf.VendorNumber.value_counts().index[np.where(ddf.VendorNumber.value_counts() > 1)]

In [88]:
sorted_ddf = ddf[ddf['VendorNumber'].isin(dup_idx)].sort_values('VendorNumber')

In [94]:
sorted_ddf.iloc[100:120,:]

Unnamed: 0,VendorName,VendorNumber
27644,RUSSIAN STANDARD VODKA,239
23,"WILLIAM GRANT AND SONS, INC.",240
10594,"William Grant and Sons, Inc.",240
27053,William Grant & Sons Inc,240
24858,Filibuster Barrels LLC,244
41545,Dilawri Barrels LLC,244
207,Wilson Daniels Ltd.,255
22023,WILSON DANIELS LTD,255
28722,Infinium Spirits,255
14985,HAAS BROTHERS,256


In [95]:
ddf = ddf.drop_duplicates('VendorNumber')

In [35]:
ddf[name_na] = pd.Series(['Unknown', 999])

In [36]:
ddf[name_na]

Unnamed: 0,VendorName,VendorNumber


In [49]:
ddf[ddf['VendorName'].str.contains('Llc')]

Unnamed: 0,VendorName,VendorNumber


In [48]:
ddf[ddf['VendorName'].str.contains('Llc')] = pd.Series(['Fire Tail Brands, LLC', 194])

In [97]:
ddf.to_csv('./data/vendor.csv')

And onto the next one!

In [100]:
ddf = data_prep(price_info)

In [101]:
ddf.columns

Index(['Item Number', 'State Bottle Cost', 'State Bottle Retail', 'Date'], dtype='object')

In [111]:
ddf.columns = ['ItemNumber', 'StateBottleCost', 'StateBottleRetail', 'Date']

#ddf.drop_duplicates()

want unique prices for each item
then for each unique row look back and get a subset of the data that has that price
look for earliest and latest date to give a range / idea of what the price was
update again with range over the value in date

In [107]:
sub_ddf = ddf[ddf.columns[:3]].drop_duplicates().compute()

In [112]:
sub_ddf.columns = ['ItemNumber', 'StateBottleCost', 'StateBottleRetail']

In [113]:
sub_ddf.sort_values('ItemNumber')

Unnamed: 0,ItemNumber,StateBottleCost,StateBottleRetail
43993,100001,8.00,12.00
9973,100005,21.98,32.97
38559,100006,23.32,34.98
28373,100015,30.00,45.00
14729,100017,23.62,35.43
...,...,...,...
2798,999992,45.00,67.50
12284,999993,21.25,31.88
23430,999994,521.66,782.49
34718,999995,17.00,25.50


In [114]:
sub_ddf.drop_duplicates('ItemNumber')

Unnamed: 0,ItemNumber,StateBottleCost,StateBottleRetail
0,35926,3.37,5.06
1,23824,2.00,2.99
2,12888,8.98,13.47
3,48106,18.99,28.49
4,53216,6.29,9.44
...,...,...,...
34894,933291,5.99,8.99
35715,903780,3.85,5.78
9912,903245,3.38,5.07
18427,904664,59.00,88.50


In [115]:
dup_idx = sub_ddf.ItemNumber.value_counts().index[np.where(sub_ddf.ItemNumber.value_counts() > 1)]

In [117]:
sorted_ddf = sub_ddf[sub_ddf['ItemNumber'].isin(dup_idx)].sort_values('ItemNumber')

In [119]:
sub_ddf.ItemNumber.value_counts()

647       78
100762    38
65144     36
42666     17
21598     15
          ..
4750       1
967388     1
916839     1
903131     1
903367     1
Name: ItemNumber, Length: 11013, dtype: int64

In [118]:
sorted_ddf

Unnamed: 0,ItemNumber,StateBottleCost,StateBottleRetail
14729,100017,23.62,35.43
12019,100017,22.04,33.06
39977,100040,23.31,34.97
13497,100040,14.50,34.97
13401,100040,15.46,34.97
...,...,...,...
7014,998071,23.32,34.98
3143,998074,23.32,34.98
17179,998074,24.00,36.00
23244,998546,22.50,33.75


In [120]:
unique_itemId = sorted_ddf.ItemNumber.unique()

In [156]:
len(unique_itemId)

3931

In [125]:
replace = ddf[ddf.ItemNumber == '100017'].compute()

In [130]:
# once we have it in datetime we can use the min and max functions to aggregate.
replace.Date = pd.to_datetime(replace['Date'])

In [137]:
reduce = replace.groupby(["ItemNumber", "StateBottleCost", "StateBottleRetail"]).agg(['min', 'max']).reset_index()

In [138]:
empty = pd.DataFrame(columns = reduce.columns)

In [146]:
empty = empty.append(reduce)

In [147]:
empty

Unnamed: 0_level_0,ItemNumber,StateBottleCost,StateBottleRetail,Date,Date
Unnamed: 0_level_1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,min,max
0,100017,22.04,33.06,2016-06-01,2016-08-23
1,100017,23.62,35.43,2016-09-08,2018-04-17


Alright, we're done figuring it out for a single case, so lets build it out into a for loop in order to process the rest of the data quickly.

In [149]:
# making sure it runs without any initialization besides the idx list
pd.DataFrame().append(reduce)

Unnamed: 0_level_0,ItemNumber,StateBottleCost,StateBottleRetail,Date,Date
Unnamed: 0_level_1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,min,max
0,100017,22.04,33.06,2016-06-01,2016-08-23
1,100017,23.62,35.43,2016-09-08,2018-04-17


In [None]:
# we want to make sure that it runs for a small section before unleashing it on the larger db.
# with so much data this cell will run for a while, but for the extra date information we can
# wait for the code to resolve.
%time
empty = pd.DataFrame()

for idx in unique_itemId:
    replace = ddf[ddf.ItemNumber == idx].compute()
    replace.Date = pd.to_datetime(replace['Date'])
    reduce = replace.groupby(["ItemNumber", "StateBottleCost", "StateBottleRetail"]).agg(['min', 'max']).reset_index()
    
    empty = empty.append(reduce)

CPU times: user 2 µs, sys: 1e+03 ns, total: 3 µs
Wall time: 5.96 µs


In [155]:
len(empty.ItemNumber.unique())

54

In [157]:
trimmed = [x for x in unique_itemId if x not in empty.ItemNumber.unique()]

In [159]:
len(trimmed) + len(empty.ItemNumber.unique()) == len(unique_itemId)

True

In [163]:
for idx in trimmed[20:]:
    replace = ddf[ddf.ItemNumber == idx].compute()
    replace.Date = pd.to_datetime(replace['Date'])
    reduce = replace.groupby(["ItemNumber", "StateBottleCost", "StateBottleRetail"]).agg(['min', 'max']).reset_index()
    
    empty = empty.append(reduce)

KeyboardInterrupt: 

In [164]:
empty

Unnamed: 0_level_0,ItemNumber,StateBottleCost,StateBottleRetail,Date,Date
Unnamed: 0_level_1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,min,max
0,100017,22.04,33.06,2016-06-01,2016-08-23
1,100017,23.62,35.43,2016-09-08,2018-04-17
0,100040,5.23,34.97,2019-09-24,2019-09-24
1,100040,14.50,34.97,2019-09-25,2019-09-25
2,100040,15.46,34.97,2019-09-26,2019-09-26
...,...,...,...,...,...
1,433,9.97,14.96,2012-11-05,2012-12-31
0,43302,10.00,15.00,2016-08-01,2022-02-28
1,43302,11.00,16.50,2012-03-12,2016-07-29
0,43308,4.80,7.20,2015-11-03,2016-03-30


In [168]:
empty.to_csv('./data/price_part_1.csv')

In [165]:
trimmed = [x for x in unique_itemId if x not in empty.ItemNumber.unique()]

In [169]:
len(empty.ItemNumber.unique())

1368

In [167]:
len(trimmed)

2563