<h1>Project - Airbnb Occupancy and Revenue Prediction</h1>

<h2>Project Overview</h2>
<p><b><h3>Title: Airbnb Occupancy and Revenue Analytics Using Spark - Los Angeles County</b>

This project analyzes Airbnb data alongside external datasets, such as crime statistics and public transportation availability, to uncover key factors influencing occupancy rates and rental prices. By leveraging Spark for efficient large-scale data processing, the goal is to generate actionable insights to help optimize profits for Airbnb hosts in Los Angeles County.

The analysis considers internal factors, such as property features and pricing strategies, alongside external elements like neighborhood safety and accessibility to transportation. The findings are intended to:
<ul>
(a) Refine pricing strategies to improve competitiveness</ul>
<ul>(b) Enhance the appeal of listings by identifying targeted improvements</ul>
<ul>(c) Guide decision-making for new property installations or renovations.</ul>


The ultimate objective is to empower hosts to maximize occupancy rates and revenue potential through data-driven recommendations.</p>

# Combined Dataset Metadata

| Field                             | Type       | Calculated | Description                                                                 |
|-----------------------------------|------------|------------|-----------------------------------------------------------------------------|
| **listings_minimum_nights**       | Integer    | No         | The minimum number of nights required to book a listing.                    |
| **listings_maximum_nights**       | Integer    | No         | The maximum number of nights allowed for booking a listing.                 |
| **listings_minimum_minimum_nights** | Integer | Yes        | Smallest minimum nights value from the calendar (next 365 days).            |
| **listings_maximum_minimum_nights** | Integer | Yes        | Largest minimum nights value from the calendar (next 365 days).             |
| **listings_minimum_maximum_nights** | Integer | Yes        | Smallest maximum nights value from the calendar (next 365 days).            |
| **listings_maximum_maximum_nights** | Integer | Yes        | Largest maximum nights value from the calendar (next 365 days).             |
| **listings_minimum_nights_avg_ntm** | Numeric | Yes        | Average minimum nights from the calendar (next 365 days).                   |
| **listings_maximum_nights_avg_ntm** | Numeric | Yes        | Average maximum nights from the calendar (next 365 days).                   |
| **listings_calendar_updated**     | Date       | No         | Last updated date of the listing's calendar.                                |
| **listings_has_availability**     | Boolean    | No         | Indicates whether the listing has future availability.                      |
| **listings_availability_30**      | Integer    | Yes        | Number of days available for booking in the next 30 days.                   |
| **listings_availability_60**      | Integer    | Yes        | Number of days available for booking in the next 60 days.                   |
| **listings_availability_90**      | Integer    | Yes        | Number of days available for booking in the next 90 days.                   |
| **listings_availability_365**     | Integer    | Yes        | Number of days available for booking in the next 365 days.                  |
| **listings_calendar_last_scraped** | Date      | Yes        | Last date when the listing data was scraped.                                |
| **listings_number_of_reviews**    | Integer    | No         | Total number of reviews for the listing.                                    |
| **listings_number_of_reviews_ltm** | Integer   | Yes        | Number of reviews in the last 12 months.                                    |
| **listings_number_of_reviews_l30d** | Integer  | Yes        | Number of reviews in the last 30 days.                                      |
| **listings_first_review**         | Date       | Yes        | Date of the first review.                                                   |
| **listings_last_review**          | Date       | Yes        | Date of the most recent review.                                             |
| **listings_review_scores_rating** | Numeric    | No         | Average rating score for the listing based on guest reviews.                |
| **listings_review_scores_accuracy** | Numeric | No         | Rating for the accuracy of the listing description.                         |
| **listings_review_scores_cleanliness** | Numeric | No      | Rating for the cleanliness of the listing.                                  |
| **listings_review_scores_checkin** | Numeric  | No         | Rating for the check-in process.                                            |
| **listings_review_scores_communication** | Numeric | No    | Rating for the communication with the host.                                 |
| **listings_review_scores_location** | Numeric | No        | Rating for the location of the listing.                                     |
| **listings_review_scores_value**  | Numeric    | No         | Rating for the value offered by the listing.                                |
| **listings_license**              | Text       | No         | Licensing or registration number for the listing.                           |
| **listings_instant_bookable**     | Boolean    | No         | Indicates if a listing is bookable instantly without host approval.         |
| **listings_calculated_host_listings_count** | Integer | Yes | Total listings by the host in the current region.                           |
| **listings_calculated_host_listings_count_entire_homes** | Integer | Yes | Entire home/apt listings by the host.                                      |
| **listings_calculated_host_listings_count_private_rooms** | Integer | Yes | Private room listings by the host.                                         |
| **listings_calculated_host_listings_count_shared_rooms** | Integer | Yes | Shared room listings by the host.                                          |
| **listings_reviews_per_month**    | Numeric    | Yes        | Average reviews per month over the lifetime of the listing.                 |
| **listings_index_right**          | Integer    | Yes        | Internal index for geospatial joins.                                        |
| **listings_ZIPCODE**              | Text       | No         | ZIP code for the Airbnb listing.                                            |
| **bus_ZIPCODE**                   | Text       | No         | ZIP code area for bus stop counts.                                          |
| **bus_bus_stops_count**           | Integer    | Yes        | Number of bus stops in the respective ZIP code area.                        |
| **metro_ZIPCODE**                 | Text       | No         | ZIP code area for metro station counts.                                     |
| **metro_train_stations_count**    | Integer    | Yes        | Number of metro stations in the respective ZIP code area.                   |
| **crime_ZIPCODE**                 | Text       | No         | ZIP code area for crime data.                                               |
| **crime_Assault and Battery**     | Integer    | Yes        | Count of assault and battery crimes in the area.                            |
| **crime_Child-Related Crimes**    | Integer    | Yes        | Count of crimes involving children in the area.                             |
| **crime_Cyber Crimes**            | Integer    | Yes        | Count of cybercrimes in the area.                                           |
| **crime_Driving Offenses**        | Integer    | Yes        | Count of driving-related crimes in the area.                                |
| **crime_Drug-Related Crimes**     | Integer    | Yes        | Count of drug-related crimes in the area.                                   |
| **crime_Fraud and Forgery**       | Integer    | Yes        | Count of fraud and forgery incidents in the area.                           |
| **crime_Human Trafficking**       | Integer    | Yes        | Count of human trafficking crimes in the area.                              |
| **crime_Kidnapping and Abduction** | Integer  | Yes        | Count of kidnapping and abduction cases in the area.                        |
| **crime_Miscellaneous Crimes**    | Integer    | Yes        | Count of miscellaneous crimes in the area.                                  |
| **crime_Other Crimes**            | Integer    | Yes        | Count of other unspecified crimes in the area.                              |
| **crime_Robbery and Extortion**   | Integer    | Yes        | Count of robbery and extortion cases in the area.                           |
| **crime_Serious Crime - Homicide** | Integer   | Yes        | Count of homicide cases in the area.                                        |
| **crime_Sexual Crimes**           | Integer    | Yes        | Count of sexual crimes in the area.                                         |
| **crime_Theft and Burglary**      | Integer    | Yes        | Count of theft and burglary cases in the area.                              |
| **crime_Traffic Offenses**        | Integer    | Yes        | Count of traffic offenses in the area.                                      |
| **crime_Vandalism**               | Integer    | Yes        | Count of vandalism incidents in the area.                                   |
| **crime_Weapons Offenses**        | Integer    | Yes        | Count of crimes involving weapons in the area.                              |


<h1>1. Installing all the libraries</h1>

In [None]:
!pip install pyspark==3.3.0

