# EDA

In [0]:
# Define the folders to load (excluding forage)
folders = {
    "daphne": "dbfs:/FileStore/Fall_2024_DS625_Team_01/Fall_2024_DS625_Team_01-main/data/daphne/",
    "hybrid": "dbfs:/FileStore/Fall_2024_DS625_Team_01/Fall_2024_DS625_Team_01-main/data/hybrid/",
    "kaggle": "dbfs:/FileStore/Fall_2024_DS625_Team_01/Fall_2024_DS625_Team_01-main/data/kaggle/"
}

# Function to load and align all CSV files in each folder
def load_and_align_files(folder_path):
    # Collect all DataFrames from the folder
    dfs = []
    print(f"Checking files in folder: {folder_path}")
    for file in dbutils.fs.ls(folder_path):
        if file.name.endswith(".csv"):
            print(f"Loading file: {file.path}")
            df = spark.read.csv(file.path, header=True, inferSchema=True)
            dfs.append(df)
        else:
            print(f"Skipping non-CSV file: {file.name}")
    
    # Verify if any DataFrames were loaded
    if not dfs:
        print("No CSV files found in the folder.")
        return None
    
    # Align columns by renaming or filling missing columns as necessary
    common_columns = list(set.intersection(*(set(df.columns) for df in dfs)))
    dfs_aligned = [df.select(*[col for col in common_columns if col in df.columns]) for df in dfs]
    
    # Union all aligned DataFrames
    unified_df = dfs_aligned[0]
    for df in dfs_aligned[1:]:
        unified_df = unified_df.unionByName(df, allowMissingColumns=True)
    return unified_df

# Load and perform EDA for each folder
for folder_name, folder_path in folders.items():
    # Load and align all files in the folder
    df = load_and_align_files(folder_path)

    # Check if DataFrame is None (indicating no data loaded)
    if df is None:
        print(f"No data loaded for {folder_name.capitalize()} folder. Skipping.")
        continue

    # Print schema and show a sample of the data
    print(f"\nSchema and data preview for {folder_name.capitalize()} folder:")
    df.printSchema()
    df.show(5)

Checking files in folder: dbfs:/FileStore/Fall_2024_DS625_Team_01/Fall_2024_DS625_Team_01-main/data/daphne/
Loading file: dbfs:/FileStore/Fall_2024_DS625_Team_01/Fall_2024_DS625_Team_01-main/data/daphne/Fig. 01-06 (also 7.3).csv
Loading file: dbfs:/FileStore/Fall_2024_DS625_Team_01/Fall_2024_DS625_Team_01-main/data/daphne/Fig. 01-07.csv
Loading file: dbfs:/FileStore/Fall_2024_DS625_Team_01/Fall_2024_DS625_Team_01-main/data/daphne/Fig. 02-01.csv
Loading file: dbfs:/FileStore/Fall_2024_DS625_Team_01/Fall_2024_DS625_Team_01-main/data/daphne/Fig. 02-02.csv
Loading file: dbfs:/FileStore/Fall_2024_DS625_Team_01/Fall_2024_DS625_Team_01-main/data/daphne/Fig. 02-09.csv
Loading file: dbfs:/FileStore/Fall_2024_DS625_Team_01/Fall_2024_DS625_Team_01-main/data/daphne/Fig. 02-11.csv
Loading file: dbfs:/FileStore/Fall_2024_DS625_Team_01/Fall_2024_DS625_Team_01-main/data/daphne/Fig. 02-15.csv
Loading file: dbfs:/FileStore/Fall_2024_DS625_Team_01/Fall_2024_DS625_Team_01-main/data/daphne/Fig. 03-01.csv
L

In [0]:
# Load and inspect each file before union
def load_and_inspect_files(folder_path):
    dfs = []
    print(f"Inspecting files in folder: {folder_path}")
    for file in dbutils.fs.ls(folder_path):
        if file.name.endswith(".csv"):
            print(f"Loading file: {file.path}")
            df = spark.read.csv(file.path, header=True, inferSchema=True)
            print(f"Schema for {file.name}:")
            df.printSchema()
            print(f"Data sample from {file.name}:")
            df.show(5)
            dfs.append(df)
    return dfs

# Inspect each folder
for folder_name, folder_path in folders.items():
    print(f"\nInspecting folder: {folder_name}")
    load_and_inspect_files(folder_path)


