# Input dataset source and database credentials

In [1]:
# Import necessary libraries
import os
import time
import logging
from datetime import datetime

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.stats import skew, boxcox

import sqlite3
import psycopg2
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine, inspect, MetaData, Table

In [2]:
def inputer_func():
    """
    Prompt the user to input necessary information for data extraction and transformation.

    Returns:
    - tuple: A tuple containing the folder path where the dataset is located,
             source database connection details (username, password, server, database name),
             and destination database connection details (username, password, server, database name).
    """
    # Dataset folderpath
    folder_path = input('Enter the folder path (without quotes): ')

    # Source database
    while True:
        try:
            SOURCE_USERNAME = input('Enter Source Username: ')
            SOURCE_PASSWORD = input('Enter Source Password: ')
            SOURCE_SERVER = input('Enter Source Server: ')
            SOURCE_DATABASE = input('Enter Source Database: ')
            # Check if all inputs are provided
            if all([SOURCE_USERNAME, SOURCE_PASSWORD, SOURCE_SERVER, SOURCE_DATABASE]):
                break
            else:
                print("Please provide all required information.")
        except Exception as e:
            print(f"Error: {e}")

    # Destination database
    while True:
        try:
            DEST_USERNAME = input('Enter Destination Username: ')
            DEST_PASSWORD = input('Enter Destination Password: ')
            DEST_SERVER = input('Enter Destination Server: ')
            DEST_DATABASE = input('Enter Destination Database: ')
            # Check if all inputs are provided
            if all([DEST_USERNAME, DEST_PASSWORD, DEST_SERVER, DEST_DATABASE]):
                break
            else:
                print("Please provide all required information.")
        except Exception as e:
            print(f"Error: {e}")

    return (folder_path, SOURCE_USERNAME, SOURCE_PASSWORD, SOURCE_SERVER, SOURCE_DATABASE,
            DEST_USERNAME, DEST_PASSWORD, DEST_SERVER, DEST_DATABASE)

EXTRACT & LOAD RAW DATA TO DATABASE (STAGING PHASE)

In [3]:
def source_data_func(folder_path):
    """
    Extract CSV data from the specified folder.
    
    Args:
    - folder_path (str): Path to the folder containing CSV files.
    
    Returns:
    - data_frames (list): List of pandas DataFrames containing data from each of the CSV files.
    """
    data_frames = []
    
    # Check if the folder path exists
    if not os.path.isdir(folder_path):
        print(f"Error: Folder path '{folder_path}' does not exist.")
        return data_frames
    
    # Iterate over files in the folder
    for file_name in os.listdir(folder_path):
        if file_name.endswith('.csv'):
            file_path = os.path.join(folder_path, file_name)
            try:
                # Read CSV file into a DataFrame
                df = pd.read_csv(file_path)
                # Add a column to store the file name
                df['file_name'] = file_name
                # Append the DataFrame to the list
                data_frames.append(df)
            except Exception as e:
                print(f"Error reading file '{file_name}': {e}")
    
    if not data_frames:
        print("No CSV files found in the specified folder.")
    
    return data_frames

In [4]:
def staging_func(data_frames, SOURCE_USERNAME, SOURCE_PASSWORD, SOURCE_SERVER, SOURCE_DATABASE):
    """
    Stages extracted data for transformation.

    Args:
    - data_frames (list): List of DataFrames containing extracted data.
    - SOURCE_USERNAME (str): Username for the source database connection.
    - SOURCE_PASSWORD (str): Password for the source database connection.
    - SOURCE_SERVER (str): Server address for the source database connection.
    - SOURCE_DATABASE (str): Name of the source database.

    Returns:
    - None
    """
    try:
        # Timer for staging data for transformation
        start_time = time.time()
        print(f"Staging data for transformation...{start_time}")
        
        # Connect to the source database
        source_conn_str = f'postgresql://{SOURCE_USERNAME}:{SOURCE_PASSWORD}@{SOURCE_SERVER}/{SOURCE_DATABASE}'
        source_engine = create_engine(source_conn_str)
        
        # Create a session to handle transactions
        Session = sessionmaker(bind=source_engine)
        session = Session()
        
        # Iterate over data frames and stage data
        for df in data_frames:
            # Add new columns 'fileimportedby' and 'fileimportdatetime' to each DataFrame
            df['fileimportedby'] = SOURCE_USERNAME
            df['fileimportdatetime'] = datetime.now()

            table_name = df['file_name'].iloc[0].split('.')[0]  # Extract table name from file name
            inspector = inspect(source_engine)
            if not inspector.has_table(table_name):
                df.head(0).to_sql(table_name, source_engine, index=False)  # Create table if it doesn't exist
            
            df.to_sql(table_name, source_engine, if_exists='append', index=False)  # Append data to table
        
        # Calculate time taken for staging
        staging_time = time.time() - start_time
        print(f"Data successfully staged. Time taken: {staging_time} seconds")
        
    except Exception as e:
        print(f"An error occurred during staging: {e}")

