In [1]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.4.0'
spark_version = 'spark-3.4.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
            Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
0% [Waiting for headers] [Waiting for headers] [Connected to ppa.launchpadconte                                                                               Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:4 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:5 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [109 kB]
Hit:6 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [497 kB]
Get:8 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Hit:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy In

In [2]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder\
    .appName("SparkSQL")\
    .config("spark.sql.debug.maxToStringFields", 2000)\
    .config("spark.driver.memory", "2g")\
    .getOrCreate()

# Set the partitions to 4 or 8.
spark.conf.set("spark.sql.shuffle.partitions", 8)

In [3]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
url_listings = "http://data.insideairbnb.com/united-states/ny/new-york-city/2023-09-05/data/listings.csv.gz"
spark.sparkContext.addFile(url_listings)
listings_df = spark.read.csv(SparkFiles.get("listings.csv.gz"), sep=",", header=True, quote ='"', multiLine=True, escape = '"')

# Create a lookup table for calendar.
url_calendar="http://data.insideairbnb.com/united-states/ny/new-york-city/2023-09-05/data/calendar.csv.gz"
spark.sparkContext.addFile(url_calendar)
calendar_df = spark.read.csv(SparkFiles.get("calendar.csv.gz"), sep=",", header=True, quote ='"', multiLine=True, escape = '"')

# Create a lookup table for the airport codes.
url_reviews ="http://data.insideairbnb.com/united-states/ny/new-york-city/2023-09-05/data/reviews.csv.gz"
spark.sparkContext.addFile(url_reviews)
reviews_df = spark.read.csv(SparkFiles.get("reviews.csv.gz"), sep=",", header=True, quote ='"', multiLine=True, escape = '"')


In [4]:
# Look over the listings data.
listings_df.show()

+------------------+--------------------+--------------+------------+-----------+--------------------+--------------------+---------------------+--------------------+---------+--------------------+---------+----------+--------------------+--------------------+------------------+------------------+--------------------+-----------------+--------------------+--------------------+------------------+-------------------+-------------------------+------------------+--------------------+----------------------+--------------------+----------------------+----------------------------+------------------+------------------+--------------------+---------------+------------+---------+--------------+--------+----+--------------------+-------+--------------+--------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------+----------------+---------------+---------------+---------------+--------------

In [5]:
# Look over the data for calendar.
calendar_df.show()

+----------+----------+---------+-------+--------------+--------------+--------------+
|listing_id|      date|available|  price|adjusted_price|minimum_nights|maximum_nights|
+----------+----------+---------+-------+--------------+--------------+--------------+
|      2595|2023-09-05|        t|$240.00|       $240.00|            30|          1125|
|      2595|2023-09-06|        t|$240.00|       $240.00|            30|          1125|
|      2595|2023-09-07|        t|$240.00|       $240.00|            30|          1125|
|      2595|2023-09-08|        t|$240.00|       $240.00|            30|          1125|
|      2595|2023-09-09|        t|$240.00|       $240.00|            30|          1125|
|      2595|2023-09-10|        t|$240.00|       $240.00|            30|          1125|
|      2595|2023-09-11|        t|$240.00|       $240.00|            30|          1125|
|      2595|2023-09-12|        t|$240.00|       $240.00|            30|          1125|
|      2595|2023-09-13|        t|$240.00|  

In [6]:
# Look over the review data.
reviews_df.show()

+----------+--------+----------+-----------+-------------+--------------------+
|listing_id|      id|      date|reviewer_id|reviewer_name|            comments|
+----------+--------+----------+-----------+-------------+--------------------+
|      2595|   17857|2009-11-21|      50679|         Jean|Notre séjour de t...|
|      2595|   19176|2009-12-05|      53267|         Cate|   Great experience.|
|      2595|   19760|2009-12-10|      38960|        Anita|I've stayed with ...|
|      2595|   34320|2010-04-09|      71130|      Kai-Uwe|We've been stayin...|
|      2595|   46312|2010-05-25|     117113|       Alicia|We had a wonderfu...|
|      2595| 1238204|2012-05-07|    1783688|       Sergey|Hi to everyone!\r...|
|      2595| 1293632|2012-05-17|    1870771|         Loïc|Jennifer was very...|
|      2595| 2022498|2012-08-18|    2124102|      Melanie|This apartment is...|
|      2595| 4682989|2013-05-20|     496053|         Eric|Jennifer's place ...|
|      2595|13193832|2014-05-21|   13685

In [7]:
import pandas as pd

In [8]:
listings_df.count()

39453

In [9]:
calendar_df.count()

14399996

In [10]:
reviews_df.count()

1019573

In [11]:
unique_neighbourhoods = listings_df.select('neighbourhood_cleansed').distinct()
unique_neighbourhoods_count = unique_neighbourhoods.count()
print("Count of unique neighbourhoods:", unique_neighbourhoods_count)

Count of unique neighbourhoods: 223


In [12]:
accepted_neighborhoods = ["Manhattan", "Queens", "Brooklyn"]

In [13]:
filtered_listings_df = listings_df[listings_df['neighbourhood_group_cleansed'].isin(accepted_neighborhoods)]


In [14]:
filtered_listings_df.show()

+------------------+--------------------+--------------+------------+-----------+--------------------+--------------------+---------------------+--------------------+---------+--------------------+---------------+----------+--------------------+--------------------+------------------+------------------+--------------------+-----------------+--------------------+--------------------+------------------+-------------------+-------------------------+--------------------+--------------------+----------------------+--------------------+----------------------+----------------------------+------------------+------------------+--------------------+---------------+------------+---------+--------------+--------+----+--------------------+-------+--------------+--------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------+----------------+---------------+---------------+---------------+------

In [15]:
filtered_listings_df.count()

37609

In [16]:
listing_columns = ['id','listing_url','name','host_id','host_url','host_name','host_since','host_is_superhost','host_listings_count','host_total_listings_count','neighbourhood_cleansed','neighbourhood_group_cleansed','latitude','longitude','room_type','accommodates','bathrooms_text','bedrooms','beds','amenities','price','minimum_nights','maximum_nights','has_availability','number_of_reviews','number_of_reviews_ltm','number_of_reviews_l30d','first_review','last_review','review_scores_rating','review_scores_accuracy','review_scores_cleanliness','review_scores_checkin','review_scores_communication','review_scores_location','review_scores_value','calculated_host_listings_count','reviews_per_month']
calendars_columns = ['listing_id','date','available','price','adjusted_price']
reviews_columns = ['listing_id','id','date','reviewer_id','reviewer_name','comments']

In [17]:
column_listings_df = filtered_listings_df[listing_columns]
column_calendars_df = calendar_df[calendars_columns]
column_reviews_df = reviews_df[reviews_columns]

In [18]:
column_listings_df = column_listings_df.withColumnRenamed('id', 'listing_id')

In [19]:
column_listings_df.show()

+------------------+--------------------+--------------------+---------+--------------------+---------------+----------+-----------------+-------------------+-------------------------+----------------------+----------------------------+------------------+------------------+---------------+------------+--------------+--------+----+--------------------+-------+--------------+--------------+----------------+-----------------+---------------------+----------------------+------------+-----------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+------------------------------+-----------------+
|        listing_id|         listing_url|                name|  host_id|            host_url|      host_name|host_since|host_is_superhost|host_listings_count|host_total_listings_count|neighbourhood_cleansed|neighbourhood_group_cleansed|          latitude|         longitude|      room_type|a

In [20]:
from pyspark.sql import SparkSession


In [21]:
# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Assuming you have a PySpark DataFrame named 'column_listings_df'
# Drop rows with null values in the 'listing_id' column
column_listings_df = column_listings_df.dropna(subset=['first_review'])

# Show the DataFrame after dropping null values
column_listings_df.show()

+------------------+--------------------+--------------------+---------+--------------------+---------------+----------+-----------------+-------------------+-------------------------+----------------------+----------------------------+------------------+------------------+---------------+------------+--------------+--------+----+--------------------+-------+--------------+--------------+----------------+-----------------+---------------------+----------------------+------------+-----------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+------------------------------+-----------------+
|        listing_id|         listing_url|                name|  host_id|            host_url|      host_name|host_since|host_is_superhost|host_listings_count|host_total_listings_count|neighbourhood_cleansed|neighbourhood_group_cleansed|          latitude|         longitude|      room_type|a

In [22]:
column_listings_df.count()

27708

In [23]:
column_reviews_df = column_reviews_df.dropna(subset=['comments'])
column_reviews_df.show()

+----------+--------+----------+-----------+-------------+--------------------+
|listing_id|      id|      date|reviewer_id|reviewer_name|            comments|
+----------+--------+----------+-----------+-------------+--------------------+
|      2595|   17857|2009-11-21|      50679|         Jean|Notre séjour de t...|
|      2595|   19176|2009-12-05|      53267|         Cate|   Great experience.|
|      2595|   19760|2009-12-10|      38960|        Anita|I've stayed with ...|
|      2595|   34320|2010-04-09|      71130|      Kai-Uwe|We've been stayin...|
|      2595|   46312|2010-05-25|     117113|       Alicia|We had a wonderfu...|
|      2595| 1238204|2012-05-07|    1783688|       Sergey|Hi to everyone!\r...|
|      2595| 1293632|2012-05-17|    1870771|         Loïc|Jennifer was very...|
|      2595| 2022498|2012-08-18|    2124102|      Melanie|This apartment is...|
|      2595| 4682989|2013-05-20|     496053|         Eric|Jennifer's place ...|
|      2595|13193832|2014-05-21|   13685

In [24]:
column_reviews_df.count()

1019573

# Clean lists of amenities

In [25]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import json

# Initialize a Spark session
spark = SparkSession.builder.appName("DataFrameCleanAmenities").getOrCreate()

# Assuming you have a PySpark DataFrame named 'cleaned_joined_df'
amenities_replaced_df = column_listings_df.select('*')

# List of amenity names to search for and replace
amenities_to_replace = ["Wifi", "TV", "Oven", "Stove", "Soap", "Shampoo", "Conditioner", "Sound system", "Refrigerator", "Backyard", "Patio", "BBQ grill",
                        "Free parking", "Paid parking", "Free street parking", "Paid street parking"]

In [26]:
# Define a UDF to perform the amenities replacement
def replace_amenities(amenities):
    amenities_list = json.loads(amenities)
    for i, amenity in enumerate(amenities_list):
        for amenity_to_replace in amenities_to_replace:
            if amenity_to_replace.lower() in amenity.lower():
                amenities_list[i] = amenity_to_replace
    return json.dumps(amenities_list)

# Register the UDF
replace_amenities_udf = udf(replace_amenities, StringType())

# Apply the UDF to replace amenities
amenities_replaced_df = amenities_replaced_df.withColumn('amenities', replace_amenities_udf('amenities'))

# Print or display the modified DataFrame
amenities_replaced_df.show()

+------------------+--------------------+--------------------+---------+--------------------+---------------+----------+-----------------+-------------------+-------------------------+----------------------+----------------------------+------------------+------------------+---------------+------------+--------------+--------+----+--------------------+-------+--------------+--------------+----------------+-----------------+---------------------+----------------------+------------+-----------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+------------------------------+-----------------+
|        listing_id|         listing_url|                name|  host_id|            host_url|      host_name|host_since|host_is_superhost|host_listings_count|host_total_listings_count|neighbourhood_cleansed|neighbourhood_group_cleansed|          latitude|         longitude|      room_type|a

In [27]:
# Define the string to find and the string to replace it with
string_to_find = "AC "
replacement_string = "Air conditioning"

# Define a UDF to perform the string replacement
def replace_string(amenities):
    amenities_list = json.loads(amenities)
    for i, amenity in enumerate(amenities_list):
        if string_to_find.lower() in amenity.lower():
            amenities_list[i] = replacement_string
    return json.dumps(amenities_list)

# Register the UDF
replace_string_udf = udf(replace_string, StringType())

# Apply the UDF to replace strings
amenities_replaced_df = amenities_replaced_df.withColumn('amenities', replace_string_udf('amenities'))

# Print or display the modified DataFrame
amenities_replaced_df.show()

+------------------+--------------------+--------------------+---------+--------------------+---------------+----------+-----------------+-------------------+-------------------------+----------------------+----------------------------+------------------+------------------+---------------+------------+--------------+--------+----+--------------------+-------+--------------+--------------+----------------+-----------------+---------------------+----------------------+------------+-----------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+------------------------------+-----------------+
|        listing_id|         listing_url|                name|  host_id|            host_url|      host_name|host_since|host_is_superhost|host_listings_count|host_total_listings_count|neighbourhood_cleansed|neighbourhood_group_cleansed|          latitude|         longitude|      room_type|a

In [28]:
# Create a copy of the DataFrame
amenities_cleaned_df = amenities_replaced_df.select('*')

# Define a UDF to perform the amenities cleaning
def clean_amenities(amenities):
    amenities_list = json.loads(amenities)
    for i, amenity in enumerate(amenities_list):
        # Split amenity at colon, if present
        amenity_parts = amenity.split(':')

        # Take the first part as the cleaned amenity (remove characters after colon)
        cleaned_amenity = amenity_parts[0].strip()

        # Update the amenities list
        amenities_list[i] = cleaned_amenity

    return json.dumps(amenities_list)

# Register the UDF
clean_amenities_udf = udf(clean_amenities, StringType())

# Apply the UDF to clean amenities
amenities_cleaned_df = amenities_cleaned_df.withColumn('amenities', clean_amenities_udf('amenities'))

# Print or display the modified DataFrame
amenities_cleaned_df.show()

+------------------+--------------------+--------------------+---------+--------------------+---------------+----------+-----------------+-------------------+-------------------------+----------------------+----------------------------+------------------+------------------+---------------+------------+--------------+--------+----+--------------------+-------+--------------+--------------+----------------+-----------------+---------------------+----------------------+------------+-----------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+------------------------------+-----------------+
|        listing_id|         listing_url|                name|  host_id|            host_url|      host_name|host_since|host_is_superhost|host_listings_count|host_total_listings_count|neighbourhood_cleansed|neighbourhood_group_cleansed|          latitude|         longitude|      room_type|a

In [31]:
column_listings_df = amenities_cleaned_df
column_listings_df.show()

+------------------+--------------------+--------------------+---------+--------------------+---------------+----------+-----------------+-------------------+-------------------------+----------------------+----------------------------+------------------+------------------+---------------+------------+--------------+--------+----+--------------------+-------+--------------+--------------+----------------+-----------------+---------------------+----------------------+------------+-----------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+------------------------------+-----------------+
|        listing_id|         listing_url|                name|  host_id|            host_url|      host_name|host_since|host_is_superhost|host_listings_count|host_total_listings_count|neighbourhood_cleansed|neighbourhood_group_cleansed|          latitude|         longitude|      room_type|a

In [32]:
# Don't forget to stop the Spark session when you're done
spark.stop()