<a href="https://colab.research.google.com/github/Tomecek/bigdata_team7/blob/main/Team7_Block3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# installing jdk
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#downloading .tgz installation file for Spache spark
!wget -q https://dlcdn.apache.org/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz
# installing apache spark from downloaded file
!tar xf spark-3.5.4-bin-hadoop3.tgz
# installing findspark library
!pip install -q findspark

In [None]:
# importin necessary libraries fro this notebook
import os
import findspark
import numpy as np
import pandas as pd
from pyspark.sql.types import StringType # Import StringType from pyspark.sql.types
from pyspark.sql.window import Window # Import the Window class
from pyspark.sql.functions import when, first, col # Import necessary functions



In [None]:
#setting up paths for JDK and spark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.4-bin-hadoop3"
#initiating findspark
#findspark.init()
findspark.init()

In [None]:
from pyspark.sql import SparkSession
# Create a Spark session
spark = SparkSession.builder.appName("SparkDayOne").getOrCreate()

In [None]:
spark.getActiveSession

# Import train data

In [None]:
df_train = spark.read.csv("train.csv", header=True, inferSchema=True)
df_train.show()

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/content/train.csv.

In [None]:
# Cleaning dle SSN

prepsat_ssn_value = "#F%$D@*&8"
colums_doplnit = ["Customer_ID", "Name", "Occupation", "SSN", "Age"]
window_part = Window.partitionBy("Customer_ID")

df_filled = df_train.withColumn(
    "Name", when(col("Name").isNull(), first("Name", True).over(window_part)).otherwise(col("Name"))
).withColumn(
    "Occupation", when(col("Occupation").isNull(), first("Occupation", True).over(window_part)).otherwise(col("Occupation"))
).withColumn(
    "SSN",
    when((col("SSN").isNull()) | (col("SSN") == prepsat_ssn_value), first(when(col("SSN") != prepsat_ssn_value, col("SSN")), True).over(window_part)).otherwise(col("SSN"))
).withColumn(
    "Age", when(col("Age").isNull(), first("Age", True).over(window_part)).otherwise(col("Age"))
)

df_filled.show()

In [None]:
# prompt: Show combinations of Customer_ID and SSN where single SSN is mapped to multiple different Customer_ID values

from pyspark.sql.functions import count

# Group by SSN and count the distinct Customer_IDs
ssn_counts = df_filled.groupBy("SSN").agg(count("Customer_ID").alias("customer_count"))

# Filter for SSNs with more than one distinct Customer_ID
multiple_customer_ids = ssn_counts.filter("customer_count > 1")

# Join with the original DataFrame to get the Customer_ID and SSN combinations
result_df = multiple_customer_ids.join(df_filled, "SSN", "inner").select("Customer_ID", "SSN")

# Show the results
result_df.show()

In [None]:
# prompt: Find unique combinations of Customer_ID and SSN where a single SSN is mapped to multiple Customer_ID values

from pyspark.sql.functions import count, col

# Group by SSN and count the number of unique Customer_IDs
ssn_counts = df_filled.groupBy("SSN").agg(count("Customer_ID").alias("customer_count"))

# Filter for SSNs with more than one Customer_ID
multiple_customer_ssns = ssn_counts.filter(col("customer_count") > 1)

# Join with the original DataFrame to get the Customer_IDs associated with these SSNs
result_df = multiple_customer_ssns.join(df_train, "SSN", "inner").select("Customer_ID", "SSN")

# Show the results
result_df.orderBy(col("SSN")).show()

# Clean up underscores

In [None]:
def find_underscore_columns(spark_df):

    underscore_columns = []
    for col in spark_df.columns:
        # Check if column is of string type
        if isinstance(spark_df.schema[col].dataType, (StringType)): # Use the imported StringType
          # Convert the column to pandas series to enable string functions
          pandas_series = spark_df.select(col).toPandas()[col]
          if any('_' in str(x) for x in pandas_series):
            underscore_columns.append(col)

    return underscore_columns

# Example usage (assuming df_train is your DataFrame)
underscore_cols = find_underscore_columns(df_filled)

# Remove 'Customer_ID' if present
if 'Customer_ID' in underscore_cols:
    underscore_cols.remove('Customer_ID')

underscore_cols

In [None]:
from pyspark.sql.functions import regexp_replace

def remove_underscores_from_columns(spark_df, cols_to_modify):
    new_df = spark_df
    for col in cols_to_modify:
        new_df = new_df.withColumn(col, regexp_replace(col, "_", ""))
    return new_df

new_df_train = remove_underscores_from_columns(df_filled, underscore_cols)
new_df_train.show()

Clean from underscores

# Statistics for individual columns

