<a href="https://colab.research.google.com/github/sankardevisharath/amex-default-prediction/blob/master/notebooks/split_dataset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Split Dataset into Multiple Files



## Load Data From Google Drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
%mkdir data
%cd data
%mkdir raw
%cd raw

/content/data
/content/data/raw


In [3]:
!cp /content/drive/MyDrive/amex-default-prediction/data/amex-default-prediction.zip .

In [4]:
!unzip amex-default-prediction.zip train_data.csv

Archive:  amex-default-prediction.zip
  inflating: train_data.csv          


In [5]:
!unzip amex-default-prediction.zip train_labels.csv

Archive:  amex-default-prediction.zip
  inflating: train_labels.csv        


## Setup Environment

In [6]:
!pip install dask[dataframe]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting partd>=0.3.10
  Downloading partd-1.2.0-py3-none-any.whl (19 kB)
Collecting fsspec>=0.6.0
  Downloading fsspec-2022.5.0-py3-none-any.whl (140 kB)
[K     |████████████████████████████████| 140 kB 15.3 MB/s 
Collecting locket
  Downloading locket-1.0.0-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: locket, partd, fsspec
Successfully installed fsspec-2022.5.0 locket-1.0.0 partd-1.2.0


In [7]:
import gc

import pandas as pd
import numpy as np 
import seaborn as sns
import matplotlib.pyplot as plt

import dask
import dask.dataframe as dd

In [8]:
TRAIN_DATA_PATH = '/content/data/raw/train_data.csv'
TRAIN_LABELS_PATH = '/content/data/raw/train_labels.csv'

In [9]:
def read_cols(cols):
  df = pd.read_csv(TRAIN_DATA_PATH, usecols=cols)
  df = append_label(df)
  return df

def append_label(source_df):
  return pd.merge(left=source_df, right=train_labels, how='inner')

## Load Train Label

In [10]:
train_labels = pd.read_csv(TRAIN_LABELS_PATH)

In [11]:
train_labels.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 458913 entries, 0 to 458912
Data columns (total 2 columns):
 #   Column       Non-Null Count   Dtype 
---  ------       --------------   ----- 
 0   customer_ID  458913 non-null  object
 1   target       458913 non-null  int64 
dtypes: int64(1), object(1)
memory usage: 7.0+ MB


## Load Minimal Columns

In [12]:
cust_id_stmt_date_df = pd.read_csv(TRAIN_DATA_PATH, usecols=['customer_ID', 'S_2'])
print(f'Total number of rows in the dataset is {cust_id_stmt_date_df.shape[0]}')

Total number of rows in the dataset is 5531451


In [13]:
customers = cust_id_stmt_date_df.customer_ID.unique().tolist()
print(f'Total number of unique customers is {len(customers)}')

Total number of unique customers is 458913


In [14]:
cust_id_stmt_date_df['S_2'] = pd.to_datetime(cust_id_stmt_date_df["S_2"])

## Split Data Customer Wise

Read data customerwise and save the result in `parquet` format to google drive.

In [15]:
df = dd.read_csv(TRAIN_DATA_PATH)
print(f'Total number of partitions in the Dask dataframe is {df.npartitions}')

Total number of partitions in the Dask dataframe is 257


In [16]:
start = 0
step = 50000
params = []
for start in range(0, 458913, step):
    if (start + step) < 458913:
        end = start + step
    else:
        end = 458913
    params.append({'start': start, 'end': end, 'dataframe_obj': pd.DataFrame()})
params

[{'dataframe_obj': Empty DataFrame
  Columns: []
  Index: [], 'end': 50000, 'start': 0}, {'dataframe_obj': Empty DataFrame
  Columns: []
  Index: [], 'end': 100000, 'start': 50000}, {'dataframe_obj': Empty DataFrame
  Columns: []
  Index: [], 'end': 150000, 'start': 100000}, {'dataframe_obj': Empty DataFrame
  Columns: []
  Index: [], 'end': 200000, 'start': 150000}, {'dataframe_obj': Empty DataFrame
  Columns: []
  Index: [], 'end': 250000, 'start': 200000}, {'dataframe_obj': Empty DataFrame
  Columns: []
  Index: [], 'end': 300000, 'start': 250000}, {'dataframe_obj': Empty DataFrame
  Columns: []
  Index: [], 'end': 350000, 'start': 300000}, {'dataframe_obj': Empty DataFrame
  Columns: []
  Index: [], 'end': 400000, 'start': 350000}, {'dataframe_obj': Empty DataFrame
  Columns: []
  Index: [], 'end': 450000, 'start': 400000}, {'dataframe_obj': Empty DataFrame
  Columns: []
  Index: [], 'end': 458913, 'start': 450000}]

In [17]:
#print(f'### Start {start} end {end}')
for i in range(df.npartitions):
  ddf1 = df.partitions[i].compute()
  
  for param in params:
    rdf = param['dataframe_obj']
    start = param['start']
    end = param['end']
    rdf = rdf.append(ddf1[ddf1.customer_ID.isin(customers[start:end])], ignore_index=True)
    param['dataframe_obj'] = rdf
    print(f'{i}th Partition {start} - {end} shape : {rdf.shape}')
  if( i % 50 == 0):
    print( f'Processed {i} partitions' )
  
  



  

0th Partition 0 - 50000 shape : (21580, 190)
0th Partition 50000 - 100000 shape : (0, 190)
0th Partition 100000 - 150000 shape : (0, 190)
0th Partition 150000 - 200000 shape : (0, 190)
0th Partition 200000 - 250000 shape : (0, 190)
0th Partition 250000 - 300000 shape : (0, 190)
0th Partition 300000 - 350000 shape : (0, 190)
0th Partition 350000 - 400000 shape : (0, 190)
0th Partition 400000 - 450000 shape : (0, 190)
0th Partition 450000 - 458913 shape : (0, 190)
Processed 0 partitions
1th Partition 0 - 50000 shape : (43178, 190)
1th Partition 50000 - 100000 shape : (0, 190)
1th Partition 100000 - 150000 shape : (0, 190)
1th Partition 150000 - 200000 shape : (0, 190)
1th Partition 200000 - 250000 shape : (0, 190)
1th Partition 250000 - 300000 shape : (0, 190)
1th Partition 300000 - 350000 shape : (0, 190)
1th Partition 350000 - 400000 shape : (0, 190)
1th Partition 400000 - 450000 shape : (0, 190)
1th Partition 450000 - 458913 shape : (0, 190)
2th Partition 0 - 50000 shape : (64765, 190

In [18]:
for param in params:
  rdf = param['dataframe_obj']
  start = param['start']
  end = param['end']
  rdf.to_parquet(path='/content/drive/MyDrive/amex-default-prediction/data/customer/'  + str(start) + '_'+ str(end) + '.parquet')
  del rdf

## Generate Aggregate Data

In [25]:
rdf = pd.read_parquet(path='/content/drive/MyDrive/amex-default-prediction/data/customer/0_50000.parquet')

In [19]:
all_cols = [c for c in list(df.columns) if c not in ['customer_ID','S_2']]
cat_features = ["B_30","B_38","D_114","D_116","D_117","D_120","D_126","D_63","D_64","D_66","D_68"]
num_features = [col for col in all_cols if col not in cat_features]


In [29]:
test_num_agg = pd.DataFrame()

for param in params:
  start = param['start']
  end = param['end']
  rdf = pd.read_parquet(path='/content/drive/MyDrive/amex-default-prediction/data/customer/'  + str(start) + '_'+ str(end) + '.parquet')  
  test_num_agg = test_num_agg.append(rdf.groupby("customer_ID")[num_features].agg(['mean', 'std', 'min', 'max', 'last']))
  del rdf
  gc.collect()

test_num_agg.columns = ['_'.join(x) for x in test_num_agg.columns]

In [30]:
test_num_agg.info()

<class 'pandas.core.frame.DataFrame'>
Index: 458913 entries, 0000099d6bd597052cdcda90ffabf56573fe9d7c79be5fbac11a8ed792feb62a to fffff1d38b785cef84adeace64f8f83db3a0c31e8d92eaba8b115f71cab04681
Columns: 885 entries, P_2_mean to D_145_last
dtypes: float64(882), int64(3)
memory usage: 3.0+ GB


In [31]:
test_cat_agg = pd.DataFrame()

for param in params:
  start = param['start']
  end = param['end']
  rdf = pd.read_parquet(path='/content/drive/MyDrive/amex-default-prediction/data/customer/'  + str(start) + '_'+ str(end) + '.parquet')  
  test_cat_agg = test_cat_agg.append(rdf.groupby("customer_ID")[cat_features].agg(['count', 'last', 'nunique']))
  del rdf
  gc.collect()

test_cat_agg.columns = ['_'.join(x) for x in test_cat_agg.columns]

In [32]:
test_cat_agg.info()

<class 'pandas.core.frame.DataFrame'>
Index: 458913 entries, 0000099d6bd597052cdcda90ffabf56573fe9d7c79be5fbac11a8ed792feb62a to fffff1d38b785cef84adeace64f8f83db3a0c31e8d92eaba8b115f71cab04681
Data columns (total 33 columns):
 #   Column         Non-Null Count   Dtype  
---  ------         --------------   -----  
 0   B_30_count     458913 non-null  int64  
 1   B_30_last      458882 non-null  float64
 2   B_30_nunique   458913 non-null  int64  
 3   B_38_count     458913 non-null  int64  
 4   B_38_last      458882 non-null  float64
 5   B_38_nunique   458913 non-null  int64  
 6   D_114_count    458913 non-null  int64  
 7   D_114_last     454174 non-null  float64
 8   D_114_nunique  458913 non-null  int64  
 9   D_116_count    458913 non-null  int64  
 10  D_116_last     454174 non-null  float64
 11  D_116_nunique  458913 non-null  int64  
 12  D_117_count    458913 non-null  int64  
 13  D_117_last     454174 non-null  float64
 14  D_117_nunique  458913 non-null  int64  
 15  D_1

In [39]:
test_num_agg_1 = test_num_agg.reset_index()
test_cat_agg_1 = test_cat_agg.reset_index()

In [40]:
test_combiled_df = pd.merge(left = test_num_agg_1, right = test_cat_agg_1, on='customer_ID')

In [41]:
test_combiled_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 458913 entries, 0 to 458912
Columns: 919 entries, customer_ID to D_68_nunique
dtypes: float64(891), int64(25), object(3)
memory usage: 3.1+ GB


In [42]:
test_combined_df = append_label(test_combiled_df)

In [43]:
test_combined_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 458913 entries, 0 to 458912
Columns: 920 entries, customer_ID to target
dtypes: float64(891), int64(26), object(3)
memory usage: 3.1+ GB


In [45]:
test_combined_df.to_parquet(path='/content/drive/MyDrive/amex-default-prediction/data/aggr/customer-agg.parquet')

In [46]:
del test_num_agg_1, test_cat_agg_1, test_num_agg, test_cat_agg

## Split Data Month Wise

In [None]:
periods = list(cust_id_stmt_date_df.S_2.dt.to_period('M').unique().astype(str))

In [None]:
for period in periods:
  print(period)
  rdf = pd.DataFrame()
  for i in range(df.npartitions):
    ddf1 = df.partitions[i].compute()
    ddf1['S_2'] = pd.to_datetime(ddf1["S_2"])
    rdf = rdf.append(ddf1[ddf1.S_2.dt.to_period('M')== period], ignore_index=True)
    if(i%30 == 0):
      print(i)
      print(rdf.shape)
  rdf.to_parquet(path='data_' + period + '.parquet')
  del rdf


In [None]:
!cp /content/data/raw/data_*.parquet /content/drive/MyDrive/amex-default-prediction/data/raw/