Collecting pyspark==3.3.0
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.3/281.3 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5 (from pyspark==3.3.0)
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m17.7 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764003 sha256=84d3c0f051fb5f0505975b6d9eddf1af73a7c39e4e5ee11eb977f692343b2ddd
  Stored in directory: /root/.cache/pip/wheels/81/9c/6c/d5200fcf351ffa39cbe09911e99703283624cd037df58070d9
Successfully built pyspark
Installing collected packages: py4j, pyspark
 

In [None]:
# Importing libraries for adding zipcodes
import geopandas as gpd
import pandas as pd
from shapely.geometry import Point
from google.colab import drive
drive.mount('/content/drive')

ValueError: mount failed

<h2>We are using pyspark for this project, so we are initializing the spark session here below.</h2>

In [None]:
# Initializing the pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
spark = SparkSession\
    .builder\
    .appName('Final Project')\
    .getOrCreate()

In [None]:
# Importing all required libraries
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.ml.feature import *

In [None]:
# Initializing all the paths
data_path = '/content/drive/MyDrive/Big Data/'
result_path = '/content/drive/MyDrive/Big Data/'
file_path = '/content/drive/MyDrive/Big Data/'

<h1>2. Adding zipcodes to all the data files</h1>

<h2>A. Listings</h2>

In [None]:
# Reading all the files
geojson_gdf = gpd.read_file('/content/drive/MyDrive/Big Data/data/LA_County_ZIP_Codes.geojson')
listings_pdf = pd.read_csv('/content/drive/MyDrive/Big Data/data/listings.csv.gz')
bus_pdf = pd.read_csv('/content/drive/MyDrive/Big Data/data/Bus_Stop_Benches.csv')
metro_pdf = pd.read_csv('/content/drive/MyDrive/Big Data/data/Metro_Stations.csv')
crime_pdf = pd.read_csv('/content/drive/MyDrive/Big Data/data/CrimeData.csv')

In [None]:
# Convert lat/long in CSV to a geometry column
listings_pdf['geometry'] = listings_pdf.apply(lambda row: Point(
    row['longitude'], row['latitude']), axis=1)
listings_gdf = gpd.GeoDataFrame(
    listings_pdf, geometry='geometry', crs=geojson_gdf.crs)

# Perform a spatial join to join the CSV points with GeoJSON geometries
listings_zip_df = gpd.sjoin(
    listings_gdf, geojson_gdf, how="inner", predicate='intersects')

In [None]:
# Displaying shape of listings
listings_zip_df = listings_zip_df.drop(columns=['OBJECTID', 'geometry',
    'Shape_Length', 'Shape_Area'])
listings_zip_df.shape

In [None]:
listings_zip_df.head()

In [None]:
listings_zip_df.to_parquet('listings_with_zipcodes.parquet', index=False)

<h2>B. Bus Stops</h2>

In [None]:
# Convert lat/long in CSV to a geometry column
bus_pdf['geometry'] = bus_pdf.apply(lambda row: Point(
    row['LONGITUDE'], row['LATITUDE']), axis=1)
bus_gdf = gpd.GeoDataFrame(bus_pdf, geometry='geometry',
    crs=geojson_gdf.crs)

# Perform a spatial join to join the CSV points with GeoJSON geometries
bus_zip_df = gpd.sjoin(
    bus_gdf, geojson_gdf, how="inner", predicate='intersects')

In [None]:
bus_column_list = ['OBJECTID_left', 'FID2', 'NUMBER', 'LATITUDE',
                   'LONGITUDE', 'SITEATS', 'CITY_SITE', 'ZIPCODE']
bus_zip_df = bus_zip_df[bus_column_list]
bus_zip_df.shape

In [None]:
bus_zip_df.head(5)

In [None]:
bus_zip_df.to_parquet('bus_with_zipcodes.parquet', index=False)

<h2>C. Metro Stations

In [None]:
# Convert lat/long in CSV to a geometry column
metro_pdf['geometry'] = metro_pdf.apply(lambda row: Point(
    row['longitude'], row['latitude']), axis=1)

metro_gdf = gpd.GeoDataFrame(metro_pdf, geometry='geometry',
    crs=geojson_gdf.crs)

# Perform a spatial join to join the CSV points with GeoJSON geometries
metro_zip_df = gpd.sjoin(metro_gdf, geojson_gdf,
    how="inner", predicate='intersects')

In [None]:
train_column_list = ['OBJECTID_left', 'source', 'cat1', 'cat2', 'Name',
    'description', 'latitude', 'longitude', 'ZIPCODE']

In [None]:
metro_zip_df = metro_zip_df[train_column_list]
metro_zip_df.shape

In [None]:
metro_zip_df.head(5)

In [None]:
metro_zip_df.to_parquet('metro_with_zipcodes.parquet', index=False)

<h2>D. Crime Data</h2>

In [None]:
# Convert lat/long in CSV to a geometry column
crime_pdf['geometry'] = crime_pdf.apply(lambda row: Point(
    row['LON'], row['LAT']), axis=1)
crime_gdf = gpd.GeoDataFrame(crime_pdf,
    geometry='geometry', crs=geojson_gdf.crs)

# Perform a spatial join to join the CSV points with GeoJSON geometries
crime_zip_df = gpd.sjoin(crime_gdf, geojson_gdf,
    how="inner", predicate='intersects')

In [None]:
crime_zip_df = crime_zip_df.drop(columns=[
    'DR_NO', 'Date Rptd', 'Rpt Dist No', 'Part 1-2', 'Mocodes',
    'Vict Sex', 'Vict Descent', 'Premis Cd', 'Premis Desc',
    'Weapon Used Cd', 'Weapon Desc', 'Status', 'Status Desc',
    'Crm Cd 1', 'Crm Cd 2', 'Crm Cd 3', 'Crm Cd 4', 'LOCATION',
    'Cross Street', 'index_right', 'OBJECTID', 'Shape_Length',
    'Shape_Area', 'Vict Age', 'geometry']
)
crime_zip_df.shape

In [None]:
crime_zip_df.head(5)

In [None]:
crime_zip_df.to_parquet('crime_with_zipcodes.parquet', index=False)

<h1>3. Reading all the files in Pyspark from external sources</h1>

In [None]:
# Reading all parquet files
listings_df = spark.read.parquet('listings_with_zipcodes.parquet')
bus_df = spark.read.parquet('bus_with_zipcodes.parquet')
metro_df = spark.read.parquet('metro_with_zipcodes.parquet')
crime_df = spark.read.parquet('crime_with_zipcodes.parquet')

In [None]:
# Displaying all data
listings_df.show(1)

<h1>4. Data Preparation by Transformation and Merging</h1>

<h2>A. Bus data</h2>

In [None]:
# Bus data transformation
bus_pivot_df = bus_df.groupBy(F.col('ZIPCODE')).count()
bus_pivot_df = bus_pivot_df.withColumnRenamed('count', 'bus_stops_count')
bus_pivot_df.count()

In [None]:
 bus_pivot_df.show(5)

<h2>B. Train data</h2>

In [None]:
# Train data transformation
metro_pivot_df = metro_df.groupBy(F.col('ZIPCODE')).count()
metro_pivot_df = metro_pivot_df.withColumnRenamed('count',
    'train_stations_count')
metro_pivot_df.count()

In [None]:
metro_pivot_df.show(5)

<h2>C. Crime data</h2>

In [None]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [None]:
# Converting 'DATE OCC' to timestamp format
crime_filtered = crime_df.withColumn("DATE OCC", F.to_timestamp(
    F.col("DATE OCC"), 'MM/dd/yyyy HH:mm:ss'))

