In [1]:
import findspark
from pyspark.sql import SparkSession
import time

In [2]:
findspark.init()
findspark.find()
# findspark.init()

'C:\\Spark\\spark-3.3.3-bin-hadoop3'

### 1. Load data from csv, show schema and some samples

In [3]:
# Create SparkSession 
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("pyspark_demo") \
      .config("spark.sql.execution.pythonUDF.arrow.enabled", "true") \
      .getOrCreate() 


In [4]:
# create a dataframe from RDD
# there are a few options such as "header", "delimiter", "inferSchema"
df = spark.read.options(header=True, inferSchema=True).csv("../data/raw/sampledata.csv")
df.printSchema()
df.show(2)

root
 |-- Suburb: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Rooms: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Method: string (nullable = true)
 |-- SellerG: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Postcode: integer (nullable = true)
 |-- Bedroom2: integer (nullable = true)
 |-- Bathroom: integer (nullable = true)
 |-- Car: integer (nullable = true)
 |-- Landsize: integer (nullable = true)
 |-- BuildingArea: integer (nullable = true)
 |-- YearBuilt: integer (nullable = true)
 |-- CouncilArea: string (nullable = true)
 |-- Lattitude: double (nullable = true)
 |-- Longtitude: double (nullable = true)
 |-- Regionname: string (nullable = true)
 |-- Propertycount: integer (nullable = true)

+----------+-------------+-----+----+-------+------+-------+---------+--------+--------+--------+--------+---+--------+------------+---------+---------

### 2. Selecting and filtering

In [5]:
# filter by landsize > 200
df.filter(df["Landsize"] > 200).show(2)

+----------+------------+-----+----+-------+------+-------+---------+--------+--------+--------+--------+---+--------+------------+---------+------------------+---------+----------+--------------------+-------------+
|    Suburb|     Address|Rooms|Type|  Price|Method|SellerG|     Date|Distance|Postcode|Bedroom2|Bathroom|Car|Landsize|BuildingArea|YearBuilt|       CouncilArea|Lattitude|Longtitude|          Regionname|Propertycount|
+----------+------------+-----+----+-------+------+-------+---------+--------+--------+--------+--------+---+--------+------------+---------+------------------+---------+----------+--------------------+-------------+
|Abbotsford|85 Turner St|    2|   h|1480000|     S| Biggin|3/12/2016|     2.5|    3067|       2|       1|  1|     202|        null|     null|Yarra City Council| -37.7996|  144.9984|Northern Metropol...|         4019|
|Abbotsford|16 Maugie St|    4|   h|   null|    SN| Nelson| 6/8/2016|     2.5|    3067|       3|       2|  2|     400|         220| 

### 3. Grouping and Aggregating

In [6]:
start = time.time()
df.groupBy("Rooms").avg("Price").show()
end = time.time()
print("Time taken in seconds: ", end - start)

+-----+------------------+
|Rooms|        avg(Price)|
+-----+------------------+
|    1| 496595.2380952381|
|    6|2891666.6666666665|
|    3| 1106181.865284974|
|    5|1627444.4444444445|
|    4|1572198.1132075472|
|    7|              null|
|    2| 830115.2542372881|
+-----+------------------+

Time taken in seconds:  0.986379861831665


In [9]:
# Create custom function
def FindUnitPriceK(price, landsize):
    return price/1000/landsize

# Register the function as a UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
UnitPriceUDF = udf(FindUnitPriceK, FloatType())

# apply the function to create a new column
df = df.withColumn("UnitPriceK", UnitPriceUDF(df["Price"], df["Landsize"]))

TypeError: 'JavaPackage' object is not callable