### Data processing using pyspark

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, FloatType, DateType

In [2]:
spark = SparkSession.builder.appName("csv_to_parquet").getOrCreate()

### Read the Airbnb csv file from GCS bucket

In [24]:
df = spark.read.options(header=True,nanValue="",inferSchema=True).csv("gs://airbnb_listing_analysis/raw_csv/listings.csv")

                                                                                

In [6]:
df.printSchema()

root
 |-- listing_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_since: string (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_total_listings_count: string (nullable = true)
 |-- host_has_profile_pic: string (nullable = true)
 |-- host_identity_verified: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- district: string (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- amenities: string (nullable = true)
 |-- price: s

### Drop the columns that are not required

In [25]:
new_df=df.drop("host_location","host_has_profile_pic","minimum_nights","maximum_nights","latitude","longitude","amenities",\
    "instant_bookable","review_scores_accuracy","review_scores_cleanliness","review_scores_checkin",\
        "review_scores_communication","review_scores_location","review_scores_value")

In [11]:
new_df.show()

+----------+--------------------+--------+----------+------------------+------------------+--------------------+-----------------+-------------------------+----------------------+-------------------+--------+-----+----------------+------------+------------+--------+---------------+--------------------+
|listing_id|                name| host_id|host_since|host_response_time|host_response_rate|host_acceptance_rate|host_is_superhost|host_total_listings_count|host_identity_verified|      neighbourhood|district| city|   property_type|   room_type|accommodates|bedrooms|          price|review_scores_rating|
+----------+--------------------+--------+----------+------------------+------------------+--------------------+-----------------+-------------------------+----------------------+-------------------+--------+-----+----------------+------------+------------+--------+---------------+--------------------+
|    281420|Beautiful Flat in...| 1466919|2011-12-03|              null|              nu

In [12]:
new_df.printSchema()

root
 |-- listing_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_since: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_total_listings_count: string (nullable = true)
 |-- host_identity_verified: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- district: string (nullable = true)
 |-- city: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- price: string (nullable = true)
 |-- review_scores_rating: string (nullable = true)



### Convert the host_since column from string to date type

In [26]:
listings=new_df.withColumn("host_since",to_date(col("host_since")))

### Cast other columns into appropriate types

In [30]:
listings_final = listings.withColumn("host_total_listings_count",col("host_total_listings_count").cast("int")).\
                withColumn("host_response_rate",col("host_response_rate").cast("float")).\
                withColumn("host_acceptance_rate",col("host_acceptance_rate").cast("float")).\
                withColumn("accommodates",col("accommodates").cast("int")).\
                withColumn("bedrooms",col("bedrooms").cast("int")).\
                withColumn("price",col("price").cast("int")).\
                withColumn("review_scores_rating",col("review_scores_rating").cast("int"))

In [32]:
listings_final.printSchema()

root
 |-- listing_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_since: date (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: float (nullable = true)
 |-- host_acceptance_rate: float (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_total_listings_count: integer (nullable = true)
 |-- host_identity_verified: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- district: string (nullable = true)
 |-- city: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- review_scores_rating: integer (nullable = true)



### Repartition the data frame into 20 partitions and write the results to GCS as parquet files

In [34]:
final_df=listings_final.repartition(20)

In [35]:
final_df.write.mode("overwrite").parquet("gs://airbnb_listing_analysis/raw_parquet")

                                                                                