# Step 3: Filter for dates between January 1, 2024, and June 30, 2024
crime_filtered = crime_filtered.filter(
    (F.col("DATE OCC") >= '2023-12-03') &
     (F.col("DATE OCC") <= '2024-12-03'))

In [None]:
# Adding the Date_of_Crime column
crime_renamed = crime_filtered.withColumn("Date_of_Crime",
    F.to_date(F.col("DATE OCC")))

# Showing the updated DataFrame
crime_renamed.show(5)

In [None]:
# Mapping dictionary for crime codes to categories
crime_category_mapping = {
    '626': 'Assault and Battery', '230': 'Assault and Battery',
    '235': 'Assault and Battery', '624': 'Assault and Battery',
    '622': 'Assault and Battery', '623': 'Assault and Battery',
    '627': 'Assault and Battery', '625': 'Assault and Battery',
    '850': 'Assault and Battery', '237': 'Child-Related Crimes',
    '922': 'Child-Related Crimes', '813': 'Child-Related Crimes',
    '814': 'Child-Related Crimes', '870': 'Child-Related Crimes',
    '473': 'Theft and Burglary', '474': 'Theft and Burglary',
    '420': 'Theft and Burglary', '331': 'Theft and Burglary',
    '341': 'Theft and Burglary', '350': 'Theft and Burglary',
    '343': 'Theft and Burglary', '442': 'Theft and Burglary',
    '351': 'Theft and Burglary', '450': 'Theft and Burglary',
    '352': 'Theft and Burglary', '453': 'Theft and Burglary',
    '210': 'Robbery and Extortion', '940': 'Robbery and Extortion',
    '220': 'Robbery and Extortion', '654': 'Fraud and Forgery',
    '653': 'Fraud and Forgery', '649': 'Fraud and Forgery',
    '652': 'Fraud and Forgery', '651': 'Fraud and Forgery',
    '670': 'Fraud and Forgery', '668': 'Fraud and Forgery',
    '660': 'Fraud and Forgery',
    '822': 'Human Trafficking', '921': 'Human Trafficking',
    '821': 'Sexual Crimes', '810': 'Sexual Crimes',
    '820': 'Sexual Crimes', '830': 'Sexual Crimes',
    '756': 'Weapons Offenses', '761': 'Weapons Offenses',
    '740': 'Vandalism', '745': 'Vandalism',
    '865': 'Drug-Related Crimes',
    '661': 'Cyber Crimes', '932': 'Cyber Crimes',
    '438': 'Driving Offenses', '433': 'Driving Offenses',
    '890': 'Traffic Offenses', '884': 'Traffic Offenses',
    '470': 'Traffic Offenses', '471': 'Traffic Offenses',
    '920': 'Kidnapping and Abduction',
    '901': 'Kidnapping and Abduction',
    '110': 'Serious Crime - Homicide',
    '902': 'Miscellaneous Crimes', '900': 'Miscellaneous Crimes',
    '903': 'Miscellaneous Crimes',
    '880': 'Miscellaneous Crimes', '886': 'Miscellaneous Crimes',
    '946': 'Miscellaneous Crimes',
    '951': 'Miscellaneous Crimes', '755': 'Miscellaneous Crimes',
    '926': 'Miscellaneous Crimes',
    '647': 'Miscellaneous Crimes', '487': 'Miscellaneous Crimes',
    '522': 'Miscellaneous Crimes',
    '520': 'Miscellaneous Crimes', '510': 'Miscellaneous Crimes',
    '668': 'Miscellaneous Crimes'
}
# Defining the mapping condition
crime_category_conditions = [F.when(F.col('Crm Cd') == code, category)
    for code, category in crime_category_mapping.items()]

# Create a new column 'Crime Category' based on the mapping
crime_category_df = crime_renamed.withColumn('Crime Category',
    F.coalesce(*crime_category_conditions))

In [None]:
# Replace NULL values in the 'Crime Category' column with 'Other Crimes'
crime_final = crime_category_df.fillna({'Crime Category': 'Other Crimes'})
crime_final.show(5)

In [None]:
# Transforming crime categories
temp_df = crime_final.select("ZIPCODE",
    "Date_of_Crime", "Crime Category")
crime_pivot_df = crime_final.select("ZIPCODE", "Date_of_Crime",
    "Crime Category").groupBy("ZIPCODE").pivot(
    "Crime Category").agg(F.count(F.lit(1)))
crime_pivot_df = crime_pivot_df.fillna(0)
crime_pivot_df.show(5)

<h2>D. Merging all data together</h2>

In [None]:
# Adding aliases to listings dataframe column names
list_old_cols = listings_df.columns
list_new_cols = ['listings_' + col for col in list_old_cols]
listings_merge_df = listings_df.toDF(*list_new_cols)
listings_merge_df.show(1)

In [None]:
# Adding aliases to bus dataframe column names
bus_old_cols = bus_pivot_df.columns
bus_new_cols = ['bus_' + col for col in bus_old_cols]
bus_merge_df = bus_pivot_df.toDF(*bus_new_cols)
bus_merge_df.show(1)

In [None]:
# Adding aliases to train dataframe column names
metro_old_cols = metro_pivot_df.columns
metro_new_cols = ['metro_' + col for col in metro_old_cols]
metro_merge_df = metro_pivot_df.toDF(*metro_new_cols)
metro_merge_df.show(1)

In [None]:
# Adding aliases to crime dataframe column names
crime_old_cols = crime_pivot_df.columns
crime_new_cols = ['crime_' + col for col in crime_old_cols]
crime_merge_df = crime_pivot_df.toDF(*crime_new_cols)
crime_merge_df.show(1)

In [None]:
# Merging listings and bus
list_bus_df = listings_merge_df.join(bus_merge_df,
    listings_merge_df.listings_ZIPCODE == bus_merge_df.bus_ZIPCODE,
    how='left')
list_bus_df = list_bus_df.fillna(0, subset=['bus_bus_stops_count'])
list_bus_df.show(5)

In [None]:
# Merging train to list_bus
list_bus_train_df = list_bus_df.join(metro_merge_df,
    listings_merge_df.listings_ZIPCODE == metro_merge_df.metro_ZIPCODE,
    how='left')
list_bus_train_df = list_bus_train_df.fillna(0,
    subset=['metro_train_stations_count'])
list_bus_train_df.show(5)

In [None]:
# Merging crime to list_bus_train
crime_columns = [col for col in crime_merge_df.columns\
                 if col != 'crime_ZIPCODE']
final_df = list_bus_train_df.join(crime_merge_df,
    listings_merge_df.listings_ZIPCODE == crime_merge_df.crime_ZIPCODE,
    how='left')
final_df = final_df.fillna(0,
    subset=crime_columns)
final_df.show(5)

<h3><b>We have merged out listings data from Airbnb Los Angeles listings with the Crime, Bus and Metro stations data based on the zipcode.</b></h3>

In [None]:
final_df.columns

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.pipeline import Pipeline
from pyspark.sql.functions import col, regexp_replace


In [None]:
final_df = final_df.withColumn(
    "listings_price",
    regexp_replace(col("listings_price"), "[$,]", "").cast("float")
)


In [None]:
from pyspark.sql.functions import col, sum

# Count null values in each column
null_counts = final_df.select([
    sum(col(c).isNull().cast("int")).alias(c) for c in final_df.columns
])

null_counts.show(truncate=False)

In [None]:
threshold = 0.8 * 45555  # Define threshold for dropping columns
columns_to_drop = [
    col for col, count in zip(null_counts.columns, null_counts.collect()[0]) if count > threshold
]
final_df = final_df.drop(*columns_to_drop)