In [None]:
# Identifikace numerických sloupců
numerical_cols = [field.name for field in new_df_train.schema.fields if "IntegerType" in str(field.dataType) or "DoubleType" in str(field.dataType)]

# Popis numerických sloupců
for col_name in numerical_cols:
    print(f"Statistiky pro sloupec: {col_name}")
    new_df_train.select(col_name).describe().show()

# Identify data types

In [None]:
print("Data Types:")
new_df_train.printSchema()

# Basic statistics for numerical columns

In [None]:
new_df_train.describe().show()

# Find duplicates

In [None]:
df_duplicates = new_df_train.groupBy(df_train.columns).count().filter("count > 1")
df_duplicates.show()

# Find missing values

In [None]:
from pyspark.sql.functions import col, sum

missing_values = new_df_train.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_train.columns])
missing_values.show()

In [None]:
from pyspark.sql.functions import col, when
from pyspark.sql.types import DoubleType

# Seznam sloupců, které chceme převést na číselné hodnoty
numeric_columns = ["Age", "Annual_Income", "Monthly_Inhand_Salary",
                   "Num_Bank_Accounts", "Num_Credit_Card", "Interest_Rate",
                   "Num_of_Loan", "Num_of_Delayed_Payment",
                   "Changed_Credit_Limit", "Num_Credit_Inquiries",
                   "Outstanding_Debt", "Credit_Utilization_Ratio",
                   "Credit_History_Age", "Total_EMI_per_month",
                   "Amount_invested_monthly", "Monthly_Balance"]

# Převod každého sloupce na číselný typ s ošetřením chybějících nebo neplatných hodnot
for column in numeric_columns:
    data = data.withColumn(column, col(column).cast("double"))

data.show()

# Data types after type casting

In [None]:
data.printSchema()

In [None]:
# prompt: Suggest 10 new features and add them to the "data" dataframe

from pyspark.sql.functions import rand, when, lit

# Assuming 'data' is your DataFrame (replace with the actual name if different)
# and it's already defined in your existing code.
# Example: data = new_df_train

# 1. Debt to Income Ratio
data = data.withColumn("Debt_to_Income_Ratio", col("Outstanding_Debt") / col("Annual_Income"))

# 2. Savings_Rate: Podíl mìsíèního zùstatku k mìsíènímu pøíjmu v procentech.
# Ukazuje, jak efektivnì zákazník šetøí peníze z mìsíèního pøíjmu.
data = data.withColumn(
    "Savings_Rate",
    when(col("Monthly_Inhand_Salary") > 0, (col("Monthly_Balance") / col("Monthly_Inhand_Salary")))
     .otherwise(None)
)

# 3. Total Delayed Days
data = data.withColumn(
    "Total_Delayed_Days",
    (col("Num_of_Delayed_Payment") * col("Delay_from_due_date")).cast("float")
)

# 4. EMI to Income Ratio
data = data.withColumn(
    "EMI_to_Income_Ratio",
    (col("Total_EMI_per_month") / col("Monthly_Inhand_Salary")).cast("float")
)

# 5. Loan Type Factor
data = data.withColumn(
    "Loan_Type_Factor",
    when(col("Type_of_Loan").like("%Home%"), col("Num_of_Loan") * 1.5)
    .when(col("Type_of_Loan").like("%Car%"), col("Num_of_Loan") * 1.2)
    .otherwise(col("Num_of_Loan"))
)

# 6. Financial Health Score
data = data.withColumn(
    "Financial_Health_Score",
    (col("Annual_Income") - col("Outstanding_Debt") - (col("Total_EMI_per_month") * 12)).cast("float")
)

# 7. Income to Credit Card
data = data.withColumn(
    "Income_Per_Credit_Card",
    (col("Annual_Income") / col("Num_Credit_Card")).cast("float")
)

# 8. Delays Per Loan
new_data = data.withColumn(
    "Delays_Per_Loan",
    (col("Num_of_Delayed_Payment") / col("Num_of_Loan")).cast("float")
)

new_data.show()

In [None]:
new_features = [
    "Debt_to_Income_Ratio",
    "Savings_Rate",
    "Total_Delayed_Days",
    "EMI_to_Income_Ratio",
    "Loan_Type_Factor",
    "Financial_Health_Score",
    "Income_Per_Credit_Card",
    "Delays_Per_Loan"
]

# Statistical analysis of new features

In [None]:
for col_name in new_features:
    print(f"Description for column: {col_name}")
    new_data.select(col_name).describe().show()
    print("-" * 40)

# Data types of new features

