## Import necessary libraries

In [1]:
# we don't necessary need to import SparkSession within the emr jupyter notebook, 
#only if we want to to turn the code in a python script and use spark-submit
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0").getOrCreate()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1610958643234_0004,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
import matplotlib.pyplot as plt
import seaborn
import numpy
import pandas

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import udf, col, regexp_replace, when, countDistinct, count, asc, desc
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, to_timestamp
from pyspark.sql import types

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Input : calendar ==> Output : time_table & calendar fact table

In [4]:
# read the calendar.csv from s3
calendar=spark.read.format('csv').options(header='true').load('s3://omar-archive-files/amsterdam-data/calendar.csv')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# transformation applied to calendar columns 
"""
Input dataframe : raw dataframe with string-type columns and messed up with symbols like $ sign
Output dataframe: refined dataframe with the appropriate column types """
calendar= calendar.withColumn('new_date', calendar['date'].cast('date'))\
                  .withColumn('new_listing_id', calendar['listing_id'].cast(dataType=types.IntegerType()))\
                  .withColumn('new_price', regexp_replace(col('price'), '[$,]', '').cast('float'))\
                  .withColumn('new_available', (when(col('available') == 'f', 'False').when(col('available') == 't', 'True')).cast(dataType=types.BooleanType()))

calendar=calendar.select(col('new_listing_id').alias('listing_id'),\
                         col('new_date').alias('date') ,\
                         col('new_available').alias('available'),\
                         col('new_price').alias('price'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
calendar.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- listing_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- available: boolean (nullable = true)
 |-- price: float (nullable = true)

In [7]:
calendar.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------+---------+-----+
|listing_id|      date|available|price|
+----------+----------+---------+-----+
|      2818|2019-12-05|    false| null|
|     73208|2019-08-30|    false| null|
|     73208|2019-08-29|    false| null|
|     73208|2019-08-28|    false| null|
|     73208|2019-08-27|    false| null|
+----------+----------+---------+-----+
only showing top 5 rows

The price column may include null values, however I will not delete it since the null value represents an informative value which is the listing is unvailable and is already taken that night

In [8]:
calendar.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

7310950

In [9]:
7310950/365

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

20030.0

The calendar dataframe registers 7310950 records referring to 20030 listings * 365 days. 
So I need a unique key to uniquely identify a row in the calendar dataframe based on the (listing_id,date).
The count of this key should be 7310950

In [10]:
calendar.createOrReplaceTempView('calendar')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
calendar=spark.sql("""
            select row_number() over \
            (order by (listing_id,date)) as calendar_id,* from calendar""")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
calendar.createOrReplaceTempView('calendar')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
spark.sql("select count(distinct (calendar_id)) from calendar").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------+
|count(DISTINCT calendar_id)|
+---------------------------+
|                    7310950|
+---------------------------+

In [14]:
calendar.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+----------+---------+-----+
|calendar_id|listing_id|      date|available|price|
+-----------+----------+----------+---------+-----+
|          1|      2818|2018-12-06|     true| 59.0|
|          2|      2818|2018-12-07|    false| null|
|          3|      2818|2018-12-08|    false| null|
|          4|      2818|2018-12-09|    false| null|
|          5|      2818|2018-12-10|    false| null|
+-----------+----------+----------+---------+-----+
only showing top 5 rows

In [15]:
calendar.write.parquet("s3://omar-archive-files/amsterdam-data/"+"calendar.parquet", mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
# extract date_time table from dates in calendar which expand from 06/12/18 to 06/12/19
# the date_time table will be complete later as the calendar does not include all needed dates 
time_table=calendar.select('date').\
dropDuplicates().orderBy('date', ascending=True)\
.withColumn('day_of_month', dayofmonth('date'))\
.withColumn('week'        , weekofyear('date'))\
.withColumn('month'       , month('date'))\
.withColumn('year'        , year('date'))\
.withColumn('day'         , date_format('date', 'EEEE'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
time_table.createOrReplaceTempView('time_table')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
spark.sql('select min( date) from time_table').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+
| min(date)|
+----------+
|2018-12-06|
+----------+

In [19]:
spark.sql('select max( date) from time_table').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+
| max(date)|
+----------+
|2019-12-06|
+----------+

## Input : listings ==> Output : location , listings, host
#### Merge listings and listings details and apply data clearning and transformation to form listings table, host and location table

In [20]:
listings=spark.read.format('csv').options(header='true').load('s3://omar-archive-files/amsterdam-data/listings.csv')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
listings_details=spark.read.format('csv').option('multiLine', True).options(header='true').option('escape','"').load('s3://omar-archive-files/amsterdam-data/listings_details.csv')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
# join the two tables as they complement each other and extract maximum info
new_listings = listings.alias('a').join(listings_details.alias('b'), on=['id'], how='left_outer')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
print ("the number of the listings is :", new_listings.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

the number of the listings is : 20048

In [24]:
# extract only wanted columns and apply cleaning and modification to its values such casting data types.
new_listings=new_listings.withColumn('new_price'        , regexp_replace(col('a.price'),'[$,]', '').cast('float'))\
            .withColumn('new_weekly_price' , regexp_replace(col('weekly_price'),'[$,]', '').cast('float'))\
            .withColumn('new_monthly_price', regexp_replace(col('monthly_price'), '[$,]', '').cast('float'))\
            .withColumn('new_security_deposit' , regexp_replace(col('security_deposit'),  '[$,]', '').cast('float'))\
            .withColumn('new_cleaning_fee' , regexp_replace(col('cleaning_fee'),'[$,]', '').cast('float'))\
            .withColumn('new_guests_included'   , regexp_replace(col('guests_included'),'[$,]', '').cast('float'))\
            .withColumn('new_extra_people'     , regexp_replace(col('extra_people'),'[$,]', '').cast('float'))\
            .withColumn('new_require_guest_profile_picture',\
                        (when(col('require_guest_profile_picture') == 'f', 'False').\
                         when(col('require_guest_profile_picture') == 't', 'True')).cast(dataType=types.BooleanType()))\
            .withColumn('new_require_guest_phone_verification',\
                        (when(col('require_guest_phone_verification') == 'f', 'False').\
                         when(col('require_guest_phone_verification') == 't', 'True')).cast(dataType=types.BooleanType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Create location_table

In [25]:
# for every listing extract its location information and put it into location_table
location_table=new_listings.select(col('b.latitude').cast('float'),\
                   col('b.longitude').cast('float'),\
                   col('street'),\
                   col('zipcode'),\
                   col('city'),\
                   col('country') ).where(col('latitude').isNotNull() & col('longitude').isNotNull()).\
                    dropDuplicates(subset=['latitude', 'longitude'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
# like the calendar table, location_table needs a unique key to identify certain values for (latitude,longitude)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
location_table.createOrReplaceTempView("location")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
location_table=spark.sql("""
            select row_number() over \
            (order by (latitude,longitude)) as location_id,* from location""")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
location_table.createOrReplaceTempView("location_with_id")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
spark.sql("select max (location_id )from location_with_id").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+
|max(location_id)|
+----------------+
|           20029|
+----------------+

In [31]:
location_table.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+---------+---------+--------------------+-------+-------------------+-----------+
|location_id| latitude|longitude|              street|zipcode|               city|    country|
+-----------+---------+---------+--------------------+-------+-------------------+-----------+
|          1|      2.0|      2.0|",,,https://a0.mu...|    N/A|     within an hour|    De Pijp|
|          2| 52.28838|4.9756355|Amsterdam, Noord-...|1107 VC|          Amsterdam|Netherlands|
|          3|52.289276|4.9737587|Amsterdam Zuid-Oo...|1107 VE|Amsterdam Zuid-Oost|Netherlands|
|          4|52.290306|4.9784713|Amsterdam-Zuidoos...| 1107WN| Amsterdam-Zuidoost|Netherlands|
|          5|52.290688| 4.983784|Amsterdam Zuid-Oo...|   1052|Amsterdam Zuid-Oost|Netherlands|
+-----------+---------+---------+--------------------+-------+-------------------+-----------+
only showing top 5 rows

In [32]:
location_table.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- location_id: integer (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- street: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)

In [33]:
location_table.write.parquet("s3://omar-archive-files/amsterdam-data/"+"location_table.parquet", mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
# I am a big fan of pandas, here is a basic demo on how to use it with a spark dataframes
# the transformation to pandas dataframe may be useful when a matplotlib/seaborn graph is needed

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
df=location_table.toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   location_id   latitude  longitude  ...  zipcode                 city      country
0            1   2.000000   2.000000  ...      N/A       within an hour      De Pijp
1            2  52.288380   4.975636  ...  1107 VC            Amsterdam  Netherlands
2            3  52.289276   4.973759  ...  1107 VE  Amsterdam Zuid-Oost  Netherlands
3            4  52.290306   4.978471  ...   1107WN   Amsterdam-Zuidoost  Netherlands
4            5  52.290688   4.983784  ...     1052  Amsterdam Zuid-Oost  Netherlands

[5 rows x 7 columns]

In [37]:
df['street'].value_counts()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Amsterdam, Noord-Holland, Netherlands             15360
Amsterdam, North Holland, Netherlands              2899
Amsterdam, NH, Netherlands                         1252
Amsterdam-Zuidoost, Noord-Holland, Netherlands      156
Amsterdam, Netherlands                              120
                                                  ...  
Amsterdam, Overtoom, Netherlands                      1
Amsterdam, boord Holland, Netherlands                 1
Amsterdam, CE, Netherlands                            1
1015 SN Amsterdam, Noord-Holland, Netherlands         1
Amsterdam, LN, Netherlands                            1
Name: street, Length: 104, dtype: int64

In [38]:
#here I finish editing my listings_table ( ordering & renaming columns, type casting)
listings_table=new_listings.select(col('id').cast('integer'),\
                   col('a.host_id').cast('integer'),\
                   col('a.latitude').cast('float'),\
                   col('a.longitude').cast('float'),\
                    
                   col('new_price').alias('price'),\
                   col('new_weekly_price').alias('weekly_price'),\
                   col('new_monthly_price').alias('monthly_price'),\
                   col('new_security_deposit').alias('security_deposit'),\
                   col('new_cleaning_fee').alias('cleaning_fee'),\
                   col('new_guests_included').alias('guests_included'),\
                   col('new_extra_people').alias('extra_people'),\

                   col('listing_url'),\

                   col('property_type'),\
                   col('accommodates').cast('integer'),\
                   col('a.room_type'),\
                   col('bathrooms').cast('float'),\
                   col('bedrooms').cast('float'),\
                   col('beds').cast('float'),\
                   col('bed_type'),\
                   col('amenities'),\

                   col('review_scores_rating').cast('float'),\
                   col('review_scores_accuracy').cast('float'),\
                   col('review_scores_cleanliness').cast('float'),\
                   col('review_scores_location').cast('float'),\
                   col('review_scores_value').cast('float'),\
                   col('cancellation_policy'),\
                   col('new_require_guest_profile_picture').alias('require_guest_profile_picture'),\
                   col('new_require_guest_phone_verification').alias('require_guest_phone_verification'))           

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### So far my listings_table contains pretty much columns, and the idea of setting the location info apart is good. So I'll create a location_id in the listings_table to references the the location_id within the location_table. This link is based on the values of (latitude, longitude).. so no need to explicitly put them within listings_table

In [39]:
listings_table.createOrReplaceTempView('listings')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
listings_table=spark.sql(""" select * , loc.location_id from listings as lis \
                                         INNER JOIN location_with_id as loc\
                                         ON (lis.latitude = loc.latitude AND lis.longitude = loc.longitude)""")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
listings_table=listings_table.select("id", "host_id", "location_id", "price", \
                      "weekly_price", "monthly_price", "security_deposit", \
                     "cleaning_fee", "guests_included", "extra_people",\
                     "listing_url", "property_type", "accommodates", \
                     "room_type", "bathrooms", "bedrooms", \
                      "beds", "bed_type", "amenities", \
                      "review_scores_rating", "review_scores_accuracy", "review_scores_cleanliness",\
                      "review_scores_location","review_scores_value", "cancellation_policy",\
                      "require_guest_profile_picture","require_guest_phone_verification").where(col("id").isNotNull())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
listings_table.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: integer (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- location_id: integer (nullable = true)
 |-- price: float (nullable = true)
 |-- weekly_price: float (nullable = true)
 |-- monthly_price: float (nullable = true)
 |-- security_deposit: float (nullable = true)
 |-- cleaning_fee: float (nullable = true)
 |-- guests_included: float (nullable = true)
 |-- extra_people: float (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- accommodates: integer (nullable = true)
 |-- room_type: string (nullable = true)
 |-- bathrooms: float (nullable = true)
 |-- bedrooms: float (nullable = true)
 |-- beds: float (nullable = true)
 |-- bed_type: string (nullable = true)
 |-- amenities: string (nullable = true)
 |-- review_scores_rating: float (nullable = true)
 |-- review_scores_accuracy: float (nullable = true)
 |-- review_scores_cleanliness: float (nullable = true)
 |-- review_scores_location: float (nullable 

In [43]:
listings_table.write.parquet("s3://omar-archive-files/amsterdam-data/"+"listings.parquet", mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## host table

##### now return back to the new_listings dataframe to extract the host dimension table

In [44]:
new_listings=new_listings.withColumn('new_host_response_rate',regexp_replace(col('host_response_rate'),'[$,%]', '' ).cast('float'))\
            .withColumn('new_host_is_superhost',\
                        (when(col('host_is_superhost') == 'f', 'False').\
                         when(col('host_is_superhost') == 't', 'True')).cast(dataType=types.BooleanType()))\
            .withColumn('new_host_identity_verified',\
                        (when(col('host_identity_verified') == 'f', 'False').\
                         when(col('host_identity_verified') == 't', 'True')).cast(dataType=types.BooleanType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [45]:
host_table=new_listings.select(col('b.host_id').cast('integer'),\
                              col('b.host_name'),\
                              col('host_about'),\
                              col('host_response_time'),\
                              col('new_host_response_rate').alias('host_response_rate'),\
                              col('new_host_is_superhost').alias('host_is_superhost'),\
                              col('host_location'),\
                              col('new_host_identity_verified').alias('host_identity_verified'))\
                              .where(col("host_id").isNotNull()).dropDuplicates(subset=['host_id'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [46]:
host_table.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

17263

In [47]:
host_table.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- host_id: integer (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_about: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: float (nullable = true)
 |-- host_is_superhost: boolean (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_identity_verified: boolean (nullable = true)

In [48]:
host_table.write.parquet("s3://omar-archive-files/amsterdam-data/"+"hosts.parquet", mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## reviews

In [49]:
# read in a json format file of reviews_details 
reviews_details=spark.read.format('json').option("multiLine","true").load('s3://omar-archive-files/amsterdam-data/reviews_details.json')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [50]:
reviews_details.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+----+----------+-----------+-------------+
|            comments|      date|  id|listing_id|reviewer_id|reviewer_name|
+--------------------+----------+----+----------+-----------+-------------+
|Daniel is really ...|2009-03-30|1191|      2818|      10952|          Lam|
|Daniel is the mos...|2009-04-24|1771|      2818|      12798|        Alice|
|We had such a gre...|2009-05-03|1989|      2818|      11869|      Natalja|
|Very professional...|2009-05-18|2797|      2818|      14064|      Enrique|
|Daniel is highly ...|2009-05-25|3151|      2818|      17977|      Sherwin|
+--------------------+----------+----+----------+-----------+-------------+
only showing top 5 rows

##### Do remember that our time_table previously defined starts up from 06/12/2018, however reviews have been made since 2009. Here below I join dates outside 2018-2019 to have a date_time table which covers all the AirBnB events

In [51]:
reviews_details=reviews_details.withColumn('new_date', reviews_details['date'].cast('date'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
time_table2=reviews_details.selectExpr('new_date as date').dropDuplicates().where(col('date').isNotNull()).orderBy('date', ascending=True)\
.withColumn('day_of_month', dayofmonth('date'))\
.withColumn('week'        , weekofyear('date'))\
.withColumn('month'       , month('date'))\
.withColumn('year'        , year('date'))\
.withColumn('day'         , date_format('date', 'EEEE'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [53]:
date_time= time_table2.union(time_table)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
date_time.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- date: date (nullable = true)
 |-- day_of_month: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- day: string (nullable = true)

In [55]:
date_time.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------+----+-----+----+------+
|      date|day_of_month|week|month|year|   day|
+----------+------------+----+-----+----+------+
|2009-03-30|          30|  14|    3|2009|Monday|
|2009-04-24|          24|  17|    4|2009|Friday|
|2009-05-03|           3|  18|    5|2009|Sunday|
|2009-05-18|          18|  21|    5|2009|Monday|
|2009-05-25|          25|  22|    5|2009|Monday|
+----------+------------+----+-----+----+------+
only showing top 5 rows

In [56]:
print ( " date_time starts from 2009 now and actually has {} records".format(date_time.count()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

 date_time starts from 2009 now and actually has 3240 records

In [57]:
date_time.write.parquet("s3://omar-archive-files/amsterdam-data/"+"date_time.parquet", mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [58]:
## reviewers_table

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### in addition to calendar fact table, my hybrid star schema or snowflake schema would involve another fact table which is reviews ( completely indep on the first fact table)

In [59]:
fact_reviews=reviews_details.select(col('id').cast('integer'),\
                                    col('listing_id').cast('integer'),\
                                    col('new_date').alias('date'),\
                                    col('reviewer_id').cast('integer'),\
                                    col('comments')).where(col('id').isNotNull() & col('date').isNotNull() & col('reviewer_id').isNotNull())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [60]:
fact_reviews.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: integer (nullable = true)
 |-- listing_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- reviewer_id: integer (nullable = true)
 |-- comments: string (nullable = true)

In [61]:
fact_reviews.write.parquet("s3://omar-archive-files/amsterdam-data/"+"fact_reviews.parquet", mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### Finaly I add reviewer dimension table and to detach the name of the reviewer from the fact table

In [62]:
reviewer_table=reviews_details.select(col('reviewer_id').cast('integer'), \
                                     col('reviewer_name')).where(col("reviewer_id").isNotNull()).dropDuplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [63]:
reviewer_table.write.parquet("s3://omar-archive-files/amsterdam-data/"+"reviewer_table.parquet", mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [64]:
reviewer_table.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+-------------+
|reviewer_id|reviewer_name|
+-----------+-------------+
|    1475799|       Dhaval|
|   17837628|       Sophie|
|   60449351|      Etienne|
|  101794231|        Katie|
|   26289366|       Martin|
+-----------+-------------+
only showing top 5 rows

In [65]:
print("There is {} active reviewers in the data set".format(reviewer_table.dropDuplicates().count()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

There is 411813 active reviewers in the data set