In [None]:
final_df.printSchema()

In [None]:
# List of columns to drop
columns_to_drop = [
    'listings_listing_url', 'listings_scrape_id', 'listings_last_scraped',
    'listings_source', 'listings_picture_url', 'listings_host_url',
    'listings_host_name', 'listings_host_thumbnail_url', 'listings_latitude',
    'listings_longitude', 'listings_neighbourhood_group_cleansed'
    'listings_host_picture_url', 'listings_neighbourhood',
    'listings_neighbourhood_group_cleansed'
    'calendars_listing_id', 'metro_latitude',
    'metro_longitude', 'bus_stop_NUMBER',
    'listings_Shape_Length', 'listings_Shape_Area', 'metro_ZIPCODE',
    'crime_ZIPCODE', 'bus_stop_ZIPCODE'
]

# Drop columns from the PySpark DataFrame
final_df = final_df.drop(*columns_to_drop)

# Show the first few rows of the updated DataFrame
final_df.show(1)

In [None]:
final_df.printSchema()

In [None]:
# Dropping irrelevant or redundant columns
columns_to_drop = [
    'listings_neighborhood_overview', 'listings_host_about', 'listings_host_neighbourhood',
    'listings_license', 'listings_host_verifications'
]

final_df = final_df.drop(*columns_to_drop)

# Show schema after dropping
final_df.show(1)

In [None]:
# Dropping irrelevant or redundant columns
columns_to_drop = [
    'listings_name']

final_df = final_df.drop(*columns_to_drop)

# Show schema after dropping
final_df.show(1)

In [None]:


from pyspark.sql.functions import to_date, datediff, current_date

# Convert to datetime and derive 'host_active_years'
final_df = final_df.withColumn('listings_host_since', to_date(final_df['listings_host_since']))
final_df = final_df.withColumn('host_active_years', (datediff(current_date(), final_df['listings_host_since']) / 365).cast('double'))

# Show a few rows with the new feature
final_df.select('listings_host_since', 'host_active_years').show(1)

In [None]:
# Drop the original 'listings_host_since' column
final_df = final_df.drop('listings_host_since')

In [None]:
from pyspark.sql.functions import regexp_replace

# Remove '%' and convert to numeric
final_df = final_df.withColumn('listings_host_response_rate', regexp_replace('listings_host_response_rate', '%', '').cast('double'))
final_df = final_df.withColumn('listings_host_acceptance_rate', regexp_replace('listings_host_acceptance_rate', '%', '').cast('double'))

# Show updated columns
final_df.select('listings_host_response_rate', 'listings_host_acceptance_rate').show(5)

In [None]:
from pyspark.sql.functions import when

# Convert 't'/'f' to 1/0
binary_columns = ['listings_host_is_superhost', 'listings_host_has_profile_pic', 'listings_host_identity_verified', 'listings_instant_bookable']
for col_name in binary_columns:
    final_df = final_df.withColumn(col_name, when(final_df[col_name] == 't', 1).otherwise(0))

# Show binary column transformations
final_df.select(*binary_columns).show(5)

In [None]:
from pyspark.sql.functions import when, regexp_extract

# Extract numeric part and assign default 1 where text doesn't specify a number
final_df = final_df.withColumn(
    'bathrooms_count',
    regexp_extract('listings_bathrooms_text', r'(\d+)', 1).cast('float')
)

# Adjust the numeric value based on the type of bathroom
final_df = final_df.withColumn(
    'bathrooms_count',
    when(final_df['listings_bathrooms_text'].contains('shared'), final_df['bathrooms_count'] * 0.5)
    .when(final_df['listings_bathrooms_text'].contains('private'), final_df['bathrooms_count'] * 1.0)
    .otherwise(final_df['bathrooms_count'])
)

# Drop the original text column as it's now redundant
final_df = final_df.drop('listings_bathrooms_text')

# Show the result
final_df.select('bathrooms_count').show(5)

In [None]:
# Drop the 'listings_host_location' column
final_df = final_df.drop('listings_host_location')

In [None]:
from pyspark.sql.functions import when

# Map response times into numerical categories
final_df = final_df.withColumn(
    'response_time_category',
    when(final_df['listings_host_response_time'] == 'within an hour', 1)
    .when(final_df['listings_host_response_time'] == 'within a few hours', 2)
    .when(final_df['listings_host_response_time'] == 'within a day', 3)
    .when(final_df['listings_host_response_time'] == 'a few days or more', 4)
    .otherwise(5)  # For NULL or undefined values
)

# Drop the original column
final_df = final_df.drop('listings_host_response_time')

# Show the transformed column
final_df.select('response_time_category').show(10)

In [None]:
# Count the number of distinct values in 'listings_neighbourhood_cleansed'
distinct_count = final_df.select('listings_neighbourhood_cleansed').distinct().count()

print(f"Number of distinct neighborhoods: {distinct_count}")


In [None]:
# Drop 'listings_neighbourhood_cleansed' since ZIPCODE is available
final_df = final_df.drop('listings_neighbourhood_cleansed')

In [None]:
# Count the number of distinct values in 'listings_neighbourhood_cleansed'
distinct_count = final_df.select('listings_property_type').distinct().count()

print(f"Number of distinct property type: {distinct_count}")


In [None]:
final_df = final_df.withColumn(
    'property_type_group',
    F.when(final_df['listings_property_type'].rlike('Private room'), 'Private Room')
    .when(final_df['listings_property_type'].rlike('Entire|Townhouse'), 'Entire Unit')
    .when(final_df['listings_property_type'].rlike('Shared room'), 'Shared Accommodation')
    .when(final_df['listings_property_type'].rlike('Yurt|Treehouse|Lighthouse|Cave|Castle|Dome|Tower'), 'Unique Stays')
    .when(final_df['listings_property_type'].rlike('Boutique|Hotel|Hostel|Serviced apartment'), 'Boutique & Hotels')
    .when(final_df['listings_property_type'].rlike('Campsite|Tipi|Shepherd’s hut|Tent|Hut'), 'Nature-Based Stays')
    .when(final_df['listings_property_type'].rlike('Farm stay|Ranch|Barn'), 'Farm & Ranch Stays')
    .when(final_df['listings_property_type'].rlike('Camper|RV|Bus|Train|Boat|Houseboat'), 'Vehicles as Accommodation')
    .when(final_df['listings_property_type'].rlike('Resort|Villa|Bungalow'), 'Luxury & Resort Stays')
    .when(final_df['listings_property_type'].rlike('Casa|Minsu|Earthen home|Cycladic home'), 'Cultural & Local Experiences')
    .otherwise('Other')  # Catch-all for unclassified property types
)

In [None]:
# Drop the original 'listings_property_type' column
final_df = final_df.drop('listings_property_type')

In [None]:
# Count the occurrences of each property type group
final_df.groupBy('property_type_group').count().show()

In [None]:
from pyspark.sql.functions import when

# Get unique values from 'property_type_group'
property_types = final_df.select('property_type_group').distinct().rdd.flatMap(lambda x: x).collect()

# Create a binary column for each property type
for property_type in property_types:
    col_name = f'property_type_{property_type.replace(" ", "_")}'  # Replace spaces for valid column names
    final_df = final_df.withColumn(col_name, when(final_df['property_type_group'] == property_type, 1).otherwise(0))

# Drop the original 'property_type_group' column
final_df = final_df.drop('property_type_group')

# Show the new one-hot encoded columns
final_df.select([f'property_type_{property_type.replace(" ", "_")}' for property_type in property_types]).show(5)

