## üîß Quick Setup: Change to Python 3.11 Environment

### Option 1: Use the Setup Script (Easiest!)

**In PowerShell terminal (in VS Code or standalone):**

```powershell
# Navigate to this directory
cd "d:\project zip flies\scala project\scala project"

# Run the setup script
.\setup-spark-env.bat
```

This will automatically:
- ‚úÖ Create Python 3.11 environment
- ‚úÖ Install all required packages
- ‚úÖ Offer to start Jupyter for you

---

### Option 2: Manual Commands

**In PowerShell/Terminal:**

```powershell
# Create environment
conda create -n spark-env python=3.11 -y

# Activate it
conda activate spark-env

# Install packages
pip install pyspark==3.5.0 findspark pandas numpy matplotlib seaborn plotly textblob jupyter

# Start Jupyter
jupyter notebook
```

---

### Option 3: Check Current Environment

**Run this in a cell to see your current Python version:**

```python
import sys
print(f"Python: {sys.version}")
print(f"Executable: {sys.executable}")
```

---

**After setup, reopen this notebook in the new Jupyter session and Spark jobs will work!**

# üìä Dataset Analysis Prototype - Spark Web UI Edition

## üöÄ Quick Start (Windows Local Jupyter)

### ‚ö†Ô∏è CRITICAL: Python Version for Spark Jobs

**Tested & Working:** Python 3.10, Python 3.11  
**Issues Reported:** Python 3.12 (worker crashes), Python 3.13 (not compatible)

**Current Status:**
- ‚úÖ Python 3.10/3.11 ‚Üí Spark jobs work perfectly
- ‚ö†Ô∏è Python 3.12 ‚Üí May have worker crash issues  
- ‚ùå Python 3.13 ‚Üí Not supported by PySpark 3.5.0

### üîß Setup Python 3.11 (Recommended for Spark)

**Method 1: Anaconda/Miniconda (Easiest)**
```powershell
# Create environment with Python 3.11
conda create -n spark-env python=3.11 -y
conda activate spark-env

# Install packages
pip install pyspark==3.5.0 findspark pandas numpy matplotlib seaborn plotly textblob jupyter

# Start Jupyter
jupyter notebook
```

**Method 2: Manual Installation**
1. Download Python 3.11: https://www.python.org/downloads/release/python-3119/
2. Install to: `C:\Python311`
3. Create virtual environment:
```powershell
C:\Python311\python.exe -m venv spark-venv
.\spark-venv\Scripts\activate
pip install pyspark==3.5.0 findspark pandas numpy matplotlib seaborn plotly textblob jupyter
jupyter notebook
```

**Alternative: Try older PySpark with Python 3.12**
```powershell
pip uninstall pyspark -y
pip install pyspark==3.4.1
```

### üìã Workflow

**Setup:**
1. ‚úÖ Run Cell 3 - Install packages
2. ‚úÖ Run Cell 6 - Windows Spark setup
3. ‚úÖ Run Cell 8 - Verify Java (optional)
4. ‚úÖ Run Cell 18 - Readiness check

**Analysis:**
5. ‚úÖ Run Cell 11 or 12 - Upload CSV
6. ‚úÖ Run Cell 19 - Generate Spark jobs (Python 3.10/3.11 only)
   - Or skip and use pandas analysis
7. ‚úÖ Run Cells 20-28 - Complete data analysis

**Spark Web UI:** http://localhost:4040

### üí° If Spark Jobs Fail

**Don't worry!** You can still get complete analysis:
- Skip Cell 19 (Spark jobs)
- Run Cells 20-28 (pandas analysis)
- Get all visualizations, statistics, and insights
- Spark UI won't show jobs, but everything else works!

---

## üìÅ Sample CSV Files Available
- `BoatProduct.csv`
- `sample-products.csv`
- `flipkart-phones-fresh.csv`

## üì¶ Step 1: Install Dependencies

In [2]:
# Install required packages
!pip install pandas numpy matplotlib seaborn plotly textblob wordcloud scikit-learn -q

print("‚úÖ All packages installed successfully!")

‚úÖ All packages installed successfully!



[notice] A new release of pip available: 22.3 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


## ‚ö†Ô∏è Step 1.5: Setup Spark with Ngrok (GOOGLE COLAB ONLY)

**üö´ SKIP THIS CELL if using local Jupyter on Windows!**

**For Google Colab users only:**
- ‚ö° Installs Apache Spark  
- üåç Public Spark Web UI access via ngrok
- üìä Monitor jobs from anywhere

**For local Windows Jupyter users:**
- ‚úÖ Use Cell 5 instead: "ü™ü Windows Local Jupyter - Setup Spark"
- ‚úÖ No ngrok needed - access at http://localhost:4040
- ‚úÖ Skip this cell to avoid ModuleNotFoundError

---

**If you already ran Cell 5 (Windows Setup), skip this cell!**

## ü™ü Windows Local Jupyter - Setup Spark

**For local Windows Jupyter notebook.** This version works without apt-get or Linux commands.

**Prerequisites:**
- Java 8 or 11 installed
- PySpark installed: `pip install pyspark findspark`

In [3]:
# Windows-compatible Spark setup for Jupyter
import os
import sys
import socket

print("üîß Setting up Spark for Windows Jupyter...")

# Install PySpark if not already installed
try:
    import findspark
    from pyspark.sql import SparkSession
    print("‚úÖ PySpark already installed")
except ImportError:
    print("üì¶ Installing PySpark...")
    import subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", "pyspark", "findspark", "-q"])
    import findspark
    from pyspark.sql import SparkSession
    print("‚úÖ PySpark installed")

# Initialize findspark (finds Spark installation)
try:
    findspark.init()
    print("‚úÖ Findspark initialized")
except:
    print("‚ö†Ô∏è  Findspark init skipped (using system PySpark)")

# Import Spark functions
from pyspark.sql.functions import *

# Stop existing Spark session if any
try:
    spark.stop()
    print("üîÑ Stopped existing Spark session")
except:
    pass

