# Data Cleaning and Preprocessing
### Team : Data Crew

This notebook was created in order to cleand a make a preprocessing of the data in order to make it ready for the model development. The data that we use in this notebook has the original structure as provided by Fingerhut. Additionally, we are creating another **.py** that will store all the functions that we are using here in order to make the code more readable and easy to understand and make the cleaning and preprocessing in one single local package.

Once the cleaning and the preprocessing in this notebook is tested and preprocessed we are going to update the **utils.py**. This file can be imported in the current session in other notebooks by just using the following command:

```python
from utils import *
```

Update: We are also performing some feature engineering that is tested and then added to the **utils.py** file.

In [None]:
# Basic Libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Own Libraries
from utils import *

# Dictionary
import dask.dataframe as dd

sns.set_style('whitegrid')

## Data

This section contains every concerc about the data retrieval and reading. This because initially we were having some issues with the time that the algorithms, cleaning and feature engineering sections were taking to run. So we decided to try a different approach by parallelizing some tasks but we didn't suceed and we ended up doing it the traditional way.

Results : The final expected data set was returned by the following functions in 2:30 hours. 

### Reading data

We can work with either the smaller sample or the original dataset but in this case we decided to work with the smaller sample to run the tests and process faster and make the debugging easier.

In [None]:
# DASK APRROACH (PARALLEL)

# #data = dd.read_csv('../../1. Data/smaller_sample.csv', assume_missing=True)
# data = dd.read_csv('../../1. Data/export.csv', assume_missing=True)
# event_defs = dd.read_csv('../../1. Data/Event+Definitions.csv')
# #data = data.compute()
# #event_defs = event_defs.compute()
# #data.head()

# PANDAS
data = pd.read_csv('../../1. Data/smaller_sample.csv')   # this is the smaller sample data set
#data = pd.read_csv('../../1. Data/export.csv')   # this is the original data set
event_defs = pd.read_csv('../../1. Data/Event+Definitions.csv')   # dictionary for the events
data.head()

* The following code needs to be run in case we are working with the original data set either with dask or with pandas

In [None]:
# DASK
# event_defs = event_defs.rename(columns={'event_definition_id': 'ed_id'})
# data = dd.merge(data, event_defs.drop(columns=['event_name']), on='ed_id', how='left')
#data.compute()

# PANDAS
# event_defs = event_defs.rename(columns={'event_definition_id': 'ed_id'})
# data = pd.merge(data, event_defs.drop(columns=['event_name']), on='ed_id', how='left')
#data.head()

### Results between using DASK and PANDAS
#### Benchmark

PANDAS:
* Reading and showing the head of the data takes : 38.6s
* Events definition dataset renaming columns and merging data takes : 19.8s
* TOTAL : 58.4s

* ALL the cleaning function : 13mins.

DASK:
* Reading and showing the head of the data takes and Events definition dataset renaming columns and merging data takes : 53.4s
* TOTAL : 53.4s

* ALL the cleaning function : 

### Cleaning data

* Applying the cleaning functions that we have in the utils package, this cleaning function is the **fingerhut_data_cleaner**.

In [None]:
def fingerhut_data_cleaner_test(og_df, defs):
    # Dropping duplicate (ignoring journey steps variable)
    df = og_df[['customer_id',
             'account_id',
             'ed_id',
             'event_name',
             'event_timestamp',
             'journey_steps_until_end',
             'milestone_number',
             'journey_id',]]
    
    # Assuming df is your Dask DataFrame
    df = df.assign(milestone_number=df['milestone_number'].copy().fillna(0))

    df = df.drop_duplicates(subset=['customer_id', 'account_id', 'ed_id', 'event_name', 'event_timestamp'])
    df = df.reset_index(drop=True) # re-indexing    
    df = df.compute()
    
    # Re-adding journey_steps_until_end (Axel's way)
    j_steps = df['journey_steps_until_end']
    s_corrected = correct_sequences(j_steps)
    df = df.assign(journey_steps_until_end=s_corrected)
    
    # Convert event_timestamps to datetime objects
    #df['event_timestamp'] = dd.to_datetime(df['event_timestamp'], format='mixed')
    df = df.assign(event_timestamp=dd.to_datetime(df['event_timestamp'], format='mixed'))
        
    # Adding a `stage` variable based on the event definitions
    df_stages = defs[['event_name', 'stage']]
    
    df = dd.merge(df, df_stages, on ='event_name', how = 'left')
    
    # Setting positive values for account_ids
    #df['account_id'] = remove_if(df, 'account_id')
    df = df.assign(account_id=remove_if(df, 'account_id'))
    
    # Setting positive values for customer_ids
    #df['customer_id'] = remove_if(df, 'customer_id')
    df = df.assign(customer_id=remove_if(df, 'customer_id'))
    
    return df

