
# Williams FinOps Preperation: Data Preperation & Feature Engineering

This notebook serves as a comprehensive guide for the preparation of financial data, leading up to predictive analysis. We work through various stages—data loading, extraction, cleaning, consolidation, and feature engineering—to ensure the data is accurate, insightful, and ready for modeling. The intention is to create a transparent, well-documented process and is reproducible.


## Table of Contents

1. **Data Loading**
2. **Data Extraction**
3. **Data Cleaning**
4. **Data Consolidation**
5. **Final Sanity Check**
6. **Feature Engineering**
7. **Advanced Feature Engineering for Predictive Analysis**
8. **Data Inspection**
9. **Saving the Feature-Engineered DataFrame**


# Data Loading

In this section, the financial data files (10-K reports) from 2012 to 2023 were loaded into Python. Each file was accessed from DBFS using its path, and downloaded to a local temporary directory. This was implemented to keep data organized and to minimize any runtime issues when dealing with multiple large files. The goal here was to establish a robust foundation for further data manipulation, ensuring that all relevant files were properly accessed and loaded without issues.

In [0]:
import os
import pandas as pd
import numpy as np

# Helper function to clean column names
def clean_column_names(df):
    for col in df.columns:
        clean_col = col.replace(" ", "_").replace("(", "").replace(")", "").lower()
        df = df.withColumnRenamed(col, clean_col)
    return df

# Function to process each Excel file
def process_excel_file(file_path):
    try:
        # Load the Excel file using Spark's capabilities
        excel_spark_df = spark.read.format("com.crealytics.spark.excel") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .option("sheetName", "consolidated overview") \
            .load(file_path)
        
        # Clean the column names
        excel_spark_df = clean_column_names(excel_spark_df)
        
        # Get file name for table naming purposes
        file_name = os.path.basename(file_path).replace(".xls", "").replace(".xlsx", "")
        
        # Save the Spark DataFrame as a table in Databricks with the cleaned file name
        table_name = file_name
        excel_spark_df.write.mode("overwrite").saveAsTable(table_name)
        
        # Output the table creation result
        print(f"Table '{table_name}' created successfully.")
    
    except Exception as e:
        print(f"Error reading file {file_path}: {str(e)}")

# List of file paths for the Excel files
file_paths = [
    "dbfs:/FileStore/tables/Williams_10_K_2012.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2013.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2014.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2015.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2016.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2017.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2018.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2019.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2020.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2021.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2022.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2023.xls"
]

# Process all files and create the corresponding tables
for file_path in file_paths:
    process_excel_file(file_path)

# Display the tables in the database after creation
df_tables = spark.sql("SHOW TABLES")
df_tables.show()



Table 'Williams_10_K_2012' created successfully.
Table 'Williams_10_K_2013' created successfully.
Table 'Williams_10_K_2014' created successfully.
Table 'Williams_10_K_2015' created successfully.
Table 'Williams_10_K_2016' created successfully.
Table 'Williams_10_K_2017' created successfully.
Table 'Williams_10_K_2018' created successfully.
Table 'Williams_10_K_2019' created successfully.
Table 'Williams_10_K_2020' created successfully.
Table 'Williams_10_K_2021' created successfully.
Table 'Williams_10_K_2022' created successfully.
Table 'Williams_10_K_2023' created successfully.
+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|feature_engineere...|      false|
| default|  williams_10_k_2012|      false|
| default|  williams_10_k_2013|      false|
| default|  williams_10_k_2014|      false|
| default|  williams_10_k_2015|      false|
| default|  williams_10_k_2016|      false|
| default|  will

In [0]:
%sql

SHOW TABLES

database,tableName,isTemporary
default,feature_engineered_table,False
default,williams_10_k_2012,False
default,williams_10_k_2013,False
default,williams_10_k_2014,False
default,williams_10_k_2015,False
default,williams_10_k_2016,False
default,williams_10_k_2017,False
default,williams_10_k_2018,False
default,williams_10_k_2019,False
default,williams_10_k_2020,False


In [0]:
# List all files in the /FileStore/tables directory
files = dbutils.fs.ls("/FileStore/tables/")

# Print out the paths of all files in the directory
for file in files:
    print(file.path)


dbfs:/FileStore/tables/Q1_Williams_10_Q_2021.xls
dbfs:/FileStore/tables/Q1_Williams_10_Q_2022.xls
dbfs:/FileStore/tables/Q1_Williams_10_Q_2023.xls
dbfs:/FileStore/tables/Q1_Williams_10_Q_2024.xls
dbfs:/FileStore/tables/Q2_Williams_10_Q_2021.xls
dbfs:/FileStore/tables/Q2_Williams_10_Q_2022.xls
dbfs:/FileStore/tables/Q2_Williams_10_Q_2023.xls
dbfs:/FileStore/tables/Q2_Williams_10_Q_2024.xls
dbfs:/FileStore/tables/Q3_Williams_10_Q_2021.xls
dbfs:/FileStore/tables/Q3_Williams_10_Q_2022.xls
dbfs:/FileStore/tables/Q3_Williams_10_Q_2023.xls
dbfs:/FileStore/tables/Williams_10_K_2012.xls
dbfs:/FileStore/tables/Williams_10_K_2013.xls
dbfs:/FileStore/tables/Williams_10_K_2014.xls
dbfs:/FileStore/tables/Williams_10_K_2015.xls
dbfs:/FileStore/tables/Williams_10_K_2016.xls
dbfs:/FileStore/tables/Williams_10_K_2017.xls
dbfs:/FileStore/tables/Williams_10_K_2018.xls
dbfs:/FileStore/tables/Williams_10_K_2019.xls
dbfs:/FileStore/tables/Williams_10_K_2020.xls
dbfs:/FileStore/tables/Williams_10_K_2021.xls
d