In [None]:
# Check distinct values in 'listings_room_type'
final_df.select('listings_room_type').distinct().show()

In [None]:
from pyspark.sql.functions import when

room_types = ['Shared room', 'Hotel room', 'Entire home/apt', 'Private room']
for room_type in room_types:
    col_name = f'room_type_{room_type.replace(" ", "_")}'  # Replace spaces with underscores
    final_df = final_df.withColumn(col_name, when(final_df['listings_room_type'] == room_type, 1).otherwise(0))

# Drop the original 'listings_room_type' column
final_df = final_df.drop('listings_room_type')

# Show the one-hot encoded columns
final_df.select([f'room_type_{room_type.replace(" ", "_")}' for room_type in room_types]).show(5)


In [None]:
# Show a sample of 'listings_amenities'
final_df.select('listings_amenities').show(5, truncate=False)

In [None]:
final_df = final_df.withColumn('listings_amenities', F.lower(col('listings_amenities')))

In [None]:
# Define the 27 categories and their associated keywords
categories = {
    "free_wifi_and_internet": ["wifi", "internet"],
    "outdoor_spaces": ["patio", "balcony", "backyard", "garden", "terrace", "views"],
    "swimming_facilities": ["pool", "hot tub", "jacuzzi"],
    "kitchen_essentials": ["kitchen", "microwave", "stove", "oven", "refrigerator"],
    "laundry_facilities": ["washer", "dryer", "iron", "ironing board"],
    "heating_and_cooling": ["air conditioning", "heating", "fan"],
    "safety_and_security": ["carbon monoxide alarm", "smoke detector", "first aid kit"],
    "entertainment": ["tv", "streaming", "cable"],
    "free_parking": ["free parking"],
    "parking_facilities": ["garage", "paid parking"],
    "pet_friendly": ["pets allowed", "dog-friendly"],
    "accessibility_features": ["elevator", "wheelchair access"],
    "fitness_and_gym_equipment": ["gym", "fitness", "workout equipment"],
    "child_friendly_amenities": ["crib", "high chair", "playpen"],
    "essentials": ["towels", "shampoo", "soap", "toiletries"],
    "workspace_features": ["desk", "workspace", "ergonomic chair"],
    "barbecue_and_grills": ["grill", "barbecue"],
    "luxury_features": ["private pool", "sauna", "jacuzzi"],
    "recreational_features": ["game room", "table tennis", "ping pong"],
    "transport_features": ["airport shuttle", "bike rental"],
    "scenic_views": ["mountain view", "sea view", "city view"],
    "shared_spaces": ["shared spaces", "communal area"],
    "technology_and_smart_home": ["smart lock", "smart thermostat"],
    "dining_area": ["dining table", "breakfast nook"],
    "event_friendly_spaces": ["event space", "meeting room"],
    "green_spaces": ["garden", "outdoor plants"],
    "luxury_entertainment": ["home theater", "cinema", "pool table"],
    "special_accommodations": ["allergy-friendly", "vegan-friendly"],
}

# Add a new column for each category with the count of matches in "amenities"
for category, keywords in categories.items():
    condition = F.lit(0)  # Initialize condition as a numeric column with 0
    for keyword in keywords:
        condition = condition + F.when(F.lower(F.col("listings_amenities")).contains(keyword.lower()), 1).otherwise(0)
    final_df = final_df.withColumn(category, condition)

# Show the updated DataFrame
final_df.select("listings_amenities", *categories.keys()).show(truncate=False)

In [None]:
from pyspark.sql.functions import size, split

# Count total amenities
final_df = final_df.withColumn('total_amenities', size(split('listings_amenities', ',')))

# Show the new column
final_df.select('listings_amenities', 'total_amenities').show(5, truncate=False)

In [None]:
# Drop the 'listings_amenities' column
final_df = final_df.drop('listings_amenities')

In [None]:
final_df = final_df.drop('listings_calendar_last_scraped')

In [None]:
from pyspark.sql.functions import when

final_df = final_df.withColumn(
    'has_availability',
    when(final_df['listings_has_availability'] == 't', 1).otherwise(0)
)

# Drop the original column
final_df = final_df.drop('listings_has_availability')

# Show the new column
final_df.select('has_availability').show(10)

In [None]:
final_df = final_df.drop('bus_ZIPCODE')

In [None]:
from pyspark.sql.functions import to_date

# Convert to date format
final_df = final_df.withColumn('listings_first_review', to_date(final_df['listings_first_review']))
final_df = final_df.withColumn('listings_last_review', to_date(final_df['listings_last_review']))

# Show the converted columns
final_df.select('listings_first_review', 'listings_last_review').show(10)

In [None]:
from pyspark.sql.functions import datediff, current_date

# Calculate days since last review
final_df = final_df.withColumn('days_since_last_review', datediff(current_date(), final_df['listings_last_review']))

# Calculate duration between first and last review
final_df = final_df.withColumn('review_activity_duration', datediff(final_df['listings_last_review'], final_df['listings_first_review']))

# Show the new columns
final_df.select('listings_first_review', 'listings_last_review', 'days_since_last_review', 'review_activity_duration').show(10)

In [None]:
# Drop the original columns
final_df = final_df.drop('listings_first_review', 'listings_last_review')

In [None]:
final_df.show(1)

In [None]:
from pyspark.sql.functions import col, sum

# Count null values for each column
null_counts = final_df.select([sum(col(c).isNull().cast('int')).alias(c) for c in final_df.columns])

# Show the results
null_counts.show()


In [None]:
# Drop columns with excessive nulls if needed
final_df = final_df.drop('listings_review_scores_rating', 'listings_review_scores_accuracy')  # Example

In [None]:
# Step 1: Check unique values to assess the column
final_df.select('bathrooms_count').distinct().show(10)

In [None]:
# Impute values for columns based on specified strategies
columns_to_impute_mean = [
    'listings_host_response_rate', 'listings_host_acceptance_rate',
    'listings_review_scores_cleanliness', 'listings_review_scores_checkin',
    'listings_review_scores_communication', 'listings_review_scores_location',
    'listings_review_scores_value'
]

# Impute mean for specified columns
for col_name in columns_to_impute_mean:
    mean_value = final_df.select(col_name).groupBy().avg(col_name).first()[0]
    final_df = final_df.fillna({col_name: mean_value})

# Impute 0 for specified columns
columns_to_impute_zero = [
    'listings_host_listings_count', 'listings_host_total_listings_count',
    'listings_beds', 'listings_reviews_per_month', 'host_active_years',
    'bathrooms_count'
]

final_df = final_df.fillna({col_name: 0 for col_name in columns_to_impute_zero})

# Impute -1 for 'days_since_last_review'
final_df = final_df.fillna({'days_since_last_review': -1})

# Confirm imputation
final_df.select([
    sum(col(c).isNull().cast('int')).alias(c) for c in columns_to_impute_mean + columns_to_impute_zero + ['days_since_last_review']
]).show()

In [None]:
from pyspark.sql.functions import col

# Cast the column to integer type
final_df = final_df.withColumn('listings_ZIPCODE', col('listings_ZIPCODE').cast('int'))

# Verify the data type
final_df.printSchema()

# Show the first few rows to confirm the change
final_df.select('listings_ZIPCODE').show(10)


In [None]:
final_df = final_df.fillna({'review_activity_duration': -1})

In [None]:
# Drop rows with null values in any column
final_df_cleaned = final_df.dropna()
final_df_cleaned_listings = final_df.dropna()

In [None]:
# Show the resulting DataFrame
final_df_cleaned.show(5)