In [5]:
def extraction_func(SOURCE_USERNAME, SOURCE_PASSWORD, SOURCE_SERVER, SOURCE_DATABASE):
    """
    Extracts data from the source database.

    Args:
    - SOURCE_USERNAME (str): Username for the source database connection.
    - SOURCE_PASSWORD (str): Password for the source database connection.
    - SOURCE_SERVER (str): Server address for the source database connection.
    - SOURCE_DATABASE (str): Name of the source database.

    Returns:
    - list: List of DataFrames containing extracted data from the source database.
    """
    try:
        # Timer for loading data from source database
        start_time = time.time()
        print(f"Loading data from source database...{start_time} seconds")
        
        # Connect to the source database
        source_conn_str = f'postgresql://{SOURCE_USERNAME}:{SOURCE_PASSWORD}@{SOURCE_SERVER}/{SOURCE_DATABASE}'
        source_engine = create_engine(source_conn_str)
        
        # Timer for extracting data for transformation
        start_time = time.time()
        print(f"Extracting data for transformation...{start_time} seconds")
        
        # Extract data for transformation
        metadata = MetaData()
        metadata.reflect(bind=source_engine)

        # Create a session to handle transactions
        Session = sessionmaker(bind=source_engine)
        session = Session()

        extracted_data = []
        for table_name, table in metadata.tables.items():
            query = table.select()
            result = session.execute(query).fetchall()
            df = pd.DataFrame(result, columns=table.columns.keys())
            extracted_data.append(df)
        
        # Calculate time taken for extraction
        extraction_time = time.time() - start_time
        print(f"Data successfully extracted. Time taken: {extraction_time} seconds")
              
        return extracted_data
    
    except Exception as e:
        print(f"An error occurred during data extraction: {e}")
        return None

# Transformation

<b>1. Handle Duplicates:</b>

Are there duplicate rows in the dataset that need to be removed?

How will the removal of duplicates impact the analysis or modeling tasks?

In [6]:
def handle_duplicates(data_frames):
    """
    Remove duplicates from each DataFrame in the list.

    Args:
    - data_frames (list): List of DataFrames.

    Returns:
    - list: List of DataFrames with duplicates removed.
    """
    return [df.drop_duplicates() for df in data_frames]

<b>2. Handle Missing Values:</b>

Are there columns with over 90% missing values and rows with 100% missing values in the dataset?

How will the removal of duplicates impact the analysis or modeling tasks?

In [7]:
def handle_missing_values(data_frames, column_threshold=0.9):
    """
    Handle missing values in a list of DataFrames by dropping columns with over 90% missing values
    and rows that are 100% null.

    Args:
        data_frames (list): List of pandas DataFrames.
        column_threshold (float): Threshold for dropping columns based on the proportion of missing values.

    Returns:
        cleaned_data_frames (list): List of DataFrames with missing values handled.
    """
    cleaned_data_frames = []

    for df in data_frames:
        # Drop columns with over 90% missing values
        df = df.dropna(axis=1, thresh=int(column_threshold * len(df)))
        
        # Drop rows that are 100% null
        df = df.dropna(axis=0, how='all')
        
        # Reset index
        df = df.reset_index(drop=True)

        cleaned_data_frames.append(df)

    return cleaned_data_frames

<b>3. Data Type:</b> 

What are the data types of the values? (Numeric, categorical, or other?)

If numeric, are they continuous or discrete?