Inspecting folder: daphne
Inspecting files in folder: dbfs:/FileStore/Fall_2024_DS625_Team_01/Fall_2024_DS625_Team_01-main/data/daphne/
Loading file: dbfs:/FileStore/Fall_2024_DS625_Team_01/Fall_2024_DS625_Team_01-main/data/daphne/Fig. 01-06 (also 7.3).csv
Schema for Fig. 01-06 (also 7.3).csv:
root
 |-- Year: string (nullable = true)
 |-- Species: string (nullable = true)
 |-- Beak length: string (nullable = true)
 |-- Beak depth: double (nullable = true)
 |-- Beak width: double (nullable = true)
 |-- CI Beak length: double (nullable = true)
 |-- CI Beak depth: double (nullable = true)
 |-- CI Beak width: double (nullable = true)

Data sample from Fig. 01-06 (also 7.3).csv:
+----+-------+-----------+----------+----------+--------------+-------------+-------------+
|Year|Species|Beak length|Beak depth|Beak width|CI Beak length|CI Beak depth|CI Beak width|
+----+-------+-----------+----------+----------+--------------+-------------+-------------+
|1973| fortis|      10.76|      9.48|   

# 1. Data Cleaning and Preparation

**Load and display data from each folder**

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os

# Initialize Spark session
spark = SparkSession.builder.appName("Data Loading").getOrCreate()

# Define base directory and folders
base_directory = "dbfs:/FileStore/Fall_2024_DS625_Team_01/Fall_2024_DS625_Team_01-main/data/"
folders = ["daphne", "hybrid", "kaggle"]

# Function to load and display data from each CSV file in a given folder
def load_folder_data(folder_path):
    # List all files in the folder
    files = dbutils.fs.ls(folder_path)
    
    for file in files:
        if file.name.endswith(".csv"):
            # Load the CSV file and handle ellipses or null values
            file_path = os.path.join(folder_path, file.name)
            try:
                df = spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)
                # Replace any ellipsis values with null
                df = df.replace("...", None)
                
                # Show schema and first few rows
                print(f"Schema for {file.name}:")
                df.printSchema()
                print(f"Data sample from {file.name}:")
                df.show(5)
                
            except Exception as e:
                print(f"Error loading file {file.name}: {e}")

# Loop through each folder and load data
for folder in folders:
    folder_path = os.path.join(base_directory, folder)
    print(f"\nInspecting folder: {folder}")
    load_folder_data(folder_path)


Inspecting folder: daphne
Schema for Fig. 01-06 (also 7.3).csv:
root
 |-- Year: string (nullable = true)
 |-- Species: string (nullable = true)
 |-- Beak length: string (nullable = true)
 |-- Beak depth: double (nullable = true)
 |-- Beak width: double (nullable = true)
 |-- CI Beak length: double (nullable = true)
 |-- CI Beak depth: double (nullable = true)
 |-- CI Beak width: double (nullable = true)

Data sample from Fig. 01-06 (also 7.3).csv:
+----+-------+-----------+----------+----------+--------------+-------------+-------------+
|Year|Species|Beak length|Beak depth|Beak width|CI Beak length|CI Beak depth|CI Beak width|
+----+-------+-----------+----------+----------+--------------+-------------+-------------+
|1973| fortis|      10.76|      9.48|      8.69|         0.097|         0.13|        0.081|
|1974| fortis|      10.72|      9.42|      8.66|         0.144|         0.17|        0.112|
|1975| fortis|      10.57|      9.19|      8.55|         0.075|        0.084|        0.

**Data Cleaning and Standardization**

In [0]:
# Define a dictionary to map inconsistent column names to standardized ones
column_mapping = {
    "Year": "Year", "year": "Year", "Capture_year": "Year", "Years": "Year",
    "Species": "Species", "species": "Species", "Species1": "Species",
    "Beak length": "BeakLength", "Beak length, mm": "BeakLength", "blength": "BeakLength",
    "Beak depth": "BeakDepth", "Beak depth, mm": "BeakDepth", "bdepth": "BeakDepth",
    "Rain, mm": "Rainfall", "Rainfall_CaptureYear": "Rainfall",
    "CI Beak length": "CI_BeakLength", "CI Beak depth": "CI_BeakDepth",
    # Add additional mappings as needed for relevant columns across files
}

