# PySpark Data Joins and DataFrame Conversions

## Learning Goals:
- Load and join multiple datasets with Spark
- Understand different join types and strategies
- Convert between pandas and Spark DataFrames
- Work with JSON and CSV data formats
- Create comprehensive joined datasets for analysis

## Available Datasets:
- `transactions_data.csv` - Transaction records
- `cards_data.csv` - Card information 
- `exchange_rates.csv` - Currency exchange rates
- `mcc.json` - Merchant category codes
- `train_fraud_labels.json` - Fraud labels for training

Let's explore and join all these datasets!

In [18]:
# Essential imports for data joining and conversions
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import json
import numpy as np



# Initialize Spark session with optimized settings for joins
spark = SparkSession.builder \
    .appName("DataJoinsAndConversions") \
    .master("local[4]") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print(f"✓ Spark {spark.version} initialized with join optimizations")
print(f"✓ Adaptive Query Execution enabled for optimal join performance")

ConnectionRefusedError: [Errno 61] Connection refused

## 1. Loading All Available Datasets

Let's load each dataset and explore their structure:

In [None]:
# Load all CSV datasets
print("Loading CSV datasets...")

# 1. Transaction data (main dataset)
transactions_df = spark.read.csv("../data/transactions_data.csv", header=True, inferSchema=True)
print(f"✓ Transactions: {transactions_df.count():,} rows, {len(transactions_df.columns)} columns")

# 2. Cards data
cards_df = spark.read.csv("../data/cards_data.csv", header=True, inferSchema=True)  
print(f"✓ Cards: {cards_df.count():,} rows, {len(cards_df.columns)} columns")

# 3. Exchange rates
exchange_rates_df = spark.read.csv("../data/exchange_rates.csv", header=True, inferSchema=True)
print(f"✓ Exchange rates: {exchange_rates_df.count():,} rows, {len(exchange_rates_df.columns)} columns")

print("\nDataset schemas:")
print("="*50)
print("Transactions schema:")
transactions_df.printSchema()
print("\nCards schema:")
cards_df.printSchema()
print("\nExchange rates schema:")
exchange_rates_df.printSchema()

Loading CSV datasets...


                                                                                

✓ Transactions: 13,305,915 rows, 12 columns
✓ Cards: 6,146 rows, 13 columns
✓ Cards: 6,146 rows, 13 columns
✓ Exchange rates: 12,175 rows, 27 columns

Dataset schemas:
Transactions schema:
root
 |-- id: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_id: integer (nullable = true)
 |-- amount: string (nullable = true)
 |-- use_chip: string (nullable = true)
 |-- merchant_id: integer (nullable = true)
 |-- merchant_city: string (nullable = true)
 |-- merchant_state: string (nullable = true)
 |-- zip: double (nullable = true)
 |-- mcc: integer (nullable = true)
 |-- errors: string (nullable = true)


