# ❄️ Setup

Establishing a Snowpark Session

Before you begin working with DataFrames, you need to establish a session with your Snowflake account.



In [None]:
# Import python packages
import streamlit as st
import pandas as pd
from snowflake.snowpark.functions import col, count, sum, avg, when, max

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()

print(f"role: {session.get_current_role()} | WH: {session.get_current_warehouse()} | DB.SCHEMA: {session.get_fully_qualified_current_schema()}")


# Basic DataFrame Operations
1. Creating DataFrames from Tables

You can create a DataFrame by referencing an existing table in your database.



In [None]:
# Create DataFrames from existing tables
customers_df = session.table("COREBANKING.CUSTOMERS")
accounts_df = session.table("COREBANKING.ACCOUNTS")

# Display the first few rows of a DataFrame
customers_df.show(5)


2. Selecting Columns

You can select specific columns using the select() method.



In [None]:
# Select specific columns
customer_info_df = customers_df.select("CUSTOMER_ID", "FIRST_NAME", "LAST_NAME", "EMAIL")
customer_info_df.show(5)


3. Filtering Rows

Use the filter() method to filter rows based on conditions.



In [None]:
# Filter customers who live in a specific state
customers_in_california_df = customers_df.filter(col("STATE") == "Vermont")
customers_in_california_df.show(5)


4. Adding New Columns

You can add new columns to a DataFrame using the with_column() method.



In [None]:
from snowflake.snowpark.functions import lit

# Add a constant column indicating customer status
customers_with_status_df = customers_df.with_column("STATUS", lit("Hi, Im a new Column"))
customers_with_status_df.show(5)


5. DataFrame Transformations

Join two DataFrames using the join() method.

In [None]:
# Join customers with their accounts
customers_accounts_df = customers_df.join(accounts_df, customers_df["CUSTOMER_ID"] == accounts_df["CUSTOMER_ID"])
customers_accounts_df.show(5)

6. Aggregations

Aggregate data in the DataFrame

In [None]:
from snowflake.snowpark.functions import sum, avg

# Aggregate account data to get total and average balances
account_summary_df = (
    accounts_df.group_by("CUSTOMER_ID")
    .agg(
        sum("BALANCE").alias("TOTAL_BALANCE"),
        avg("BALANCE").alias("AVG_BALANCE")
    )
)
account_summary_df.show(5)


# ❄️❄️ Data Processing Example (Multi-Step)
This Snowpark example demonstrates how to build complex data processing workflows in Snowflake, using multi-step data transformations and aggregations to derive meaningful insights. By properly structuring joins and leveraging Snowpark's processing capabilities, we can effectively analyze customer risk.

# Set-Up Example

## Scenario: Customer Segmentation and Risk Analysis
In this example, we'll identify high-risk customers by analyzing:

1) Customer Demographics: Extract relevant customer information.
2) Account and Balance Analysis: Assess account balances and transaction activity.
3) Loan Analysis: Evaluate loan status and outstanding balances.
4) Credit Card Usage: Examine credit card balances and credit utilization.
5) Fraud Detection: Identify customers with fraud history.
6) Risk Scoring: Calculate a risk score based on the above factors.

In [None]:
# Load tables into Snowpark DataFrames
customers_df = session.table("COREBANKING.CUSTOMERS")
accounts_df = session.table("COREBANKING.ACCOUNTS")
transactions_df = session.table("COREBANKING.TRANSACTIONS")
fraud_df = session.table("FRAUDDETECTION.TRANSACTION_FRAUD")
credit_cards_df = session.table("RISKANALYSIS.CREDIT_CARDS")
loans_df = session.table("RISKANALYSIS.LOANS")


### Step 1: Customer Demographics
Join the TRANSACTIONS table with the ACCOUNTS table to access CUSTOMER_ID and aggregate transaction data.



In [None]:
# Join transactions with accounts to access CUSTOMER_ID
transactions_with_customers_df = (
    transactions_df
    .join(accounts_df, transactions_df["ACCOUNT_ID"] == accounts_df["ACCOUNT_ID"], "inner")
)

