In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, regexp_replace
from pyspark.sql.types import IntegerType


In [4]:
# Initialize a Spark session
spark = SparkSession.builder.appName("DataTransformation").getOrCreate()

23/09/05 16:01:59 WARN Utils: Your hostname, ubuntu-Lenovo-Legion-5-15ARH05 resolves to a loopback address: 127.0.1.1; using 172.16.5.112 instead (on interface wlp4s0)
23/09/05 16:01:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/05 16:02:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/09/05 16:02:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
# Load the CSV data into DataFrames
listing_df_raw = spark.read.csv('raw_data/listings.tsv', header=True, inferSchema=True, sep="\t")
reviews_df_raw = spark.read.csv('raw_data/reviews.tsv', header=True, inferSchema=True, sep="\t")
calendar_df_raw = spark.read.csv('raw_data/calendar.tsv', header=True, inferSchema=True,sep="\t")



                                                                                

In [6]:
calendar_df_raw.show(truncate=False)

+----------+----------+---------+-----+
|listing_id|date      |available|price|
+----------+----------+---------+-----+
|12147973  |2017-09-05|f        |null |
|12147973  |2017-09-04|f        |null |
|12147973  |2017-09-03|f        |null |
|12147973  |2017-09-02|f        |null |
|12147973  |2017-09-01|f        |null |
|12147973  |2017-08-31|f        |null |
|12147973  |2017-08-30|f        |null |
|12147973  |2017-08-29|f        |null |
|12147973  |2017-08-28|f        |null |
|12147973  |2017-08-27|f        |null |
|12147973  |2017-08-26|f        |null |
|12147973  |2017-08-25|f        |null |
|12147973  |2017-08-24|f        |null |
|12147973  |2017-08-23|f        |null |
|12147973  |2017-08-22|f        |null |
|12147973  |2017-08-21|f        |null |
|12147973  |2017-08-20|f        |null |
|12147973  |2017-08-19|f        |null |
|12147973  |2017-08-18|f        |null |
|12147973  |2017-08-17|f        |null |
+----------+----------+---------+-----+
only showing top 20 rows



In [7]:
# Transform the data

# LISTINGS DATA

# Drop the 'summary' column
listing_df_raw = listing_df_raw.drop('summary')
listing_df_raw = listing_df_raw.drop('description')
listing_df_raw = listing_df_raw.drop('host_about')


# Convert 'host_is_superhost' to boolean
listing_df_raw = listing_df_raw.withColumn('host_is_superhost', when(col('host_is_superhost') == 't', True).otherwise(False))

# Drop 'country' and 'market' columns
listing_df_raw = listing_df_raw.drop('country', 'market')

# Drop rows with null values in the 'space' column
listing_df_raw = listing_df_raw.na.drop(subset=['space'])





In [10]:
# Remove "$" and convert to integer
listing_df_raw = listing_df_raw.withColumn("price", regexp_replace(col("price"), "[^0-9]", "").cast(IntegerType()))

In [18]:
# Replace commas in all columns using a loop
for column in listing_df_raw.columns:
    listing_df_raw = listing_df_raw.withColumn(column, regexp_replace(col(column), ',', ''))


In [23]:
# CALENDAR DATA

# Fill null values in 'price' column with "Booked"
calendar_df_raw = calendar_df_raw.na.fill({'price': 'Booked'})

# Convert 'available' to boolean
calendar_df_raw = calendar_df_raw.withColumn('available', when(col('available') == 't', True).otherwise(False))



In [24]:
calendar_df_raw.select(col("price")).show()

+-----+
|price|
+-----+
| null|
| null|
| null|
| null|
| null|
| null|
| null|
| null|
| null|
| null|
| null|
| null|
| null|
| null|
| null|
| null|
| null|
| null|
| null|
| null|
+-----+
only showing top 20 rows



In [20]:
# Remove "$" and convert to integer
calendar_df_raw = calendar_df_raw.withColumn("price", regexp_replace(col("price"), "[^0-9]", "").cast(IntegerType()))

In [21]:
# REVIEW DATA

# No transformation needed

# SAVE DATA

listing_df_raw.coalesce(1).write.csv('cleaned_data/clean_listing_csv', header=True, mode="overwrite")
calendar_df_raw.coalesce(1).write.csv('cleaned_data/clean0_calendar_csv', header=True, mode="overwrite")
reviews_df_raw.coalesce(1).write.csv('cleaned_data/clean_reviews_csv', header=True, mode="overwrite")

                                                                                

In [22]:
# Use the read method to read the CSV file into a DataFrame
read_df = spark.read.csv('cleaned_data/clean_listing_csv/', header=True, inferSchema=True)
read_df.select(col('name')).show()

+--------------------+
|                name|
+--------------------+
|Sunny Bungalow in...|
|Charming room in ...|
|Mexican Folk Art ...|
|Spacious Sunny Be...|
| Come Home to Boston|
|Private Bedroom +...|
|New Lrg Studio ap...|
|"Tranquility" on ...|
|6 miles away from...|
|Perfect & Practic...|
|Quiet  Beauty in ...|
|Cozy Room & Fresh...|
|Convient Safe and...|
|Cozy room in a ch...|
|Arborside Guest C...|
|Skyline View to B...|
|Spacious 3 bedroo...|
|4BD/3.5BA Perfect...|
|Private room in h...|
|Surround yourself...|
+--------------------+
only showing top 20 rows