# Data Extraction

Once the files were loaded, the relevant data was extracted- focusing exclusivly on the "Consolidated Overview" sheets in each report in orrder to access the relevent information in the most streamlined way. The data was transposed and cleaned to create a consistent structure for analysis. Given that each financial report varied slightly in layout, this step ensured uniformity across all datasets, allowing for efficient comparison and consolidation of data for the subsequent steps.

In [0]:
%pip install xlrd --upgrade

Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
import pandas as pd
import os

# Function to download the files to local /tmp directory
def download_file_to_tmp(dbfs_file_path):
    local_path = "/tmp/" + os.path.basename(dbfs_file_path)
    dbutils.fs.cp(dbfs_file_path, f"file:{local_path}")
    return local_path

# Function to preview the first few rows of the "Consolidated Overview" sheet
def preview_consolidated_overview(file_path):
    try:
        # Assuming all the files are .xls, you can adjust the engine if needed for other formats
        df = pd.read_excel(file_path, sheet_name='consolidated overview', engine='xlrd')
        print(f"Preview of 'Consolidated Overview' from file '{file_path}':")
        print(df.head())  # Show the first few rows of the relevant sheet
    except Exception as e:
        print(f"Error reading 'Consolidated Overview' in file {file_path}: {str(e)}")

# List of file paths for the Excel files in DBFS
file_paths = [
    "dbfs:/FileStore/tables/Williams_10_K_2012.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2013.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2014.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2015.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2016.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2017.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2018.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2019.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2020.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2021.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2022.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2023.xls"
]

# Preview the 'Consolidated Overview' sheet from each 10-K Excel file
for file_path in file_paths:
    local_file = download_file_to_tmp(file_path)
    preview_consolidated_overview(local_file)





Preview of 'Consolidated Overview' from file '/tmp/Williams_10_K_2012.xls':
              Unnamed: 0  Unnamed: 1                Unnamed: 2 Unnamed: 3  \
0  Consolidated Overview         NaN                       NaN        NaN   
1                    NaN         NaN                       NaN        NaN   
2                    NaN         NaN                       NaN        NaN   
3                    NaN         NaN  Years Ended December 31,        NaN   
4                    NaN         NaN                      2012        NaN   

   Unnamed: 4  Unnamed: 5             Unnamed: 6  Unnamed: 7  Unnamed: 8  \
0         NaN         NaN                    NaN         NaN         NaN   
1         NaN         NaN                    NaN         NaN         NaN   
2         NaN         NaN                    NaN         NaN         NaN   
3         NaN         NaN                    NaN         NaN         NaN   
4         NaN         NaN  $ Change\nfrom\n2011*         NaN         NaN   

   U

# Data Cleaning

Here, the emphasis was on removing inconsistencies and ensuring data quality. Issues like missing headers, duplicated rows, non-numeric characters in financial columns, and replaceing NaN values were addressed. Cleaning the data ensures it is both usable and accurate, which is critical when performing financial analyses. I focused heavily on making numeric columns truly numeric, standardizing column names, and imputing missing values with appropriate methods such as forward filling or default values.

In [0]:
# Function to download the files to local /tmp directory
def download_file_to_tmp(dbfs_file_path):
    local_path = "/tmp/" + os.path.basename(dbfs_file_path)
    dbutils.fs.cp(dbfs_file_path, f"file:{local_path}")
    return local_path

# Function to safely clean column names (handles NaN or float issues)
def safe_clean_column_names(df):
    # Clean column names safely, ensuring no issues with non-string types
    df.columns = [str(col).strip().replace('Unnamed:', '').replace('\n', ' ') if isinstance(col, str) else col for col in df.columns]
    return df

# Function to transpose and clean the "Consolidated Overview" sheet
def transpose_and_clean(df):
    # Drop completely empty rows and columns
    df.dropna(how='all', inplace=True)
    
    # Transpose the DataFrame to set the first column as headers
    df_transposed = df.T
    df_transposed.columns = df_transposed.iloc[0]  # Set first row as headers
    df_transposed = df_transposed.drop(df_transposed.index[0])  # Drop the original header row from data

    # Clean the column names
    df_transposed = safe_clean_column_names(df_transposed)
    
    return df_transposed

# Function to fill or clean NaN values
def clean_nan_values(df):
    # Forward fill for sequential data
    df.fillna(method='ffill', inplace=True)
    
    # Replace any remaining NaNs with 0
    df.fillna(0, inplace=True)
    
    return df

# Function to clean and process the data further
def clean_and_standardize_columns(df):
    df_cleaned = transpose_and_clean(df)
    
    # Drop unnecessary columns or rows with unnamed or empty values
    df_cleaned = df_cleaned.loc[:, ~df_cleaned.columns.str.contains('^Unnamed$', na=False)]
    
    # Remove dollar signs, commas, or other symbols from financial values
    df_cleaned.replace({'\$': '', ',': ''}, regex=True, inplace=True)
    
    # Clean NaN values
    df_cleaned = clean_nan_values(df_cleaned)
    
    return df_cleaned