# Aggregate transaction data
transaction_agg_df = (
    transactions_with_customers_df
    .group_by(transactions_with_customers_df["CUSTOMER_ID"])
    .agg(
        count(transactions_with_customers_df["TRANSACTION_ID"]).alias("TOTAL_TRANSACTIONS"),
        sum(transactions_with_customers_df["TRANSACTION_AMOUNT"]).alias("TOTAL_TRANSACTION_AMOUNT"),
        avg(transactions_with_customers_df["TRANSACTION_AMOUNT"]).alias("AVG_TRANSACTION_AMOUNT"),
        max(transactions_with_customers_df["TRANSACTION_DATE"]).alias("LAST_TRANSACTION_DATE")
    )
)

transaction_agg_df.show()

### step 2: Analyze Account Balances and Status
Aggregate accounts to derive balance and status information

In [None]:
# Account balance and status analysis
account_agg_df = (
    accounts_df
    .group_by(accounts_df["CUSTOMER_ID"])
    .agg(
        sum(accounts_df["BALANCE"]).alias("TOTAL_ACCOUNT_BALANCE"),
        avg(accounts_df["BALANCE"]).alias("AVG_ACCOUNT_BALANCE"),
        count(when(accounts_df["ACCOUNT_STATUS"] == "Active", 1)).alias("ACTIVE_ACCOUNTS"),
        count(when(accounts_df["ACCOUNT_STATUS"] == "Inactive", 1)).alias("INACTIVE_ACCOUNTS"),
    )
)

account_agg_df.show()

### Step 3: Evaluate Loan Data
Aggregate loan data for each customer:



In [None]:
# Loan analysis
loan_agg_df = (
    loans_df
    .group_by(loans_df["CUSTOMER_ID"])
    .agg(
        sum(loans_df["LOAN_AMOUNT"]).alias("TOTAL_LOAN_AMOUNT"),
        sum(loans_df["LOAN_BALANCE"]).alias("OUTSTANDING_LOAN_BALANCE"),
        count(when(loans_df["LOAN_BALANCE"] > 0, 1)).alias("ACTIVE_LOANS"),
        count(when(loans_df["LOAN_BALANCE"] == 0, 1)).alias("PAID_OFF_LOANS")
    )
)

loan_agg_df.show()

### Step 4: Assess Credit Card Utilization
Aggregate credit card data for each customer:



In [None]:
# Credit card usage analysis
credit_card_agg_df = (
    credit_cards_df
    .group_by(credit_cards_df["CUSTOMER_ID"])
    .agg(
        sum(credit_cards_df["BALANCE"]).alias("TOTAL_CREDIT_CARD_BALANCE"),
        sum(credit_cards_df["CREDIT_LIMIT"]).alias("TOTAL_CREDIT_LIMIT"),
        avg(credit_cards_df["BALANCE"] / credit_cards_df["CREDIT_LIMIT"]).alias("AVG_CREDIT_UTILIZATION"),
        count(when(credit_cards_df["CARD_STATUS"] == "Active", 1)).alias("ACTIVE_CARDS")
    )
)

credit_card_agg_df.show()

### Step 5: Detect Fraud History
Identify customers with fraud incidents:



In [None]:
# Fraud detection
fraud_agg_df = (
    fraud_df
    .join(transactions_with_customers_df, fraud_df["TRANSACTION_ID"] == transactions_with_customers_df["TRANSACTION_ID"], "inner")
    .group_by(transactions_with_customers_df["CUSTOMER_ID"])
    .agg(
        count(fraud_df["FRAUD_ID"]).alias("FRAUD_INCIDENTS")
    )
)


fraud_agg_df.show()

### Step 6: Combine All Insights and Calculate Risk Scores
Combine all data to calculate risk scores:



In [None]:
# Combine all insights and calculate risk scores
customer_risk_df = (
    customers_df
    .join(transaction_agg_df, customers_df["CUSTOMER_ID"] == transaction_agg_df["CUSTOMER_ID"], "left_outer")
    .join(account_agg_df, customers_df["CUSTOMER_ID"] == account_agg_df["CUSTOMER_ID"], "left_outer")
    .join(loan_agg_df, customers_df["CUSTOMER_ID"] == loan_agg_df["CUSTOMER_ID"], "left_outer")
    .join(credit_card_agg_df, customers_df["CUSTOMER_ID"] == credit_card_agg_df["CUSTOMER_ID"], "left_outer")
    .join(fraud_agg_df, customers_df["CUSTOMER_ID"] == fraud_agg_df["CUSTOMER_ID"], "left_outer")
    .select(
        customers_df["CUSTOMER_ID"].alias("CUSTOMER_ID"),
        customers_df["FIRST_NAME"],
        customers_df["LAST_NAME"],
        account_agg_df["TOTAL_ACCOUNT_BALANCE"],
        account_agg_df["AVG_ACCOUNT_BALANCE"],
        transaction_agg_df["TOTAL_TRANSACTIONS"],
        transaction_agg_df["TOTAL_TRANSACTION_AMOUNT"],
        transaction_agg_df["AVG_TRANSACTION_AMOUNT"],
        loan_agg_df["OUTSTANDING_LOAN_BALANCE"],
        loan_agg_df["ACTIVE_LOANS"],
        credit_card_agg_df["TOTAL_CREDIT_CARD_BALANCE"],
        credit_card_agg_df["AVG_CREDIT_UTILIZATION"],
        fraud_agg_df["FRAUD_INCIDENTS"],
        # Calculate a simple risk score
        (
            (account_agg_df["AVG_ACCOUNT_BALANCE"] < 1000).cast("int") * 1 +
            (credit_card_agg_df["AVG_CREDIT_UTILIZATION"] > 0.5).cast("int") * 1 +
            (loan_agg_df["OUTSTANDING_LOAN_BALANCE"] > 5000).cast("int") * 1 +
            (fraud_agg_df["FRAUD_INCIDENTS"] > 0).cast("int") * 1 +
            (transaction_agg_df["AVG_TRANSACTION_AMOUNT"] < 50).cast("int") * 1
        ).alias("RISK_SCORE")
    )
    .order_by(col("RISK_SCORE").desc())
)

# Show high-risk customers
customer_risk_df.explain()


# Save Results to Snowflake Table

In [None]:
customer_risk_df.write.mode("overwrite").save_as_table("PUBLIC.CUSTOMER_RISK_ANALYSIS")
print("DataFrame successfully written to PUBLIC.CUSTOMER_RISK_ANALYSIS")

# StreamLit Examples

In [None]:
import streamlit as st
import pandas as pd
import numpy as np

st.title('Streamlit Object Example')

title = st.text_input("Say Hi", "")

if st.button("Echo", type="primary"):
    st.write(title)
else:
    st.write("")

In [None]:
import streamlit as st
from snowflake.snowpark.functions import col, avg, sum
import pandas as pd



# Query the table and perform aggregation
aggregation_df = (
    session.table("PUBLIC.CUSTOMER_RISK_ANALYSIS")
    .group_by("ACTIVE_LOANS")
    .agg(
        avg("RISK_SCORE").alias("AVG_RISK_SCORE"),
        sum("TOTAL_ACCOUNT_BALANCE").alias("TOTAL_ACCOUNT_BALANCE")
    )
    .order_by("ACTIVE_LOANS")
)

# Convert Snowpark DataFrame to Pandas DataFrame
df = aggregation_df.to_pandas()

# Streamlit visualization code
st.title("Aggregated Customer Risk Analysis")

st.write("### Average Risk Score and Total Account Balance by Number of Active Loans")
st.dataframe(df)

# Create a bar chart for Average Risk Score by Active Loans
st.bar_chart(df.set_index("ACTIVE_LOANS")["AVG_RISK_SCORE"])

# Create a bar chart for Total Account Balance by Active Loans
st.bar_chart(df.set_index("ACTIVE_LOANS")["TOTAL_ACCOUNT_BALANCE"])