# Function to find free port
def find_free_port(start_port=4040):
    """Find a free port starting from start_port"""
    port = start_port
    while port < start_port + 100:
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.bind(('', port))
                return port
        except OSError:
            port += 1
    return start_port

# Find available port for Spark UI
spark_ui_port = find_free_port(4040)

print(f"\n‚ö° Creating Spark session on port {spark_ui_port}...")

# Create Spark Session with reduced memory for better compatibility
spark = SparkSession.builder \
    .appName("Dataset Analysis - Jupyter") \
    .master("local[*]") \
    .config("spark.ui.port", str(spark_ui_port)) \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.ui.enabled", "true") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
    .getOrCreate()

# Get the actual Web UI URL
spark_ui_url = spark.sparkContext.uiWebUrl
if not spark_ui_url:
    spark_ui_url = f"http://localhost:{spark_ui_port}"

print("\n" + "="*70)
print("‚úÖ SPARK SESSION CREATED!")
print("="*70)
print(f"   Spark Version: {spark.version}")
print(f"   Spark UI Port: {spark_ui_port}")
print(f"   Local Web UI: {spark_ui_url}")
print("="*70)

# Display clickable link
from IPython.display import HTML, display

html = f'''
<div style="background: linear-gradient(135deg, #11998e 0%, #38ef7d 100%); 
            padding: 25px; border-radius: 12px; text-align: center; 
            box-shadow: 0 4px 6px rgba(0,0,0,0.1); margin: 20px 0;">
    <h2 style="color: white; margin: 0 0 15px 0;">
        ‚ö° Spark Web UI Ready for Jupyter!
    </h2>
    <div style="background: white; padding: 15px; border-radius: 8px; margin: 15px 0;">
        <a href="{spark_ui_url}" target="_blank" 
           style="color: #11998e; font-size: 18px; font-weight: bold; text-decoration: none;">
            üöÄ {spark_ui_url}
        </a>
    </div>
    <p style="color: white; margin: 15px 0 5px 0; font-size: 14px;">
        Click the link above to open Spark Web UI in browser
    </p>
    <p style="color: rgba(255,255,255,0.9); margin: 5px 0 0 0; font-size: 12px;">
        ‚úÖ Jobs ‚Ä¢ ‚úÖ Stages ‚Ä¢ ‚úÖ DAG Visualizations ‚Ä¢ ‚úÖ SQL Queries
    </p>
</div>
'''
display(HTML(html))

print("\nüí° Next Steps:")
print("   1. ‚úÖ Spark is running - click the green button above")
print("   2. üìÇ Upload your dataset (run Step 2)")
print("   3. ‚ö° Generate Spark jobs (run Step 3.5)")
print("   4. üîÑ Refresh the browser to see jobs in Spark UI!")
print(f"\nüìä Bookmark this URL: {spark_ui_url}")
print("\n‚úÖ Ready for data analysis with Spark!")

üîß Setting up Spark for Windows Jupyter...
‚úÖ PySpark already installed
‚úÖ Findspark initialized

‚ö° Creating Spark session on port 4041...


Py4JError: An error occurred while calling None.org.apache.spark.sql.SparkSession. Trace:
py4j.Py4JException: Constructor org.apache.spark.sql.SparkSession([class org.apache.spark.SparkContext, class java.util.HashMap]) does not exist
	at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:180)
	at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:197)
	at py4j.Gateway.invoke(Gateway.java:237)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:833)



## ‚úÖ Verify Java Installation (Windows)

Run this cell first to check if Java is installed.

In [None]:
# Check Java installation
import subprocess
import sys

print("üîç Checking Java installation...\n")

try:
    result = subprocess.run(['java', '-version'], 
                          capture_output=True, 
                          text=True, 
                          shell=True)
    
    java_output = result.stderr if result.stderr else result.stdout
    
    if 'version' in java_output.lower():
        print("‚úÖ Java is installed!")
        print("\n" + java_output)
        print("\nüéâ You're ready to use Spark!")
    else:
        print("‚ö†Ô∏è  Java check returned unexpected output")
        print(java_output)
        
except FileNotFoundError:
    print("‚ùå Java is NOT installed!")
    print("\nüì• Please install Java:")
    print("   1. Download from: https://www.oracle.com/java/technologies/downloads/")
    print("   2. Or install OpenJDK: https://adoptium.net/")
    print("   3. Recommended: Java 8 or Java 11")
    print("\n   After installation, restart Jupyter and try again.")
except Exception as e:
    print(f"‚ùå Error checking Java: {e}")

## üì§ Step 2: Upload Your Dataset

**Supported formats:** CSV files

**Expected columns (flexible):**
- Product name/title
- Price
- Rating/Rate
- Reviews/Review text
- Category (optional)
- Discount (optional)

## üìÅ Quick Load (Alternative for Jupyter)

If you prefer, directly specify the CSV path in code:

In [None]:
# Quick Load - Directly specify CSV path (easier for Jupyter)
import pandas as pd
import os

# Change this to your CSV file path
csv_path = "BoatProduct.csv"  # Or use full path like: r"D:\project zip flies\scala project\scala project\BoatProduct.csv"

print(f"üìÇ Current directory: {os.getcwd()}")
print(f"üìÑ Loading: {csv_path}\n")

try:
    df = pd.read_csv(csv_path)
    print(f"‚úÖ Successfully loaded!")
    print(f"üìä Shape: {df.shape[0]:,} rows √ó {df.shape[1]} columns")
    print(f"\nüìã Columns: {', '.join(df.columns[:5])}{'...' if len(df.columns) > 5 else ''}")
    
    # Show first few rows
    print("\nüìë First 3 rows:")
    display(df.head(3))
    
except FileNotFoundError:
    print(f"‚ùå File not found: {csv_path}")
    print(f"\nüí° Available CSV files in current directory:")
    csv_files = [f for f in os.listdir('.') if f.endswith('.csv')]
    for i, f in enumerate(csv_files, 1):
        print(f"   {i}. {f}")
    if csv_files:
        print(f"\nüëâ Update csv_path to one of these files")
    else:
        print(f"   (No CSV files found)")
        
except Exception as e:
    print(f"‚ùå Error: {e}")

