# Extraction and Transformation Pipeline

# Modules

In [16]:
import pandas as pd
import numpy as np
from scipy.stats import skew
from datetime import datetime, timedelta

## Extraction

### Full Extraction

All the rows are pulled from the CSV file and loaded into the dataframe. The number of pulled rows are printed and a snapshot of the data in the rows is displayed for validation.

In [17]:
df_full = pd.read_csv("custom_data.csv", parse_dates=["Timestamp"])
print(f"Pulled {len(df_full)} rows via full extraction.")
df_full.head()

Pulled 736 rows via full extraction.


Unnamed: 0,Timestamp,Age,Primary streaming service,Hours per day,While working,Instrumentalist,Composer,Fav genre,Exploratory,Foreign languages,...,Frequency [R&B],Frequency [Rap],Frequency [Rock],Frequency [Video game music],Anxiety,Depression,Insomnia,OCD,Music effects,Permissions
0,2022-08-27 19:29:02,18.0,Spotify,3.0,Yes,Yes,Yes,Latin,Yes,Yes,...,Sometimes,Very frequently,Never,Sometimes,3.0,0.0,1.0,0.0,,I understand.
1,2022-08-27 19:57:31,63.0,Pandora,1.5,Yes,No,No,Rock,Yes,No,...,Sometimes,Rarely,Very frequently,Rarely,7.0,2.0,2.0,1.0,,I understand.
2,2022-08-27 21:28:18,18.0,Spotify,4.0,No,No,No,Video game music,No,Yes,...,Never,Rarely,Rarely,Very frequently,7.0,7.0,10.0,2.0,No effect,I understand.
3,2022-08-27 21:40:40,61.0,YouTube Music,2.5,Yes,No,Yes,Jazz,Yes,Yes,...,Sometimes,Never,Never,Never,9.0,7.0,3.0,3.0,Improve,I understand.
4,2022-08-27 21:54:47,18.0,Spotify,4.0,Yes,No,No,R&B,Yes,No,...,Very frequently,Very frequently,Never,Rarely,7.0,2.0,5.0,9.0,Improve,I understand.


It is evident that the CSV file has **736 rows**.

### Incremental Extraction

First of all, the last extraction checkpoint is set to August 28th 2022 at Midnight. Afterwards the checkpoint is loaded and converted to a datetime object for use with the pandas dataframe.

The entire the CSV file is then loaded and the checkpoint extracted is used to filter out all entries that were logged before the checkpoint retaining only the entries that appeared after the checkpoint.

The number of columns pulled are then displayed and a snapshot of the pulled dataset is displayed.

Finally, a new checkpoint is configured to the latest date present in the CSV file.

In [18]:
with open("last_extraction.txt", "w") as f:
    f.write("2022-08-28 12:00:00") 

In [19]:
with open("last_extraction.txt", "r") as f:
    last_extraction = f.read().strip()

In [20]:
df = pd.read_csv("custom_data.csv", parse_dates=["Timestamp"])
last_extraction_time = pd.to_datetime(last_extraction)
df_incremental = df[df['Timestamp'] > last_extraction_time]
print(f"Pulled {len(df_incremental)} new/updated rows since {last_extraction}.")
df_incremental.head()

Pulled 692 new/updated rows since 2022-08-28 12:00:00.


Unnamed: 0,Timestamp,Age,Primary streaming service,Hours per day,While working,Instrumentalist,Composer,Fav genre,Exploratory,Foreign languages,...,Frequency [R&B],Frequency [Rap],Frequency [Rock],Frequency [Video game music],Anxiety,Depression,Insomnia,OCD,Music effects,Permissions
44,2022-08-28 12:08:29,18.0,Spotify,4.0,Yes,No,No,Metal,Yes,Yes,...,Sometimes,Very frequently,Very frequently,Very frequently,1.0,0.0,8.0,2.0,No effect,I understand.
45,2022-08-28 12:12:35,17.0,I do not use a streaming service.,1.0,Yes,Yes,Yes,Folk,No,No,...,Rarely,Sometimes,Sometimes,Sometimes,7.0,3.0,1.0,5.0,Improve,I understand.
46,2022-08-28 12:15:02,36.0,Spotify,1.0,No,Yes,Yes,Classical,No,No,...,Never,Never,Sometimes,Never,8.0,9.0,3.0,0.0,No effect,I understand.
47,2022-08-28 12:23:52,24.0,Spotify,3.0,Yes,No,No,Rock,Yes,Yes,...,Never,Never,Very frequently,Sometimes,9.0,6.0,4.0,3.0,Improve,I understand.
48,2022-08-28 12:30:08,18.0,Spotify,5.0,Yes,No,Yes,Metal,Yes,Yes,...,Rarely,Very frequently,Very frequently,Never,7.0,9.0,5.0,0.0,Improve,I understand.


In [21]:
new_checkpoint = df['Timestamp'].max()

