<a href="https://colab.research.google.com/github/sheacon/repeat_customers/blob/main/12_join_full_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Join Data


In [1]:
# check system specs

gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print('Connected to a GPU')

from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9

if ram_gb < 20:
  print('Not using a high-RAM runtime: {:.1f} gigabytes of available RAM'.format(ram_gb))
else:
  print('Using a high-RAM runtime: {:.1f} gigabytes of available RAM'.format(ram_gb))

Not connected to a GPU
Using a high-RAM runtime: 54.8 gigabytes of available RAM


In [2]:
!pip install pyjanitor==0.23.1

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [3]:
# import packages
import pandas as pd
import numpy as np
import os
import janitor

## Load Data, Except Sales

In [4]:
# mount google drive
from google.colab import drive
drive.mount('/content/gdrive')
     
# navigate to directory
%cd /content/gdrive/MyDrive/Projects/repeat_customers/data

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).
/content/gdrive/MyDrive/Projects/repeat_customers/data


In [5]:
if not os.path.exists('processed/'):
    os.mkdir('processed/')

In [6]:
# read individuals
# set int32 and category dtypes for memory efficiency
col_list = ['MZB_INDIV_ID','EMAIL_OPTIN_IND','AH1_RES_BUS_INDC','SUPP1_BUS_PANDER']
individuals = pd.read_csv('raw/individual.csv'
                          ,sep=','
                          ,usecols=col_list
                          ,dtype = {'MZB_INDIV_ID':np.int32
                                    ,'EMAIL_OPTIN_IND':'category'
                                    ,'AH1_RES_BUS_INDC':'category'
                                    ,'SUPP1_BUS_PANDER':'category'} 
                         ).clean_names()
individuals.rename(columns={'mzb_indiv_id':'indiv_id'}, inplace=True)

print(individuals.info())
print(individuals.nunique())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 16834962 entries, 0 to 16834961
Data columns (total 4 columns):
 #   Column            Dtype   
---  ------            -----   
 0   indiv_id          int32   
 1   email_optin_ind   category
 2   ah1_res_bus_indc  category
 3   supp1_bus_pander  category
dtypes: category(3), int32(1)
memory usage: 112.4 MB
None
indiv_id            16834962
email_optin_ind            3
ah1_res_bus_indc           3
supp1_bus_pander           2
dtype: int64


In [7]:
# read products
col_list = ['ARTICLE_ID', 'PROD_GROUP_CODE', 'PROD_GROUP_DESC', 'CATEGORY_CODE',
            'CATEGORY_DESC', 'SEGMENT_CODE', 'SEGMENT_DESC', 'CLASS_CODE',
            'CLASS_DESC', 'DISCOUNT_FLAG', 'CROSS_SECTION', 'ASPECT_RATIO',
            'RIM_SIZE']
products = pd.read_csv('raw/product.csv'
                        ,sep='|'
                        ,usecols=col_list
                        ,dtype = {'ARTICLE_ID':np.int32, 'PROD_GROUP_CODE':'category'
                                , 'PROD_GROUP_DESC':'category', 'CATEGORY_CODE':'category'
                                ,'CATEGORY_DESC':'category', 'SEGMENT_CODE':'category'
                                , 'SEGMENT_DESC':'category', 'CLASS_CODE':'category'
                                , 'CLASS_DESC':'category', 'DISCOUNT_FLAG':'category'
                                , 'CROSS_SECTION':'category', 'ASPECT_RATIO':'category',
                                  'RIM_SIZE':'category'}
                        ).clean_names()
products.info()
print(products.nunique())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 56842 entries, 0 to 56841
Data columns (total 13 columns):
 #   Column           Non-Null Count  Dtype   
---  ------           --------------  -----   
 0   article_id       56842 non-null  int32   
 1   prod_group_code  56818 non-null  category
 2   prod_group_desc  56818 non-null  category
 3   category_code    56818 non-null  category
 4   category_desc    56818 non-null  category
 5   segment_code     56818 non-null  category
 6   segment_desc     56818 non-null  category
 7   class_code       56818 non-null  category
 8   class_desc       56818 non-null  category
 9   discount_flag    56818 non-null  category
 10  cross_section    35670 non-null  category
 11  aspect_ratio     35665 non-null  category
 12  rim_size         35504 non-null  category
dtypes: category(12), int32(1)
memory usage: 1.1 MB
article_id         56842
prod_group_code        3
prod_group_desc        3
category_code         18
category_desc         18
segment_c