# Function to preview, transpose, and clean the "Consolidated Overview" sheet
def preview_and_clean_consolidated_overview(file_path):
    try:
        # Read the "Consolidated Overview" sheet
        df = pd.read_excel(file_path, sheet_name='consolidated overview', engine='xlrd')

        # Clean and standardize the data
        df_cleaned = clean_and_standardize_columns(df)
        
        # Preview the cleaned data
        print(f"Cleaned data from 'Consolidated Overview' of file '{file_path}':")
        print(df_cleaned.head())  # Show the first few rows of the cleaned sheet
        
        return df_cleaned
    except Exception as e:
        print(f"Error processing 'Consolidated Overview' in file {file_path}: {str(e)}")
        return None

# List of file paths for the Excel files in DBFS
file_paths = [
    "dbfs:/FileStore/tables/Williams_10_K_2012.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2013.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2014.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2015.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2016.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2017.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2018.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2019.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2020.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2021.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2022.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2023.xls"
]

# Loop through each file, transpose, clean, and preview the "Consolidated Overview" sheet
for file_path in file_paths:
    local_file = download_file_to_tmp(file_path)
    preview_and_clean_consolidated_overview(local_file)


Cleaned data from 'Consolidated Overview' of file '/tmp/Williams_10_K_2012.xls':
            Consolidated Overview                      NaN   NaN         NaN  \
Unnamed: 1                    0.0                        0     0           0   
Unnamed: 2                    0.0  Years Ended December 31  2012  (Millions)   
Unnamed: 3                    0.0  Years Ended December 31  2012  (Millions)   
Unnamed: 4                    0.0  Years Ended December 31  2012  (Millions)   
Unnamed: 5                    0.0  Years Ended December 31  2012  (Millions)   

            Revenues: Service revenues Product sales  Total revenues  \
Unnamed: 1        0.0                0             0             0.0   
Unnamed: 2        0.0             2729             0             0.0   
Unnamed: 3        0.0             2729          4757          7486.0   
Unnamed: 4        0.0             2729          4757          7486.0   
Unnamed: 5        0.0             2729          4757          7486.0   

     

In [0]:
# Function to download the files to local /tmp directory
def download_file_to_tmp(dbfs_file_path):
    local_path = "/tmp/" + os.path.basename(dbfs_file_path)
    dbutils.fs.cp(dbfs_file_path, f"file:{local_path}")
    return local_path

# Function to clean column names safely
def safe_clean_column_names(df):
    df.columns = [str(col).strip().replace('Unnamed:', '').replace('\n', ' ') for col in df.columns]
    return df

# Function to transpose and clean the "Consolidated Overview" sheet
def transpose_and_clean(df):
    # Drop completely empty rows and columns
    df.dropna(how='all', inplace=True)
    
    # Transpose the DataFrame to set the first column as headers
    df_transposed = df.T
    
    # Ensure all headers are strings
    df_transposed.columns = df_transposed.iloc[0].astype(str)  # Convert first row to string for headers
    df_transposed = df_transposed.drop(df_transposed.index[0])  # Drop the original header row from data

    # Clean the column names
    df_transposed = safe_clean_column_names(df_transposed)
    
    return df_transposed

# Function to clean NaN values
def clean_nan_values(df):
    # Forward fill for sequential data
    df.fillna(method='ffill', inplace=True)
    
    # Replace any remaining NaNs with 0 or an appropriate value for your analysis
    df.fillna(0, inplace=True)
    
    return df

# Function to clean and process the data further
def clean_and_standardize_columns(df):
    df_cleaned = transpose_and_clean(df)
    
    # Drop unnecessary columns or rows with unnamed or empty values
    df_cleaned = df_cleaned.loc[:, ~df_cleaned.columns.str.contains('^Unnamed$', na=False)]
    
    # Remove dollar signs, commas, or other symbols from financial values
    df_cleaned.replace({'\$': '', ',': ''}, regex=True, inplace=True)
    
    # Clean NaN values
    df_cleaned = clean_nan_values(df_cleaned)
    
    return df_cleaned

# Function to load, transpose, and clean the "Consolidated Overview" sheet
def load_and_clean_file(file_path):
    try:
        df = pd.read_excel(file_path, sheet_name='consolidated overview', engine='xlrd')
        
        # Clean and standardize the data
        df_cleaned = clean_and_standardize_columns(df)
        
        return df_cleaned
    except Exception as e:
        print(f"Error loading file {file_path}: {e}")
        return None

# List of file paths for the Excel files in DBFS
file_paths = [
    "dbfs:/FileStore/tables/Williams_10_K_2012.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2013.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2014.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2015.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2016.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2017.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2018.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2019.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2020.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2021.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2022.xls",
    "dbfs:/FileStore/tables/Williams_10_K_2023.xls"
]

# Step 1: Load and assign each cleaned dataframe to a variable
for idx, file_path in enumerate(file_paths, start=2012):
    local_file = download_file_to_tmp(file_path)
    cleaned_df = load_and_clean_file(local_file)
    
    if cleaned_df is not None:
        # Dynamically create a variable for each year
        globals()[f'df_{idx}'] = cleaned_df
        print(f"Data for year {idx} loaded successfully")
    else:
        print(f"Error processing the file for year {idx}")

# Now, you should have variables named df_2012, df_2013, etc.

# Function to clean non-numeric values and convert columns to numeric
def clean_numeric_columns(df, columns):
    for col in columns:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce')  # Convert to numeric, set errors as NaN for non-numeric data
    return df

# Function to handle column mismatches by renaming columns
def standardize_columns(df, target_columns):
    current_columns = set(df.columns)
    if current_columns != target_columns:
        # Only keep columns that are in the target set
        df = df.loc[:, df.columns.isin(target_columns)]
        # Ensure all necessary columns exist in the DataFrame, adding missing ones with NaN values
        missing_cols = target_columns - current_columns
        for col in missing_cols:
            df[col] = pd.NA
    return df

