## Milestone 2 All US Cities Data Combined via Spark

The purpose of this notebook is to join our 3 regional cleaned datasets into a single large dataset via Spark.

Issues encountered when combining:
* Spark has a problem reading in some of our 

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import lit
from pyspark.sql.functions import col, isnan, count, when, isnull, size, split
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType, FloatType, DateType
from pyspark.sql.functions import col, regexp_replace

In [2]:
spark = SparkSession.builder.appName('final_project').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/26 04:05:15 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
24/03/26 04:05:15 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
24/03/26 04:05:15 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
24/03/26 04:05:15 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


In [14]:
# Save in case needed to use a schema
# Note had to make host_response_rate and host_acceptance_rate as string or else the Null is picked up as unrecognized %
# east_schema = StructType([
#     StructField("id", LongType(), True),
#     StructField("host_id", LongType(), True),
#     StructField("host_name", StringType(), True),
#     StructField("host_since", DateType(), True),
#     StructField("host_location", StringType(), True),
#     StructField("host_response_time", StringType(), True),
#     StructField("host_response_rate", StringType(), True),
#     StructField("host_acceptance_rate", StringType(), True),
#     StructField("host_is_superhost", StringType(), True),
#     StructField("host_listings_count", IntegerType(), True),
#     StructField("host_total_listings_count", IntegerType(), True),
#     StructField("host_has_profile_pic", StringType(), True),
#     StructField("host_identity_verified", StringType(), True),
#     StructField("latitude", LongType(), True),
#     StructField("longitude", LongType(), True),
#     StructField("room_type", StringType(), True),
#     StructField("accommodates", IntegerType(), True),
#     StructField("num_bath", FloatType(), True),
#     StructField("bedrooms", IntegerType(), True),
#     StructField("beds", IntegerType(), True),
#     StructField("price", FloatType(), True),
#     StructField("number_of_reviews", IntegerType(), True),
#     StructField("review_scores_value", FloatType(), True),
#     StructField("calculated_host_listings_count", IntegerType(), True),
#     StructField("city", StringType(), True),
#     StructField("amenities_count", IntegerType(), True)
# ])


# Provide the file path
east_file = "gs://ds5460-tlee-spring2024/notebooks/jupyter/data/usa/clean/east_coast_cities_cleaned-spark.csv"


east_df = spark.read.csv(east_file, header=True, inferSchema=True)# schema=east_schema) #inferSchema=True) # #schema=east_schema) #

# Preprocess the 'host_response_rate' and 'host_acceptance_rate' to remove '%' and cast to float
east_df = east_df.withColumn("host_response_rate", regexp_replace("host_response_rate", "%", "").cast(FloatType()))
east_df = east_df.withColumn("host_acceptance_rate", regexp_replace("host_acceptance_rate", "%", "").cast(FloatType()))

east_df.printSchema()