In [8]:
def classify_datatypes(data_frames):
    """
    Classify columns in a list of DataFrames into different data types: numeric, datetime, date, and categorical.

    Parameters:
        dataframes (list): A list of pandas DataFrames.

    Returns:
        numeric_columns_all (list): List of columns classified as numeric across all DataFrames.
        datetime_columns_all (list): List of columns classified as datetime across all DataFrames.
        date_columns_all (list): List of columns classified as date across all DataFrames.
        categorical_columns_all (list): List of columns classified as categorical across all DataFrames.
    """
    numeric_columns_all = []
    datetime_columns_all = []
    date_columns_all = []
    categorical_columns_all = []
    data_frames = data_frames.copy()

    for df in data_frames:
        numeric_columns, datetime_columns, date_columns, categorical_columns = [], [], [], []

        for col in df.columns:
            if df[col].dtype in ['float64', 'int64']:
                numeric_columns.append(col)
            elif df[col].astype(str).str.match(r'\d+/\d+/\d+ \d+:\d+:\d+ [AP]M').all():
                datetime_columns.append(col)
            elif df[col].astype(str).str.match(r'\d+/\d+/\d+').all():
                date_columns.append(col)
            else:
                categorical_columns.append(col)

        numeric_columns_all.extend(numeric_columns)
        datetime_columns_all.extend(datetime_columns)
        date_columns_all.extend(date_columns)
        categorical_columns_all.extend(categorical_columns)

    return numeric_columns_all, datetime_columns_all, date_columns_all, categorical_columns_all, data_frames

<b>4. Data Context:</b>

What do the values represent? (Measurements, counts, percentages, etc.)

What is the context or domain of the data?

In [9]:
def classify_numeric_columns(data_frames, numeric_columns_all, threshold=0.06):
    """
    Classify numeric columns in a list of DataFrames into count columns and measurement columns.
    
    Parameters:
        data_frames (list): List of DataFrames.
        numeric_columns_all (list): List of all numeric columns across all DataFrames.
        threshold (float): Threshold to differentiate count columns from measurement columns based on the proportion of unique values.
                           Columns with a proportion of unique values less than or equal to the threshold are classified as count columns.
    
    Returns:
        count_columns_all (list): List of all count columns across all DataFrames.
        measurement_columns_all (list): List of all measurement columns across all DataFrames.
        data_frames (list): A copy of the original list of DataFrames.
    """

    count_columns_all = []
    measurement_columns_all = []
    data_frames = data_frames.copy()

    for df in data_frames:
        count_columns = []
        measurement_columns = []
        
        for col in df.columns:
            if col in numeric_columns_all: # Check if column exists in list of numeric columns
                unique_ratio = df[col].nunique() / len(df[col])
                
                if df[col].astype(str).str.match(r'0\..*').any():
                    measurement_columns.append(col)
                elif unique_ratio <= threshold or col.endswith('Id'):
                    count_columns.append(col)
                else:
                    measurement_columns.append(col)
        
        count_columns_all.extend(count_columns)
        measurement_columns_all.extend(measurement_columns)

    return count_columns_all, measurement_columns_all, data_frames

<b>5. Handle Outliers:</b> 
    
Are the values within a reasonable range for the context of the project?

Are there any outliers that might skew the analysis?