# Function to apply standardized column names and replace ellipses or dots with NULL
def clean_and_standardize(df):
    # Rename columns based on the mapping
    for original_col, standard_col in column_mapping.items():
        if original_col in df.columns:
            df = df.withColumnRenamed(original_col, standard_col)
    
    # Replace ellipses or dots with NULL
    df = df.replace("...", None).replace(".", None)
    
    return df

# Apply the cleaning function to each DataFrame in each folder
for folder in folders:
    folder_path = os.path.join(base_directory, folder)
    files = dbutils.fs.ls(folder_path)
    
    for file in files:
        if file.name.endswith(".csv"):
            file_path = os.path.join(folder_path, file.name)
            try:
                # Load the DataFrame
                df = spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)
                
                # Clean and standardize the DataFrame
                cleaned_df = clean_and_standardize(df)
                
                # Display the cleaned schema and first few rows
                print(f"\nCleaned Data for {file.name}:")
                cleaned_df.printSchema()
                cleaned_df.show(5)
                
            except Exception as e:
                print(f"Error processing file {file.name}: {e}")


Cleaned Data for Fig. 01-06 (also 7.3).csv:
root
 |-- Year: string (nullable = true)
 |-- Species: string (nullable = true)
 |-- BeakLength: string (nullable = true)
 |-- BeakDepth: double (nullable = true)
 |-- Beak width: double (nullable = true)
 |-- CI_BeakLength: double (nullable = true)
 |-- CI_BeakDepth: double (nullable = true)
 |-- CI Beak width: double (nullable = true)

+----+-------+----------+---------+----------+-------------+------------+-------------+
|Year|Species|BeakLength|BeakDepth|Beak width|CI_BeakLength|CI_BeakDepth|CI Beak width|
+----+-------+----------+---------+----------+-------------+------------+-------------+
|1973| fortis|     10.76|     9.48|      8.69|        0.097|        0.13|        0.081|
|1974| fortis|     10.72|     9.42|      8.66|        0.144|        0.17|        0.112|
|1975| fortis|     10.57|     9.19|      8.55|        0.075|       0.084|        0.057|
|1976| fortis|     10.64|     9.23|      8.58|        0.048|       0.053|        0.039|

**Data Cleaning: Missing Values and Data Type Correction**

In [0]:
from pyspark.sql.functions import col, when

# Function to clean and standardize data types, handle missing values, and ensure consistency
def clean_and_prepare_data(df):
    # Handle missing values by replacing "NULL" and empty strings with None
    df = df.replace(["NULL", ""], None)

    # Correct data types where possible (convert string columns that should be numeric)
    for column in df.columns:
        if "Year" in column or "year" in column:  # Convert year columns to integer
            df = df.withColumn(column, col(column).cast("int"))
        elif "size" in column or "count" in column or "Number" in column:  # Convert count-related columns to integer
            df = df.withColumn(column, col(column).cast("int"))
        elif "depth" in column or "width" in column or "length" in column:  # Convert measurements to float
            df = df.withColumn(column, col(column).cast("double"))

    return df

# Apply cleaning function to all loaded data
cleaned_data = []
for folder in folders:
    folder_path = os.path.join(base_directory, folder)
    files = dbutils.fs.ls(folder_path)
    
    for file in files:
        if file.name.endswith(".csv"):
            file_path = os.path.join(folder_path, file.name)
            df = spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)
            df = clean_and_prepare_data(df)
            cleaned_data.append((file.name, df))
            print(f"Cleaned data for {file.name}:")
            df.show(5)

Cleaned data for Fig. 01-06 (also 7.3).csv:
+----+-------+-----------+----------+----------+--------------+-------------+-------------+
|Year|Species|Beak length|Beak depth|Beak width|CI Beak length|CI Beak depth|CI Beak width|
+----+-------+-----------+----------+----------+--------------+-------------+-------------+
|1973| fortis|      10.76|      9.48|      8.69|         0.097|         0.13|        0.081|
|1974| fortis|      10.72|      9.42|      8.66|         0.144|         0.17|        0.112|
|1975| fortis|      10.57|      9.19|      8.55|         0.075|        0.084|        0.057|
|1976| fortis|      10.64|      9.23|      8.58|         0.048|        0.053|        0.039|
|1977| fortis|      10.73|      9.35|      8.63|         0.085|        0.092|        0.066|
+----+-------+-----------+----------+----------+--------------+-------------+-------------+
only showing top 5 rows