# Count the rows before and after dropping nulls
print(f"Rows before dropping nulls: {final_df.count()}")
print(f"Rows after dropping nulls: {final_df_cleaned.count()}")

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

# Select numerical columns
numerical_cols = [col for col in final_df.columns if final_df.schema[col].dataType.typeName() in ['double', 'long', 'float', 'integer']]

# Function to calculate correlations
def calculate_correlations(df, target_variable, numerical_cols):
    # Assemble features excluding the target variable
    assembler = VectorAssembler(inputCols=[col for col in numerical_cols if col != target_variable], outputCol="features")
    df_features = assembler.transform(df).select("features", target_variable)

    # Calculate correlation matrix
    correlation_matrix = Correlation.corr(df_features, "features", "pearson").head()[0].toArray()

    # Extract correlations
    correlations = {numerical_cols[i]: correlation_matrix[-1, i] for i in range(len(numerical_cols) - 1)}

    # Sort correlations by absolute value for feature importance
    sorted_correlations = sorted(correlations.items(), key=lambda x: abs(x[1]), reverse=True)
    return sorted_correlations

# Calculate correlations for `listings_availability_30`
target_variable_1 = "listings_availability_30"
correlations_availability_30 = calculate_correlations(final_df, target_variable_1, numerical_cols)

# Calculate correlations for `listings_price`
target_variable_2 = "listings_price"
correlations_price = calculate_correlations(final_df, target_variable_2, numerical_cols)

# Print results
print("Feature Importance for listings_availability_30:")
for feature, importance in correlations_availability_30:
    print(f"{feature}: {importance}")

print("\nFeature Importance for listings_price:")
for feature, importance in correlations_price:
    print(f"{feature}: {importance}")


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Function to plot sorted feature importance
def plot_sorted_feature_importance_fixed(correlations, title):
    # Separate positive and negative correlations
    positive_correlations = [item for item in correlations if item[1] > 0.15]
    negative_correlations = [item for item in correlations if item[1] <= -0.07]

    # Sort each group
    positive_correlations = sorted(positive_correlations, key=lambda x: x[1], reverse=True)
    negative_correlations = sorted(negative_correlations, key=lambda x: x[1], reverse = True)

    # Combine groups in the desired "S" shape order
    correlations_sorted = positive_correlations + negative_correlations

    # Unzip features and their importance values
    features, importance = zip(*correlations_sorted)

    # Create a vertical bar chart
    plt.figure(figsize=(30, len(features) * 0.25))
    plt.barh(features, importance, color=np.where(np.array(importance) > 0, 'skyblue', 'salmon'))
    plt.axvline(0, color='black', linewidth=0.7, linestyle='--')  # Add a vertical line at 0
    plt.xlabel("Correlation Coefficients")
    plt.ylabel("Features")
    plt.title(title)
    plt.gca().invert_yaxis()  # Invert y-axis to place positive at the top
    plt.tight_layout()  # Adjust layout to fit all features
    plt.show()



<h2>A. Random Forest</h2>

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

final_df_cleaned_listings = final_df_cleaned_listings.drop('features_price')

# Step 1: Log Transformation for listings_price
final_df_cleaned_listings = final_df_cleaned_listings.withColumn("log_listings_price", F.log(F.col("listings_price")))

feature_cols_price = [
    col for col in final_df_cleaned_listings.columns
    if col not in ["listings_price", "log_listings_price", "listings_id", "listings_host_id", "listings_neighbourhood_group_cleansed", "listings_host_picture_url"]
]

# Assemble features for availability prediction
assembler_price = VectorAssembler(inputCols=feature_cols_price, outputCol="features_price")
final_df_cleaned_listings = assembler_price.transform(final_df_cleaned_listings)

# Step 3: Split the data into training and test sets
train_df, test_df = final_df_cleaned_listings.randomSplit([0.8, 0.2], seed=42)


# Step 5: Model for listings_availability_30 (without transformation)
rf_price = RandomForestRegressor(featuresCol="features_price", labelCol="log_listings_price", numTrees=100)
rf_price_model = rf_price.fit(train_df)
predictions_price = rf_price_model.transform(test_df)

# Evaluate listings_availability_90 model
evaluator_rmse_price = RegressionEvaluator(labelCol="log_listings_price", predictionCol="prediction", metricName="rmse")
evaluator_r2_price = RegressionEvaluator(labelCol="log_listings_price", predictionCol="prediction", metricName="r2")
rmse_price = evaluator_rmse_price.evaluate(predictions_price)
r2_price = evaluator_r2_price.evaluate(predictions_price)

print(f"Price Model - RMSE: {rmse_price}")
print(f"Price Model - R^2: {r2_price}")

# Inverse transformation for predictions
predictions_price = predictions_price.withColumn("price_original", F.exp(F.col("prediction")))

# Show original prices and predicted prices
# Show availability predictions
print("Price Predictions:")
predictions_price.select("listings_price", "price_original").show(5)

In [None]:
final_df_cleaned = final_df_cleaned.drop('features_availability')

feature_cols_availability = [
    col for col in final_df_cleaned.columns
    if col not in ["listings_price", "log_listings_price", "listings_id", "listings_host_id", "listings_availability_30", "listings_neighbourhood_group_cleansed", "listings_host_picture_url"]
]

# Assemble features for availability prediction
assembler_availability = VectorAssembler(inputCols=feature_cols_availability, outputCol="features_availability")
final_df_cleaned = assembler_availability.transform(final_df_cleaned)

# Step 3: Split the data into training and test sets
train_df, test_df = final_df_cleaned.randomSplit([0.8, 0.2], seed=42)


# Step 5: Model for listings_availability_30 (without transformation)
rf_availability = RandomForestRegressor(featuresCol="features_availability", labelCol="listings_availability_30", numTrees=100)
rf_availability_model = rf_availability.fit(train_df)
predictions_availability = rf_availability_model.transform(test_df)

# Evaluate listings_availability_90 model
evaluator_rmse_avail = RegressionEvaluator(labelCol="listings_availability_30", predictionCol="prediction", metricName="rmse")
evaluator_r2_avail = RegressionEvaluator(labelCol="listings_availability_30", predictionCol="prediction", metricName="r2")
rmse_availability = evaluator_rmse_avail.evaluate(predictions_availability)
r2_availability = evaluator_r2_avail.evaluate(predictions_availability)

print(f"Availability Model - RMSE: {rmse_availability}")
print(f"Availability Model - R^2: {r2_availability}")

# Show availability predictions
print("Availability Predictions:")
predictions_availability.select("prediction", "listings_availability_30").show(5)

<h2>B. Linear Regression</h2>

In [None]:
from pyspark.ml.regression import LinearRegression

# Drop 'features_price' column if it exists
final_df_cleaned_listings = final_df_cleaned_listings.drop('features_price')

# Step 1: Log Transformation for listings_price
final_df_cleaned_listings = final_df_cleaned_listings.withColumn("log_listings_price", F.log(F.col("listings_price")))

# Step 2: Define feature columns, excluding specific ones
feature_cols_price = [
    col for col in final_df_cleaned_listings.columns
    if col not in ["listings_price", "log_listings_price", "listings_id", "listings_host_id", "listings_neighbourhood_group_cleansed", "listings_host_picture_url"]
]

# Assemble features for prediction
assembler_price = VectorAssembler(inputCols=feature_cols_price, outputCol="features_price")
final_df_cleaned_listings = assembler_price.transform(final_df_cleaned_listings)

# Step 3: Split the data into training and test sets
train_df, test_df = final_df_cleaned_listings.randomSplit([0.8, 0.2], seed=42)