In [None]:
# Upload CSV file - Works in both Jupyter and Colab
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import warnings
import os
warnings.filterwarnings('ignore')

# Set styling
plt.style.use('default')
sns.set_palette("husl")
plt.rcParams['figure.figsize'] = (12, 6)

print("üìÅ Please select your CSV file...\n")

# Check if running in Colab or local Jupyter
try:
    from google.colab import files
    print("üåê Google Colab detected - using file upload widget")
    uploaded = files.upload()
    filename = list(uploaded.keys())[0]
    
except ImportError:
    # Running in local Jupyter
    print("üíª Local Jupyter detected")
    print("\nüìã Options:")
    print("   1. Enter full path to your CSV file")
    print("   2. Or place CSV in notebook directory and enter filename")
    print(f"\nüìÇ Current directory: {os.getcwd()}")
    
    # Show available CSV files in current directory
    csv_files = [f for f in os.listdir('.') if f.endswith('.csv')]
    if csv_files:
        print(f"\nüìÑ CSV files found in current directory:")
        for i, f in enumerate(csv_files, 1):
            print(f"   {i}. {f}")
    
    filename = input("\nüëâ Enter CSV filename or path: ").strip().strip('"')
    
    # If user entered a number, use that file from the list
    if filename.isdigit() and csv_files:
        idx = int(filename) - 1
        if 0 <= idx < len(csv_files):
            filename = csv_files[idx]
            print(f"‚úÖ Selected: {filename}")

# Load the CSV file
try:
    df = pd.read_csv(filename)
    print(f"\n‚úÖ Successfully loaded: {filename}")
    print(f"üìä Shape: {df.shape[0]} rows √ó {df.shape[1]} columns")
except FileNotFoundError:
    print(f"\n‚ùå File not found: {filename}")
    print(f"\nüí° Tips:")
    print(f"   ‚Ä¢ Use full path: C:\\Users\\YourName\\Documents\\data.csv")
    print(f"   ‚Ä¢ Or copy file to: {os.getcwd()}")
    raise
except Exception as e:
    print(f"\n‚ùå Error loading file: {e}")
    raise

## üîç Step 3: Load and Explore Data

In [None]:
# Load the dataset
df = pd.read_csv(filename, encoding='utf-8', on_bad_lines='skip')