In [None]:
df = fingerhut_data_cleaner(data, event_defs)

* Verifying if the data is clean and the sequences are corrected.

In [None]:
has_correct_sequence(df['journey_steps_until_end'])

* Deleting promotion created event

In [None]:
idxs = list(df[df['event_name'] == 'promotion_created'].index)

# DROP THE INDEX OF THIS QUERY IN THE DATASET
df.drop(idxs, inplace=True)
df.reset_index(drop=True, inplace=True)

## Sequences (Obtaining sequences)

### Obtaining the sequences with the correct states

* Function for the sequences and the states

In [None]:
#result_sequences = split_sequences(df)

* Assigning the probabilities for each state

In [None]:
#result_sequences[0:3]

### Distribution of the length of the sequences

In [None]:
# lengths = []
# for seq in result_sequences:
#     lengths.append(len(seq))
    
# plt.figure(figsize=(12, 6))
# sns.histplot(lengths, kde=True)
# plt.title('Length of Sequences')
# plt.show()

## Creating New Data Set for Binary Classification

* By making it suitable for clustering/analysis/classification of the customers without the explicit sequences

My proposal is to create a new dataset with the customer_ids where each instance corresponds to an only customer where the features are descriptive features about the sequences and the stages.

In [None]:
num_unique_cust = len(df['customer_id'].unique())
print(f'Number of unique customers: {num_unique_cust}')

Ideas of more features to be added:   
* Avg length of the sequence (avg in case has more than one, otherwise the length of the unique sequence) ✅
* Has more than one journey ✅
* Number of journeys ✅
* Has order shipped ✅
* Highest milestone reached ✅
* Has credit approved ✅
* Has first purchase ✅
* Has down payment ✅
* Has order shipped ✅
* max number of journey_steps_until_end ✅
* Most repeated event in the journey ✅
* Time between the application and first purchase, then time from the first purchase to down payment and then down payment to order shipped

* number of attempts (applying for a credit)
* Returning customer
* Has an ideal journey

### Trying to use Dask for improving the vectorized operations and the parallelism

This particular library will be very useful in case we handle operations with the original data.

Ofc the following **benchmark** is designed thinking on the original data and not the sample, because as we may not parallelize the operations with very few data could be even slower or not worth it.

In [None]:
import dask.dataframe as dd
import time
# import dask.array as da

df_dask = dd.from_pandas(df, npartitions=4)

# Testing the parallel computation
time_init = time.time()
result = df_dask['journey_steps_until_end'] * df_dask['ed_id'].sum()  # Compute sum(x * y) in parallel
time_end = time.time()

print(result.compute())
print(f'Time: {time_end - time_init}')

In [None]:
time_init = time.time()
result = df['journey_steps_until_end'] * df['ed_id'].sum()  # Compute sum(x * y) in parallel
time_end = time.time()

print(result)
print(f'Time: {time_end - time_init}')

### Feature Engineering

* Functions

In [None]:
# def number_journeys_and_max(cus_df):
#     """Function to check the number of journeys in a sequence

#     Args:
#         seq (list): List of values

#     Returns:
#         int: Number of journeys in the sequence
#     """
#     j_steps = cus_df['journey_steps_until_end']
#     ones = [i for i, x in enumerate(j_steps) if x == 1]
#     return len(ones), max(j_steps)

# def has_discover(cust_df):
#     """Function to check if a sequence has the discovery event

#     Args:
#         cust_df (pd.DataFrame): Dataset of a certain customer (not all the dataset, just one customer)

#     Returns:
#         bool: True if the sequence has the discovery event, False otherwise
#     """
#     return 'Discover' in list(cust_df['stage'])

# def number_accounts(cust_df):
#     """Function to add the number of accounts to the dataset

#     Args:
#         cust_df (pd.DataFrame): Dataset of a certain customer (not all the dataset, just one customer)

#     Returns:
#         pd.DataFrame: Dataset with the number of accounts in a new column
#     """
#     return cust_df['account_id'].nunique()

# def has_more_one_journey(j_steps):
#     """Function to check if a sequence has repeated values

#     Args:
#         seq (list): List of values

#     Returns:
#         bool: True if there are repeated values, False otherwise
#     """
#     return len(j_steps) != len(set(j_steps))

