# Drop duplicates from sales data

The raw sales data contains an estimated 10% of duplicate values. This notebook will process the data to remove them, and create new datasets for future use in EDA and modeling. Runtimes are also taken and printed to test and track efficiency. 

The output from this notebook are 43 csv files, which are stored in the team7/removed-duplicates directory.

## Install packages

In [83]:
import glob
import numpy as np
import os
import pandas as pd

## Load data

In [105]:
# column names only
header_only = pd.read_csv("sales_20180228.csv", sep="|", nrows=0)
cols = header_only.columns.to_list()
cols

['STORE_ID',
 'TRAN_ID',
 'DATE',
 'ARTICLE_ID',
 'INDIV_ID',
 'VEHICLE_ID',
 'UNITS',
 'SALES']

In [65]:
# nrows = 1M for testing
%cd /data/p_dsi/teams2023/bridgestone_data/data

dat_test_tiny = pd.read_csv("sales_20180228.csv", sep="|", nrows=1000000, parse_dates=["DATE"])
dat_test_tiny.shape

/gpfs52/data/p_dsi/teams2023/bridgestone_data/data


(1000000, 8)

In [78]:
%%time
%cd /data/p_dsi/teams2023/bridgestone_data/data

dat_test_full = pd.read_csv("sales_20180228.csv", sep="|", parse_dates=["DATE"])
dat_test_full.shape

/gpfs52/data/p_dsi/teams2023/bridgestone_data/data
CPU times: user 6.21 s, sys: 602 ms, total: 6.81 s
Wall time: 1min 34s


(13247533, 8)

In [79]:
%%time
dat_test_full = pd.read_csv("sales_20180228.csv", sep="|", parse_dates=["DATE"])

CPU times: user 6.22 s, sys: 605 ms, total: 6.82 s
Wall time: 1min 48s


## Custom Functions


In [28]:
def duplicates_test(data_raw, data_dropped):
    '''
    Check whether duplicates are properly removed using drop_duplicates out-of-the-box on the raw sales data.
    
    data_raw = original sales dataframe
    data_dropped = dataframe of month of sales data after dropping duplicates
    '''
    # group data and add count column
    cols = data_raw.columns.tolist()
    temp = data_raw.groupby(cols).size().reset_index(name="count")
    target_nrows = temp.shape[0]   # target = nrows after grouping
    
    # run test:
    print(". . . \n")
    print("Results from duplicates_test \n")
    if data_dropped.shape[0] == target_nrows: print("PASS")
    else: print("FAIL: nrows do not match")
    
    # sanity check - print descriptive info
    print("Original nrows: ", data_raw.shape[0])
    print("Target nrows: ", target_nrows) 
    print(". . . \n")
    print("Nrows after drop_duplicates: ", data_dropped.shape[0])
    
    return

In [111]:
def drop_duplicates_big_data(bigdata, cols, filename): 
    '''
    Given a big data dataframe, removes duplicates. Returns 'new_df'.
    
    bigdata = pandas dataframe of the big dataset
    cols = list of sales columns
    filename = passed in from looping over csv_list
    '''
    temp = bigdata.groupby(cols).size().reset_index(name="count")
    temp_nonduplicate_obs = temp.query("count == 1").drop(["count"], axis=1)
    temp_duplicate_obs = temp.query("count > 1").drop(["count"], axis=1)
    
    # df with duplicates removed
    temp_drop_duplicates = temp_duplicate_obs.drop_duplicates()
    # concat dfs with distinct obs
    new_df = pd.concat([temp_nonduplicate_obs, temp_drop_duplicates], axis=0)

# print message for testing only:    
#     print(". . . \n")
#     print("Results from drop_duplicates_big_data \n")
#     print("Shape of new df (after dropping duplicates): ", new_df.shape)
#     print(". . . \n")
    
    return new_df 

### Test drop_duplicates_big_data on 1M rows of data

In [30]:
%%time
drop_test = drop_duplicates_big_data(dat_test_tiny)

duplicates_test(data_raw=dat_test_tiny, data_dropped=drop_test)

. . . 

Results from drop_duplicates_big_data: 

Shape of new df (after dropping duplicates):  (927393, 8)
. . . 

. . . 

Results from duplicates_test: 