Cleaned data for Fig. 01-07.csv:
+----+--------+-----------+----------+----------+--------------+------

**Standardizing Column Names Across All DataFrames**

In [0]:
# listing folders
files = dbutils.fs.ls(base_directory)
for file in files:
    print(file.path)

dbfs:/FileStore/Fall_2024_DS625_Team_01/Fall_2024_DS625_Team_01-main/data/daphne/
dbfs:/FileStore/Fall_2024_DS625_Team_01/Fall_2024_DS625_Team_01-main/data/forage/
dbfs:/FileStore/Fall_2024_DS625_Team_01/Fall_2024_DS625_Team_01-main/data/hybrid/
dbfs:/FileStore/Fall_2024_DS625_Team_01/Fall_2024_DS625_Team_01-main/data/kaggle/


In [0]:
# Function to load, clean, and standardize data
def standardize_column_names(df):
    standardized_column_mapping = {
        "Year": "Year", "year": "Year", "Capture_year": "Year", "Years": "Year",
        "Species": "Species", "species": "Species", "Species1": "Species",
        "Beak length": "BeakLength", "Beak length, mm": "BeakLength", "blength": "BeakLength",
        "Beak depth": "BeakDepth", "Beak depth, mm": "BeakDepth", "bdepth": "BeakDepth",
        "Rain, mm": "Rainfall", "Rainfall_CaptureYear": "Rainfall",
        "CI Beak length": "CI_BeakLength", "CI Beak depth": "CI_BeakDepth",
        # Add additional mappings as necessary
    }
    
    # Rename columns based on standardized mapping
    for original_col, standard_col in standardized_column_mapping.items():
        if original_col in df.columns:
            df = df.withColumnRenamed(original_col, standard_col)
    
    # Replace ellipses or dots with NULL
    df = df.replace("...", None).replace(".", None)
    
    return df

# Apply the function to standardize column names for each file in each folder
standardized_data = []
for folder in folders:
    folder_path = os.path.join(base_directory, folder)
    files = dbutils.fs.ls(folder_path)
    
    for file in files:
        if file.name.endswith(".csv"):
            file_path = os.path.join(folder_path, file.name)
            try:
                # Load the DataFrame
                df = spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)
                
                # Standardize the column names
                df = standardize_column_names(df)
                
                # Store standardized DataFrame for further processing
                standardized_data.append((file.name, df))
                
                # Display standardized schema and sample data
                print(f"\nStandardized Data for {file.name}:")
                df.printSchema()
                df.show(5)
                
            except Exception as e:
                print(f"Error processing file {file.name}: {e}")


Standardized Data for Fig. 01-06 (also 7.3).csv:
root
 |-- Year: string (nullable = true)
 |-- Species: string (nullable = true)
 |-- BeakLength: string (nullable = true)
 |-- BeakDepth: double (nullable = true)
 |-- Beak width: double (nullable = true)
 |-- CI_BeakLength: double (nullable = true)
 |-- CI_BeakDepth: double (nullable = true)
 |-- CI Beak width: double (nullable = true)

+----+-------+----------+---------+----------+-------------+------------+-------------+
|Year|Species|BeakLength|BeakDepth|Beak width|CI_BeakLength|CI_BeakDepth|CI Beak width|
+----+-------+----------+---------+----------+-------------+------------+-------------+
|1973| fortis|     10.76|     9.48|      8.69|        0.097|        0.13|        0.081|
|1974| fortis|     10.72|     9.42|      8.66|        0.144|        0.17|        0.112|
|1975| fortis|     10.57|     9.19|      8.55|        0.075|       0.084|        0.057|
|1976| fortis|     10.64|     9.23|      8.58|        0.048|       0.053|        0

**Saving Cleaned Data**

In [0]:
# Define the new directory for cleaned data
cleaned_data_directory = "/dbfs/FileStore/Fall_2024_DS625_Team_01/cleaned_data"

# Ensure the directory exists
dbutils.fs.mkdirs(cleaned_data_directory)

# Save each cleaned DataFrame to the new directory
for file_name, df in cleaned_data:
    # Remove file extension from original name to avoid duplicate extensions
    cleaned_file_name = file_name.replace('.csv', '_cleaned.csv')
    cleaned_file_path = os.path.join(cleaned_data_directory, cleaned_file_name)
    
    # Write DataFrame as CSV with overwrite mode
    df.write.mode("overwrite").option("header", "true").csv(cleaned_file_path)
    print(f"Saved cleaned data for {file_name} to {cleaned_file_path}")

