# Snowpark JSON Flattening Demo

This notebook demonstrates how to use Snowpark Python to flatten JSON data from the `raw_data.security_trades` table.

## Overview
- Connect to Snowflake using the active session
- Query the raw JSON data
- Use Snowpark DataFrame operations to flatten nested JSON structures
- Display the results


In [None]:
# Import required libraries
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import col, flatten, parse_json
import pandas as pd

# Get the active Snowflake session
session = get_active_session()
print(f"Connected to Snowflake as: {session.get_current_user()}")
print(f"Current Database: {session.get_current_database()}")
print(f"Current Schema: {session.get_current_schema()}")
print(f"Current Warehouse: {session.get_current_warehouse()}")


## Step 1: Explore the Raw JSON Data

Let's first look at the structure of our raw security trades data.


In [None]:
# Query the raw security trades table
raw_trades_df = session.table("raw_data.security_trades")

# Show the schema
print("Schema of raw_data.security_trades:")
raw_trades_df.schema

# Display first few rows
print("\nFirst 3 rows of raw data:")
raw_trades_df.limit(3).show()


## Step 2: Examine the JSON Structure

Let's look at the JSON payload structure to understand what we need to flatten.


In [None]:
# Look at a sample JSON payload
sample_json = raw_trades_df.select("transaction_payload").limit(1).collect()[0][0]
print("Sample JSON payload structure:")
print(sample_json)


## Step 3: Flatten JSON Data Using Snowpark

Now we'll use Snowpark DataFrame operations to flatten the nested JSON structure. The `transaction_payload` contains:
- `customer_id`
- `account_type`
- `trades` (array of trade objects)


In [None]:
# Flatten the JSON data using Snowpark
flattened_df = raw_trades_df.join_table_function(
    flatten(col("transaction_payload")["trades"])
).select(
    col("transaction_date"),
    col("transaction_id"),
    col("transaction_payload")["customer_id"].cast("INT").alias("customer_id"),
    col("transaction_payload")["account_type"].cast("STRING").alias("account_type"),
    col("value")["action"].cast("STRING").alias("trade_action"),
    col("value")["ticker"].cast("STRING").alias("ticker"),
    col("value")["shares"].cast("INT").alias("shares"),
    col("value")["trade_price"].cast("NUMBER)").alias("trade_price"),
    col("value")["total_trade_value"].cast("NUMBER").alias("total_trade_value")
)

# Show the flattened data
print("Flattened security trades data:")
flattened_df.show(10)


## Step 4: Add Calculated Columns

Let's add some calculated columns that would be useful for portfolio analysis:
- `share_change`: Positive for buys, negative for sells
- `cash_flow_impact`: Negative for buys (cash out), positive for sells (cash in)


In [None]:
from snowflake.snowpark.functions import when, lit

# Add calculated columns using with_column (one at a time)
enriched_df = flattened_df.with_column(
    "share_change",
    when(col("trade_action") == "Buy", col("shares"))
    .otherwise(-1 * col("shares"))
).with_column(
    "cash_flow_impact", 
    when(col("trade_action") == "Buy", -1 * col("total_trade_value"))
    .otherwise(col("total_trade_value"))
)

# Display enriched data
print("Enriched flattened data with calculated columns:")
enriched_df.select(
    "transaction_date", "customer_id", "ticker", 
    "trade_action", "shares", "share_change", 
    "total_trade_value", "cash_flow_impact"
).show(10)


## Step 5: Aggregate Data by Customer

Let's create a summary showing net positions by customer and ticker.


In [None]:
from snowflake.snowpark.functions import sum as sum_

# Aggregate by customer and ticker
position_summary = enriched_df.group_by("customer_id", "ticker", "account_type").agg([
    sum_("share_change").alias("net_shares"),
    sum_(when(col("trade_action") == "Buy", col("total_trade_value"))
        .otherwise(lit(0))).alias("total_bought_value"),
    sum_(when(col("trade_action") == "Sell", col("total_trade_value"))
        .otherwise(lit(0))).alias("total_sold_value"),
    sum_("cash_flow_impact").alias("net_cash_impact")
]).filter(col("net_shares") > 0)  # Only show positions with positive shares

print("Current positions summary by customer:")
position_summary.order_by("customer_id", "ticker").show(20)


## Step 6: Convert to Pandas for Visualization

Let's convert a subset of data to pandas for potential visualization or further analysis.


In [None]:
# Get top 5 customers by number of trades
top_traders = enriched_df.group_by("customer_id").count() \
    .order_by(col("count").desc()) \
    .limit(5) \
    .select("customer_id").to_pandas()

print("Top 5 most active traders:")
print(top_traders)

# Get trade distribution by action
trade_distribution = enriched_df.group_by("trade_action").agg([
    sum_("shares").alias("total_shares"),
    sum_("total_trade_value").alias("total_value")
]).to_pandas()

print("\nTrade distribution by action:")
print(trade_distribution)


## Summary

This notebook demonstrated how to:
1. Connect to Snowflake using the active session context
2. Query raw JSON data from a table
3. Use Snowpark's `flatten` function to unnest JSON arrays
4. Extract and cast JSON fields to appropriate data types
5. Add calculated columns for business logic
6. Aggregate data for analysis
7. Convert results to pandas for further processing

This approach is entirely in-memory and doesn't materialize any data - perfect for exploratory data analysis!


## Step 6: Convert to Pandas for Visualization

Let's convert a subset of data to pandas for potential visualization or further analysis.