# Function to remove duplicate rows
def remove_duplicates(df):
    return df.drop_duplicates()

# List of critical columns expected in the DataFrames
critical_columns = ['Total revenues', 'Net income (loss)', 'Interest expense', 'Operating income (loss)', 'Depreciation and amortization expenses', 'Product sales', 'Service revenues']

# Validation for column consistency and numeric conversion
for idx, df in enumerate([df_2012, df_2013, df_2014, df_2015, df_2016, df_2017, df_2018, df_2019, df_2020, df_2021, df_2022, df_2023]):
    print(f"\nValidating DataFrame for year {2012 + idx}")

    # Check if all critical columns are present
    missing_columns = [col for col in critical_columns if col not in df.columns]
    if missing_columns:
        print(f"DataFrame for year {2012 + idx} is missing columns: {missing_columns}")
        # Optionally, add missing columns with default values
        for col in missing_columns:
            df[col] = 0  # or np.nan, depending on the context

    # Proceed with numeric validation for columns that exist
    for col in critical_columns:
        if col in df.columns:
            try:
                df[col] = pd.to_numeric(df[col], errors='coerce')
                print(f"Column '{col}' in DataFrame for year {2012 + idx} is numeric.")
            except Exception as e:
                print(f"Error converting column '{col}' in DataFrame for year {2012 + idx}: {e}")

# Function to rename columns to a standard set
def rename_columns_to_standard(df):
    # Standard column names to enforce consistency
    standard_columns = [
        'Total revenues', 'Net income (loss)', 'Interest expense', 
        'Operating income (loss)', 'Depreciation and amortization expenses', 
        'Product sales', 'Service revenues'
    ]

    # Rename columns using the standard list
    df.columns = [col if col in standard_columns else 'Unknown' for col in df.columns]

    # Filter columns to keep only standard columns
    df = df[[col for col in df.columns if col in standard_columns]]

    return df

# Then call the function as planned
df = rename_columns_to_standard(df)


# Apply cleaning and standardization to each DataFrame
for year in range(2012, 2024):
    df_var_name = f'df_{year}'

    if df_var_name in globals():
        df = globals()[df_var_name]

        # Rename columns for consistency
        df = rename_columns_to_standard(df)

        # Handle missing data
        df.fillna(0, inplace=True)  # Fill with zero or use another method based on your requirements

        # Remove duplicate rows
        df = remove_duplicates(df)

        # Re-assign the cleaned DataFrame to the global variable
        globals()[df_var_name] = df
        print(f"DataFrame {df_var_name} cleaned successfully.")
    else:
        print(f"DataFrame {df_var_name} is missing or was not loaded correctly.")

# List of standard column names to enforce across all DataFrames
standard_columns = [
    'Total revenues', 'Net income (loss)', 'Interest expense', 
    'Operating income (loss)', 'Depreciation and amortization expenses', 
    'Product sales', 'Service revenues'
]

# Apply renaming to ensure consistency across all DataFrames
for year in range(2012, 2024):
    df_var_name = f'df_{year}'

    if df_var_name in globals():
        df = globals()[df_var_name]

        # Rename columns using the standard list
        df.columns = [col if col in standard_columns else 'Unknown' for col in df.columns]

        # Filter columns to keep only standard columns
        df = df[[col for col in df.columns if col in standard_columns]]

        # Re-assign the cleaned DataFrame to the global variable
        globals()[df_var_name] = df
        print(f"DataFrame {df_var_name} columns standardized successfully.")
    else:
        print(f"DataFrame {df_var_name} is missing or was not loaded correctly.")


Data for year 2012 loaded successfully
Data for year 2013 loaded successfully
Data for year 2014 loaded successfully
Data for year 2015 loaded successfully
Data for year 2016 loaded successfully
Data for year 2017 loaded successfully
Data for year 2018 loaded successfully
Data for year 2019 loaded successfully
Data for year 2020 loaded successfully
Data for year 2021 loaded successfully
Data for year 2022 loaded successfully
Data for year 2023 loaded successfully

Validating DataFrame for year 2012
Column 'Total revenues' in DataFrame for year 2012 is numeric.
Column 'Net income (loss)' in DataFrame for year 2012 is numeric.
Column 'Interest expense' in DataFrame for year 2012 is numeric.
Column 'Operating income (loss)' in DataFrame for year 2012 is numeric.
Column 'Depreciation and amortization expenses' in DataFrame for year 2012 is numeric.
Column 'Product sales' in DataFrame for year 2012 is numeric.
Column 'Service revenues' in DataFrame for year 2012 is numeric.

Validating Data

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.fillna(0, inplace=True)  # Fill with zero or use another method based on your requirements
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.fillna(0, inplace=True)  # Fill with zero or use another method based on your requirements
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.fillna(0, inplace=True)  # Fill with zero or use another method based on your requirements
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the docu

In [0]:
# Validate if column names are consistent across all DataFrames
def validate_column_consistency(dataframes):
    column_set = set(dataframes[0].columns)
    for i, df in enumerate(dataframes):
        if set(df.columns) != column_set:
            print(f"Column mismatch in DataFrame {i + 1}")
        else:
            print(f"DataFrame {i + 1} column names are consistent.")