with open("last_extraction.txt", "w") as f:
    f.write(new_checkpoint.isoformat())
print(f"Updated last_extraction.txt to {new_checkpoint}")

Updated last_extraction.txt to 2022-11-09 01:55:20


There are **692 rows** that were recorded after 8th August 2022. The very last logged record was on 9th November 2022 at 1:55 AM.

## Transformation

### Helper Functions

In [25]:
def save_to_csv(df, filename):
    """
    Write the pandas DataFrame to a CSV file.
    """
    df.to_csv(filename, index=False)

def range_to_text(df, columns):
    """
    Map numeric values in specified columns onto intensity levels.
    [1-2: Minimal, 3-4: Mild, 5-6: Moderate, 7-8: Severe, 9-10: Critical]
    """
    df = df.copy()

    def classify(score):
        if pd.isna(score):
            return np.nan
        elif 0 <= score <= 2.5:
            return "Minimal"
        elif 2.5 < score <= 4.5:
            return "Mild"
        elif 4.5 < score <= 6.5:
            return "Moderate"
        elif 6.5 < score <= 8.5:
            return "Severe"
        elif 8.5 < score <= 10:
            return "Critical"
        else:
            return "Invalid"

    for col in columns:
        df.loc[:, col + "_label"] = df[col].apply(classify)

    return df

def handling_missing_values(df):
    """
    Impute missing values:
    - For numerical columns: use mean if symmetric, median if skewed.
    - For non-numerical/text columns: use mode.
    """
    df = df.copy()

    for col in df.columns:
        if df[col].isnull().sum() > 0:
            if pd.api.types.is_numeric_dtype(df[col]):
                col_skew = df[col].skew(skipna=True)
                if abs(col_skew) < 0.5:
                    # Use mean for symmetric distribution
                    df[col].fillna(df[col].mean())
                else:
                    # Use median for skewed distribution
                    df[col].fillna(df[col].median())
            else:
                # Use mode for non-numeric/text columns
                if not df[col].mode().empty:
                    df[col].fillna(df[col].mode()[0])
    return df

def handle_duplicates(df):
    """
    Locate and drop duplicate rows.
    """
    df = df.drop_duplicates().copy()
    return df

def drop_irrelevant_columns(df, columns):
    """
    Drop specified columns from the DataFrame.
    """
    df = df.drop(columns=columns, errors='ignore').copy()
    return df

### Transform Full Data

The following columns are irrelevant and are dropped:

- Timestamp
- Permissions

Afterwards, missing values are handled using either mean, median or mode imputation. Mean imputation is used for numerical data is normally distributed. Otherwise, if there is skewness median imputation is employed. Mode imputation is used for textual values. Duplicate rows are then dropped.

The columns of:

- Anxiety
- Depression
- Insomnia
- OCD

use a range system of 0-10. These values are not very descriptive. As a result the values are mapped onto five values:

- Minimal (0 <= x <= 2.5)
- Mild (2.5 < x <= 4.5)
- Moderate (4.5 < x <= 6.5)
- Severe (6.5 < x <= 8.5)
- Critical (8.5 < x <= 10)

Finally, the dataframe is saved to the `transformed_full.csv` file.

In [None]:
df_full = drop_irrelevant_columns(df_full, ["Timestamp", "Permissions"])
df_full = handling_missing_values(df_full)
df_full = handle_duplicates(df_full)
df_full = range_to_text(df_full, ["Anxiety", "Depression", "Insomnia", "OCD"])

full_file = "transformed_full.csv"
save_to_csv(df_full, full_file)
print(f"Saved dataframe to {full_file}")

### Transform Incremental Data

The following columns are irrelevant and are dropped:

- Timestamp
- Permissions

Afterwards, missing values are handled using either mean, median or mode imputation. Mean imputation is used for numerical data is normally distributed. Otherwise, if there is skewness median imputation is employed. Mode imputation is used for textual values. Duplicate rows are then dropped.

The columns of:

- Anxiety
- Depression
- Insomnia
- OCD

use a range system of 0-10. These values are not very descriptive. As a result the values are mapped onto five values:

- Minimal (0 <= x <= 2.5)
- Mild (2.5 < x <= 4.5)
- Moderate (4.5 < x <= 6.5)
- Severe (6.5 < x <= 8.5)
- Critical (8.5 < x <= 10)

Finally, the dataframe is saved to the `transformed_incremental.csv` file.

In [None]:
df_incremental = drop_irrelevant_columns(df_incremental, ["Timestamp", "Permissions"])
df_incremental = handling_missing_values(df_incremental)
df_incremental = handle_duplicates(df_incremental)
df_incremental = range_to_text(df_incremental, ["Anxiety", "Depression", "Insomnia", "OCD"])

incremental_file = "transformed_incremental.csv"
save_to_csv(df_incremental, incremental_file)
print(f"Saved dataframe to {incremental_file}")