In [None]:
for col_name in new_features:
    print(f"Data type of column '{col_name}': {new_data.schema[col_name].dataType}")

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Function to plot distributions
def plot_feature_distributions(data, new_features, bins=30):
    # Convert the PySpark DataFrame to Pandas before plotting
    pandas_df = data.select(new_features).toPandas()
    for feature in new_features:
        plt.figure(figsize=(8, 6))
        sns.histplot(pandas_df[feature], kde=True, bins=bins, color='blue', alpha=0.6)
        plt.title(f'Distribution of {feature}', fontsize=14)
        plt.xlabel(feature, fontsize=12)
        plt.ylabel('Frequency', fontsize=12)
        plt.grid(True, alpha=0.3)
        plt.show()

# Call the function
plot_feature_distributions(new_data, new_features)

# Target leakage

In [None]:
target_column = "Credit_Score"  # Replace with your target variable

# 1. Correlation analysis (for numeric target)
# Convert the column to a Pandas Series for type checking
target_column_type = new_data.select(target_column).toPandas()[target_column].dtype

# Check if the target column is numeric or object (string)
if pd.api.types.is_numeric_dtype(target_column_type):
    # Convert to Pandas DataFrame for correlation calculation
    pandas_df = new_data[new_features + [target_column]].toPandas()
    correlation_matrix = pandas_df.corr()
    print("Correlation matrix:\n", correlation_matrix)

    # Plot heatmap
    plt.figure(figsize=(10, 6))
    sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm", fmt=".2f")
    plt.title("Correlation Heatmap")
    plt.show()
else:  # Handle categorical target variable
    # Convert 'Credit_Score' to numerical representation
    from pyspark.sql.functions import when

    new_data = new_data.withColumn(
        target_column,
        when(col(target_column) == "Good", 1)
        .when(col(target_column) == "Standard", 0)
        .otherwise(None)  # Handle other categories if needed
    )
    # Now you can proceed with the correlation analysis using the updated 'new_data'
    pandas_df = new_data[new_features + [target_column]].toPandas()
    correlation_matrix = pandas_df.corr()
    print("Correlation matrix:\n", correlation_matrix)

    # Plot heatmap
    plt.figure(figsize=(10, 6))
    sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm", fmt=".2f")
    plt.title("Correlation Heatmap")
    plt.show()


# 2. Distribution across target categories (for categorical target)
# ... (rest of your code remains the same)

In [None]:
all_features = [
    "Age", "Annual_Income", "Monthly_Inhand_Salary",
                   "Num_Bank_Accounts", "Num_Credit_Card", "Interest_Rate",
                   "Num_of_Loan", "Num_of_Delayed_Payment",
                   "Changed_Credit_Limit", "Num_Credit_Inquiries",
                   "Outstanding_Debt", "Credit_Utilization_Ratio",
                   "Credit_History_Age", "Total_EMI_per_month",
                   "Amount_invested_monthly", "Monthly_Balance",
    "Debt_to_Income_Ratio",
    "Savings_Rate",
    "Total_Delayed_Days",
    "EMI_to_Income_Ratio",
    "Loan_Type_Factor",
    "Financial_Health_Score",
    "Income_Per_Credit_Card",
    "Delays_Per_Loan"
]

In [None]:
target_column = "Credit_Score"  # Replace with your target variable

# 1. Correlation analysis (for numeric target)
# Convert the column to a Pandas Series for type checking
target_column_type = new_data.select(target_column).toPandas()[target_column].dtype

# Check if the target column is numeric or object (string)
if pd.api.types.is_numeric_dtype(target_column_type):
    # Convert to Pandas DataFrame for correlation calculation
    pandas_df = new_data[all_features + [target_column]].toPandas()
    correlation_matrix = pandas_df.corr()
    print("Correlation matrix:\n", correlation_matrix)

    # Plot heatmap
    plt.figure(figsize=(10, 6))
    sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm", fmt=".2f")
    plt.title("Correlation Heatmap")
    plt.show()
else:  # Handle categorical target variable
    # Convert 'Credit_Score' to numerical representation
    from pyspark.sql.functions import when

    new_data = new_data.withColumn(
        target_column,
        when(col(target_column) == "Good", 1)
        .when(col(target_column) == "Standard", 0)
        .otherwise(None)  # Handle other categories if needed
    )
    # Now you can proceed with the correlation analysis using the updated 'new_data'
    pandas_df = new_data[all_features + [target_column]].toPandas()
    correlation_matrix = pandas_df.corr()
    print("Correlation matrix:\n", correlation_matrix)

    # Plot heatmap
    plt.figure(figsize=(10, 6))
    sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm", fmt=".2f")
    plt.title("Correlation Heatmap")
    plt.show()


# 2. Distribution across target categories (for categorical target)
# ... (rest of your code remains the same)

Zmena - test
