# Task 1: Big Data Analysis using PySpark
This notebook demonstrates processing of large data using PySpark, with transformations and simple insights.

In [1]:
# Step 1: Initialize Spark
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CaliforniaHousing").getOrCreate()

### 📦 Reading Zipped CSV File

- Extracted housing.csv from ZIP using zipfile.
- Read the CSV into PySpark DataFrame with inferSchema=True and header=True.

In [4]:
import zipfile
import os

# Step 1: Extract the zip file
zip_path = "C:\\Users\\sarva\\Downloads\\housing.csv.zip"
extract_dir = "C:\\Users\\sarva\\Downloads\\unzipped_housing"

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_dir)

# Step 2: Read the extracted CSV using PySpark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("HousingData").getOrCreate()

csv_file_path = os.path.join(extract_dir, "housing.csv")  # Adjust if filename differs
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

df.show()
df.printSchema()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

### 🧾 Add Constant Column using lit()

- Added a new column source with the constant value "Internship" to all rows.

In [5]:
from pyspark.sql.functions import lit

# Add a constant column 'source' with value 'Internship'
df = df.withColumn("source", lit("Internship"))

df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|    source|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|Internship|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|Internship|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|Internship|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|    

### 🎲 Add Random Column using rand()

- Added a new column score with random float values between 0 and 1.

In [6]:
from pyspark.sql.functions import rand

# Add a new column 'score' with random float values
df = df.withColumn("score", rand())

df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----------+-------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|    source|              score|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----------+-------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|Internship| 0.4560686343068585|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|Internship| 0.9860320676056068|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|Internship|0.31282

### 📊 GroupBy on ocean_proximity

- Calculated average median_house_value for each location category.
- Counted how many records are present in each ocean_proximity group.

In [7]:
# Group by 'ocean_proximity' and show average house value in each category
df.groupBy("ocean_proximity").avg("median_house_value").show()

# Count number of records per group
df.groupBy("ocean_proximity").count().show()

+---------------+-----------------------+
|ocean_proximity|avg(median_house_value)|
+---------------+-----------------------+
|         ISLAND|               380440.0|
|     NEAR OCEAN|     249433.97742663656|
|       NEAR BAY|     259212.31179039303|
|      <1H OCEAN|     240084.28546409807|
|         INLAND|     124805.39200122119|
+---------------+-----------------------+

+---------------+-----+
|ocean_proximity|count|
+---------------+-----+
|         ISLAND|    5|
|     NEAR OCEAN| 2658|
|       NEAR BAY| 2290|
|      <1H OCEAN| 9136|
|         INLAND| 6551|
+---------------+-----+