root
 |-- id: double (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: string (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: float (nullable = true)
 |-- host_acceptance_rate: float (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_listings_count: integer (nullable = true)
 |-- host_total_listings_count: integer (nullable = true)
 |-- host_has_profile_pic: string (nullable = true)
 |-- host_identity_verified: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: integer (nullable = true)
 |-- num_bath: double (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- beds: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- number

In [17]:
central_file = "gs://ds5460-tlee-spring2024/notebooks/jupyter/data/usa/clean/central-region-cities-cleaned-spark.csv"
central_df = spark.read.csv(central_file, header=True, inferSchema=True)
central_df = central_df.withColumn("host_response_rate", regexp_replace("host_response_rate", "%", "").cast(FloatType()))
central_df = central_df.withColumn("host_acceptance_rate", regexp_replace("host_acceptance_rate", "%", "").cast(FloatType()))
central_df.printSchema()

root
 |-- id: double (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: string (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: float (nullable = true)
 |-- host_acceptance_rate: float (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_listings_count: integer (nullable = true)
 |-- host_total_listings_count: integer (nullable = true)
 |-- host_has_profile_pic: string (nullable = true)
 |-- host_identity_verified: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: integer (nullable = true)
 |-- bathrooms_text: string (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- beds: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- 

In [19]:
west_file = "gs://ds5460-tlee-spring2024/notebooks/jupyter/data/usa/clean/listings_detailed_clean_west-spark.csv"
west_df = spark.read.csv(west_file, header=True, inferSchema=True)
west_df = west_df.withColumn("host_response_rate", regexp_replace("host_response_rate", "%", "").cast(FloatType()))
west_df = west_df.withColumn("host_acceptance_rate", regexp_replace("host_acceptance_rate", "%", "").cast(FloatType()))
west_df.printSchema()

root
 |-- id: double (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: string (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: float (nullable = true)
 |-- host_acceptance_rate: float (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_listings_count: integer (nullable = true)
 |-- host_total_listings_count: integer (nullable = true)
 |-- host_has_profile_pic: string (nullable = true)
 |-- host_identity_verified: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: integer (nullable = true)
 |-- bathrooms_text: string (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- beds: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- 

In [24]:
# Drop bathrooms_text from central and west:
central_df = central_df.drop('bathrooms_text')

west_df = west_df.drop('bathrooms_text')

In [None]:
def count_nulls(df):
    # Count nulls for each column. Omit isnan check to avoid data type mismatch.
    null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
    return null_counts

null_counts = count_nulls(east_df)
print(null_counts)
print("-----")
null_counts = count_nulls(central_df)
print(null_counts)
print("-----")
null_counts = count_nulls(west_df)
print(null_counts)

In [16]:
def count_missings(spark_df,sort=True):
    """
    Counts number of nulls and nans in each column
    """
    df = spark_df.select([F.count(F.when(F.isnan(c) | F.isnull(c), c)).alias(c) for (c,c_type) in spark_df.dtypes if c_type not in ('timestamp', 'string', 'date')]).toPandas()

    if len(df) == 0:
        print("There are no any missing values!")
        return None

    if sort:
        return df.rename(index={0: 'count'}).T.sort_values("count",ascending=False)

    return df

count_missings(east_df)


                                                                                

Unnamed: 0,count
host_response_rate,17339
review_scores_value,15633
host_acceptance_rate,14827
id,0
bedrooms,0
calculated_host_listings_count,0
number_of_reviews,0
price,0
beds,0
accommodates,0


In [21]:
count_missings(central_df)

                                                                                

Unnamed: 0,count
review_scores_value,7652
host_response_rate,7421
host_acceptance_rate,5974
id,0
beds,0
num_bath,0
calculated_host_listings_count,0
number_of_reviews,0
price,0
accommodates,0


In [22]:
count_missings(west_df)

                                                                                

Unnamed: 0,count
review_scores_value,18638
host_response_rate,12470
host_acceptance_rate,10445
id,0
beds,0
amenities_count,0
calculated_host_listings_count,0
number_of_reviews,0
price,0
accommodates,0


In [25]:
num_rows = east_df.count()
num_cols = len(east_df.columns)

print(f"The shape of the east DataFrame is: ({num_rows}, {num_cols})")


num_rows = central_df.count()
num_cols = len(central_df.columns)

print(f"The shape of the central DataFrame is: ({num_rows}, {num_cols})")


num_rows = west_df.count()
num_cols = len(west_df.columns)

print(f"The shape of the west DataFrame is: ({num_rows}, {num_cols})")


The shape of the east DataFrame is: (68592, 27)
The shape of the central DataFrame is: (43067, 27)
The shape of the west DataFrame is: (74259, 27)


In [26]:
combined_df = east_df.unionByName(central_df).unionByName(west_df)

In [27]:
num_rows = combined_df.count()
num_cols = len(combined_df.columns)

print(f"The shape of the combined DataFrame is: ({num_rows}, {num_cols})")

The shape of the combined DataFrame is: (185918, 27)


In [28]:
output_path_csv = "gs://ds5460-tlee-spring2024/notebooks/jupyter/data/usa/combined_datasets/combined_data_all_cities"

# Coalesce the DataFrame into 1 partition and write to a single CSV file
# File will be initially called part-00000-5fbe87d4-4b54-4911-a6b9-9d38f18c7a19-c000
combined_df.coalesce(1).write.option("header", "true").csv(output_path_csv)

                                                                                

In [29]:
output_path_csv = "gs://ds5460-tlee-spring2024/notebooks/jupyter/data/usa/combined_datasets/combined_data_all_cities"


combined_listings_gcs_path = f"{output_path_csv}/all_cities-final-spark-dataset.csv"
combined_listings_df = spark.read.option("header", "true").csv(combined_listings_gcs_path)

num_rows = combined_listings_df.count()
num_cols = len(combined_listings_df.columns)

# Confirmed the shape is correct when reading the informaton back in:
print(f"The shape of the combined DataFrame is: ({num_rows}, {num_cols})")

The shape of the combined DataFrame is: (185918, 27)


In [30]:
spark.stop()