PASS
Original nrows:  1000000
Target nrows:  927393
. . . 

Nrows after drop_duplicates:  927393
CPU times: user 1.14 s, sys: 0 ns, total: 1.14 s
Wall time: 1.14 s


## Test drop_duplicates_big_data on one sales file

Test on pre-loaded dataframe

In [39]:
%%time
drop_test = drop_duplicates_big_data(dat_test_full)

duplicates_test(data_raw=dat_test_full, data_dropped=drop_test)

. . . 

Results from drop_duplicates_big_data: 

Shape of new df (after dropping duplicates):  (11897920, 8)
. . . 

. . . 

Results from duplicates_test: 

PASS
Original nrows:  13247533
Target nrows:  11897920
. . . 

Nrows after drop_duplicates:  11897920
CPU times: user 38.6 s, sys: 2.9 s, total: 41.5 s
Wall time: 41.4 s


Now write for loop to operate across all 43 sales csv files 

In [44]:
%%time
cols = drop_test.columns.to_list()
drop_test.sort_values(cols).groupby(cols).size().reset_index(name="count").query("count == 1").shape[0]

CPU times: user 22.5 s, sys: 2.28 s, total: 24.8 s
Wall time: 24.8 s


11897920

In [46]:
%cd /data/p_dsi/teams2023/team7

/gpfs52/data/p_dsi/teams2023/team7


In [59]:
csv_files = []
for file in os.listdir("/data/p_dsi/teams2023/bridgestone_data/data"):
    if file.startswith('sales_2'):
        csv_files.append(file)

if len(csv_files) != 43: print("ERROR - csv_files length = ", len(csv_files))

In [68]:
%%time
# store col names to be accessed in the loop
cols = dat_test_tiny.columns.to_list()
# loop counter variable
count = 1

for filename in csv_files:
    # store in and out file paths as variables
    path_in = "/data/p_dsi/teams2023/bridgestone_data/data/" + filename
    path_out = "/data/p_dsi/teams2023/team7/remove-duplicates/" + filename
    
    # read in one sales csv as panda dataframe
    df_read = pd.read_csv(path_in, sep = "|", parse_dates=["DATE"])
    
    # drop duplicates and store in new df
    temp = drop_duplicates_big_data(bigdata=df_read, cols=cols)
    
    # write new df to csv
    temp.to_csv(path_out, index=False)
    
    # status message with loop count
    print("fParsing complete: #{count}, {filename}")
    count += 1
    
    break

. . . 

Results from drop_duplicates_big_data 

Shape of new df (after dropping duplicates):  (13640515, 8)
. . . 

fParsing complete: #{count}, {filename}
CPU times: user 2min 17s, sys: 3.45 s, total: 2min 20s
Wall time: 6min


In [92]:
from pyarrow import csv

In [94]:
parse_opts = csv.ParseOptions(delimiter="|")

In [100]:
%%time
test_pa = csv.read_csv("sales_20180228.csv", parse_options=parse_opts)

CPU times: user 5.62 s, sys: 1.05 s, total: 6.67 s
Wall time: 1min 22s


In [102]:
%%time
test_convert_pd = test_pa.to_pandas()

CPU times: user 540 ms, sys: 333 ms, total: 873 ms
Wall time: 134 ms


Test run the for loop...

In [112]:
%%time
# set options to read csv w/pyarrow
parse_opts = csv.ParseOptions(delimiter="|")
read_opts = csv.ReadOptions(use_threads=4)
# loop counter variable
count = 1
# dict to store nrows of each iteration
results_nrows = {}

for filename in csv_files:
    # store in and out file paths as variables
    path_in = "/data/p_dsi/teams2023/bridgestone_data/data/" + filename
    path_out = "/data/p_dsi/teams2023/team7/remove-duplicates/" + filename
    
    # read in one sales csv as Arrow table
    arrow_table = csv.read_csv(path_in, parse_options=parse_opts)
    # convert Arrow table to pandas df
    df = arrow_table.to_pandas()
    
    # drop duplicates and store in new df
    temp = drop_duplicates_big_data(bigdata=df, cols=cols, filename=filename)
    
    # store the nrows from each df in a dict
    results_nrows.update({f"{filename}": temp.shape[0]})
    
    # write new df to csv
    temp.to_csv(path_out, index=False)
    
    # status message with loop count
    print(f"Parsing complete: #{count}, {filename}")
    count += 1
    print(results_nrows)
    break