In [10]:
def outlier_detector_and_decider_func(data_frames, measurement_columns_all, significance_threshold=0, removal_threshold=0.05):
    """
    Detects outliers in the given DataFrames and decides on the approach to handle them.

    Parameters:
        data_frames (list): List of pandas DataFrames containing the data.
        measurement_columns_all (list): List of all columns considered as measurement columns.
        significance_threshold (float): Threshold for considering a difference significant in outlier detection.
        removal_threshold (float): Threshold for determining whether to remove outliers or adjust their values.

    Returns:
        outlier_info (dict): Dictionary containing information about outliers and their handling approach for each column.
                             Keys: column name, Values: {'is_outlier': bool, 'correction_approach': str}.
                             'is_outlier': True if the column contains outliers, False otherwise.
                             'correction_approach': Approach to handle outliers ('remove_outliers' or 'adjust_values').
    """
    outlier_info = {}

    def detect_outliers(df, col):
        """
        Detects outliers in a column based on the given DataFrame.

        Args:
            df (pd.DataFrame): DataFrame containing the data.
            col (str): Column name to detect outliers.

        Returns:
            bool: True if outliers are detected, False otherwise.
        """
        q1 = df[col].quantile(0.25)
        q3 = df[col].quantile(0.75)
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr
        outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)]
        return not outliers.empty

    def decide_outlier_correction(df_with_outliers, df_without_outliers, col):
        """
        Decides on the approach to handle outliers in a column.

        Args:
            df_with_outliers (pd.DataFrame): DataFrame containing outliers.
            df_without_outliers (pd.DataFrame): DataFrame without outliers.
            col (str): Column name to handle outliers.

        Returns:
            dict: Information about outlier detection and handling approach.
                  {'is_outlier': bool, 'correction_approach': str}.
                  'is_outlier': True if the column contains outliers, False otherwise.
                  'correction_approach': Approach to handle outliers ('remove_outliers' or 'adjust_values').
        """
        column_data_with_outliers = df_with_outliers[col]
        column_data_without_outliers = df_without_outliers[col]

        mean_with_outliers = column_data_with_outliers.mean()
        median_with_outliers = column_data_with_outliers.median()
        mean_without_outliers = column_data_without_outliers.mean()
        median_without_outliers = column_data_without_outliers.median()

        mean_difference = mean_with_outliers - mean_without_outliers
        median_difference = median_with_outliers - median_without_outliers

        std_with_outliers = column_data_with_outliers.std()
        std_without_outliers = column_data_without_outliers.std()

        std_difference = std_with_outliers - std_without_outliers

        q75_with_outliers = column_data_with_outliers.quantile(0.75)
        q75_without_outliers = column_data_without_outliers.quantile(0.75)

        q75_difference = q75_with_outliers - q75_without_outliers

        max_with_outliers = column_data_with_outliers.max()
        min_with_outliers = column_data_with_outliers.min()

        max_without_outliers = column_data_without_outliers.max()
        min_without_outliers = column_data_without_outliers.min()

        max_difference = max_with_outliers - max_without_outliers
        min_difference = min_with_outliers - min_without_outliers

        is_outlier = (abs(mean_difference) > significance_threshold) and \
                     (abs(median_difference) > significance_threshold) and \
                     (abs(std_difference) > significance_threshold) and \
                     (abs(q75_difference) > significance_threshold) and \
                     (abs(max_difference) > significance_threshold) and \
                     (abs(min_difference) > significance_threshold)

        correction_approach = None

        if is_outlier:
            outlier_percentage = len(column_data_with_outliers) / len(column_data_without_outliers)
            if outlier_percentage > removal_threshold:
                correction_approach = 'remove_outliers'
            else:
                correction_approach = 'adjust_values'

        return {'is_outlier': is_outlier, 'correction_approach': correction_approach}

    # Detect outliers and decide on the approach for each column in each DataFrame
    for df in data_frames:
        for col in df.columns:
            if col in measurement_columns_all:
                if detect_outliers(df, col):
                    outlier_info.setdefault(col, {'is_outlier': True, 'correction_approach': None})
                else:
                    outlier_info.setdefault(col, {'is_outlier': False, 'correction_approach': None})

    return outlier_info

In [11]:
def outlier_corrector_func(data_frames, outlier_info):
    """
    Correct outliers in the given DataFrames based on the provided outlier information.

    Args:
    - dfs (list): List of DataFrames to be corrected.
    - outlier_info (dict): Dictionary containing outlier information for each column.
                           Keys: column name, Values: Dictionary with 'is_outlier' key indicating if it's an outlier.

    Returns:
    - corrected_dfs (list): List of corrected DataFrames.
    """
    
    corrected_dfs = []

    for df in data_frames:
        corrected_df = df.copy()  # Make a copy of the original DataFrame to avoid modifying it directly
        for col, col_info in outlier_info.items():
            if col_info['is_outlier']:
                if col_info['correction_approach'] == 'remove_outliers':
                    # Remove outliers from the column
                    corrected_df = corrected_df[~(corrected_df[col].isin(df[col]))]
                elif col_info['correction_approach'] == 'adjust_values':
                    # Correct the outliers in the column
                    # Replace outliers with the median of the column
                    median_value = df[col].median()
                    corrected_df[col] = df[col].apply(lambda x: median_value if col_info['is_outlier'] else x)
        corrected_dfs.append(corrected_df)

    return corrected_dfs

<b>6. Handle Skewness and Kurtosis:</b> 

Are the values within a reasonable range for the context of the project?

Is the data distribution skewed or heavily tailed?

Does the data exhibit high kurtosis?

<b>Skewness:</b>

Skewness measures the asymmetry of the distribution of values in a dataset.

*For a perfectly symmetric distribution, the skewness should be close to 0.

*Positive skewness indicates a longer right tail, so if you're aiming for symmetry, you'd want to reduce positive skewness towards 0.

*Negative skewness indicates a longer left tail, so if you're aiming for symmetry, you'd want to reduce negative skewness towards 0.


<b>Kurtosis:</b>

*Kurtosis measures the tailedness or peakedness of the distribution of values in a dataset.

*For a normal distribution, the kurtosis is 3.

*Kurtosis greater than 3 indicates heavier tails and a more peaked distribution (leptokurtic).

*Kurtosis less than 3 indicates lighter tails and a flatter distribution (platykurtic).

