# Data Prepration for evaluating the New York Market for Short-Term Rental Investments

## Imports

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, TimestampType

In [0]:
PATH = 'sample-s3-location' 
DB = 'sample_db'

In [0]:
%sql
CREATE WIDGET TEXT database DEFAULT "hive_metastore.sample_db"

#Airbnb Data

In [0]:
airbnb_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("multiLine", "true") \
    .option("escape", "\"") \
    .load("s3://{PATH}/tmp/Airbnb_data_New_York.csv")

#check schema
airbnb_df.printSchema()
#view sample data
airbnb_df.limit(5),display()
#chcek null values
print("columns with null values")
airbnb_df.select([count(when(col(c).isNull(), c)).alias(c) for c in airbnb_df.columns]).display()
#check duplicates
print("duplicate records in columns" )
duplicates_all_columns = airbnb_df.groupBy(airbnb_df.columns).count()

# Filter out rows that have a count greater than 1 (duplicates)
duplicates_all_columns = duplicates_all_columns.filter("count > 1")
duplicates_all_columns.display()

#fill null columns with appropriate values
airbnb_df = airbnb_df.fillna({'reviews_per_month': 0})
airbnb_df = airbnb_df.fillna({'last_review': '1900-01-01'})
airbnb_df= airbnb_df.drop('id','name','host_id','host_name')
airbnb_df = airbnb_df.withColumn('lifetime_revenue', F.lit(F.col('number_of_reviews')*F.col('price')*F.col('minimum_nights'))

)
#write cleaned data to deltatable
airbnb_df.write.format("delta").mode("overwrite").saveAsTable(f"{DB}.ny_airbnb")

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: date (nullable = true)
 |-- reviews_per_month: double (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)

columns with null values


id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,16,0,21,0,0,0,0,0,0,0,0,10052,10052,0,0


duplicate records in columns


id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365,count


#Property Sales Data

In [0]:
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("multiLine", "true") \
    .option("escape", "\"") \
    .load("s3://{PATH}/tmp/Property_sales_data_New_York.csv")

#Mapping of BOROUGH to BOROUGH_NAME
df = df.withColumn(
    "BOROUGH_NAME",
    F.when(F.col("BOROUGH") == 1, "Manhattan")
    .when(F.col("BOROUGH") == 2, "Bronx")
    .when(F.col("BOROUGH") == 3, "Brooklyn")
    .when(F.col("BOROUGH") == 4, "Queens")
    .when(F.col("BOROUGH") == 5, "Staten Island")
    .otherwise("Unknown")
)



#check schema
df.printSchema()
#view sample data
df.limit(5).display()
#column required and renaming column
df = df.select(F.col('BOROUGH_NAME'),
          F.col('NEIGHBORHOOD'),
          F.col('RESIDENTIAL UNITS').alias('RESIDENTIAL_UNITS'),
          F.col('SALE PRICE').alias('SALE_PRICE'),
          F.col('SALE DATE').alias('SALE_DATE'))

#chcek null values
print("columns with null values")
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).display()

#check duplicates
print("duplicate records" )
duplicates_all_columns = df.groupBy(df.columns).count()
#duplicate rows
duplicates_all_columns = duplicates_all_columns.filter("count > 1")
duplicates_all_columns.limit(5).display()

before_drop = df.count()
print(f"Number of rows before dropping duplicates: {before_drop}")

# Perform drop duplicates
df = df.dropDuplicates()

after_drop = df.count()
print(f"Number of rows after dropping duplicates: {after_drop}")

#convert sale_price to double
df = df.withColumn(
    "SALE_PRICE",
    F.regexp_replace(F.col("SALE_PRICE"), "[^0-9.]", "").cast(DoubleType())
).withColumn("SALE_DATE", F.col("SALE_DATE").cast('date'))

cleaned_df =df.fillna({'SALE_PRICE': 0})

cleaned_df.write.format("delta").mode("overwrite").saveAsTable(f"{DB}.ny_property_sales")



#Weather Data

In [0]:
#Weather data first 9 rows contains the static information of location,latitude,temp_unit
# Actual data start from 10th row
# Define schema of the column present in 10th row
# Add static columns as additional columns

schema = StructType([
    StructField("date", TimestampType(), True),
    StructField("temp_max", DoubleType(), True),
    StructField("temp_min", DoubleType(), True),
    StructField("temp_avg", DoubleType(), True)
])

df = (spark.read.format("com.crealytics.spark.excel").option("header", "true")
    .option("inferSchema", "true")
    .option("dataAddress", "'0 daily'!A10") 
    .schema(schema)
    .load("s3://{PATH}/tmp/Weather Data.xlsx"))

full_df = (df.withColumn("location", F.lit('New York'))
            .withColumn("latitude", F.lit(40.666534))
            .withColumn("longitude", F.lit(-74.0625))
            .withColumn("altitude", F.lit(44.787575))
            .withColumn("temp_unit", F.lit('°C'))
)

full_df.write.format("delta").mode("overwrite").saveAsTable(f"{DB}.daily_temperature")