Saved cleaned data for Fig. 01-06 (also 7.3).csv to /dbfs/FileStore/Fall_2024_DS625_Team_01/cleaned_data/Fig. 01-06 (also 7.3)_cleaned.csv
Saved cleaned data for Fig. 01-07.csv to /dbfs/FileStore/Fall_2024_DS625_Team_01/cleaned_data/Fig. 01-07_cleaned.csv
Saved cleaned data for Fig. 02-01.csv to /dbfs/FileStore/Fall_2024_DS625_Team_01/cleaned_data/Fig. 02-01_cleaned.csv
Saved cleaned data for Fig. 02-02.csv to /dbfs/FileStore/Fall_2024_DS625_Team_01/cleaned_data/Fig. 02-02_cleaned.csv
Saved cleaned data for Fig. 02-09.csv to /dbfs/FileStore/Fall_2024_DS625_Team_01/cleaned_data/Fig. 02-09_cleaned.csv
Saved cleaned data for Fig. 02-11.csv to /dbfs/FileStore/Fall_2024_DS625_Team_01/cleaned_data/Fig. 02-11_cleaned.csv
Saved cleaned data for Fig. 02-15.csv to /dbfs/FileStore/Fall_2024_DS625_Team_01/cleaned_data/Fig. 02-15_cleaned.csv
Saved cleaned data for Fig. 03-01.csv to /dbfs/FileStore/Fall_2024_DS625_Team_01/cleaned_data/Fig. 03-01_cleaned.csv
Saved cleaned data for Fig. 03-02.csv to /

# 2. Data Exploration

In [0]:
# Use dbutils to list files in Databricks
files = dbutils.fs.ls("dbfs:/FileStore/Fall_2024_DS625_Team_01/")

print("Contents of Parent Directory:")
for file in files:
    print(file.name)

Contents of Parent Directory:
Fall_2024_DS625_Team_01-main/
Fall_2024_DS625_Team_01_main.zip
cleaned_data/


In [0]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# Initialize Spark session if not already done
spark = SparkSession.builder.appName("EDA").getOrCreate()

# Path to the cleaned data directory
data_directory = "dbfs:/FileStore/Fall_2024_DS625_Team_01/cleaned_data/"

# List all CSV files in the directory
files = [file.name for file in dbutils.fs.ls(data_directory) if file.name.endswith(".csv")]