*In the output, for example, the skewness value for the StepTotal column is approximately 4.832214, indicating that the distribution is right-skewed.. The kurtosis value for the StepTotal column is approximately 34.200632, indicating a heavily tailed distribution with many values in the tails.

Based on these values, you can infer that the StepTotal column has a right-skewed distribution (positive skewness) and a heavily tailed distribution (high kurtosis). This information provides insights into the shape and characteristics of the distribution of values in the dataset.

Transformations that can be applied to correct skewness and kurtosis:

For Skewness:

*Right-skewed data (positive skewness): square root, logarithm, or reciprocal to reduce the skewness towards 0.

*Left-skewed data (negative skewness), you can apply transformations such as square or cube to reduce the skewness towards 0.

For Kurtosis:

*To reduce excessive kurtosis: square root, logarithm, or reciprocal to flatten the tails and reduce the peakiness of the distribution.

*If the distribution is too flat(low kurtosis): squaring or cubing to increase kurtosis.

In [12]:
def skewness_kurtosis_decider_func(dfs, measurement_column_all, significance_threshold=3):
    """
    Decide whether columns in a list of DataFrames have significant skewness or kurtosis.
    
    Args:
    - dfs (list): List of DataFrames containing the data.
    - measurement_column_all (list): List of column names to process.
    - significance_threshold (float): Threshold value to determine the significance of skewness or kurtosis.
    
    Returns:
    - columns_with_significant_skewness (list): List of columns with significant skewness.
    - columns_with_significant_kurtosis (list): List of columns with significant kurtosis.
    - columns_without_significant_skewness (list): List of columns without significant skewness.
    - columns_without_significant_kurtosis (list): List of columns without significant kurtosis.
    """
    columns_with_significant_skewness = []
    columns_with_significant_kurtosis = []
    columns_without_significant_skewness = []
    columns_without_significant_kurtosis = []
    
    for df in dfs:
        if isinstance(df, pd.DataFrame):  # Check if df is a DataFrame
            for col in measurement_column_all:
                if col in df.columns:
                    # Get the column data
                    column_data = df[col]
                    
                    # Calculate skewness and kurtosis
                    skewness = column_data.skew()
                    kurtosis = column_data.kurtosis()
                    
                    # Determine significant skewness
                    if abs(skewness) > significance_threshold:
                        columns_with_significant_skewness.append((col, skewness))
                    else:
                        columns_without_significant_skewness.append((col, skewness))
                    
                    # Determine significant kurtosis
                    if abs(kurtosis) > significance_threshold:
                        columns_with_significant_kurtosis.append((col, kurtosis))
                    else:
                        columns_without_significant_kurtosis.append((col, kurtosis))
                else:
                    pass  # Column not found in the DataFrame
        else:
            print("Input is not a DataFrame.")
    
    return (columns_with_significant_skewness, columns_with_significant_kurtosis,
            columns_without_significant_skewness, columns_without_significant_kurtosis)

In [13]:
def transform_skewness_kurtosis_func(data_frames, columns_with_significant_skewness, columns_with_significant_kurtosis):
    """
    Transforms the data frames to reduce skewness and kurtosis in specified columns.

    Args:
    - data_frames (list): List of pandas DataFrames to be transformed.
    - columns_with_significant_skewness (list): List of column names with significant skewness.
    - columns_with_significant_kurtosis (list): List of column names with significant kurtosis.

    Returns:
    - list: List of transformed pandas DataFrames.
    """
    transformed_dfs = []
    for df in data_frames:
        transformed_df = df.copy()
        for col in df.columns:
            if col in columns_with_significant_skewness:
                # Plot histogram before skewness transformation
                sns.histplot(df[col], bins=20, color='blue', alpha=0.7)
                plt.title(f'Histogram of {col} before skewness transformation')
                plt.xlabel(col)
                plt.ylabel('Frequency')
                plt.show()

                # Perform skewness transformation
                skewness = skew(df[col])
                if skewness > 0:
                    transformed_df[col] = np.log1p(df[col])
                elif skewness < 0:
                    transformed_df[col] = np.sqrt(df[col])

                # Plot histogram after skewness transformation
                sns.histplot(transformed_df[col], bins=20, color='red', alpha=0.7)
                plt.title(f'Histogram of {col} after skewness transformation')
                plt.xlabel(col)
                plt.ylabel('Frequency')
                plt.show()

            if col in columns_with_significant_kurtosis:
                # Plot histogram before kurtosis transformation
                sns.histplot(df[col], bins=20, color='blue', alpha=0.7)
                plt.title(f'Histogram of {col} before kurtosis transformation')
                plt.xlabel(col)
                plt.ylabel('Frequency')
                plt.show()

                # Perform kurtosis transformation
                transformed_df[col] = boxcox(df[col])[0]

                # Plot histogram after kurtosis transformation
                sns.histplot(transformed_df[col], bins=20, color='red', alpha=0.7)
                plt.title(f'Histogram of {col} after kurtosis transformation')
                plt.xlabel(col)
                plt.ylabel('Frequency')
                plt.show()

        transformed_dfs.append(transformed_df)
    
    return transformed_dfs

