In [28]:
from pyspark.sql.functions import expr
from pyspark.sql.types import IntegerType
import os
from pyspark.sql import SparkSession

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

spark_home="C:/Users/omar/Downloads/spark_unzipped/spark-3.5.1-bin-hadoop3"
#environment settings

os.environ["SPARK_HOME"] = spark_home

# Add Spark bin and executors to PATH
os.environ["PATH"] += os.pathsep + os.path.join(spark_home, "bin")
os.environ["PATH"] += os.pathsep + os.path.join(spark_home, "sbin")

# Add Spark Python libraries to PYTHONPATH
os.environ["PYTHONPATH"] = os.path.join(spark_home, "python") + os.pathsep + os.environ.get("PYTHONPATH", "")
os.environ["PYTHONPATH"] += os.pathsep + os.path.join(spark_home, "python", "lib")

# Add PySpark to the system path
os.environ["PATH"] += os.pathsep + os.path.join(spark_home, "python", "lib", "pyspark.zip")
os.environ["PATH"] += os.pathsep + os.path.join(spark_home, "python", "lib", "py4j-0.10.9-src.zip")

os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

# Create a SparkSession
spark = SparkSession.builder \
    .appName("PySpark-Script") \
    .getOrCreate()


In [25]:

# Initialize SparkSession
spark2 = SparkSession.builder.appName("AirbnbPriceCategories").getOrCreate()

# Read dataset
file_path = 'Dataset/Airbnb_Data1.csv'
df = spark2.read.csv(file_path, header=True)

#preprocess all the prices to make them integers by using int() function
df = df.withColumn("price", expr("int(price)"))


# Define a function to create price categories
def create_price_category(price):
  """
  This function takes a price and assigns it to a price category based on ranges.
  """
  prices = [(0 + 50 * i, 50 + 50 * i) for i in range(0, 40)]
  return [i for i in range(len(prices)) if prices[i][0] <= int(price) <= prices[i][1]][0]

# Apply the map function using User Defined Function (UDF)
udf_create_price_category = spark.udf.register("create_price_category", create_price_category, IntegerType())
df = df.withColumn("price_category", udf_create_price_category(df["price"]))

# Reduce function (using groupBy and count)
price_category_counts = df.groupBy("price_category").count()

# Print the results
price_category_counts.show()

spark.stop()


+--------------+-----+
|price_category|count|
+--------------+-----+
|            34|   16|
|            28|    7|
|            26|   13|
|            27|   23|
|            12|  183|
|            22|   10|
|             1|26061|
|            13|  218|
|             6| 1471|
|            16|   74|
|             3| 9728|
|            20|   19|
|             5| 3008|
|            19|  273|
|            15|  223|
|             9|  812|
|            17|  115|
|            35|   11|
|             4| 4563|
|             8|  598|
+--------------+-----+
only showing top 20 rows

Price category: 34, Count: 16
Price category: 28, Count: 7
Price category: 26, Count: 13
Price category: 27, Count: 23
Price category: 12, Count: 183
Price category: 22, Count: 10
Price category: 1, Count: 26061
Price category: 13, Count: 218
Price category: 6, Count: 1471
Price category: 16, Count: 74
Price category: 3, Count: 9728
Price category: 20, Count: 19
Price category: 5, Count: 3008
Price category: 19, Count:

### ALT 2

In [29]:
# Read dataset
file_path = 'Dataset/Airbnb_Data1.csv'

dataset = spark.read.csv(file_path, header=True)

#preprocess all the prices to make them integers by using int() function
dataset = dataset.withColumn("price", expr("int(price)"))


# Define the map function
def map_function(price):
    
    # Define price ranges
    prices = [(0 + 50 * i, 50 + 50 * i) for i in range(0, 40)]
    # Iterate over price ranges
    for i, p in enumerate(prices):
        # Check if the price falls within the range
        if p[0] <= int(price) <= p[1]:
            # Return the index of the price range
            return i
    # If the price does not fall within any range, return -1 or handle accordingly
    return -1

# Register the UDF
map_udf = udf(map_function, IntegerType())

# Apply the map function to create a new column
mapped_dataset = dataset.withColumn("price_category", map_udf("price"))

# Reduce function
def reduce_function(rows):
    counts = [0] * 40
    for row in rows:
        # Check if row['price_category'] is not -1 (indicating an invalid category)
        if row['price_category'] != -1:
            counts[row['price_category']] += 1
    return counts

# Group data and apply reduce function
reduced_counts = mapped_dataset.groupBy("price_category").count().orderBy("price_category").collect()

print(reduced_counts)


[Row(price_category=0, count=8429), Row(price_category=1, count=26061), Row(price_category=2, count=15767), Row(price_category=3, count=9728), Row(price_category=4, count=4563), Row(price_category=5, count=3008), Row(price_category=6, count=1471), Row(price_category=7, count=1222), Row(price_category=8, count=598), Row(price_category=9, count=812), Row(price_category=10, count=249), Row(price_category=11, count=425), Row(price_category=12, count=183), Row(price_category=13, count=218), Row(price_category=14, count=146), Row(price_category=15, count=223), Row(price_category=16, count=74), Row(price_category=17, count=115), Row(price_category=18, count=60), Row(price_category=19, count=273), Row(price_category=20, count=19), Row(price_category=21, count=47), Row(price_category=22, count=10), Row(price_category=23, count=90), Row(price_category=24, count=32), Row(price_category=25, count=33), Row(price_category=26, count=13), Row(price_category=27, count=23), Row(price_category=28, count=

In [30]:
#get counts only from the reduced_counts
counts = [row['count'] for row in reduced_counts]

print(counts)



[8429, 26061, 15767, 9728, 4563, 3008, 1471, 1222, 598, 812, 249, 425, 183, 218, 146, 223, 74, 115, 60, 273, 19, 47, 10, 90, 32, 33, 13, 23, 7, 117, 2, 14, 7, 11, 16, 11, 4, 10, 8, 14]