Parsing complete: #1, sales_20180731.csv
{'sales_20180731.csv': 13640515}
CPU times: user 1min 27s, sys: 4.78 s, total: 1min 32s
Wall time: 4min 27s


In [113]:
%%time
# sanity check on above loop test
duplicates_test(data_raw=df, data_dropped=temp)

. . . 

Results from duplicates_test: 

PASS
Original nrows:  15197671
Target nrows:  13640515
. . . 

Nrows after drop_duplicates:  13640515
CPU times: user 20.7 s, sys: 1.32 s, total: 22.1 s
Wall time: 22.1 s


Run full loop

In [114]:
%%time
# set options to read csv w/pyarrow
parse_opts = csv.ParseOptions(delimiter="|")
read_opts = csv.ReadOptions(use_threads=4)
# loop counter variable
count = 1
# dict to store nrows of each iteration
results_nrows = {}

for filename in csv_files:
    # store in and out file paths as variables
    path_in = "/data/p_dsi/teams2023/bridgestone_data/data/" + filename
    path_out = "/data/p_dsi/teams2023/team7/remove-duplicates/" + filename
    
    # read in one sales csv as Arrow table
    arrow_table = csv.read_csv(path_in, parse_options=parse_opts)
    # convert Arrow table to pandas df
    df = arrow_table.to_pandas()
    
    # drop duplicates and store in new df
    temp = drop_duplicates_big_data(bigdata=df, cols=cols, filename=filename)
    
    # write new df to csv
    temp.to_csv(path_out, index=False)
    
    # status message with loop count
    print(f"Processed: File no. {count}, File name: {filename}")
    count += 1
print("Parsing complete, check directory for new csvs.")

Processed: File no. 1, File name: sales_20180731.csv
Processed: File no. 2, File name: sales_20150731.csv
Processed: File no. 3, File name: sales_20170831.csv
Processed: File no. 4, File name: sales_20170731.csv
Processed: File no. 5, File name: sales_20150531.csv
Processed: File no. 6, File name: sales_20160131.csv
Processed: File no. 7, File name: sales_20151130.csv
Processed: File no. 8, File name: sales_20170531.csv
Processed: File no. 9, File name: sales_20160331.csv
Processed: File no. 10, File name: sales_20170228.csv
Processed: File no. 11, File name: sales_20160930.csv
Processed: File no. 12, File name: sales_20170131.csv
Processed: File no. 13, File name: sales_20180831.csv
Processed: File no. 14, File name: sales_20160430.csv
Processed: File no. 15, File name: sales_20170331.csv
Processed: File no. 16, File name: sales_20171031.csv
Processed: File no. 17, File name: sales_20180531.csv
Processed: File no. 18, File name: sales_20160831.csv
Processed: File no. 19, File name: sa

#### Check one of the new files...

In [116]:
df = pd.read_csv("/data/p_dsi/teams2023/team7/remove-duplicates/sales_20181031.csv", nrows=10)
df

Unnamed: 0,STORE_ID,TRAN_ID,DATE,ARTICLE_ID,INDIV_ID,VEHICLE_ID,UNITS,SALES
0,27,991724790,2018-10-30,7003186,266764026.0,933927006,0.0,0.0
1,27,991724790,2018-10-30,7003189,266764026.0,933927006,1.0,0.0
2,27,991724790,2018-10-30,7003348,266764026.0,933927006,0.0,0.0
3,27,991724790,2018-10-30,7046930,266764026.0,933927006,0.0,0.0
4,27,991726170,2018-10-01,7004228,277240279.0,965350926,0.0,20.0
5,27,991726170,2018-10-01,7005229,277240279.0,965350926,0.0,199.99
6,27,991726170,2018-10-01,7005537,277240279.0,965350926,1.0,29.99
7,27,991726170,2018-10-01,7008209,277240279.0,965350926,0.0,20.0
8,27,991726170,2018-10-01,7008406,277240279.0,965350926,0.0,17.99
9,27,991726170,2018-10-01,7008409,277240279.0,965350926,0.0,17.99