# Validate if numeric columns contain any non-numeric data
def validate_numeric_columns(df, numeric_columns):
    for col in numeric_columns:
        if not pd.api.types.is_numeric_dtype(df[col]):
            print(f"Non-numeric data found in column '{col}'")
        else:
            print(f"Column '{col}' is numeric.")

# Validate if there are any duplicate rows
def validate_duplicates(df):
    duplicate_rows = df.duplicated().sum()
    if duplicate_rows > 0:
        print(f"{duplicate_rows} duplicate rows found.")
    else:
        print("No duplicate rows found.")

# Validate missing data in critical columns
def validate_missing_data(df, critical_columns):
    missing_data = df[critical_columns].isnull().sum()
    if missing_data.any():
        print("Missing data detected in critical columns:")
        print(missing_data[missing_data > 0])
    else:
        print("No missing data in critical columns.")

# List of numeric columns and critical columns to validate
numeric_columns = ['Total revenues', 'Net income (loss)', 'Interest expense', 'Operating income (loss)', 'Depreciation and amortization expenses']
critical_columns = ['Total revenues', 'Net income (loss)', 'Product sales', 'Service revenues']

# Example usage assuming you have a list of DataFrames (from each file)
dataframes = [df_2012, df_2013, df_2014, df_2015, df_2016, df_2017, df_2018, df_2019, df_2020, df_2021, df_2022, df_2023]  # Replace with your actual DataFrames

# Validate consistency and data integrity
validate_column_consistency(dataframes)
for df in dataframes:
    validate_numeric_columns(df, numeric_columns)
    validate_duplicates(df)
    validate_missing_data(df, critical_columns)


DataFrame 1 column names are consistent.
DataFrame 2 column names are consistent.
DataFrame 3 column names are consistent.
DataFrame 4 column names are consistent.
DataFrame 5 column names are consistent.
DataFrame 6 column names are consistent.
DataFrame 7 column names are consistent.
DataFrame 8 column names are consistent.
DataFrame 9 column names are consistent.
DataFrame 10 column names are consistent.
DataFrame 11 column names are consistent.
DataFrame 12 column names are consistent.
Column 'Total revenues' is numeric.
Column 'Net income (loss)' is numeric.
Column 'Interest expense' is numeric.
Column 'Operating income (loss)' is numeric.
Column 'Depreciation and amortization expenses' is numeric.
No duplicate rows found.
No missing data in critical columns.
Column 'Total revenues' is numeric.
Column 'Net income (loss)' is numeric.
Column 'Interest expense' is numeric.
Column 'Operating income (loss)' is numeric.
Column 'Depreciation and amortization expenses' is numeric.
No dupl

# Data Consolidation

After cleaning, the 10-K datasets from 2012 to 2023 were consolidated into a single DataFrame. This allowes a holistic view of the financial data across all years. Column consistency checks were used and ensured that any mismatches were corrected, adding missing columns where necessary. The result was a well-structured and comprehensive dataset ready for advanced analysis. This step was crucial to enable year-over-year analyses, which often drive business decisions and highlight key trends.

In [0]:
# Remove the duplicate rows for the 2012 DataFrame
df_2012 = df_2012.drop_duplicates()

# Create a list of all cleaned DataFrames
dfs = [globals()[f'df_{year}'] for year in range(2012, 2024) if f'df_{year}' in globals()]

# Concatenate all DataFrames into a single DataFrame
consolidated_df = pd.concat(dfs, ignore_index=True)

# Preview the consolidated DataFrame
print("\nConsolidated DataFrame:")
print(consolidated_df.head())



Consolidated DataFrame:
   Service revenues  Product sales  Total revenues  \
0               0.0            0.0             0.0   
1            2729.0            0.0             0.0   
2            2729.0         4757.0          7486.0   
3             197.0         -641.0          7486.0   
4               0.0            0.0          7486.0   

   Depreciation and amortization expenses  Operating income (loss)  \
0                                     0.0                      0.0   
1                                     0.0                      0.0   
2                                   756.0                   1612.0   
3                                   -95.0                   1612.0   
4                                     0.0                   1612.0   

   Interest expense  Net income (loss)  
0               0.0                0.0  
1               0.0                0.0  
2            -509.0             1065.0  
3              64.0             1065.0  
4               0.0     

In [0]:
consolidated_df = consolidated_df.loc[(consolidated_df != 0).any(axis=1)]
print("\nConsolidated DataFrame after removing rows with all zeros:")

consolidated_df.drop_duplicates(inplace=True)
print("\nConsolidated DataFrame after dropping duplicates:")
print(consolidated_df.head())





Consolidated DataFrame after removing rows with all zeros:

Consolidated DataFrame after dropping duplicates:
   Service revenues  Product sales  Total revenues  \
1            2729.0            0.0             0.0   
2            2729.0         4757.0          7486.0   
3             197.0         -641.0          7486.0   
4               0.0            0.0          7486.0   
5            2532.0            0.0          7486.0   

   Depreciation and amortization expenses  Operating income (loss)  \
1                                     0.0                      0.0   
2                                   756.0                   1612.0   
3                                   -95.0                   1612.0   
4                                     0.0                   1612.0   
5                                     0.0                   1612.0   

   Interest expense  Net income (loss)  
1               0.0                0.0  
2            -509.0             1065.0  
3              64.0 

# Final Sanity Check

To ensure everything was ready before feature engineering, a final inspection was performed. Column data types were validated, checked for missing values in critical columns, and made sure no duplicate rows were present. Additionally, I looked for any unrealistic values that could negatively impact model performance. This sanity check serves as a gatekeeping measure, ensuring only quality data moves forward into feature engineering.