# Step 4: Train Linear Regression Model
lr_price = LinearRegression(featuresCol="features_price", labelCol="log_listings_price", maxIter=100, regParam=0.1, elasticNetParam=0.8)
lr_price_model = lr_price.fit(train_df)

# Step 5: Make predictions
predictions_price = lr_price_model.transform(test_df)

# Step 6: Evaluate the model
evaluator_rmse_price = RegressionEvaluator(labelCol="log_listings_price", predictionCol="prediction", metricName="rmse")
evaluator_r2_price = RegressionEvaluator(labelCol="log_listings_price", predictionCol="prediction", metricName="r2")
rmse_price = evaluator_rmse_price.evaluate(predictions_price)
r2_price = evaluator_r2_price.evaluate(predictions_price)

print(f"Linear Regression Model - RMSE: {rmse_price}")
print(f"Linear Regression Model - R^2: {r2_price}")

# Step 7: Inverse transformation for predictions
predictions_price = predictions_price.withColumn("price_original", F.exp(F.col("prediction")))

# Step 8: Display original prices and predicted prices
print("Price Predictions:")
predictions_price.select("listings_price", "price_original").show(5, truncate=False)

In [None]:
# Drop 'features_availability' column if it exists
final_df_cleaned = final_df_cleaned.drop('features_availability')

# Step 1: Define feature columns
feature_cols_availability = [
    col for col in final_df_cleaned.columns
    if col not in ["listings_price", "log_listings_price", "listings_id", "listings_host_id", "listings_availability_30", "listings_neighbourhood_group_cleansed", "listings_host_picture_url"]
]

# Assemble features for availability prediction
assembler_availability = VectorAssembler(inputCols=feature_cols_availability, outputCol="features_availability")
final_df_cleaned = assembler_availability.transform(final_df_cleaned)

# Step 2: Split the data into training and test sets
train_df, test_df = final_df_cleaned.randomSplit([0.8, 0.2], seed=42)

# Step 3: Train Linear Regression Model
lr_availability = LinearRegression(featuresCol="features_availability", labelCol="listings_availability_30", maxIter=100, regParam=0.1, elasticNetParam=0.8)
lr_availability_model = lr_availability.fit(train_df)

# Step 4: Make predictions
predictions_availability = lr_availability_model.transform(test_df)

# Step 5: Evaluate the model
evaluator_rmse_avail = RegressionEvaluator(labelCol="listings_availability_30", predictionCol="prediction", metricName="rmse")
evaluator_r2_avail = RegressionEvaluator(labelCol="listings_availability_30", predictionCol="prediction", metricName="r2")
rmse_availability = evaluator_rmse_avail.evaluate(predictions_availability)
r2_availability = evaluator_r2_avail.evaluate(predictions_availability)

print(f"Availability Model - RMSE: {rmse_availability}")
print(f"Availability Model - R^2: {r2_availability}")

# Step 6: Show availability predictions
print("Availability Predictions:")
predictions_availability.select("prediction", "listings_availability_30").show(5, truncate=False)


<h2>C. Gradient Boosted Tree</h2>

In [None]:
from pyspark.ml.regression import GBTRegressor

# Drop 'features_price' column if it exists
final_df_cleaned_listings = final_df_cleaned_listings.drop('features_price')

# Step 1: Log Transformation for listings_price
final_df_cleaned_listings = final_df_cleaned_listings.withColumn("log_listings_price", F.log(F.col("listings_price")))

# Step 2: Define feature columns
feature_cols_price = [
    col for col in final_df_cleaned_listings.columns
    if col not in ["listings_price", "log_listings_price", "listings_id", "listings_host_id", "listings_neighbourhood_group_cleansed", "listings_host_picture_url"]
]

# Assemble features for prediction
assembler_price = VectorAssembler(inputCols=feature_cols_price, outputCol="features_price")
final_df_cleaned_listings = assembler_price.transform(final_df_cleaned_listings)

# Step 3: Split the data into training and test sets
train_df, test_df = final_df_cleaned_listings.randomSplit([0.8, 0.2], seed=42)

# Step 4: Train Gradient-Boosted Tree Regressor
gbt_price = GBTRegressor(featuresCol="features_price", labelCol="log_listings_price", maxIter=100)
gbt_price_model = gbt_price.fit(train_df)

# Step 5: Make predictions
predictions_price = gbt_price_model.transform(test_df)

# Step 6: Evaluate the model
evaluator_rmse_price = RegressionEvaluator(labelCol="log_listings_price", predictionCol="prediction", metricName="rmse")
evaluator_r2_price = RegressionEvaluator(labelCol="log_listings_price", predictionCol="prediction", metricName="r2")
rmse_price = evaluator_rmse_price.evaluate(predictions_price)
r2_price = evaluator_r2_price.evaluate(predictions_price)

print(f"GBT Price Model - RMSE: {rmse_price}")
print(f"GBT Price Model - R^2: {r2_price}")

# Step 7: Inverse transformation for predictions
predictions_price = predictions_price.withColumn("price_original", F.exp(F.col("prediction")))

# Step 8: Display original prices and predicted prices
print("Price Predictions:")
predictions_price.select("listings_price", "price_original").show(5, truncate=False)


In [None]:
# Drop 'features_availability' column if it exists
final_df_cleaned = final_df_cleaned.drop('features_availability')

# Step 1: Define feature columns
feature_cols_availability = [
    col for col in final_df_cleaned.columns
    if col not in ["listings_price", "log_listings_price", "listings_id", "listings_host_id", "listings_availability_30", "listings_neighbourhood_group_cleansed", "listings_host_picture_url"]
]

# Assemble features for availability prediction
assembler_availability = VectorAssembler(inputCols=feature_cols_availability, outputCol="features_availability")
final_df_cleaned = assembler_availability.transform(final_df_cleaned)

# Step 2: Split the data into training and test sets
train_df, test_df = final_df_cleaned.randomSplit([0.8, 0.2], seed=42)

# Step 3: Train Gradient-Boosted Tree Regressor
gbt_availability = GBTRegressor(featuresCol="features_availability", labelCol="listings_availability_30", maxIter=100)
gbt_availability_model = gbt_availability.fit(train_df)

# Step 4: Make predictions
predictions_availability = gbt_availability_model.transform(test_df)

# Step 5: Evaluate the model
evaluator_rmse_avail = RegressionEvaluator(labelCol="listings_availability_30", predictionCol="prediction", metricName="rmse")
evaluator_r2_avail = RegressionEvaluator(labelCol="listings_availability_30", predictionCol="prediction", metricName="r2")
rmse_availability = evaluator_rmse_avail.evaluate(predictions_availability)
r2_availability = evaluator_r2_avail.evaluate(predictions_availability)

print(f"GBT Availability Model - RMSE: {rmse_availability}")
print(f"GBT Availability Model - R^2: {r2_availability}")

# Step 6: Display availability predictions
print("Availability Predictions:")
predictions_availability.select("prediction", "listings_availability_30").show(5, truncate=False)


<b><h2>Revenue Prediction!</h2></b>

In [None]:
!pip install fastapi uvicorn pyngrok pandas openpyxl pyspark


In [None]:
from pyspark.sql import functions as F

# Group by 'listings_ZIPCODE' and calculate average of (90 - prediction)
avg_predictions_by_zip = predictions_availability.groupBy('listings_ZIPCODE').agg(
    F.avg(90 - F.col('prediction')).alias('avg_prediction')
)

# Sort by avg_prediction in descending order
avg_predictions_by_zip = avg_predictions_by_zip.orderBy(F.col('avg_prediction').desc())

# Show the results
avg_predictions_by_zip.show(10)

In [None]:
!pip install dash