In [8]:
# read stores
col_list = ['STORE_ID','STATE_CODE','ZIP_CODE','MSA']
stores = pd.read_csv('raw/store.csv'
                      ,sep='|'
                      ,usecols=col_list
                      ,dtype = {'STORE_ID':'category'
                                ,'STATE_CODE':'category'
                                ,'ZIP_CODE':'category'
                                ,'MSA':'category'}
                      ).clean_names()
stores.info()
print(stores.nunique())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2814 entries, 0 to 2813
Data columns (total 4 columns):
 #   Column      Non-Null Count  Dtype   
---  ------      --------------  -----   
 0   store_id    2814 non-null   category
 1   state_code  2814 non-null   category
 2   zip_code    2814 non-null   category
 3   msa         2309 non-null   category
dtypes: category(4)
memory usage: 201.0 KB
store_id      2814
state_code      48
zip_code      2374
msa            321
dtype: int64


In [9]:
# read vehicles
col_list = ['VEHICLE_ID','MAKE','MODEL','SUB_MODEL','MODEL_YEAR']
vehicles = pd.read_csv('raw/vehicle.csv'
                        ,sep='|'
                        ,usecols=col_list
                        ,dtype = {'VEHICLE_ID':np.int32
                                  ,'MAKE':'category'
                                  ,'MODEL':'category'
                                  ,'SUB_MODEL':'category'
                                  ,'MODEL_YEAR':np.int16}
                        ).clean_names()
vehicles.info()
print(vehicles.nunique())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 27854109 entries, 0 to 27854108
Data columns (total 5 columns):
 #   Column      Dtype   
---  ------      -----   
 0   vehicle_id  int32   
 1   make        category
 2   model       category
 3   sub_model   category
 4   model_year  int16   
dtypes: category(3), int16(1), int32(1)
memory usage: 319.9 MB
vehicle_id    27854109
make              5064
model            22573
sub_model         9206
model_year         196
dtype: int64


## Load Sales Data

In [10]:
# create list of sales files
sales_files = [i for i in os.listdir('raw/') if 'sales_' in i]

# column list
col_list = ['STORE_ID','TRAN_ID','DATE','ARTICLE_ID','INDIV_ID','VEHICLE_ID','UNITS','SALES']

In [11]:
# date_parser = lambda dates : pd.datetime(dates, '%Y-%m-%d')

In [15]:
# load initial sales file
df = pd.read_csv('raw/' + sales_files[0]
                  ,sep='|'
                  ,usecols=col_list
                  #,parse_dates=['DATE']
                  #,date_parser=date_parser
                  ,dtype = {'STORE_ID':'category'
                            ,'TRAN_ID':np.int32
                            ,'DATE':'category'
                            ,'ARTICLE_ID':np.int32
                            ,'VEHICLE_ID':np.int32
                            ,'UNITS':np.int8
                            ,'SALES':np.float16
                            ,'INDIV_ID':np.float16 # int32 throws error claiming float value
                            }
                 ).clean_names()
df.info()
print(df.nunique())
print('df memory: ',round(df.memory_usage(deep=True).sum()/1073741824,2))

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 14804703 entries, 0 to 14804702
Data columns (total 8 columns):
 #   Column      Dtype   
---  ------      -----   
 0   store_id    category
 1   tran_id     int32   
 2   date        category
 3   article_id  int32   
 4   indiv_id    float16 
 5   vehicle_id  int32   
 6   units       int8    
 7   sales       float16 
dtypes: category(2), float16(2), int32(3), int8(1)
memory usage: 282.5 MB
store_id         2204
tran_id        309871
date               30
article_id       8533
indiv_id            1
vehicle_id    1321780
units             107
sales           15266
dtype: int64
df memory:  0.28


## Memory Solutions
- loop reads into separate df's, concat all at once: https://www.terality.com/post/pandas-concat-vs-append
- parquet by chunk: https://www.confessionsofadataguy.com/solving-the-memory-hungry-pandas-concat-problem/

In [None]:
# method 1: read all and then append all at once

sales_dfs = []
sales_dfs['0'] = df

for file in sales_files[1:]:

  sales_to_append = pd.read_csv('raw/' + file
                                ,sep='|'
                                ,usecols=col_list
                                #,parse_dates=['DATE']
                                #,date_parser=date_parser
                                ,dtype = {'STORE_ID':'category'
                                          ,'TRAN_ID':np.int32
                                          ,'DATE':'category'
                                          ,'ARTICLE_ID':np.int32
                                          ,'VEHICLE_ID':np.int32
                                          ,'UNITS':np.int8
                                          ,'SALES':np.float16
                                          ,'INDIV_ID':np.float16 # int32 throws error claiming float value
                                          }
                              ).clean_names()
  df = pd.concat([df,sales_to_append], axis = 0)

  print('df records: ',f"{df.shape[0]:,d}")
  print('df memory: ',round(df.memory_usage(deep=True).sum()/1073741824,2))