In [0]:
# Final Sanity Check Function
def final_sanity_check(df, critical_columns):
    # Step 1: Column Consistency Check
    print("\n--- Column Consistency Check ---")
    consistent_columns = all(col in df.columns for col in critical_columns)
    if consistent_columns:
        print("All critical columns are present.")
    else:
        missing_columns = [col for col in critical_columns if col not in df.columns]
        print(f"Missing critical columns: {missing_columns}")

    # Step 2: Data Types Check
    print("\n--- Data Type Check for Critical Columns ---")
    for col in critical_columns:
        if col in df.columns:
            if pd.api.types.is_numeric_dtype(df[col]):
                print(f"Column '{col}' is numeric.")
            else:
                print(f"Column '{col}' is NOT numeric, found type: {df[col].dtype}")

    # Step 3: Missing Values Check
    print("\n--- Missing Values Check ---")
    missing_values = df[critical_columns].isnull().sum()
    if missing_values.sum() > 0:
        print("Missing data detected in critical columns:")
        print(missing_values[missing_values > 0])
    else:
        print("No missing data in critical columns.")

    # Step 4: Duplicate Rows Check
    print("\n--- Duplicate Rows Check ---")
    duplicate_rows = df.duplicated().sum()
    if duplicate_rows > 0:
        print(f"{duplicate_rows} duplicate rows found.")
    else:
        print("No duplicate rows found.")

    # Step 5: Range/Value Consistency Check
    print("\n--- Range/Value Consistency Check ---")
    for col in critical_columns:
        if col in df.columns and pd.api.types.is_numeric_dtype(df[col]):
            if (df[col] < 0).any() and col != 'Net income (loss)':
                print(f"Warning: Negative values detected in column '{col}' where they are not expected.")
            else:
                print(f"Column '{col}' values are within the expected range.")

# Define critical columns to check
critical_columns = [
    'Total revenues', 'Net income (loss)', 'Interest expense', 
    'Operating income (loss)', 'Depreciation and amortization expenses',
    'Product sales', 'Service revenues'
]

# Apply the final sanity check to the consolidated DataFrame
final_sanity_check(consolidated_df, critical_columns)



--- Column Consistency Check ---
All critical columns are present.

--- Data Type Check for Critical Columns ---
Column 'Total revenues' is numeric.
Column 'Net income (loss)' is numeric.
Column 'Interest expense' is numeric.
Column 'Operating income (loss)' is numeric.
Column 'Depreciation and amortization expenses' is numeric.
Column 'Product sales' is numeric.
Column 'Service revenues' is numeric.

--- Missing Values Check ---
No missing data in critical columns.

--- Duplicate Rows Check ---
No duplicate rows found.

--- Range/Value Consistency Check ---
Column 'Total revenues' values are within the expected range.
Column 'Net income (loss)' values are within the expected range.
Column 'Operating income (loss)' values are within the expected range.


# Feature Engineering

We created multiple key features to derive insights and enrich our dataset for future analysis. Some of the engineered features include:

- *YoY_Revenue_Growth:* Captures the year-over-year revenue change, providing insight into business growth over time.

- *YoY_Net_Income_Growth:* Similar to YoY_Revenue_Growth but for net income, allowing us to track profitability trends.

- *Profit_Margin and Operating_Margin:* These features were generated to understand the financial health and operational efficiency of the company.

- *Depreciation_Percent_Revenue and Interest_Expense_Percent_Revenue:* These ratios help evaluate the impact of depreciation and interest expense relative to revenue.

These features are critical for understanding both the revenue drivers and expense breakdown, setting up the dataset for more insightful analysis in the future.

In [0]:
# Feature Engineering Step
df['YoY_Revenue_Growth'] = df['Total revenues'].pct_change().fillna(0).replace([float('inf'), -float('inf')], 0)
df['YoY_Net_Income_Growth'] = df['Net income (loss)'].pct_change().fillna(0).replace([float('inf'), -float('inf')], 0)
df['Profit_Margin'] = (df['Net income (loss)'] / df['Total revenues']).fillna(0)
df['Operating_Margin'] = (df['Operating income (loss)'] / df['Total revenues']).fillna(0)
df['Depreciation_Percent_Revenue'] = (df['Depreciation and amortization expenses'] / df['Total revenues']).fillna(0)
df['Interest_Expense_Percent_Revenue'] = (df['Interest expense'] / df['Total revenues']).fillna(0)

# Print the DataFrame to check if feature-engineered columns are there
print(df.head())


             Service revenues  Product sales  Total revenues  \
Unnamed: 1                0.0            0.0             0.0   
Unnamed: 3             7026.0         2779.0         10907.0   
Unnamed: 9              490.0        -1777.0         10907.0   
Unnamed: 15               0.0            0.0         10907.0   
Unnamed: 21            6536.0         4556.0         10965.0   

             Depreciation and amortization expenses  Operating income (loss)  \
Unnamed: 1                                      0.0                      0.0   
Unnamed: 3                                   2071.0                   4311.0   
Unnamed: 9                                    -62.0                   4311.0   
Unnamed: 15                                     0.0                   4311.0   
Unnamed: 21                                  2009.0                   3018.0   

             Interest expense  Net income (loss)  YoY_Revenue_Growth  \
Unnamed: 1                0.0                0.0            0.

In [0]:
# Handling negative values where they are unexpected
columns_to_correct = [
    'Interest_Expense_Percent_Revenue',
    'Depreciation_Percent_Revenue',
    'Product sales',
    'Service revenues'
]

