<a href="https://colab.research.google.com/github/itsyashkhurana/Big-Data-Analystics/blob/main/exp_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Clean up any existing Spark installations
!rm -rf /content/spark-*

# Install required packages with Spark 3.5.0
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
!pip uninstall pyspark -y -q  # Remove any existing PySpark
!pip install -q findspark pyspark==3.5.0

# Set up environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

# Initialize Spark
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# Create SparkSession
spark = SparkSession.builder \
    .appName("SparkHDFSExample") \
    .config("spark.sql.catalogImplementation", "in-memory") \
    .getOrCreate()

try:
    # Create sample data
    sample_data = [
        (1, "John", "Sales", 50000.0),
        (2, "Alice", "Engineering", 75000.0),
        (3, "Bob", "Marketing", 45000.0),
        (4, "Carol", "Sales", 55000.0)
    ]

    # Define schema
    schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("name", StringType(), False),
        StructField("department", StringType(), False),
        StructField("salary", FloatType(), False)
    ])

    # Create DataFrame
    df = spark.createDataFrame(sample_data, schema)

    # Save as parquet (simulating HDFS storage)
    df.write.mode("overwrite").parquet("/content/employee_data")

    # Create temporary view
    df.createOrReplaceTempView("employee_table")

    # Perform sample queries
    print("Query 1: Show all records")
    spark.sql("SELECT * FROM employee_table").show()

    print("Query 2: Average salary by department")
    spark.sql("""
        SELECT department, AVG(salary) as avg_salary
        FROM employee_table
        GROUP BY department
    """).show()

    print("Query 3: Employees with above-average salary")
    spark.sql("""
        SELECT name, salary
        FROM employee_table
        WHERE salary > (SELECT AVG(salary) FROM employee_table)
        ORDER BY salary DESC
    """).show()

    # Basic data analysis
    total_employees = spark.sql("SELECT COUNT(*) as total FROM employee_table").collect()[0]['total']
    max_salary = spark.sql("SELECT MAX(salary) as max_salary FROM employee_table").collect()[0]['max_salary']

    print(f"Total number of employees: {total_employees}")
    print(f"Maximum salary: {max_salary}")

except Exception as e:
    print(f"An error occurred: {str(e)}")

finally:
    # Clean up
    try:
        spark.catalog.dropTempView("employee_table")
        spark.stop()
    except:
        pass

    # Clear any cached data
    if 'spark' in locals():
        spark._jvm.System.gc()

# Remove temporary files
!rm -rf /content/employee_data

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
Query 1: Show all records
+---+-----+-----------+-------+
| id| name| department| salary|
+---+-----+-----------+-------+
|  1| John|      Sales|50000.0|
|  2|Alice|Engineering|75000.0|
|  3|  Bob|  Marketing|45000.0|
|  4|Carol|      Sales|55000.0|
+---+-----+-----------+-------+

Query 2: Average salary by department
+-----------+----------+
| department|avg_salary|
+-----------+----------+
|      Sales|   52500.0|
|Engineering|   75000.0|
|  Marketing|   45000.0|
+-----------+----------+

Query 3: Employees with above-average salary
+-----+-------+
| name| salary|
+-----+-------+
|Alice|75000.0|
+-----+-------+

Total number of employees: 4
Maximum salary: 75000.0