# Loop through each file for EDA
for file in files:
    # Load data
    file_path = f"{data_directory}{file}"
    df = spark.read.csv(file_path, header=True, inferSchema=True)

    # Display summary statistics
    print(f"\nSummary Statistics for {file}:")
    df.describe().show()

    # Convert to Pandas DataFrame for plotting
    pd_df = df.toPandas()
    
    # Plot distribution of numerical columns
    numerical_columns = [col for col in pd_df.columns if pd_df[col].dtype != 'object']
    if numerical_columns:
        plt.figure(figsize=(12, 8))
        for i, col in enumerate(numerical_columns, 1):
            plt.subplot(2, (len(numerical_columns) + 1) // 2, i)
            sns.histplot(pd_df[col].dropna(), kde=True)
            plt.title(f"Distribution of {col}")
        plt.suptitle(f"Distribution Analysis for {file}")
        plt.tight_layout(rect=[0, 0.03, 1, 0.95])
        plt.show()
    else:
        print(f"No numerical columns for distribution analysis in {file}")

**Summary Statistics and Distribution Analysis**

In [0]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# Initialize Spark session if not already done
spark = SparkSession.builder.appName("EDA").getOrCreate()

# Path to the cleaned data directory
data_directory = "dbfs:/FileStore/Fall_2024_DS625_Team_01/cleaned_data/"
files = [file.name for file in dbutils.fs.ls(data_directory) if file.name.endswith(".csv")]

# Loop through each file for summary statistics and distribution analysis
for file in files:
    # Load data
    file_path = f"{data_directory}{file}"
    print(f"\nLoading data for {file}...")
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    
    # Display summary statistics
    print(f"\nSummary Statistics for {file}:")
    df.describe().show()
    
    # Convert to Pandas DataFrame for distribution analysis
    pd_df = df.toPandas()
    
    # Plot distribution of numerical columns
    numerical_columns = [col for col in pd_df.columns if pd_df[col].dtype != 'object']
    if numerical_columns:
        print(f"\nPlotting distributions for {file}...")
        plt.figure(figsize=(12, 8))
        for i, col in enumerate(numerical_columns, 1):
            plt.subplot(2, (len(numerical_columns) + 1) // 2, i)
            sns.histplot(pd_df[col].dropna(), kde=True)
            plt.title(f"Distribution of {col}")
        plt.suptitle(f"Distribution Analysis for {file}")
        plt.tight_layout(rect=[0, 0.03, 1, 0.95])
        plt.show()  # Show each plot explicitly
    else:
        print(f"No numerical columns for distribution analysis in {file}")

**Trend Analysis Over Time**

In [0]:
import matplotlib.dates as mdates

# Loop through files with 'Year' or similar time-related columns
for file in files:
    file_path = f"{data_directory}{file}"
    print(f"\nLoading data for {file}...")
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    
    # Convert to Pandas for plotting
    pd_df = df.toPandas()
    
    # Check for 'Year' or other time-related columns
    time_columns = [col for col in pd_df.columns if 'year' in col.lower() or 'date' in col.lower()]
    numerical_columns = [col for col in pd_df.columns if pd_df[col].dtype != 'object' and col not in time_columns]
    
    if time_columns and numerical_columns:
        for time_col in time_columns:
            for num_col in numerical_columns:
                print(f"\nPlotting trend of {num_col} over time in {file}...")
                plt.figure(figsize=(10, 6))
                sns.lineplot(data=pd_df, x=time_col, y=num_col)
                plt.title(f"Trend of {num_col} Over Time in {file}")
                plt.xlabel(time_col)
                plt.ylabel(num_col)
                plt.gca().xaxis.set_major_locator(mdates.YearLocator())
                plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y'))
                plt.xticks(rotation=45)
                plt.tight_layout()
                plt.show()

**Comparative Analysis Between Species**

In [0]:
# Loop through files with 'Species' or similar categorical columns
for file in files:
    file_path = f"{data_directory}{file}"
    print(f"\nLoading data for {file}...")
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    
    pd_df = df.toPandas()
    
    # Check for 'Species' column and relevant numerical columns
    if 'species' in pd_df.columns:
        numerical_columns = [col for col in pd_df.columns if pd_df[col].dtype != 'object' and col != 'species']
        for num_col in numerical_columns:
            print(f"\nPlotting comparative analysis for {num_col} by species in {file}...")
            plt.figure(figsize=(10, 6))
            sns.boxplot(data=pd_df, x='species', y=num_col)
            plt.title(f"Comparative Analysis of {num_col} by Species in {file}")
            plt.xlabel("Species")
            plt.ylabel(num_col)
            plt.xticks(rotation=45)
            plt.tight_layout()
            plt.show()

**Weather and Environmental Impact Analysis**

In [0]:
# Correlate environmental factors (e.g., rainfall) with morphological traits
for file in files:
    file_path = f"{data_directory}{file}"
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    
    pd_df = df.toPandas()
    
    if 'rainfall' in pd_df.columns:
        numerical_columns = [col for col in pd_df.columns if pd_df[col].dtype != 'object' and col != 'rainfall']
        for num_col in numerical_columns:
            plt.figure(figsize=(10, 6))
            sns.scatterplot(data=pd_df, x='rainfall', y=num_col)
            plt.title(f"Impact of Rainfall on {num_col} in {file}")
            plt.xlabel("Rainfall")
            plt.ylabel(num_col)
            plt.tight_layout()
            plt.show()

**Correlations and Relationships**

In [0]:
# Calculate and plot correlation matrices for each dataset
for file in files:
    file_path = f"{data_directory}{file}"
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    
    pd_df = df.toPandas()
    
    # Select numerical columns for correlation
    numerical_columns = [col for col in pd_df.columns if pd_df[col].dtype != 'object']
    if len(numerical_columns) > 1:
        plt.figure(figsize=(12, 8))
        sns.heatmap(pd_df[numerical_columns].corr(), annot=True, cmap="coolwarm", vmin=-1, vmax=1)
        plt.title(f"Correlation Matrix for Numerical Features in {file}")
        plt.tight_layout()
        plt.show()

# 3. Data Integration for Modeling

# 4. Model Selection and Training

# 5. Evaluation and Visualization

# 6. Interpret Findings and Project Goals Alignment