In [None]:
# method 2: read and append one at a time
for file in sales_files[1:]:

  sales_to_append = pd.read_csv('raw/' + file
                                ,sep='|'
                                ,usecols=col_list
                                #,parse_dates=['DATE']
                                #,date_parser=date_parser
                                ,dtype = {'STORE_ID':'category'
                                          ,'TRAN_ID':np.int32
                                          ,'DATE':'category'
                                          ,'ARTICLE_ID':np.int32
                                          ,'VEHICLE_ID':np.int32
                                          ,'UNITS':np.int8
                                          ,'SALES':np.float16
                                          ,'INDIV_ID':np.float16 # int32 throws error claiming float value
                                          }
                              ).clean_names()
  df = pd.concat([df,sales_to_append], axis = 0)

  print('df records: ',f"{df.shape[0]:,d}")
  print('df memory: ',round(df.memory_usage(deep=True).sum()/1073741824,2))

df records:  30,236,979
df memory:  4450
df records:  45,867,220
df memory:  6750
df records:  61,064,891
df memory:  8987
df records:  76,953,359
df memory:  11325
df records:  92,895,599
df memory:  13672
df records:  107,344,695
df memory:  15798
df records:  123,402,420
df memory:  18161
df records:  136,649,953
df memory:  20111
df records:  151,636,603
df memory:  22317
df records:  166,179,523
df memory:  24457
df records:  181,771,314
df memory:  26751
df records:  197,572,120
df memory:  29077
df records:  212,448,811
df memory:  31266
df records:  228,051,169
df memory:  33562
df records:  243,439,535
df memory:  35827
df records:  259,325,147
df memory:  38165
df records:  275,085,345
df memory:  40484
df records:  289,998,142
df memory:  42679
df records:  305,879,156
df memory:  45016
df records:  319,921,142
df memory:  47083
df records:  334,584,443
df memory:  49241
df records:  349,604,384
df memory:  51451
df records:  364,486,886
df memory:  53641
df records:  379,94

In [None]:
df.to_csv('processed/combined_sales.csv')

---
---
---

In [None]:
def join_data(sales_name_list):
    new_list = []
    for name in sales_name_list:
        # read data files and clean names
        sale = pd.read_csv('raw/' + name, sep='|', skiprows=[1]).clean_names()
        
        # convert store id to string
        sale['store_id'] = sale['store_id'].apply(str)
     
        # merging the data sets together
        mega_table = sale.merge(product, on = 'article_id', how = 'left').\
            merge(store, on = 'store_id', how = 'left').\
            merge(individual, on = 'indiv_id', how = 'left').\
            merge(vehicle, on = 'vehicle_id', how = 'left')
        
        # extracting name for storing data sets
        new_name = name[6:]
        new_list.append(new_name)
        mega_table["year"] = new_name[:4]
        mega_table['month'] = new_name[4:-4]
        mega_table = mega_table[(mega_table['ah1_res_bus_indc'] == 'R') & (mega_table['supp1_bus_pander'] == 'N') & (mega_table['email_optin_ind'] == 'Y')]
        mega_table = mega_table.drop(['ah1_res_bus_indc', 'supp1_bus_pander', 'email_optin_ind'], axis=1)
        col_list = list(mega_table.columns)
        mega_table.to_csv("/data/p_dsi/teams2022/team_1/new_data/" + new_name)
    return new_list, col_list

In [None]:
def combine_data(sales_list):
    data_list, col_list = join_data(sales_list)
    df = pd.DataFrame(columns = col_list)
    for data_name in data_list: 
        if os.path.isfile('processed/' + data_name + ".csv"):
            df1 = pd.read_csv('processed/' + data_name + ".csv")
            df = pd.concat([df1, df], axis = 0)
            df = df.reset_index(drop = True)
    return (df)

In [None]:
combine_df = combine_data(sales_name_list)

In [None]:
combine_df.to_csv("/data/p_dsi/teams2022/team_1/new_data/total_dataset.csv", index = False)

There is a high probability for ACCRE to break down during the final combination process. So when you run this notebook, it will be better to use a 4GPU (24 cores) server.