In [None]:
import dash
from dash import dcc, html, Input, Output
import pandas as pd

# Prepare the data
columns_to_select = [
    "listings_ZIPCODE",
    "property_type_Entire_Unit",
    "property_type_Cultural_&_Local_Experiences",
    "property_type_Other",
    "property_type_Vehicles_as_Accommodation",
    "property_type_Unique_Stays",
    "property_type_Farm_&_Ranch_Stays",
    "property_type_Shared_Accommodation",
    "property_type_Nature-Based_Stays",
    "property_type_Private_Room",
    "room_type_Shared_room",
    "room_type_Hotel_room",
    "room_type_Entire_home/apt",
    "room_type_Private_room",
    "bathrooms_count",
    "listings_price",
    "listings_availability_90"
]

df = final_df.select(*columns_to_select).toPandas()

# Add a 'Property Type' column
df['Property Type'] = df[[
    "property_type_Entire_Unit",
    "property_type_Cultural_&_Local_Experiences",
    "property_type_Other",
    "property_type_Vehicles_as_Accommodation",
    "property_type_Unique_Stays",
    "property_type_Farm_&_Ranch_Stays",
    "property_type_Shared_Accommodation",
    "property_type_Nature-Based_Stays",
    "property_type_Private_Room"
]].idxmax(axis=1)

# Add a 'Room Type' column
df['Room Type'] = df[[
    "room_type_Shared_room",
    "room_type_Hotel_room",
    "room_type_Entire_home/apt",
    "room_type_Private_room"
]].idxmax(axis=1)

# Remove prefixes for cleaner names
df['Property Type'] = df['Property Type'].str.replace('property_type_', '', regex=False)
df['Room Type'] = df['Room Type'].str.replace('room_type_', '', regex=False)

# Calculate Revenue
df['Revenue'] = (90 - df['listings_availability_90']) * df['listings_price']

# Initialize Dash app
app = dash.Dash(__name__)

# App layout
app.layout = html.Div([
    html.H1("Dynamic Pivot Table by ZIP Code"),
    html.Label("Select a ZIP Code:"),
    dcc.Dropdown(
        id="zip-dropdown",
        options=[{"label": str(zip_code), "value": zip_code} for zip_code in df["listings_ZIPCODE"].unique()],
        value=df["listings_ZIPCODE"].unique()[0]
    ),
    html.Div(id="pivot-table")
])

# Callback to update pivot table based on selected ZIP Code
@app.callback(
    Output("pivot-table", "children"),
    [Input("zip-dropdown", "value")]
)
def update_pivot_table(selected_zip):
    filtered_df = df[df["listings_ZIPCODE"] == selected_zip]
    pivot_table = pd.pivot_table(
        filtered_df,
        values="Revenue",
        index=["Property Type", "Room Type", "bathrooms_count"],
        aggfunc="mean"
    ).reset_index()

    pivot_table.columns = ["Property Type", "Room Type", "Bathroom Count", "Average Revenue Generated"]

    # Limit table to top rows for compactness
    compact_table = pivot_table.head(10)  # Show only the first 10 rows

    return html.Div([
        html.H3(f"Pivot Table for ZIP Code: {selected_zip}"),
        dcc.Graph(
            figure={
                "data": [
                    {
                        "type": "table",
                        "header": {
                            "values": list(compact_table.columns),
                            "fill": {"color": "lightgrey"},
                            "align": "center",
                            "font": {"size": 10}  # Adjust font size
                        },
                        "cells": {
                            "values": [compact_table[col] for col in compact_table.columns],
                            "fill": {"color": "white"},
                            "align": "center",
                            "height": 20  # Adjust cell height
                        }
                    }
                ],
                "layout": {
                    "autosize": False,
                    "width": 700,  # Adjust table width
                    "height": 400  # Adjust table height
                }
            }
        )
    ])

# Run the app
if __name__ == "__main__":
    app.run_server(debug=True, port=8051)


In [None]:
import dash
from dash import dcc, html, Input, Output
from dash.dash_table import DataTable
import pandas as pd

# Prepare the data (optimized for performance)
columns_to_select = [
    "listings_ZIPCODE",
    "property_type_Entire_Unit",
    "property_type_Cultural_&_Local_Experiences",
    "property_type_Other",
    "property_type_Vehicles_as_Accommodation",
    "property_type_Unique_Stays",
    "property_type_Farm_&_Ranch_Stays",
    "property_type_Shared_Accommodation",
    "property_type_Nature-Based_Stays",
    "property_type_Private_Room",
    "room_type_Shared_room",
    "room_type_Hotel_room",
    "room_type_Entire_home/apt",
    "room_type_Private_room",
    "bathrooms_count",
    "listings_price",
    "listings_availability_90"
]

# Sample data conversion for testing
df = final_df.select(*columns_to_select).toPandas()

# Add derived columns for Property Type, Room Type, and Revenue
df['Property Type'] = df[[
    "property_type_Entire_Unit",
    "property_type_Cultural_&_Local_Experiences",
    "property_type_Other",
    "property_type_Vehicles_as_Accommodation",
    "property_type_Unique_Stays",
    "property_type_Farm_&_Ranch_Stays",
    "property_type_Shared_Accommodation",
    "property_type_Nature-Based_Stays",
    "property_type_Private_Room"
]].idxmax(axis=1).str.replace('property_type_', '', regex=False)

df['Room Type'] = df[[
    "room_type_Shared_room",
    "room_type_Hotel_room",
    "room_type_Entire_home/apt",
    "room_type_Private_room"
]].idxmax(axis=1).str.replace('room_type_', '', regex=False)

df['Revenue'] = (90 - df['listings_availability_90']) * df['listings_price']

# Precompute pivot data
pivot_data = df.groupby(['listings_ZIPCODE', 'Property Type', 'Room Type', 'bathrooms_count'])['Revenue'].mean().reset_index()
pivot_data.rename(columns={'Revenue': 'Average Revenue Generated'}, inplace=True)

# Initialize Dash app
app = dash.Dash(__name__)

# Dropdown options
zip_options = [{"label": str(zip_code), "value": zip_code} for zip_code in df["listings_ZIPCODE"].unique()]

# App layout
app.layout = html.Div([
    html.H1("Dynamic Pivot Table by ZIP Code"),
    html.Label("Select a ZIP Code:"),
    dcc.Dropdown(
        id="zip-dropdown",
        options=zip_options,
        value=df["listings_ZIPCODE"].unique()[0]
    ),
    html.Div(id="pivot-table")
])

# Callback to update pivot table based on selected ZIP Code
@app.callback(
    Output("pivot-table", "children"),
    [Input("zip-dropdown", "value")]
)
def update_pivot_table(selected_zip):
    # Filter precomputed pivot data
    filtered_data = pivot_data[pivot_data["listings_ZIPCODE"] == selected_zip].head(10)

    # Render pivot table using DataTable
    return html.Div([
        html.H3(f"Pivot Table for ZIP Code: {selected_zip}"),
        DataTable(
            data=filtered_data.to_dict("records"),
            columns=[
                {"name": "Property Type", "id": "Property Type"},
                {"name": "Room Type", "id": "Room Type"},
                {"name": "Bathroom Count", "id": "bathrooms_count"},
                {"name": "Average Revenue Generated", "id": "Average Revenue Generated"}
            ],
            style_table={"height": "400px", "overflowY": "auto"},
            style_cell={"textAlign": "center", "fontSize": 10},
            style_header={"backgroundColor": "lightgrey", "fontWeight": "bold"}
        )
    ])

# Run the app
if __name__ == "__main__":
    app.run_server(debug=True, port=8051)