print("="*70)
print("üìä DATASET OVERVIEW")
print("="*70)
print(f"\nüìà Total Records: {len(df):,}")
print(f"üìã Total Columns: {len(df.columns)}")
print(f"üíæ Memory Usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print(f"\nüìë Column Names:")
for i, col in enumerate(df.columns, 1):
    print(f"   {i}. {col} ({df[col].dtype})")

print("\n" + "="*70)
print("üîç FIRST 5 ROWS")
print("="*70)
display(df.head())

print("\n" + "="*70)
print("üìä DATA TYPES & MISSING VALUES")
print("="*70)
info_df = pd.DataFrame({
    'Column': df.columns,
    'Data Type': df.dtypes.values,
    'Non-Null Count': df.count().values,
    'Null Count': df.isnull().sum().values,
    'Null %': (df.isnull().sum() / len(df) * 100).values
})
display(info_df)

print("\n" + "="*70)
print("üìà STATISTICAL SUMMARY")
print("="*70)
display(df.describe())

## üßπ Step 4: Data Cleaning & Preprocessing

## ‚ö° Step 3.5: Process Data with Spark (Generate Jobs)

This step converts your data to Spark DataFrames and runs analysis to generate jobs visible in Spark Web UI.

## ‚ö†Ô∏è Python 3.13 Compatibility Issue

**Current Status:** You have Python 3.13, which is incompatible with PySpark 3.5.0

**Impact:** 
- ‚ùå Step 3.5 & 3.6 will crash
- ‚ùå No Spark jobs will appear in Web UI
- ‚ùå PySpark DataFrame operations fail

**üéØ TO SEE SPARK JOBS - YOU MUST:**

### Switch to Python 3.11 or 3.10

**Fastest Method (if you have Anaconda):**

Open PowerShell terminal and run:
```powershell
conda create -n spark-env python=3.11 -y
conda activate spark-env
pip install pyspark findspark pandas numpy matplotlib seaborn plotly textblob jupyter
jupyter notebook
```

Then reopen this notebook in the new Jupyter session.

**Verification:**
After switching Python versions, run this in a cell:
```python
import sys
print(f"Python version: {sys.version}")
# Should show 3.11.x
```

**Once on Python 3.11:**
- ‚úÖ Run Step 3.5 - Spark jobs will work!
- ‚úÖ Spark Web UI will show all jobs
- ‚úÖ DAG visualizations visible
- ‚úÖ Complete Spark functionality

---

**Alternative:** See Cell 1 for detailed downgrade instructions

In [None]:
# üéØ Check if Ready for Spark Jobs

import sys

print("="*70)
print("üîç SPARK JOBS READINESS CHECK")
print("="*70)

# Check 1: Python Version
python_ver = f"{sys.version_info.major}.{sys.version_info.minor}"
print(f"\n1. Python Version: {python_ver}")

if sys.version_info >= (3, 13):
    print("   ‚ùå Python 3.13 - NOT compatible with PySpark")
    print("   ‚ö†Ô∏è  Spark jobs WILL crash")
    print("   ‚úÖ Solution: Switch to Python 3.11")
    ready = False
elif sys.version_info >= (3, 11):
    print("   ‚úÖ Python 3.11/3.12 - Compatible!")
    ready = True
elif sys.version_info >= (3, 8):
    print("   ‚úÖ Python 3.8-3.10 - Compatible!")
    ready = True
else:
    print("   ‚ö†Ô∏è  Old Python version - consider upgrading")
    ready = False

# Check 2: Spark Session
print(f"\n2. Spark Session:")
try:
    if spark:
        print(f"   ‚úÖ Spark is running (v{spark.version})")
        print(f"   üìä Web UI: {spark.sparkContext.uiWebUrl}")
except NameError:
    print("   ‚ùå Spark not initialized")
    print("   ‚úÖ Solution: Run Cell 5 (Windows Spark Setup)")
    ready = False

# Check 3: Data Loaded
print(f"\n3. Dataset:")
try:
    if df is not None:
        print(f"   ‚úÖ Data loaded: {len(df):,} rows")
except NameError:
    print("   ‚ùå No data loaded")
    print("   ‚úÖ Solution: Run Cell 19 or 20 (Upload CSV)")
    ready = False

# Final verdict
print("\n" + "="*70)
if ready:
    print("üéâ ALL SYSTEMS GO! Ready to run Spark jobs!")
    print("="*70)
    print("\n‚úÖ Next step: Run Step 3.5 (Cell 26)")
    print("   Spark jobs will appear in Web UI!")
else:
    print("‚ö†Ô∏è  NOT READY - Fix issues above first")
    print("="*70)
    if sys.version_info >= (3, 13):
        print("\nüîß CRITICAL: Python 3.13 detected!")
        print("\n   Quick fix commands:")
        print("   conda create -n spark-env python=3.11 -y")
        print("   conda activate spark-env")
        print("   pip install pyspark findspark pandas numpy matplotlib seaborn plotly textblob jupyter")
        print("   jupyter notebook")
        print("\n   Then reopen this notebook")

print("="*70)

In [None]:
import sys

print("\n" + "="*70)
print("üîç PYTHON VERSION CHECK")
print("="*70)

python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
print(f"Current Python: {python_version}")

# Check if Python version is compatible
if sys.version_info >= (3, 13):
    print("\n‚ö†Ô∏è  Python 3.13+ detected - NOT compatible with PySpark 3.5.0")
    print("‚ùå Spark jobs will crash")
    compatible = False
elif sys.version_info >= (3, 12):
    print("\n‚ö†Ô∏è  Python 3.12 detected - May have worker crash issues")
    print("üí° Recommend: Use Python 3.11 or 3.10 for best results")
    compatible = "maybe"
else:
    print("\n‚úÖ Python version compatible with PySpark")
    compatible = True

print("="*70)

# Only attempt Spark jobs if compatible
if compatible == False:
    print("\n‚ùå SKIPPING SPARK JOBS - Incompatible Python version")
    print("\nüìä GOOD NEWS: You can still complete the full analysis!")
    print("   ‚úÖ Skip this cell")
    print("   ‚úÖ Run Cell 21 onwards for complete pandas analysis")
    print("   ‚úÖ Get all visualizations, statistics, and insights")
    print("\nüí° To see Spark jobs in Web UI:")
    print("   ‚Ä¢ Install Python 3.11 (not 3.12 or 3.13)")
    print("   ‚Ä¢ Create new environment: py -3.11 -m venv spark-venv")
    print("   ‚Ä¢ Activate: .\\spark-venv\\Scripts\\activate")
    print("   ‚Ä¢ Install packages: pip install pyspark jupyter pandas matplotlib seaborn plotly textblob")
    print("   ‚Ä¢ Start: jupyter notebook")
    print("="*70)

elif compatible == "maybe":
    print("\n‚ö†Ô∏è  ATTEMPTING SPARK JOBS (May fail with Python 3.12)")
    print("="*70)
    
    try:
        print("\nüìä Converting pandas DataFrame to Spark RDD...")
        print("   Using RDD operations (better compatibility)...\n")
        
        # Convert DataFrame to list of rows
        data_rows = df.values.tolist()
        column_names = df.columns.tolist()
        
        # Create RDD from the data
        rdd = spark.sparkContext.parallelize(data_rows)
        
        print(f"‚úÖ Created Spark RDD successfully")
        print(f"   Partitions: {rdd.getNumPartitions()}")
        
        # Run simplified RDD operations
        print("\nüîÑ Running Spark RDD operations...")
        
        # Job 1: Count operation
        print("\n1Ô∏è‚É£ RDD Count...")
        count = rdd.count()
        print(f"   ‚úÖ Total rows: {count:,}")
        
        # Job 2: Take sample
        print("\n2Ô∏è‚É£ Taking sample...")
        sample = rdd.take(3)
        print(f"   ‚úÖ Sample retrieved: {len(sample)} rows")
        
        # Job 3: Simple map operation
        print("\n3Ô∏è‚É£ Map operation...")
        mapped = rdd.map(lambda x: 1).take(10)
        print(f"   ‚úÖ Map completed")
        
        print("\n" + "="*70)
        print("‚úÖ Spark RDD processing complete!")
        print("="*70)
        print(f"\nüìä CHECK SPARK WEB UI: {spark_ui_url}")
        print("   You should see completed jobs in the Jobs tab")
        print("="*70)
        
    except Exception as e:
        error_msg = str(e)
        print(f"\n‚ùå Spark job failed: {type(e).__name__}")
        
        if "Python worker" in error_msg or "crashed" in error_msg:
            print("\nüîß PYTHON WORKER CRASHED")
            print("="*70)
            print("\nPython 3.12.4 has compatibility issues with PySpark.")
            print("\n‚úÖ SOLUTION: Skip this cell and use pandas analysis")
            print("   ‚Ä¢ Cell 21 onwards: Complete data analysis with pandas")
            print("   ‚Ä¢ Get all visualizations, statistics, and insights")
            print("   ‚Ä¢ No Spark required!")
            print("\nüí° To fix Spark jobs:")
            print("   ‚Ä¢ Install Python 3.11 (download from python.org)")
            print("   ‚Ä¢ Create new environment with Python 3.11")
            print("   ‚Ä¢ Reinstall packages and rerun notebook")
            print("="*70)
        else:
            print(f"   Error: {error_msg[:200]}")

else:
    # Python 3.11 or lower - should work
    print("\n‚ö° SPARK RDD PROCESSING")
    print("="*70)
    
    try:
        print("\nüìä Converting pandas DataFrame to Spark RDD...")
        
        # Convert DataFrame to list of rows
        data_rows = df.values.tolist()
        column_names = df.columns.tolist()
        
        # Create RDD from the data
        rdd = spark.sparkContext.parallelize(data_rows)
        
        print(f"‚úÖ Created Spark RDD successfully")
        print(f"   Partitions: {rdd.getNumPartitions()}")
        
        # Run RDD operations to generate jobs
        print("\nüîÑ Running Spark RDD operations (check Web UI for jobs)...")
        
        # Job 1: Count
        print("\n1Ô∏è‚É£ RDD Count...")
        count = rdd.count()
        print(f"   Total rows: {count:,}")
        
        # Job 2: Sample
        print("\n2Ô∏è‚É£ Taking sample...")
        sample = rdd.take(5)
        print(f"   Sample: {len(sample)} rows")
        
        # Job 3: Map
        print("\n3Ô∏è‚É£ Map operation...")
        first_col = rdd.map(lambda row: row[0] if len(row) > 0 else None).take(5)
        print(f"   Mapped: {first_col}")
        
        # Job 4: Filter
        print("\n4Ô∏è‚É£ Filter operation...")
        filtered = rdd.filter(lambda row: row is not None).count()
        print(f"   Filtered rows: {filtered:,}")
        
        # Job 5: Reduce
        print("\n5Ô∏è‚É£ Reduce operation...")
        total = rdd.map(lambda x: 1).reduce(lambda a, b: a + b)
        print(f"   Total count: {total:,}")
        
        print("\n" + "="*70)
        print("‚úÖ Spark RDD processing complete!")
        print("="*70)
        print(f"\nüìä CHECK SPARK WEB UI: {spark_ui_url}")
        print("\n   You should see:")
        print("   ‚úÖ 5+ completed jobs")
        print("   ‚úÖ RDD operations in Jobs tab")
        print("   ‚úÖ DAG visualizations")
        print("   ‚úÖ Task details in Stages tab")
        print("="*70)
        
    except NameError as e:
        print(f"\n‚ö†Ô∏è  NameError: {e}")
        print("\nüìã Make sure you've run:")
        print("   1. Cell 7 - Windows Spark Setup")
        print("   2. Cell 12 or 13 - Load CSV data")
        
    except Exception as e:
        error_msg = str(e)
        print(f"\n‚ùå Error: {type(e).__name__}")
        print(f"   {error_msg[:200]}")
        
        if "Python worker" in error_msg or "crashed" in error_msg:
            print("\nüîß Unexpected worker crash")
            print("\nüí° Alternative: Skip this cell and run Cell 21+ for pandas analysis")
        
        import traceback
        print("\nüìã Full traceback:")
        traceback.print_exc()

In [None]:
import re
from textblob import TextBlob

# Create a copy for processing
df_clean = df.copy()

print("üßπ Starting data cleaning...\n")

# Detect column names (flexible mapping)
column_mapping = {}

for col in df_clean.columns:
    col_lower = col.lower()
    if 'product' in col_lower and 'name' in col_lower:
        column_mapping['product_name'] = col
    elif 'price' in col_lower:
        column_mapping['price'] = col
    elif 'rate' in col_lower or 'rating' in col_lower:
        column_mapping['rating'] = col
    elif 'review' in col_lower and 'number' not in col_lower and 'count' not in col_lower:
        column_mapping['review_text'] = col
    elif 'review' in col_lower and ('number' in col_lower or 'count' in col_lower):
        column_mapping['review_count'] = col
    elif 'discount' in col_lower:
        column_mapping['discount'] = col
    elif 'summary' in col_lower:
        column_mapping['summary'] = col

print("üìã Detected columns:")
for key, val in column_mapping.items():
    print(f"   {key}: {val}")

# Clean price column
if 'price' in column_mapping:
    def clean_price(price_str):
        if pd.isna(price_str):
            return np.nan
        price_str = str(price_str)
        # Remove currency symbols, commas, and text
        price_str = re.sub(r'[^0-9.]', '', price_str)
        try:
            return float(price_str)
        except:
            return np.nan
    
    df_clean['price_cleaned'] = df_clean[column_mapping['price']].apply(clean_price)
    print("\n‚úÖ Price column cleaned")

# Clean rating column
if 'rating' in column_mapping:
    def clean_rating(rating_str):
        if pd.isna(rating_str):
            return np.nan
        rating_str = str(rating_str)
        # Extract numeric rating
        match = re.search(r'(\d+\.?\d*)', rating_str)
        if match:
            rating = float(match.group(1))
            # Normalize to 5-point scale if needed
            if rating > 5:
                rating = rating / 10 * 5
            return rating
        return np.nan
    
    df_clean['rating_cleaned'] = df_clean[column_mapping['rating']].apply(clean_rating)
    print("‚úÖ Rating column cleaned")

# Clean review count
if 'review_count' in column_mapping:
    def clean_review_count(count_str):
        if pd.isna(count_str):
            return 0
        count_str = str(count_str)
        # Extract numbers
        numbers = re.findall(r'\d+', count_str)
        if numbers:
            return int(numbers[0])
        return 0
    
    df_clean['review_count_cleaned'] = df_clean[column_mapping['review_count']].apply(clean_review_count)
    print("‚úÖ Review count cleaned")

# Clean discount
if 'discount' in column_mapping:
    def clean_discount(discount_str):
        if pd.isna(discount_str):
            return 0
        discount_str = str(discount_str)
        numbers = re.findall(r'\d+', discount_str)
        if numbers:
            return int(numbers[0])
        return 0
    
    df_clean['discount_cleaned'] = df_clean[column_mapping['discount']].apply(clean_discount)
    print("‚úÖ Discount column cleaned")

# Sentiment analysis on reviews
if 'review_text' in column_mapping or 'summary' in column_mapping:
    review_col = column_mapping.get('review_text') or column_mapping.get('summary')
    
    def get_sentiment(text):
        if pd.isna(text) or str(text).strip() == '':
            return 'Neutral', 0.0
        try:
            analysis = TextBlob(str(text))
            polarity = analysis.sentiment.polarity
            if polarity > 0.1:
                return 'Positive', polarity
            elif polarity < -0.1:
                return 'Negative', polarity
            else:
                return 'Neutral', polarity
        except:
            return 'Neutral', 0.0
    
    print("\nü§ñ Performing sentiment analysis...")
    df_clean[['sentiment', 'sentiment_score']] = df_clean[review_col].apply(
        lambda x: pd.Series(get_sentiment(x))
    )
    print("‚úÖ Sentiment analysis completed")

# Remove duplicates
before_dup = len(df_clean)
df_clean = df_clean.drop_duplicates()
after_dup = len(df_clean)
print(f"\nüóëÔ∏è  Removed {before_dup - after_dup} duplicate rows")

print(f"\n‚úÖ Data cleaning completed!")
print(f"üìä Final dataset size: {len(df_clean):,} rows")

## üìä Step 5: Statistical Analysis

In [None]:
print("="*70)
print("üìà STATISTICAL ANALYSIS")
print("="*70)

# Price statistics
if 'price_cleaned' in df_clean.columns:
    print("\nüí∞ PRICE STATISTICS:")
    price_stats = df_clean['price_cleaned'].describe()
    print(f"   Mean Price: ‚Çπ{price_stats['mean']:,.2f}")
    print(f"   Median Price: ‚Çπ{price_stats['50%']:,.2f}")
    print(f"   Min Price: ‚Çπ{price_stats['min']:,.2f}")
    print(f"   Max Price: ‚Çπ{price_stats['max']:,.2f}")
    print(f"   Std Dev: ‚Çπ{price_stats['std']:,.2f}")

# Rating statistics
if 'rating_cleaned' in df_clean.columns:
    print("\n‚≠ê RATING STATISTICS:")
    rating_stats = df_clean['rating_cleaned'].describe()
    print(f"   Mean Rating: {rating_stats['mean']:.2f}/5.0")
    print(f"   Median Rating: {rating_stats['50%']:.2f}/5.0")
    print(f"   Min Rating: {rating_stats['min']:.2f}")
    print(f"   Max Rating: {rating_stats['max']:.2f}")
    print(f"   Std Dev: {rating_stats['std']:.2f}")
    
    # Rating distribution
    print("\n   Rating Distribution:")
    rating_dist = df_clean['rating_cleaned'].value_counts().sort_index(ascending=False)
    for rating, count in rating_dist.head(5).items():
        percentage = (count / len(df_clean)) * 100
        print(f"   {rating:.1f} ‚≠ê: {count:,} ({percentage:.1f}%)")

# Sentiment statistics
if 'sentiment' in df_clean.columns:
    print("\nüòä SENTIMENT DISTRIBUTION:")
    sentiment_counts = df_clean['sentiment'].value_counts()
    for sentiment, count in sentiment_counts.items():
        percentage = (count / len(df_clean)) * 100
        emoji = 'üòä' if sentiment == 'Positive' else 'üòê' if sentiment == 'Neutral' else 'üòû'
        print(f"   {emoji} {sentiment}: {count:,} ({percentage:.1f}%)")

# Review count statistics
if 'review_count_cleaned' in df_clean.columns:
    print("\nüí¨ REVIEW STATISTICS:")
    review_stats = df_clean['review_count_cleaned'].describe()
    print(f"   Total Reviews: {df_clean['review_count_cleaned'].sum():,}")
    print(f"   Avg Reviews per Product: {review_stats['mean']:.2f}")
    print(f"   Max Reviews: {review_stats['max']:.0f}")

# Product statistics
if 'product_name' in column_mapping:
    print("\nüì¶ PRODUCT STATISTICS:")
    unique_products = df_clean[column_mapping['product_name']].nunique()
    print(f"   Unique Products: {unique_products:,}")
    print(f"   Total Records: {len(df_clean):,}")
    print(f"   Avg Reviews per Product: {len(df_clean) / unique_products:.2f}")

## üìä Step 6: Visualizations

In [None]:
# Ensure df_clean is a pandas DataFrame (convert from Spark if needed)
if hasattr(df_clean, 'toPandas'):
    print("üîÑ Converting Spark DataFrame to pandas...")
    df_clean = df_clean.toPandas()
    print("‚úÖ Conversion complete")

# Create subplots
fig = make_subplots(
    rows=2, cols=2,
    subplot_titles=('Price Distribution', 'Rating Distribution', 
                    'Sentiment Analysis', 'Top 10 Products by Reviews'),
    specs=[[{'type': 'histogram'}, {'type': 'histogram'}],
           [{'type': 'pie'}, {'type': 'bar'}]]
)

# 1. Price Distribution
if 'price_cleaned' in df_clean.columns:
    fig.add_trace(
        go.Histogram(x=df_clean['price_cleaned'], name='Price', 
                     marker_color='#3498db', nbinsx=50),
        row=1, col=1
    )

# 2. Rating Distribution
if 'rating_cleaned' in df_clean.columns:
    fig.add_trace(
        go.Histogram(x=df_clean['rating_cleaned'], name='Rating',
                     marker_color='#f39c12', nbinsx=20),
        row=1, col=2
    )

# 3. Sentiment Pie Chart
if 'sentiment' in df_clean.columns:
    sentiment_counts = df_clean['sentiment'].value_counts()
    colors = {'Positive': '#2ecc71', 'Neutral': '#95a5a6', 'Negative': '#e74c3c'}
    pie_colors = [colors.get(s, '#3498db') for s in sentiment_counts.index]
    
    fig.add_trace(
        go.Pie(labels=sentiment_counts.index, values=sentiment_counts.values,
               marker=dict(colors=pie_colors)),
        row=2, col=1
    )

# 4. Top Products by Review Count
if 'product_name' in column_mapping and 'review_count_cleaned' in df_clean.columns:
    top_products = df_clean.groupby(column_mapping['product_name'])['review_count_cleaned'].sum()
    top_products = top_products.nlargest(10).sort_values()
    
    fig.add_trace(
        go.Bar(y=top_products.index, x=top_products.values,
               orientation='h', marker_color='#9b59b6'),
        row=2, col=2
    )

# Update layout
fig.update_layout(
    height=800,
    showlegend=False,
    title_text="üìä Comprehensive Dataset Analysis Dashboard",
    title_font_size=20
)

fig.show()

print("‚úÖ Dashboard generated successfully!")

## üìà Step 7: Advanced Visualizations

In [None]:
# Ensure df_clean is a pandas DataFrame (convert from Spark if needed)
try:
    # Check if it's a Spark DataFrame by trying to access toPandas method
    if hasattr(df_clean, 'toPandas'):
        print("üîÑ Converting Spark DataFrame to pandas for visualizations...")
        df_clean = df_clean.toPandas()
        print("‚úÖ Conversion complete\n")
    # Also check by type name
    elif type(df_clean).__name__ == 'DataFrame' and 'pyspark' in str(type(df_clean).__module__):
        print("üîÑ Converting Spark DataFrame to pandas for visualizations...")
        df_clean = df_clean.toPandas()
        print("‚úÖ Conversion complete\n")
except Exception as e:
    print(f"‚ö†Ô∏è  Error during conversion check: {e}")

# Verify it's pandas now
print(f"üìä DataFrame type: {type(df_clean)}")
print(f"   Shape: {df_clean.shape if hasattr(df_clean, 'shape') else 'N/A'}")

# Price vs Rating Scatter Plot
if 'price_cleaned' in df_clean.columns and 'rating_cleaned' in df_clean.columns:
    # Sample data for better performance (using pandas methods)
    total_rows = df_clean.shape[0]
    sample_size = 1000 if total_rows > 1000 else total_rows
    df_sample = df_clean.sample(n=sample_size) if total_rows > 0 else df_clean
    
    fig = px.scatter(
        df_sample,
        x='price_cleaned',
        y='rating_cleaned',
        color='sentiment' if 'sentiment' in df_clean.columns else None,
        size='review_count_cleaned' if 'review_count_cleaned' in df_clean.columns else None,
        title='üí∞ Price vs Rating Analysis',
        labels={'price_cleaned': 'Price (‚Çπ)', 'rating_cleaned': 'Rating'},
        color_discrete_map={'Positive': '#2ecc71', 'Neutral': '#95a5a6', 'Negative': '#e74c3c'},
        height=500
    )
    fig.update_layout(xaxis_title='Price (‚Çπ)', yaxis_title='Rating')
    fig.show()

# Box plot for rating distribution by sentiment
if 'rating_cleaned' in df_clean.columns and 'sentiment' in df_clean.columns:
    fig = px.box(
        df_clean,
        x='sentiment',
        y='rating_cleaned',
        color='sentiment',
        title='‚≠ê Rating Distribution by Sentiment',
        labels={'rating_cleaned': 'Rating', 'sentiment': 'Sentiment'},
        color_discrete_map={'Positive': '#2ecc71', 'Neutral': '#95a5a6', 'Negative': '#e74c3c'},
        height=500
    )
    fig.show()

# Price categories
if 'price_cleaned' in df_clean.columns:
    def categorize_price(price):
        if pd.isna(price):
            return 'Unknown'
        if price < 1000:
            return 'Budget (< ‚Çπ1K)'
        elif price < 5000:
            return 'Economy (‚Çπ1K-5K)'
        elif price < 15000:
            return 'Mid-Range (‚Çπ5K-15K)'
        elif price < 50000:
            return 'Premium (‚Çπ15K-50K)'
        else:
            return 'Luxury (> ‚Çπ50K)'
    
    df_clean['price_category'] = df_clean['price_cleaned'].apply(categorize_price)
    
    price_cat_counts = df_clean['price_category'].value_counts()
    order = ['Budget (< ‚Çπ1K)', 'Economy (‚Çπ1K-5K)', 'Mid-Range (‚Çπ5K-15K)', 
             'Premium (‚Çπ15K-50K)', 'Luxury (> ‚Çπ50K)']
    price_cat_counts = price_cat_counts.reindex([o for o in order if o in price_cat_counts.index])
    
    fig = px.bar(
        x=price_cat_counts.index,
        y=price_cat_counts.values,
        title='üíµ Product Distribution by Price Category',
        labels={'x': 'Price Category', 'y': 'Number of Products'},
        color=price_cat_counts.values,
        color_continuous_scale='Viridis',
        height=500
    )
    fig.update_layout(xaxis_title='Price Category', yaxis_title='Number of Products')
    fig.show()

print("‚úÖ Advanced visualizations completed!")

## üéØ Step 8: Key Insights & Recommendations

In [None]:
# Ensure df_clean is pandas DataFrame
if hasattr(df_clean, 'toPandas'):
    print("üîÑ Converting Spark DataFrame to pandas...")
    df_clean = df_clean.toPandas()
    print("‚úÖ Conversion complete\n")

print("="*70)
print("üéØ KEY INSIGHTS & RECOMMENDATIONS")
print("="*70)

insights = []

# Price insights
if 'price_cleaned' in df_clean.columns:
    avg_price = df_clean['price_cleaned'].mean()
    median_price = df_clean['price_cleaned'].median()
    
    if avg_price > median_price * 1.5:
        insights.append(f"üìä The average price (‚Çπ{avg_price:,.2f}) is significantly higher than the median (‚Çπ{median_price:,.2f}), indicating the presence of high-priced premium products skewing the distribution.")
    
    price_range = df_clean['price_cleaned'].max() - df_clean['price_cleaned'].min()
    insights.append(f"üí∞ Price range spans ‚Çπ{price_range:,.2f}, from ‚Çπ{df_clean['price_cleaned'].min():,.2f} to ‚Çπ{df_clean['price_cleaned'].max():,.2f}.")

# Rating insights
if 'rating_cleaned' in df_clean.columns:
    avg_rating = df_clean['rating_cleaned'].mean()
    high_rated = (df_clean['rating_cleaned'] >= 4.0).sum()
    high_rated_pct = (high_rated / len(df_clean)) * 100
    
    insights.append(f"‚≠ê Average rating is {avg_rating:.2f}/5.0. {high_rated_pct:.1f}% of products have ratings ‚â• 4.0, indicating generally positive customer satisfaction.")
    
    if avg_rating < 3.5:
        insights.append("‚ö†Ô∏è Overall ratings are below 3.5, suggesting room for quality improvement.")
    elif avg_rating >= 4.5:
        insights.append("‚ú® Excellent average rating above 4.5, indicating high customer satisfaction!")

# Sentiment insights
if 'sentiment' in df_clean.columns:
    sentiment_dist = df_clean['sentiment'].value_counts(normalize=True) * 100
    
    if 'Positive' in sentiment_dist.index:
        pos_pct = sentiment_dist['Positive']
        if pos_pct > 70:
            insights.append(f"üòä Strong positive sentiment ({pos_pct:.1f}%) in customer reviews - excellent product reception!")
        elif pos_pct < 40:
            insights.append(f"üòü Low positive sentiment ({pos_pct:.1f}%) - consider investigating customer concerns.")
    
    if 'Negative' in sentiment_dist.index:
        neg_pct = sentiment_dist['Negative']
        if neg_pct > 20:
            insights.append(f"‚ö†Ô∏è Significant negative sentiment ({neg_pct:.1f}%) detected - review quality issues.")

# Price-Rating correlation
if 'price_cleaned' in df_clean.columns and 'rating_cleaned' in df_clean.columns:
    # Calculate correlation using pandas
    correlation = df_clean[['price_cleaned', 'rating_cleaned']].corr().iloc[0, 1]
    
    # Use Python's built-in abs() instead of letting it be interpreted by PySpark
    import builtins
    if builtins.abs(correlation) > 0.5:
        direction = "positive" if correlation > 0 else "negative"
        insights.append(f"üìà Strong {direction} correlation ({correlation:.2f}) between price and rating - {'higher' if correlation > 0 else 'lower'} priced products tend to have {'better' if correlation > 0 else 'worse'} ratings.")
    elif builtins.abs(correlation) < 0.2:
        insights.append(f"üí° Weak correlation ({correlation:.2f}) between price and rating - price doesn't strongly predict quality.")

# Review insights
if 'review_count_cleaned' in df_clean.columns:
    total_reviews = df_clean['review_count_cleaned'].sum()
    insights.append(f"üí¨ Total of {total_reviews:,} customer reviews analyzed, providing rich feedback data.")
    
    no_reviews = (df_clean['review_count_cleaned'] == 0).sum()
    if no_reviews > 0:
        no_reviews_pct = (no_reviews / len(df_clean)) * 100
        insights.append(f"üìù {no_reviews_pct:.1f}% of products have no reviews - opportunity for review collection campaigns.")

# Discount insights
if 'discount_cleaned' in df_clean.columns:
    avg_discount = df_clean['discount_cleaned'].mean()
    max_discount = df_clean['discount_cleaned'].max()
    
    if avg_discount > 20:
        insights.append(f"üè∑Ô∏è Average discount of {avg_discount:.1f}% indicates aggressive pricing strategy. Maximum discount: {max_discount:.0f}%.")
    
    if 'rating_cleaned' in df_clean.columns:
        high_discount_products = df_clean[df_clean['discount_cleaned'] > 30]
        if len(high_discount_products) > 0:
            avg_rating_discounted = high_discount_products['rating_cleaned'].mean()
            avg_rating_normal = df_clean[df_clean['discount_cleaned'] <= 30]['rating_cleaned'].mean()
            
            if avg_rating_discounted < avg_rating_normal - 0.5:
                insights.append("‚ö†Ô∏è Heavily discounted products have significantly lower ratings - possible quality concerns.")

# Display insights
print()
for i, insight in enumerate(insights, 1):
    print(f"{i}. {insight}")
    print()

print("="*70)
print("üìã RECOMMENDATIONS:")
print("="*70)

recommendations = []

if 'rating_cleaned' in df_clean.columns and df_clean['rating_cleaned'].mean() < 3.5:
    recommendations.append("üîß Focus on quality improvement initiatives")
    recommendations.append("üìû Implement customer feedback collection system")

if 'sentiment' in df_clean.columns:
    neg_pct = (df_clean['sentiment'] == 'Negative').sum() / len(df_clean) * 100
    if neg_pct > 20:
        recommendations.append("üõ†Ô∏è Address common complaints mentioned in negative reviews")
        recommendations.append("üì¢ Improve customer communication and support")

if 'price_cleaned' in df_clean.columns:
    if df_clean['price_cleaned'].std() / df_clean['price_cleaned'].mean() > 1:
        recommendations.append("üí∞ Consider segmenting products into clear price tiers")

if 'review_count_cleaned' in df_clean.columns:
    no_reviews_pct = (df_clean['review_count_cleaned'] == 0).sum() / len(df_clean) * 100
    if no_reviews_pct > 30:
        recommendations.append("üìù Launch review incentive programs for products with low engagement")

recommendations.append("üìä Continuously monitor these metrics to track improvement")
recommendations.append("üéØ Use insights for targeted marketing and product development")

for i, rec in enumerate(recommendations, 1):
    print(f"{i}. {rec}")

print("\n" + "="*70)
print("‚úÖ Analysis complete!")

## üíæ Step 9: Export Processed Data (Optional)

In [None]:
# Export cleaned dataset
output_filename = 'processed_dataset.csv'
df_clean.to_csv(output_filename, index=False)

print(f"‚úÖ Processed dataset saved as '{output_filename}'")
print(f"üìä Total records: {len(df_clean):,}")
print(f"üìã Total columns: {len(df_clean.columns)}")

# Show the full path where file was saved
import os
full_path = os.path.abspath(output_filename)
print(f"\nüíæ File saved to: {full_path}")
print(f"\nüìÇ You can find it in the same directory as this notebook")

---

## üìö Summary

This notebook provides:

‚úÖ **Automated data loading** from CSV upload

‚úÖ **Intelligent column detection** and cleaning

‚úÖ **Comprehensive statistical analysis**

‚úÖ **Interactive visualizations** with Plotly

‚úÖ **AI-powered sentiment analysis** on reviews

‚úÖ **Actionable insights** and recommendations

‚úÖ **Processed data export** for further use

---

### üöÄ Next Steps:

1. Upload your own CSV dataset
2. Run all cells sequentially
3. Review the insights and visualizations
4. Download the processed data
5. Use insights for business decisions

---

**Created for:** Dataset Analysis Prototype

**Version:** 1.0

**Platform:** Google Colab