Cards schema:
root
 |-- id: integer (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_brand: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- card_number: long (nullable = true)
 |-- expires: string (nullable = true)
 |-- cvv: integer (nullable = true)
 |-- has_chip: string (nullable

In [None]:
# Load JSON datasets
print("Loading JSON datasets...")

# Check if Spark session is active
try:
	spark.version
except Exception as e:
	raise RuntimeError("Spark session is not active. Please restart the kernel and re-run the Spark initialization cell.") from e

# 4. MCC (Merchant Category Codes) - JSON format
mcc_df = spark.read.json("../data/mcc.json")
print(f"✓ MCC data: {mcc_df.count():,} rows, {len(mcc_df.columns)} columns")

# 5. Fraud labels - JSON format  
fraud_labels_df = spark.read.json("../data/train_fraud_labels.json")
print(f"✓ Fraud labels: {fraud_labels_df.count():,} rows, {len(fraud_labels_df.columns)} columns")

print("\nJSON schemas:")
print("="*50)
print("MCC schema:")
mcc_df.printSchema()
print("\nFraud labels schema:")
fraud_labels_df.printSchema()

Loading JSON datasets...
✓ MCC data: 111 rows, 1 columns


25/08/12 01:34:33 ERROR Executor: Exception in task 0.0 in stage 19.0 (TID 33)8]
java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.getBinary(UnsafeRow.java:393)
	at org.apache.spark.sql.catalyst.json.CreateJacksonParser$.internalRow(CreateJacksonParser.scala:84)
	at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.$anonfun$inferFromDataset$4(JsonDataSource.scala:106)
	at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$$$Lambda$3764/0x0000000801bbcb58.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema.$anonfun$infer$3(JsonInferSchema.scala:90)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema$$Lambda$3769/0x0000000801bbe910.apply(Unknown Source)
	at org.apache.spark.util.SparkErrorUtils.tryWithResource(SparkErrorUtils.scala:47)
	at org.apache.spark.util.SparkErrorUtils.tryWithResource$(SparkErrorUtils.scala:46)
	at org.apache.spark.util.Utils$.tryWithResource(

ConnectionRefusedError: [Errno 61] Connection refused

## 2. Data Exploration and Preparation

Let's examine sample data from each dataset to understand relationships:

In [None]:
# Explore sample data from each dataset
print("Sample data from each dataset:")
print("="*60)

print("1. TRANSACTIONS (first 5 rows):")
transactions_df.show(5)

print("2. CARDS (first 5 rows):")
cards_df.show(5)

print("3. EXCHANGE RATES (first 5 rows):")
exchange_rates_df.show(5)

print("4. MCC CODES (first 5 rows):")
mcc_df.show(5)

print("5. FRAUD LABELS (first 5 rows):")
fraud_labels_df.show(5)

In [None]:
# Analyze join keys and relationships
print("Analyzing potential join relationships:")
print("="*50)

# Check common columns between datasets
tx_cols = set(transactions_df.columns)
cards_cols = set(cards_df.columns)
rates_cols = set(exchange_rates_df.columns)
mcc_cols = set(mcc_df.columns)
fraud_cols = set(fraud_labels_df.columns)

print("Common columns for joins:")
print(f"Transactions ∩ Cards: {tx_cols.intersection(cards_cols)}")
print(f"Transactions ∩ Exchange Rates: {tx_cols.intersection(rates_cols)}")
print(f"Transactions ∩ MCC: {tx_cols.intersection(mcc_cols)}")
print(f"Transactions ∩ Fraud Labels: {tx_cols.intersection(fraud_cols)}")

# Check unique counts for potential join keys
print(f"\nJoin key analysis:")
if 'client_id' in transactions_df.columns:
    tx_clients = transactions_df.select("client_id").distinct().count()
    print(f"Unique clients in transactions: {tx_clients:,}")

if 'client_id' in cards_df.columns:
    card_clients = cards_df.select("client_id").distinct().count()
    print(f"Unique clients in cards: {card_clients:,}")

if 'mcc' in transactions_df.columns and 'mcc' in mcc_df.columns:
    tx_mccs = transactions_df.select("mcc").distinct().count()
    mcc_codes = mcc_df.select("mcc").distinct().count()
    print(f"Unique MCCs in transactions: {tx_mccs:,}")
    print(f"Unique MCCs in MCC reference: {mcc_codes:,}")

## 3. Pandas to Spark DataFrame Conversion Examples

Let's demonstrate multiple ways to convert between pandas and Spark DataFrames:

In [None]:
# Example 1: Create pandas DataFrame and convert to Spark
print("Example 1: Creating pandas DataFrame and converting to Spark")
print("="*60)

# Create a sample pandas DataFrame with banking data
banking_products_pd = pd.DataFrame({
    'product_id': ['CC_001', 'CC_002', 'SA_001', 'SA_002', 'LN_001'],
    'product_type': ['Credit Card', 'Credit Card', 'Savings Account', 'Savings Account', 'Loan'],
    'interest_rate': [18.99, 15.49, 2.5, 3.0, 8.75],
    'min_balance': [0, 1000, 100, 500, 0],
    'annual_fee': [95, 0, 0, 0, 200],
    'rewards_rate': [1.5, 2.0, 0.0, 0.0, 0.0]
})

print("Original pandas DataFrame:")
print(banking_products_pd)
print(f"pandas DataFrame shape: {banking_products_pd.shape}")

# Convert to Spark DataFrame
banking_products_spark = spark.createDataFrame(banking_products_pd)
print(f"\nConverted to Spark DataFrame:")
banking_products_spark.show()
print(f"Spark DataFrame count: {banking_products_spark.count()}")

In [None]:
# Example 2: Create Spark DataFrame from Python data structures
print("Example 2: Creating Spark DataFrame from Python structures")
print("="*60)

# Method 1: From list of tuples with explicit schema
customer_data = [
    ('CUST_001', 'John Doe', 45, 'Premium', 85000.0, 'New York'),
    ('CUST_002', 'Jane Smith', 32, 'Gold', 62000.0, 'California'), 
    ('CUST_003', 'Bob Johnson', 28, 'Standard', 45000.0, 'Texas'),
    ('CUST_004', 'Alice Brown', 55, 'Premium', 120000.0, 'Florida'),
    ('CUST_005', 'Charlie Wilson', 38, 'Gold', 75000.0, 'Illinois')
]

# Define schema explicitly for better performance
customer_schema = StructType([
    StructField("customer_id", StringType(), False),
    StructField("full_name", StringType(), False),
    StructField("age", IntegerType(), False),
    StructField("tier", StringType(), False),
    StructField("annual_income", DoubleType(), True),
    StructField("state", StringType(), True)
])

customers_spark = spark.createDataFrame(customer_data, customer_schema)
print("Spark DataFrame from tuples with explicit schema:")
customers_spark.show()

# Method 2: From list of dictionaries (inferred schema)
account_data = [
    {'account_id': 'ACC_001', 'customer_id': 'CUST_001', 'account_type': 'Checking', 'balance': 5420.50},
    {'account_id': 'ACC_002', 'customer_id': 'CUST_001', 'account_type': 'Savings', 'balance': 25600.75},
    {'account_id': 'ACC_003', 'customer_id': 'CUST_002', 'account_type': 'Checking', 'balance': 8950.25},
    {'account_id': 'ACC_004', 'customer_id': 'CUST_003', 'account_type': 'Savings', 'balance': 15750.00},
    {'account_id': 'ACC_005', 'customer_id': 'CUST_004', 'account_type': 'Investment', 'balance': 95400.80}
]

accounts_spark = spark.createDataFrame(account_data)
print("Spark DataFrame from dictionaries (inferred schema):")
accounts_spark.show()

In [None]:
# Example 3: Converting Spark DataFrame back to pandas
print("Example 3: Converting Spark DataFrame to pandas")
print("="*60)

# Convert small Spark DataFrame to pandas (be careful with large datasets!)
customers_pd = customers_spark.toPandas()
print("Converted back to pandas:")
print(customers_pd)
print(f"pandas DataFrame info:")
print(customers_pd.info())

# Convert with specific columns only
accounts_subset_pd = accounts_spark.select("customer_id", "account_type", "balance").toPandas()
print(f"\nPandas DataFrame from selected Spark columns:")
print(accounts_subset_pd)

## 4. Comprehensive Data Joins

Now let's join our banking datasets using different join strategies:

In [None]:
# Prepare data for joins - clean and standardize
print("Preparing datasets for joins...")
print("="*50)

# Clean transactions data
transactions_clean = transactions_df \
    .withColumn("amount_numeric", regexp_replace(col("amount"), "[\$,]", "").cast("double")) \
    .withColumn("transaction_date", to_date(col("date"))) \
    .filter(col("amount_numeric") > 0)

print(f"✓ Transactions cleaned: {transactions_clean.count():,} records")

# Cache datasets that will be used multiple times
transactions_clean.cache()
cards_df.cache()
mcc_df.cache()
if fraud_labels_df.count() > 0:
    fraud_labels_df.cache()

print("✓ Key datasets cached for join performance")

In [None]:
# Join 1: Transactions with Card Information
print("Join 1: Transactions ⟕ Cards (LEFT JOIN)")
print("="*50)

# Left join to keep all transactions, add card info where available
tx_with_cards = transactions_clean.alias("tx") \
    .join(cards_df.alias("cards"), 
          col("tx.client_id") == col("cards.client_id"), 
          "left")

print(f"Result: {tx_with_cards.count():,} records")
print("Sample joined data:")
tx_with_cards.select("tx.client_id", "tx.amount_numeric", "cards.card_type", "cards.credit_limit").show(5)

# Analyze join effectiveness
cards_matched = tx_with_cards.filter(col("cards.client_id").isNotNull()).count()
join_rate = (cards_matched / tx_with_cards.count()) * 100
print(f"Join effectiveness: {cards_matched:,} transactions matched cards ({join_rate:.1f}%)")

In [None]:
# Join 2: Add MCC (Merchant Category Code) information
print("Join 2: Transactions + Cards ⟕ MCC (LEFT JOIN)")
print("="*50)

# Join with MCC data to get merchant category descriptions
tx_cards_mcc = tx_with_cards.alias("base") \
    .join(mcc_df.alias("mcc"),
          col("base.mcc") == col("mcc.mcc"),
          "left")

print(f"Result: {tx_cards_mcc.count():,} records")
print("Sample with MCC info:")
tx_cards_mcc.select(
    "base.client_id", 
    "base.amount_numeric", 
    "base.mcc",
    "mcc.category_description"
).show(5, truncate=False)

# Analyze MCC join coverage
mcc_matched = tx_cards_mcc.filter(col("mcc.mcc").isNotNull()).count()
mcc_join_rate = (mcc_matched / tx_cards_mcc.count()) * 100
print(f"MCC join effectiveness: {mcc_matched:,} transactions have category info ({mcc_join_rate:.1f}%)")

In [None]:
# Join 3: Add Exchange Rate information (if applicable)
print("Join 3: Add Exchange Rates (LEFT JOIN)")
print("="*50)

# Check if we have currency information to join on
if "currency" in exchange_rates_df.columns:
    # If transactions have currency info, join on currency and date
    if "currency" in transactions_clean.columns:
        tx_full = tx_cards_mcc.alias("main") \
            .join(exchange_rates_df.alias("rates"),
                  (col("main.currency") == col("rates.currency")) & 
                  (col("main.transaction_date") == col("rates.date")),
                  "left")
        print("Joined on currency and date")
    else:
        # If no currency column, add USD rates by date
        tx_full = tx_cards_mcc.alias("main") \
            .join(exchange_rates_df.alias("rates"),
                  col("main.transaction_date") == col("rates.date"),
                  "left")
        print("Joined on date only (assuming USD)")
else:
    # If no proper join key, just keep the previous result
    tx_full = tx_cards_mcc
    print("Exchange rates data doesn't have suitable join keys - skipped")

print(f"Final joined dataset: {tx_full.count():,} records")

In [None]:
# Join 4: Add Fraud Labels (INNER JOIN for training data)
print("Join 4: Add Fraud Labels (INNER JOIN)")
print("="*50)

# Create training dataset by inner joining with fraud labels
if fraud_labels_df.count() > 0 and "client_id" in fraud_labels_df.columns:
    training_data = tx_full.alias("main") \
        .join(fraud_labels_df.alias("fraud"),
              col("main.client_id") == col("fraud.client_id"),
              "inner")
    
    print(f"Training dataset: {training_data.count():,} records with fraud labels")
    
    # Analyze fraud distribution
    fraud_stats = training_data.groupBy("fraud.is_fraud").count()
    print("Fraud label distribution:")
    fraud_stats.show()
else:
    print("No suitable fraud labels found - creating synthetic labels for demo")
    training_data = tx_full.withColumn("is_fraud", (rand(42) < 0.05).cast("int"))
    print(f"Training dataset with synthetic fraud labels: {training_data.count():,} records")

## 5. Advanced Join Patterns and Analysis

In [None]:
# Demonstrate different join types with our sample data
print("Advanced Join Patterns Demo:")
print("="*50)

# Create sample datasets for join demonstrations
customers_sample = customers_spark.select("customer_id", "full_name", "tier")
accounts_sample = accounts_spark.select("customer_id", "account_type", "balance")

# Add a customer without accounts and account without customer for demo
additional_customer = spark.createDataFrame([("CUST_999", "No Account User", "Standard")], 
                                          ["customer_id", "full_name", "tier"])
customers_extended = customers_sample.union(additional_customer)

additional_account = spark.createDataFrame([("CUST_888", "Orphaned", 1000.0)],
                                         ["customer_id", "account_type", "balance"])
accounts_extended = accounts_sample.union(additional_account)

print("1. INNER JOIN - Only matching records:")
inner_result = customers_extended.join(accounts_extended, "customer_id", "inner")
print(f"Inner join result: {inner_result.count()} records")
inner_result.show()

print("2. LEFT JOIN - All customers, with account info where available:")
left_result = customers_extended.join(accounts_extended, "customer_id", "left")
print(f"Left join result: {left_result.count()} records")
left_result.show()

print("3. RIGHT JOIN - All accounts, with customer info where available:")
right_result = customers_extended.join(accounts_extended, "customer_id", "right")
print(f"Right join result: {right_result.count()} records")
right_result.show()

print("4. FULL OUTER JOIN - All records from both sides:")
full_result = customers_extended.join(accounts_extended, "customer_id", "full_outer")
print(f"Full outer join result: {full_result.count()} records")
full_result.show()

In [None]:
# Create final comprehensive dataset with all joins
print("Creating Final Comprehensive Dataset:")
print("="*60)

# Create the most complete dataset possible
final_dataset = tx_full \
    .withColumn("transaction_year", year(col("transaction_date"))) \
    .withColumn("transaction_month", month(col("transaction_date"))) \
    .withColumn("is_weekend", dayofweek(col("transaction_date")).isin([1, 7]).cast("int")) \
    .withColumn("is_high_value", (col("amount_numeric") > 1000).cast("int"))

# Add our synthetic customers and accounts data
final_with_customers = final_dataset.alias("main") \
    .join(customers_spark.alias("cust"), 
          col("main.client_id") == col("cust.customer_id"), 
          "left") \
    .join(accounts_spark.alias("acc"),
          col("main.client_id") == col("acc.customer_id"),
          "left")

print(f"Final comprehensive dataset: {final_with_customers.count():,} records")
print(f"Columns: {len(final_with_customers.columns)}")

# Show sample of final dataset
print("\nSample of final comprehensive dataset:")
final_with_customers.select(
    "client_id", "amount_numeric", "transaction_date",
    "category_description", "cust.tier", "acc.account_type", "acc.balance"
).show(10, truncate=False)

# Dataset statistics
print("\nDataset Statistics:")
print("="*50)
stats = final_with_customers.agg(
    count("*").alias("total_records"),
    countDistinct("client_id").alias("unique_customers"),
    sum("amount_numeric").alias("total_volume"),
    avg("amount_numeric").alias("avg_transaction"),
    min("transaction_date").alias("earliest_date"),
    max("transaction_date").alias("latest_date")
).collect()[0]

print(f"Total records: {stats.total_records:,}")
print(f"Unique customers: {stats.unique_customers:,}")
print(f"Total transaction volume: ${stats.total_volume:,.2f}")
print(f"Average transaction: ${stats.avg_transaction:,.2f}")
print(f"Date range: {stats.earliest_date} to {stats.latest_date}")

## 6. Performance Optimization for Joins

In [None]:
# Join performance optimization techniques
print("Join Performance Optimization Techniques:")
print("="*60)

# 1. Broadcast joins for small datasets
print("1. Broadcast Join Optimization:")
small_mcc = mcc_df.filter(col("mcc").isNotNull())  # Ensure clean data
if small_mcc.count() < 10000:  # Broadcast if small enough
    broadcast_join = transactions_clean \
        .join(broadcast(small_mcc), "mcc", "left")
    print(f"✓ MCC data broadcasted for efficient joins ({small_mcc.count()} records)")
else:
    print("✓ MCC data too large for broadcast, using regular join")

# 2. Bucketing for repeated joins (demonstration)
print("\n2. Partitioning Strategy:")
partitioned_tx = transactions_clean.repartition(4, "client_id")
print(f"✓ Transactions repartitioned by client_id for better join locality")

# 3. Join hints and strategies
print("\n3. Join Performance Analysis:")
print("Current Spark configurations for joins:")
print(f"  - Adaptive Query Execution: {spark.conf.get('spark.sql.adaptive.enabled')}")
print(f"  - Adaptive Coalesce: {spark.conf.get('spark.sql.adaptive.coalescePartitions.enabled')}")
print(f"  - Skew Join Detection: {spark.conf.get('spark.sql.adaptive.skewJoin.enabled')}")

# 4. Analyze join performance
print("\n4. Join Execution Analysis:")
# This would show the query plan (commented out to avoid verbose output)
# tx_with_cards.explain(True)
print("✓ Use .explain(True) on DataFrames to see detailed execution plans")
print("✓ Monitor Spark UI for join shuffle operations and data skew")

## Summary

### What we accomplished:

1. **Multi-format Data Loading**: CSV and JSON files
2. **Comprehensive Joins**: LEFT, RIGHT, INNER, and FULL OUTER joins
3. **Pandas ↔ Spark Conversions**: Multiple methods and examples
4. **Real-world Join Scenarios**: Transactions with cards, MCC codes, exchange rates
5. **Performance Optimization**: Broadcast joins, partitioning, and caching
6. **Data Quality Analysis**: Join effectiveness and coverage metrics

### Key Learning Points:

- **Join Strategy Selection**: Choose appropriate join types based on data relationships
- **Performance Considerations**: Use broadcast joins for small datasets, cache frequently accessed data
- **Schema Management**: Explicit schemas vs. inference for better performance
- **Data Conversion**: When to use pandas vs Spark based on data size
- **Real-world Applications**: Building comprehensive datasets for ML and analytics

### Next Steps:
- Experiment with different join conditions and strategies
- Practice with window functions over joined datasets
- Explore advanced optimization techniques like bucketing
- Build machine learning pipelines on joined datasets

🎉 **Successfully joined multiple datasets and demonstrated DataFrame conversions!**

In [17]:
# Clean up resources
print("Cleaning up Spark resources...")
spark.stop()
print("✅ Spark session terminated successfully!")
print("🎊 Data joins and conversions workshop completed!")

Cleaning up Spark resources...


ConnectionRefusedError: [Errno 61] Connection refused