<b>7. Data encoding:</b>
    
Do boolean values need to be encoded to 1 for True and 0 for False?

How will data encoding of boolean/or categorical values impact data interpretation and analysis?

Do you need to add comments to indicate the meaning of True (1) and False (0) in the respective columns?

I encoded the boolean values in the 'boolean_column' as integers, where True is represented as 1 and False as 0."

In [14]:
def data_encoding_func(cleansed_data_frames_skewness_kurtosis, categorical_columns_all):
    """
    Encode categorical columns in the given DataFrames.

    Args:
    - cleansed_data_frames_skewness_kurtosis (list): List of DataFrames to be encoded.
    - categorical_columns_all (list): List of column names to encode as categorical.

    Returns:
    - encoded_data_frames (list): List of encoded DataFrames.
    """
    encoded_data_frames = []

    for df in cleansed_data_frames_skewness_kurtosis:
        encoded_df = df.copy()  # Make a copy of the original DataFrame to avoid modifying it directly
        for col in df.columns:
            if col in categorical_columns_all:
                if df[col].dtype == 'bool':
                    # Convert boolean values to integer (0 or 1)
                    encoded_df[col] = df[col].astype(int)
                else:
                    pass
        encoded_data_frames.append(encoded_df)

    return encoded_data_frames

<b>8. Datetime Decomposition:</b>

This term describes the process of breaking down a datetime variable into its constituent parts, such as date and time components.

Do datetime values need to be split into separate date and time columns?

How will splitting datetime values impact subsequent analysis or modeling tasks?

In [15]:
def datetime_decomposer_func(cleansed_data_frames_encoding, datetime_columns_all, date_columns_all):
    """
    Decompose datetime columns into date, time, and period_of_day components.

    Args:
    - cleansed_data_frames_encoding (list): List of DataFrames to be decomposed.
    - datetime_columns_all (list): List of column names representing datetime values.
    - date_columns_all (list): List of column names representing date values.

    Returns:
    - decomposed_data_frames (list): List of DataFrames with decomposed datetime columns.
    """
    decomposed_data_frames = []

    for df in cleansed_data_frames_encoding:
        decomposed_df = df.copy()  # Make a copy of the original DataFrame to avoid modifying it directly
        for col in df.columns:
            if col in datetime_columns_all:
                # Decompose datetime column into date, time, and period_of_day components
                decomposed_df[['date', 'time', 'period_of_day']] = df[col].astype(str).str.split(' ', expand=True)
                decomposed_df['date'] = pd.to_datetime(decomposed_df['date'])
                decomposed_df['time'] = pd.to_datetime(decomposed_df['time'], format='%H:%M:%S').dt.time
            elif col in date_columns_all:
                # Convert date column to datetime type
                decomposed_df[col] = pd.to_datetime(df[col])
            else:
                pass  # Column not to be decomposed
        decomposed_data_frames.append(decomposed_df)


    return decomposed_data_frames

<b>9. Row Identification:</b>

This term describes the process of assigning a unique identifier to each row in a dataset.

Is it necessary to add a unique identifier to each row in the dataset?

How will the addition of a unique ID affect data manipulation and analysis?

"I performed row identification by adding a unique ID to each row in the DataFrame."