# def most_repeated_event(cust_df):
#     """Function that returns the most repeated event in a sequence

#     Args:
#         cust_df (pd.DataFrame): Dataset of a certain customer (not all the dataset, just one customer)

#     Returns:
#         str: The most repeated event in the sequence
#     """
#     return cust_df['ed_id'].mode()[0]

# def average_length_seq(cust_df):
#     """Function to add the average length of the sequences to the dataset

#     Args:
#         cust_df (pd.DataFrame): Dataset of a certain customer (not all the dataset, just one customer)

#     Returns:
#         pd.DataFrame: Dataset with the average length of the sequences in a new column
#     """
#     new_df = cust_df.copy()
#     # Split the sequences
#     sequences = split_sequences(new_df)
#     return np.mean([len(seq) for seq in sequences])

# def has_prospecting(cust_df):
#     """Function to check if a sequence has the prospecting event

#     Args:
#         cust_df (pd.DataFrame): Dataset of a certain customer (not all the dataset, just one customer)

#     Returns:
#         bool: True if the sequence has the prospecting event, False otherwise
#     """
#     evnts = list(cust_df['ed_id'])
#     return 20 in evnts or 21 in evnts or 24 in evnts

# def has_pre_application(cust_df):
#     """Function to check if a sequence has the pre-application event

#     Args:
#         cust_df (pd.DataFrame): Dataset of a certain customer (not all the dataset, just one customer)

#     Returns:
#         bool: True if the sequence has the pre-application event, False otherwise
#     """
#     return 22 in list(cust_df['ed_id'])

# def initial_device(cust_df):
#     """Function to get the initial device of a customer
#     """
#     events = set(cust_df['event_name'])
#     phone = ['phone' in event for event in events]
#     web = ['web' in event for event in events]
    
#     if np.array(phone).any() and np.array(web).any():
#         return 3
#     elif np.array(phone).any():
#         return 1
#     elif np.array(web).any():
#         return 2
    
# def has_approved(cust_df):
#     """Function to check if a sequence has the approved event

#     Args:
#         cust_df (pd.DataFrame): Dataset of a certain customer (not all the dataset, just one customer)

#     Returns:
#         bool: True if the sequence has the approved event, False otherwise
#     """
#     x = set(cust_df['ed_id'])
#     return 15 in x or 12 in x

# def get_first_n_events(cust_df, n = 10):
#     """Function that returns the first 10 events of a sequence

#     Args:
#         cust_df (pd.DataFrame): Dataset of a certain customer (not all the dataset, just one customer)

#     Returns:
#         list: The first 10 events of the sequence, padded with np.nan if necessary
#     """
#     events = cust_df['ed_id'].head(n).tolist()
#     # Pad with np.nan if the sequence has fewer than 10 events
#     #events += [np.nan] * (10 - len(events))
#     return np.array(events)

In [None]:
# def which_milestones(cust_df):
#     """Function that returns in a tuple in the following sequence the next statemens:
#     - If the customer has applied for credit and it has been approved (milestone 1)
#     - If the customer has first purchase (milestone 2)
#     - If the customer has account activitation (milestone 3)
#     - If the customer has downpayment received (milestone 4)
#     - If the customer has downpayment cleared (milestone 5)
#     - If the customer has order shipped (milestone 6)

#     Args:
#         cust_df (_type_): _description_
#     """
#     milestones = set(cust_df['milestone_number'].unique())
#     max_milestone = max(milestones)
#     return (1 in milestones, 2 in milestones, 3 in milestones, 4 in milestones, 5 in milestones, 6 in milestones), max_milestone

# # Functions for time
# def get_idxs(cust_df, stage, milestone = -1):
#     """Function to get the indexes of a certain stage

#     Args:
#         cust_df (pd.DataFrame): Dataset of a certain customer (not all the dataset, just one customer)

#     Returns:
#         list: List with the indexes of a certain stage
#     """
#     if milestone != -1:
#         return list(cust_df[cust_df['milestone_number'] == milestone].index)
    
#     return list(cust_df[cust_df['stage'] == stage].index)

# def time_in_discover(cust_df, seconds_differences):
#     """Function to calculate the time between events

#     Args:
#         cust_df (pd.DataFrame): Dataset of a certain customer (not all the dataset, just one customer)

#     Returns:
#         list: List with the time between events
#     """
#     idxs = get_idxs(cust_df, 'Discover')
    