for col in columns_to_correct:
    if col in df.columns:
        df[col] = df[col].apply(lambda x: max(0, x))

# Print the updated DataFrame
print("\nFeature-Engineered DataFrame after handling unrealistic values:")
print(df.head())




Feature-Engineered DataFrame after handling unrealistic values:
             Service revenues  Product sales  Total revenues  \
Unnamed: 1                0.0            0.0             0.0   
Unnamed: 3             7026.0         2779.0         10907.0   
Unnamed: 9              490.0            0.0         10907.0   
Unnamed: 15               0.0            0.0         10907.0   
Unnamed: 21            6536.0         4556.0         10965.0   

             Depreciation and amortization expenses  Operating income (loss)  \
Unnamed: 1                                      0.0                      0.0   
Unnamed: 3                                   2071.0                   4311.0   
Unnamed: 9                                    -62.0                   4311.0   
Unnamed: 15                                     0.0                   4311.0   
Unnamed: 21                                  2009.0                   3018.0   

             Interest expense  Net income (loss)  YoY_Revenue_Growth 

In [0]:
import numpy as np

# Replace infinite values with NaN, and then fill them with zeros or appropriate methods.
df.replace([np.inf, -np.inf], np.nan, inplace=True)

# Handling NaN values generated due to infinite replacements.
# For YoY_Revenue_Growth and YoY_Net_Income_Growth, replacing NaNs with 0 to indicate no growth where applicable.
df['YoY_Revenue_Growth'].fillna(0, inplace=True)
df['YoY_Net_Income_Growth'].fillna(0, inplace=True)

# Replace negative values in 'Product sales', 'Service revenues', and other revenue-related columns with 0.
# Assuming negative revenues are due to data errors or are conceptually not applicable.
df['Product sales'] = df['Product sales'].apply(lambda x: max(x, 0))
df['Service revenues'] = df['Service revenues'].apply(lambda x: max(x, 0))

# Optional: You can also clip other columns to prevent unrealistic negative values if needed.
# For example, preventing negative depreciation or interest expense values.
df['Depreciation and amortization expenses'] = df['Depreciation and amortization expenses'].apply(lambda x: max(x, 0))
df['Interest expense'] = df['Interest expense'].apply(lambda x: max(x, 0))

# After cleaning, print the DataFrame to verify
print("\nFeature-Engineered DataFrame after handling unrealistic values:")
print(df.head())



Feature-Engineered DataFrame after handling unrealistic values:
             Service revenues  Product sales  Total revenues  \
Unnamed: 1                0.0            0.0             0.0   
Unnamed: 3             7026.0         2779.0         10907.0   
Unnamed: 9              490.0            0.0         10907.0   
Unnamed: 15               0.0            0.0         10907.0   
Unnamed: 21            6536.0         4556.0         10965.0   

             Depreciation and amortization expenses  Operating income (loss)  \
Unnamed: 1                                      0.0                      0.0   
Unnamed: 3                                   2071.0                   4311.0   
Unnamed: 9                                      0.0                   4311.0   
Unnamed: 15                                     0.0                   4311.0   
Unnamed: 21                                  2009.0                   3018.0   

             Interest expense  Net income (loss)  YoY_Revenue_Growth 

# Advanced Feature Engineering for Predictive Analytics

In this final step, I focused on creating advanced features that would be directly used in machine learning models. Examples include rolling averages, lagged revenue and net income values, and engineered interactions like Profit_Margin_Lag_1. These features provide temporal and trend-related information, which is essential for predictive modeling. Additionally, we checked for feature correlations to ensure no redundant features made it into our final model, reducing the chances of overfitting and improving model efficiency.

In [0]:
# Lagged Revenue and Net Income (1-year Lag)
df['Revenue_Lag_1'] = df['Total revenues'].shift(1)
df['Net_Income_Lag_1'] = df['Net income (loss)'].shift(1)

#Year-Over-Year (YoY) Growth Rates
df['YoY_Revenue_Growth'] = (df['Total revenues'] - df['Total revenues'].shift(1)) / df['Total revenues'].shift(1)
df['YoY_Net_Income_Growth'] = (df['Net income (loss)'] - df['Net income (loss)'].shift(1)) / df['Net income (loss)'].shift(1)

# Rolling 3-Year Average for Revenue
df['Revenue_Rolling_Avg_3Y'] = df['Total revenues'].rolling(window=3).mean()

# Lagged Profit Margin (1-year Lag)
df['Profit_Margin_Lag_1'] = df['Profit_Margin'].shift(1)

# Interaction Feature: Revenue and Profit Margin
df['Revenue_Profit_Margin_Interaction'] = df['Total revenues'] * df['Profit_Margin']


In [0]:
# List of feature-engineered columns
feature_columns = [
    'Revenue_Lag_1', 'Net_Income_Lag_1', 'YoY_Revenue_Growth', 
    'YoY_Net_Income_Growth', 'Revenue_Rolling_Avg_3Y', 
    'Profit_Margin_Lag_1'
]

# Replace infinite values with NaN
df.replace([float('inf'), -float('inf')], pd.NA, inplace=True)

# Impute missing values for feature-engineered columns
for feature in feature_columns:
    if df[feature].isna().sum() > 0:
        df[feature].fillna(df[feature].median(), inplace=True)  # Using median imputation

correlations = df.corr()

# Drop features if they are highly correlated with the target to avoid overfitting
correlation_threshold = 0.95
highly_correlated_features = correlations[correlations['Net income (loss)'].abs() > correlation_threshold].index.tolist()
highly_correlated_features.remove('Net income (loss)')  # Remove the target itself from this list