In [16]:
def generate_unique_id(cleansed_data_frames_datetime):
    """
    Generate a unique ID based on the file name and auto-incrementing serial number for each row
    and move the 'file_name' column to the last position in each DataFrame in the list.

    Args:
    - cleansed_data_frames_datetime (list): List of DataFrames to which unique IDs will be added.

    Returns:
    - list: List of DataFrames with unique IDs added as indices for each row and 'file_name'
    column moved to the last position.
    """
    cleansed_data_frames_unique_id = []
    
    for df in cleansed_data_frames_datetime:
        if 'file_name' in df.columns:
            file_name = df['file_name'].iloc[0].split('.')[0]  # Extract file name
            initials = ''.join(word[:2].upper() for word in file_name.split('_'))
            serial_numbers = range(1, len(df) + 1)
            unique_ids = [f'{initials}{i}' for i in serial_numbers]
            df.insert(0, 'unique_id', unique_ids)  # Insert 'unique_id' column at the first position

        # Rearrange columns to ensure 'file_name' is at the last position
        if 'file_name' in df.columns:
            columns = list(df.columns)  # Get list of current columns
            columns.remove('file_name')  # Remove 'file_name' from the list
            columns.append('file_name')  # Append 'file_name' to the end of the list
            df = df[columns]  # Update DataFrame with the new column order
            
        cleansed_data_frames_unique_id.append(df)

    
    return cleansed_data_frames_unique_id

In [17]:
def transformation_func(extracted_datax):
    """
    Perform data transformation pipeline on the extracted data.

    Args:
    - extracted_datax: Data extracted from the source.

    Returns:
    - transformed_data_frames: Transformed data frames ready for analysis or modeling.
    """
    # Step 1: Clean Duplicates
    cleansed_data_frames_duplicates = handle_duplicates(extracted_datax)

    # Step 2: Clean Missing Values
    cleansed_data_frames_missing_values = handle_missing_values(cleansed_data_frames_duplicates)

    # Step 3: Classify Data Types
    numeric_columns_all, datetime_columns_all, date_columns_all, categorical_columns_all, cleansed_data_frames_datatypes = classify_datatypes(cleansed_data_frames_missing_values)

    # Step 4: Categorize Numeric Columns
    count_columns_all, measurement_columns_all, cleansed_data_frames_classify_numbers = classify_numeric_columns(cleansed_data_frames_datatypes, numeric_columns_all)

    # Step 5: Handle Outliers
    outlier_info = outlier_detector_and_decider_func(cleansed_data_frames_classify_numbers, measurement_columns_all)
    cleansed_data_frames_outlier = outlier_corrector_func(cleansed_data_frames_classify_numbers, outlier_info)

    # Step 6: Handle Skewness and Kurtosis
    columns_with_significant_skewness, columns_with_significant_kurtosis, columns_without_significant_skewness, columns_without_significant_kurtosis = skewness_kurtosis_decider_func(cleansed_data_frames_outlier, measurement_columns_all)
    cleansed_data_frames_skewness_kurtosis = transform_skewness_kurtosis_func(cleansed_data_frames_outlier, columns_with_significant_skewness, columns_with_significant_kurtosis)

    # Step 7: Encode Boolean Values
    cleansed_data_frames_encoding = data_encoding_func(cleansed_data_frames_skewness_kurtosis, categorical_columns_all)

    # Step 8: Decompose Datetime
    cleansed_data_frames_datetime = datetime_decomposer_func(cleansed_data_frames_encoding, datetime_columns_all, date_columns_all)

    # Step 9: Row Identification (if applicable)
    cleansed_data_frames_row_identification = generate_unique_id(cleansed_data_frames_datetime)

    # Step 10: Return Transformed Data Frames
    transformed_data_frames = cleansed_data_frames_datetime

    return transformed_data_frames


# 10. Load data to Warehouse

In [18]:
def load_transformed_data_func(transformed_dataframe_list, DEST_USERNAME, DEST_PASSWORD, DEST_SERVER, DEST_DATABASE):
    """
    Loads transformed data into the destination PostgreSQL database.

    Args:
    - transformed_dataframe_list (list): List of DataFrames containing transformed data.
    - DEST_USERNAME (str): Username for the destination database connection.
    - DEST_PASSWORD (str): Password for the destination database connection.
    - DEST_SERVER (str): Server address for the destination database connection.
    - DEST_DATABASE (str): Name of the destination database.

    Returns:
    - None
    """
    # Set up logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    # Destination database connection string
    dest_conn_str = f'postgresql://{DEST_USERNAME}:{DEST_PASSWORD}@{DEST_SERVER}/{DEST_DATABASE}'

    try:
        # Establish connection to the destination database
        dest_engine = create_engine(dest_conn_str)

        # Add new columns 'fileimportedby' and 'fileimportdatetime' to each DataFrame
        for df in transformed_dataframe_list:
            df['fileimportedby'] = DEST_USERNAME
            df['fileimportdatetime'] = datetime.now()

        # Create a sessionmaker to handle transactions
        Session = sessionmaker(bind=dest_engine)

        # Loop through each DataFrame in transformed_dataframe_list
        for transformed_data in transformed_dataframe_list:
            # Extract the table name from the DataFrame
            table_name = transformed_data['file_name'].iloc[0].split('.')[0]

            # Check if the table already exists in the database
            table_exists = dest_engine.has_table(table_name)

            if not table_exists:
                # If the table does not exist, create it
                transformed_data.head(0).to_sql(table_name, dest_engine, index=False)

            # Load transformed data into the destination database
            with dest_engine.connect() as conn:
                # Append transformed data to the existing table
                transformed_data.to_sql(table_name, conn, if_exists='append', index=False)

        logger.info("Transformed data loaded successfully.")
    except Exception as e:
        logger.error(f"An error occurred: {str(e)}")
        raise  # Re-raise the exception for higher-level handling

In [19]:
import logging

def fitbit_etl_pipeline():
    """
    Executes the Fitbit ETL (Extract, Transform, Load) pipeline.

    1. Input dataset folder path and database credentials.
    2. Extracts data from the source dataset folder.
    3. Extracts data from the source database and stages it for transformation.
    4. Transforms the extracted data.
    5. Loads the transformed data into the destination database.

    Returns:
    - None
    """
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    try:
        # 1. Input dataset folder path and database credentials
        logger.info("Step 1: Input dataset folder path and database credentials.")
        input_result = inputer_func()
        folder_path, SOURCE_USERNAME, SOURCE_PASSWORD, SOURCE_SERVER, SOURCE_DATABASE, DEST_USERNAME, DEST_PASSWORD, DEST_SERVER, DEST_DATABASE = input_result
        
        # Validate input
        if not all(input_result):
            raise ValueError("Invalid input. Please provide all required parameters.")

        # 2. Sourcing Data phase
        logger.info("Step 2: Data sourcing phase.")
        sourced_dataframe_list = source_data_func(folder_path)

        # 3. Staging phase
        logger.info("Step 3: Staging phase.")
        staged_dataframe_list = staging_func(sourced_dataframe_list, SOURCE_USERNAME, SOURCE_PASSWORD, SOURCE_SERVER, SOURCE_DATABASE)
        
        # 4. Extraction phase
        logger.info("Step 4: Extraction phase.")
        extracted_data_list = extraction_func(SOURCE_USERNAME, SOURCE_PASSWORD, SOURCE_SERVER, SOURCE_DATABASE)

        # 5. Transformation phase
        logger.info("Step 5: Transformation phase.")
        transformed_dataframe_list = transformation_func(extracted_data_list)

        # 6. Loading phase
        logger.info("Step 6: Loading phase.")
        load_transformed_data_func(transformed_dataframe_list, DEST_USERNAME, DEST_PASSWORD, DEST_SERVER, DEST_DATABASE)
        
        logger.info("ETL process completed successfully.")
    except Exception as e:
        logger.error(f"An error occurred during ETL process: {str(e)}")
        raise

In [20]:
fitbit_etl_pipeline()

INFO:__main__:Step 1: Input dataset folder path and database credentials.


Enter the folder path (without quotes): C:\Users\admin\Desktop\Projects\Project FitBit\Fitabase Dataset
Enter Source Username: fitbase
Enter Source Password: fitbase
Enter Source Server: localhost
Enter Source Database: projectfitbase
Enter Destination Username: fitbase
Enter Destination Password: fitbase
Enter Destination Server: localhost
Enter Destination Database: projectfitbasetransformed


INFO:__main__:Step 2: Data sourcing phase.
INFO:__main__:Step 3: Staging phase.


Staging data for transformation...1709083421.5229754


INFO:__main__:Step 4: Extraction phase.


Data successfully staged. Time taken: 172.3397753238678 seconds
Loading data from source database...1709083593.8627508 seconds
Extracting data for transformation...1709083593.8627508 seconds


INFO:__main__:Step 5: Transformation phase.


Data successfully extracted. Time taken: 139.68007040023804 seconds


INFO:__main__:Step 6: Loading phase.
  table_exists = dest_engine.has_table(table_name)
INFO:__main__:Transformed data loaded successfully.
INFO:__main__:ETL process completed successfully.


In [21]:
Enter the folder path: "C:\Users\admin\Desktop\Projects\Project FitBit\Fitabase Dataset"
Enter Source Username: fitbase
Enter Source Password: fitbase
Enter Source Server: localhost
Enter Source Database: projectfitbase
Enter Destination Username: fitbase
Enter Destination Password: fitbase
Enter Destination Server: localhost
Enter Destination Database: projectfitbasetransformed

SyntaxError: invalid syntax (1150518328.py, line 1)