# INTRODUCTION TO SPARK

**Prepared by:**

Raahul R <br>
22011103043 <br>
BTech CSE (Cyber Security), 3rd year

# New Section

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, avg, col
from pyspark.ml.feature import StringIndexer
import pyspark.sql.functions as F

Create Spark session

In [5]:
spark = SparkSession.builder.appName("RealEstate").getOrCreate()


Dataframe

In [8]:
df = spark.read.csv("/real_estate.csv", header=True, inferSchema=True)
df.printSchema()
df.show(5)

root
 |-- Address: string (nullable = true)
 |-- Rooms: integer (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Bedroom: integer (nullable = true)
 |-- Bathroom: integer (nullable = true)
 |-- Landsize: integer (nullable = true)
 |-- YearBuilt: integer (nullable = true)
 |-- Regionname: string (nullable = true)
 |-- Suburb: string (nullable = true)
 |-- Type: string (nullable = true)

+----------------+-----+-------+-------+--------+--------+---------+--------------------+----------+----+
|         Address|Rooms|  Price|Bedroom|Bathroom|Landsize|YearBuilt|          Regionname|    Suburb|Type|
+----------------+-----+-------+-------+--------+--------+---------+--------------------+----------+----+
|    85 Turner St|    2|1480000|      2|       1|     202|     NULL|Northern Metropol...|Abbotsford|   h|
| 25 Bloomburg St|    2|1035000|      2|       1|     156|     1900|Northern Metropol...|Abbotsford|   h|
|    5 Charles St|    3|1465000|      3|       2|     134|     1900|N

1. Encode Address Variable

In [9]:
address_indexer = StringIndexer(inputCol="Address", outputCol="Address_Index")
df = address_indexer.fit(df).transform(df)

2. Map the year of built (4-digits) to a single digit

1800 -> 8 <br>
[1801 - 1850] -> 7 <br>
[1851 - 1900] -> 6 <br>
.... <br>
[2001 - 2023] -> 1 <br>
otherwise -> 0

In [10]:
df = df.withColumn("YearBuilt", col("YearBuilt").cast("int"))

df = df.withColumn(
    "YearBuilt_Category",
    when(col("YearBuilt") < 1800, 8)
    .when((col("YearBuilt") >= 1800) & (col("YearBuilt") <= 1850), 7)
    .when((col("YearBuilt") >= 1851) & (col("YearBuilt") <= 1900), 6)
    .when((col("YearBuilt") >= 1901) & (col("YearBuilt") <= 1920), 5)
    .when((col("YearBuilt") >= 1921) & (col("YearBuilt") <= 1950), 4)
    .when((col("YearBuilt") >= 1951) & (col("YearBuilt") <= 1980), 3)
    .when((col("YearBuilt") >= 1981) & (col("YearBuilt") <= 2000), 2)
    .when((col("YearBuilt") >= 2001) & (col("YearBuilt") <= 2023), 1)
    .otherwise(0)
)

3.Find the most dominant house type (mode) in the region Western Metropolitan in Sparkdf style and SQL style.

In [11]:
region_df = df.filter(col("Regionname") == "Western Metropolitan")
dominant_house_type = region_df.groupBy("Type").count().orderBy(col("count").desc()).first()[0]
print(f"Most Dominant House Type: {dominant_house_type}")

Most Dominant House Type: h


In [12]:
average_cost = region_df.groupBy("Type").agg(avg("Price").alias("Average_Cost"))
average_cost.show()

+----+-----------------+
|Type|     Average_Cost|
+----+-----------------+
|   h|950766.2659388647|
|   u|488414.4248210024|
|   t|720951.0460251046|
+----+-----------------+



4. Write SparkSQL query for the same

In [13]:
df.createOrReplaceTempView("real_estate") # Creating a view out of the dataset

In [14]:
dominant_house_type_query = """
SELECT Type, COUNT(*) AS count
FROM real_estate
WHERE Regionname = 'Western Metropolitan'
GROUP BY Type
ORDER BY count DESC
LIMIT 1
"""

dominant_house_type = spark.sql(dominant_house_type_query).first()[0]
print(f"Most Dominant House Type: {dominant_house_type}")

Most Dominant House Type: h


In [15]:
average_cost_query = """
SELECT Type, AVG(Price) AS Average_Cost
FROM real_estate
WHERE Regionname = 'Western Metropolitan'
GROUP BY Type
"""

average_cost = spark.sql(average_cost_query)
average_cost.show()

+----+-----------------+
|Type|     Average_Cost|
+----+-----------------+
|   h|950766.2659388647|
|   u|488414.4248210024|
|   t|720951.0460251046|
+----+-----------------+



5. Calculate the average cost of the rooms based on number of bedrooms available in map-reduce paradigm.

In [16]:
rdd = df.select("Bedroom", "Price").rdd
bedroom_price_pairs = rdd.map(lambda row: (row["Bedroom"], (row["Price"], 1)))
bedroom_totals = bedroom_price_pairs.reduceByKey(
    lambda acc, val: (acc[0] + val[0], acc[1] + val[1])
)

average_price_by_bedroom = bedroom_totals.mapValues(lambda x: x[0] / x[1])
for bedroom, avg_price in average_price_by_bedroom.collect():
    print(f"Bedroom: {bedroom}, Average Price: {avg_price}")

Bedroom: 2, Average Price: 787672.8067968959
Bedroom: 3, Average Price: 1082497.8044436907
Bedroom: 4, Average Price: 1452231.913110342
Bedroom: 1, Average Price: 447282.4182344428
Bedroom: 6, Average Price: 1831992.0634920634
Bedroom: 5, Average Price: 1854027.1726618705
Bedroom: 0, Average Price: 1030218.75
Bedroom: 8, Average Price: 1423200.0
Bedroom: 9, Average Price: 1487000.0
Bedroom: 7, Average Price: 1865700.0
Bedroom: 20, Average Price: 1650000.0
Bedroom: 10, Average Price: 900000.0


In [17]:
houses_before_2000 = df.filter(col("YearBuilt") < 2000).count()
print(f"Number of houses built before 2000: {houses_before_2000}")

Number of houses built before 2000: 6169


In [18]:
average_price = df.select(avg("Price")).collect()[0][0]
houses_above_average = df.filter(col("Price") > average_price).count()
print(f"Number of houses with price greater than average: {houses_above_average}")

Number of houses with price greater than average: 5186


In [19]:
average_price_specific = df.filter(
    (col("Regionname") == "Southern Metropolitan") & (col("Suburb") == "Albert Park")
).select(avg("Price")).collect()[0][0]
print(
    f"Average price in Southern Metropolitan, Albert Park: {average_price_specific}"
)

Average price in Southern Metropolitan, Albert Park: 1941355.072463768


In [20]:
spark.stop()