df.drop(columns=highly_correlated_features, inplace=True)
print(f"Features dropped due to high correlation: {highly_correlated_features}")

# Print updated DataFrame for validation
print("\n--- Updated Feature-Engineered DataFrame ---")
print(df.head())


Features dropped due to high correlation: ['Operating income (loss)', 'Profit_Margin', 'Operating_Margin', 'Revenue_Profit_Margin_Interaction']

--- Updated Feature-Engineered DataFrame ---
             Service revenues  Product sales  Total revenues  \
Unnamed: 1                0.0            0.0             0.0   
Unnamed: 3             7026.0         2779.0         10907.0   
Unnamed: 9              490.0            0.0         10907.0   
Unnamed: 15               0.0            0.0         10907.0   
Unnamed: 21            6536.0         4556.0         10965.0   

             Depreciation and amortization expenses  Interest expense  \
Unnamed: 1                                      0.0               0.0   
Unnamed: 3                                   2071.0               0.0   
Unnamed: 9                                      0.0               0.0   
Unnamed: 15                                     0.0               0.0   
Unnamed: 21                                  2009.0         

# Data Inspection

Data Inspection Plan
- Inspect Feature Completeness: Check for NaN values and ensure lagged features and rolling averages have correct data where expected.
- Check Feature Ranges: Validate that the newly created features (e.g., growth rates, lagged values) are within logical ranges.
- Data Types Verification: Verify that all columns are in the appropriate format (e.g., numeric features should be floats or integers).
- Basic Summary Statistics: Summarize statistics for the newly engineered features to check if they make sense.
- Correlation Check: Check the correlation between newly engineered features and key financial metrics.

In [0]:
# 1. Inspect Feature Completeness - Checking for NaN values
print("\n--- NaN Value Count in Feature-Engineered Columns ---")
feature_columns = [
    'Revenue_Lag_1', 'Net_Income_Lag_1', 'YoY_Revenue_Growth', 
    'YoY_Net_Income_Growth', 'Revenue_Rolling_Avg_3Y', 
    'Profit_Margin_Lag_1'
]
nan_counts = df[feature_columns].isna().sum()
print(nan_counts)

# 2. Check Feature Ranges
print("\n--- Range Check for Selected Features ---")
for feature in feature_columns:
    min_value = df[feature].min()
    max_value = df[feature].max()
    print(f"{feature}: Min = {min_value}, Max = {max_value}")

# 3. Data Types Verification
print("\n--- Data Types of Feature-Engineered Columns ---")
data_types = df[feature_columns].dtypes
print(data_types)

# 4. Basic Summary Statistics
print("\n--- Summary Statistics for Feature-Engineered Columns ---")
summary_stats = df[feature_columns].describe()
print(summary_stats)

# 5. Correlation Check
print("\n--- Correlation with 'Net income (loss)' ---")
correlations = df[feature_columns + ['Net income (loss)']].corr()
print(correlations['Net income (loss)'].sort_values(ascending=False))



--- NaN Value Count in Feature-Engineered Columns ---
Revenue_Lag_1             0
Net_Income_Lag_1          0
YoY_Revenue_Growth        0
YoY_Net_Income_Growth     0
Revenue_Rolling_Avg_3Y    0
Profit_Margin_Lag_1       0
dtype: int64

--- Range Check for Selected Features ---
Revenue_Lag_1: Min = 0.0, Max = 10965.0
Net_Income_Lag_1: Min = 0.0, Max = 3303.0
YoY_Revenue_Growth: Min = -0.030825353397172822, Max = 0.0053176858897955445
YoY_Net_Income_Growth: Min = -0.35906751438086587, Max = 0.0
Revenue_Rolling_Avg_3Y: Min = 7271.333333333333, Max = 10965.0
Profit_Margin_Lag_1: Min = 0.0, Max = 0.3028330429999083

--- Data Types of Feature-Engineered Columns ---
Revenue_Lag_1             float64
Net_Income_Lag_1          float64
YoY_Revenue_Growth        float64
YoY_Net_Income_Growth     float64
Revenue_Rolling_Avg_3Y    float64
Profit_Margin_Lag_1       float64
dtype: object

--- Summary Statistics for Feature-Engineered Columns ---
       Revenue_Lag_1  Net_Income_Lag_1  YoY_Revenue_Gr

# Saving the Feature-Engineered DataFrame

In this last section, the feature-engineered DataFrame is converted into a Spark DataFrame (spark_df) for efficient storage and persistence in the Databricks environment. The converted Spark DataFrame is then saved as a managed table using the Hive metastore, ensuring the data remains accessible even after the cluster terminates. This approach provides a reliable way to store the processed data, making it available for future analysis or machine learning without having to redo previous data engineering steps.

The key steps are:

- Convert to Spark DataFrame: Converts the Pandas DataFrame into a Spark DataFrame for distributed storage and processing.
- Save as Managed Table: Saves the DataFrame as a managed table (saveAsTable) so it is persisted across sessions and cluster restarts, ensuring data reliability and ease of access.

In [0]:
# Clean column names to remove invalid characters
df.columns = [col.strip().replace(' ', '_').replace(';', '').replace('(', '').replace(')', '').replace('{', '').replace('}', '').replace('\n', '').replace('\t', '').replace('=', '') for col in df.columns]

# Convert to Spark DataFrame
spark_df = spark.createDataFrame(df)

# Save as a managed table
spark_df.write.mode('overwrite').saveAsTable("feature_engineered_table")