#     time_in = []
#     for idx in idxs:
#         if idx + 1 < len(seconds_differences):
#             time_in.append(seconds_differences[idx + 1])
#         else:
#             time_in.append(0)
#     return sum(time_in)

# def time_in_apply(cust_df, seconds_differences):
#     """Function to calculate the time between events

#     Args:
#         cust_df (pd.DataFrame): Dataset of a certain customer (not all the dataset, just one customer)

#     Returns:
#         list: List with the time between events
#     """
#     idxs = get_idxs(cust_df, 'Apply for Credit')
    
#     time_in = []
#     for idx in idxs:
#         if idx + 1 < len(seconds_differences):
#             time_in.append(seconds_differences[idx + 1])
#         else:
#             time_in.append(0)
#     return sum(time_in)

# def time_reach_milestone1(cust_df, seconds_differences):
#     """Function to calculate the time between events

#     Args:
#         cust_df (pd.DataFrame): Dataset of a certain customer (not all the dataset, just one customer)

#     Returns:
#         list: List with the time between events
#     """
#     idxs = get_idxs(cust_df, 'Apply for Credit', 1)
    
#     # sum all the times before the milestone
#     return sum(seconds_differences[1:idxs[0]+1])

* 1. Groupby Approach:     
    This avoids repeated filtering and improves efficiency.

In [None]:
# def group_by_approach(cust_df):
#     cust_df = cust_df.reset_index(drop=True)
#     # applying all the functions to get the data
#     num_journeys, max_journey = number_journeys_and_max(cust_df)
#     discover = has_discover(cust_df)
#     numb_accs = number_accounts(cust_df)
#     more_one_journey = has_more_one_journey(cust_df['journey_steps_until_end'])
#     repeated_event = most_repeated_event(cust_df)
#     avg_length_journey = average_length_seq(cust_df)
#     has_pros = has_prospecting(cust_df)
#     pre_applic = has_pre_application(cust_df)
#     device = initial_device(cust_df)
#     x = cust_df['event_timestamp'].diff().dt.total_seconds().tolist()
#     time_disc = time_in_discover(cust_df, x)
#     time_apply = time_in_apply(cust_df, x)
#     # time_milestone1 = time_reach_milestone1(cust_df, x)
    
#     milestones, max_milestone = which_milestones(cust_df)
    
#     # Creating the new data frame
#     new_df = pd.DataFrame({'num_journeys': num_journeys,
#                            'max_journey': max_journey,
#                            'discover': discover, 
#                            'number_accounts': numb_accs,
#                            'one_more_journey': more_one_journey,
#                            'most_repeated_event': repeated_event,
#                            'average_length_seq': avg_length_journey,
#                            'approved_credit': milestones[0],
#                            'first_purchase': milestones[1],
#                            'account_activitation': milestones[2],
#                            'downpayment_received': milestones[3],
#                            'downpayment_cleared': milestones[4],
#                            'order_ships': milestones[5],
#                            'max_milestone': max_milestone,
#                             'has_prospecting': has_pros,
#                             'has_pre_application': pre_applic,
#                             'initial_device': device,
#                             'time_in_discover': time_disc,
#                             'time_in_apply': time_apply,
#                             #'time_reach_milestone_1': time_milestone1,
#                            'index':[0]})
#     return new_df    

# def get_classification_dataset(data, event_defs, n_events = 10):
#     df = fingerhut_data_cleaner(data, event_defs)
#     # drop the promotion_created event
#     idxs = list(df[df['event_name'] == 'promotion_created'].index)
#     df.drop(idxs, inplace=True)
#     df.reset_index(drop=True, inplace=True)
    
#     # Grouping by the customer id and gathering the data
#     new_df = df.groupby('customer_id').apply(group_by_approach)
#     new_df.drop(columns=['index'], inplace=True)
    
#     # Adding the first n events
#     x = list(df.groupby('customer_id').apply(get_first_n_events, n = n_events))
#     new_df['first_' + str(n_events) + '_events'] = x
    
#     return new_df

* Approx Time : 1 minute or less

In [None]:
# New dataset
new_df = get_classification_dataset(data, event_defs, n_events = 3)
new_df

* This next command keeps the dataset in your local working directory.

In [None]:
new_df.to_csv('new_dataset_original_data.csv', index=False)

## Analysis

The following analysis is pending to be done.

Good ideas for further analysis:

* Go further in the analysis in the people that had applied for a credit and did not get it

* Figure out if we add rows with the same event name in a very small (but different) time difference (e.g. 1 second) to the dropping criteria. Example: