# Project Summary
To achieve the project with Spark, I have utilized two datasets. The first one is Tokyo Airbnb Detailed Open Data from Kaggle and another Tokyo weather data from Kaggle as well. The datasets are transformed into several data models based on star schema on top of Postgres DBMS.


## Steps
Here is the outline of the steps that I took.
* step 1. Scope the Project and Gather Data
* step 2. Explore and Assess the Data
* step 3. Define the Data Model
* step 4. Run ETL to Model the Data
* step 5. Complete Project Write Up

### Configuration

In [134]:
import os
import glob
import pandas as pd
from sql_queries import *
import datetime
import psycopg2
import configparser

In [135]:
import os
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format


In [136]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

### Database Construction

In [138]:
conn = psycopg2.connect("host=127.0.0.1 dbname=studentdb user=taro password=''")
conn.set_session(autocommit=True)
cur = conn.cursor()

# create sparkify database with UTF8 encoding
cur.execute("DROP DATABASE IF EXISTS sparkifydb")
cur.execute("CREATE DATABASE sparkifydb WITH ENCODING 'utf8' TEMPLATE template0")

# close connection to default database
conn.close()

# connect to sparkify database
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=taro password=''")
cur = conn.cursor()

In [139]:
print(conn)
print(cur)
cur.close()
conn.close()

## Step 1. Scope the Project and Gather Data

### Scope
I used Tokyo Airbnb Detailed Open Data from Kaggle and another Tokyo weather data from Kaggle as well. Those two datasets are snitched on staging table then distributed to the fact table and dimension tables via ETL pipeline.

### Data
* Tokyo Airbnb Detailed Open Data [source](https://www.kaggle.com/fuyutaro/tokyo-airbnb-detailed-open-data?select=reviews.csv)
    * calendar: revervation information
    * listing: entire property information
    * review: reviews made by the property users
* Tokyo weather data [source](https://www.kaggle.com/loovmj/tokyo-weather-data): Climate change, precipitation and dailight hours data since 1984

### Tokyo Airbnb Detailed Open Data - calendar

In [140]:
calendar_filepath = './calendar.csv'

In [141]:
calendar_df = spark.read.option("header","true").csv(calendar_filepath)
calendar_df.limit(5).toPandas()

Unnamed: 0,listing_id,date,available,price,adjusted_price,minimum_nights,maximum_nights
0,1419,2019-08-08,f,$469.00,$469.00,4,730
1,1419,2019-08-09,f,$469.00,$469.00,4,730
2,1419,2019-08-10,f,$469.00,$469.00,4,730
3,1419,2019-08-11,f,$469.00,$469.00,4,730
4,1419,2019-08-12,f,$469.00,$469.00,4,730


### Tokyo Airbnb Detailed Open Data - listing

In [144]:
listing_filepath = './listings.csv'
listing_df = spark.read.option("header","true").csv(listing_filepath)
listing_df.limit(5).toPandas()

Unnamed: 0,id,listing_url,scrape_id,last_scraped,name,summary,space,description,experiences_offered,neighborhood_overview,...,instant_bookable,is_business_travel_ready,cancellation_policy,require_guest_profile_picture,require_guest_phone_verification,calculated_host_listings_count,calculated_host_listings_count_entire_homes,calculated_host_listings_count_private_rooms,calculated_host_listings_count_shared_rooms,reviews_per_month
0,1419,https://www.airbnb.com/rooms/1419,20190808195523.0,2019-08-08,Beautiful home in amazing area!,"This large, family home is located in one of T...",Visit Toronto with this fabulous furnished hom...,"This large, family home is located in one of T...",none,The apartment is located in the Ossington stri...,...,2017-12-04,100,10.0,10.0,10.0,10.0,10.0,10.0,f,
1,8077,https://www.airbnb.com/rooms/8077,20190808195523.0,2019-08-09,Downtown Harbourfront Private Room,Guest room in a luxury condo with access to al...,My husband and I are both artists. I give pain...,Guest room in a luxury condo with access to al...,none,,...,2009-08-20,2013-08-27,97.0,10.0,10.0,10.0,10.0,10.0,10,f
2,12604,https://www.airbnb.com/rooms/12604,20190808195523.0,2019-08-08,Seaton Village Parlour Bedroom,Comfortable sofa bed in airy main floor parlour.,Family of 4 (2 kids) plus a friendly dog open...,Comfortable sofa bed in airy main floor parlou...,none,,...,,,,,,,,,,
3,I am able to travel alone and with my husband ...,but we also travel together as a family.,,,,,,,,,...,,,,,,,,,,
4,I treat my hosts homes as I would expect peopl...,"and leave everything exactly as I find it. """,,,,f,https://a0.muscache.com/im/users/48239/profile...,https://a0.muscache.com/im/users/48239/profile...,The Annex,1,...,,,,,,,,,,


### Tokyo Airbnb Detailed Open Data - review

In [145]:
review_filepath = './reviews.csv'
review_df = spark.read.option("header","true").csv(review_filepath)
review_df.limit(5).toPandas()

Unnamed: 0,listing_id,id,date,reviewer_id,reviewer_name,comments
0,1419,38924112.0,2015-07-19,11308465.0,Marcela,Having the opportunity of arriving to Alexandr...
1,"""",,,,,
2,1419,44791978.0,2015-08-29,9580285.0,Marco,We have no enough words to describe how beauty...
3,We loved this home as it if were our. Thank y...,,,,,
4,1419,45957133.0,2015-09-07,38394721.0,Andrea,The listing was exceptional and an even better...


### Tokyo weather data

In [146]:
weather_filepath = './weather.csv'
weather_df = spark.read.option("header","true").option("encoding","shift-JIS").csv(weather_filepath)
weather_df.limit(5).toPandas()


Unnamed: 0,y/m/d,high(℃),low(℃),max_precipitation_10min(mm),daylight_hours(hours)
0,1984/1/1,10.1,0.9,0.0,8.5
1,1984/1/2,8.9,0.3,0.0,7.0
2,1984/1/3,7.7,0.9,1.0,4.1
3,1984/1/4,10.9,1.5,0.5,6.5
4,1984/1/5,7.6,1.9,0.0,8.6


## Step 2. Explore and Assess the Data
Explore the data to identify data quality issues, like missing values, 
duplicate data.

In [147]:
print("calendar")
print("rows:", calendar_df.count())
print("columns: ", len(calendar_df.columns[0]) ," \n")

print("listing")
print("rows:", listing_df.count())
print("columns: ", len(listing_df.columns[0]) ," \n")

print("review")
print("rows:", review_df.count())
print("columns: ", len(review_df.columns[0]) ," \n")

print("weather")
print("rows:", weather_df.count())
print("columns: ", len(weather_df.columns[0]) ," \n")


calendar
rows: 7890295
columns:  10  

listing
rows: 36762
columns:  2  

review
rows: 750734
columns:  10  

weather
rows: 12805
columns:  5  



#### Find Null values

In [148]:
def count_null(dataFrame):
    """Count the number of nulls in a given dataframe
    Parameters:
    ----------------
    dataframe: object
        data frame
    """"
    dataFrame.select([F.count(F.when(F.isnan(c) | col(c).isNull(), c)).alias(c) for c in dataFrame.columns]).show()

##### Null values in weather

In [149]:
count_null(weather_df)

+-----+-------+------+---------------------------+---------------------+
|y/m/d|high(℃)|low(℃)|max_precipitation_10min(mm)|daylight_hours(hours)|
+-----+-------+------+---------------------------+---------------------+
|   16|     13|    13|                         15|                   13|
+-----+-------+------+---------------------------+---------------------+



Take the same steps from the rest of the datasets

##### Null values in listing

In [150]:
print("Count the number of null values in listings dataset.")
count_null(listing_df)

Count the number of null values in listings dataset.
+---+-----------+---------+------------+----+-------+-----+-----------+-------------------+---------------------+-----+-------+------+-----------+-----------+-------------+----------+-----------+--------------+-------+--------+---------+----------+-------------+----------+------------------+------------------+--------------------+-----------------+------------------+----------------+------------------+-------------------+-------------------------+------------------+--------------------+----------------------+------+-------------+----------------------+----------------------------+-----+-----+-------+------+--------------+------------+-------+--------+---------+-----------------+-------------+---------+------------+---------+--------+-----+--------+---------+-----------+-----+------------+-------------+----------------+------------+---------------+------------+--------------+--------------+----------------------+----------------------

In [151]:
listing_df.withColumn('numNulls', sum(listing_df[col].isNull().cast('int') for col in listing_df.columns))\
.orderBy(F.desc("numNulls")).groupBy("numNulls").count().limit(10).toPandas()

Unnamed: 0,numNulls,count
0,105,4834
1,104,1771
2,103,1070
3,102,733
4,101,446
5,100,216
6,99,181
7,98,106
8,97,89
9,96,57


In [152]:
listing_df.withColumn('numNulls', sum(listing_df[col].isNull().cast('int') for col in listing_df.columns)).orderBy(F.desc("numNulls")).limit(10).toPandas()

Unnamed: 0,id,listing_url,scrape_id,last_scraped,name,summary,space,description,experiences_offered,neighborhood_overview,...,is_business_travel_ready,cancellation_policy,require_guest_profile_picture,require_guest_phone_verification,calculated_host_listings_count,calculated_host_listings_count_entire_homes,calculated_host_listings_count_private_rooms,calculated_host_listings_count_shared_rooms,reviews_per_month,numNulls
0,We are available to our guests any time of day...,,,,,,,,,,...,,,,,,,,,,105
1,I can assure you of a very warm welcome by mys...,,,,,,,,,,...,,,,,,,,,,105
2,We look forward to hosting you!,,,,,,,,,,...,,,,,,,,,,105
3,-,,,,,,,,,,...,,,,,,,,,,105
4,I would love to help you earn headache-free in...,,,,,,,,,,...,,,,,,,,,,105
5,Happy Trails!,,,,,,,,,,...,,,,,,,,,,105
6,Feel free to reach out if you have any questio...,,,,,,,,,,...,,,,,,,,,,105
7,We also have a travel startup called Lokafy wh...,,,,,,,,,,...,,,,,,,,,,105
8,I've enjoyed Airbnb spots in SanFrancisco and ...,,,,,,,,,,...,,,,,,,,,,105
9,Happy Trails!,,,,,,,,,,...,,,,,,,,,,105


In [153]:
listing_df.withColumn('numNulls', sum(listing_df[col].isNull().cast('integer') for col in listing_df.columns))\
.filter(listing_df.host_id.cast("integer").isNotNull())\
.orderBy(F.desc("numNulls")).groupBy("numNulls").count().limit(10).toPandas()

Unnamed: 0,numNulls,count
0,93,26
1,92,22
2,91,228
3,90,244
4,89,253
5,88,319
6,87,325
7,86,634
8,85,953
9,84,1643


In [154]:
listing_df.filter(listing_df.id.cast("integer").isNotNull()).filter(listing_df.id.cast("integer").isNull()).count()

0

In [None]:
listing_df.limit(10).toPandas()

##### Null values in reviews

In [162]:
review_df.withColumn('numNulls', sum(review_df[col].isNull().cast('int') for col in review_df.columns))\
.orderBy(F.desc("numNulls")).groupBy("numNulls").count().limit(10).toPandas()


review_df.withColumn('numNulls', sum(review_df[col].isNull().cast('int') for col in review_df.columns)).orderBy(F.desc("numNulls")).limit(10).toPandas()

review_df.limit(10).toPandas()

Unnamed: 0,listing_id,id,date,reviewer_id,reviewer_name,comments
0,1419,38924112.0,2015-07-19,11308465.0,Marcela,Having the opportunity of arriving to Alexandr...
1,"""",,,,,
2,1419,44791978.0,2015-08-29,9580285.0,Marco,We have no enough words to describe how beauty...
3,We loved this home as it if were our. Thank y...,,,,,
4,1419,45957133.0,2015-09-07,38394721.0,Andrea,The listing was exceptional and an even better...
5,1419,67295154.0,2016-03-28,3515044.0,Shaun,Alexandra's home was amazing and in such a nea...
6,Anyone looking for a quiet neighbourhood that'...,,,,,
7,"Thanks Alexandra!""",,,,,
8,1419,177702208.0,2017-08-03,13987100.0,Kate,Beautiful home. Very comfortable and clean. Pe...
9,Alexandra gave us à very complete list of all ...,,,,,


##### Null values in calendar

In [163]:
calendar_df.withColumn('numNulls', sum(calendar_df[col].isNull().cast('int') for col in calendar_df.columns))\
.orderBy(F.desc("numNulls")).groupBy("numNulls").count().limit(10).toPandas()

calendar_df.withColumn('numNulls', sum(calendar_df[col].isNull().cast('int') for col in calendar_df.columns)).orderBy(F.desc("numNulls")).limit(10).toPandas()

calendar_df.limit(10).toPandas()

Unnamed: 0,listing_id,date,available,price,adjusted_price,minimum_nights,maximum_nights
0,1419,2019-08-08,f,$469.00,$469.00,4,730
1,1419,2019-08-09,f,$469.00,$469.00,4,730
2,1419,2019-08-10,f,$469.00,$469.00,4,730
3,1419,2019-08-11,f,$469.00,$469.00,4,730
4,1419,2019-08-12,f,$469.00,$469.00,4,730
5,1419,2019-08-13,f,$469.00,$469.00,4,730
6,1419,2019-08-14,f,$469.00,$469.00,4,730
7,1419,2019-08-15,f,$469.00,$469.00,4,730
8,1419,2019-08-16,f,$469.00,$469.00,4,730
9,1419,2019-08-17,f,$469.00,$469.00,4,730


### Find duplicates

Now let me find duplicate values in order. At first, check calendar dataset.

In [164]:
# Calendar
print("rows:", calendar_df.count())
print("columns: ", len(calendar_df.columns[0]) ," \n")

calendar_df \
    .groupby(calendar_df.columns) \
    .count() \
    .where('count > 1') \
    .select(F.sum('count'))\
    .show()


calendar
rows: 7890295
columns:  10  

+----------+
|sum(count)|
+----------+
|      null|
+----------+



In [165]:
# Lisiting
print("rows:", listing_df.count())
print("columns: ", len(listing_df.columns[0]) ," \n")

listing_df \
    .groupby(listing_df.columns) \
    .count() \
    .where('count > 1') \
    .select(F.sum('count'))\
    .show()


listing
rows: 36762
columns:  2  

+----------+
|sum(count)|
+----------+
|      6501|
+----------+



In [166]:
# Review
print("rows:", review_df.count())
print("columns: ", len(review_df.columns[0]) ," \n")

review_df \
    .groupby(review_df.columns) \
    .count() \
    .where('count > 1') \
    .select(F.sum('count'))\
    .show()


review
rows: 750734
columns:  10  

+----------+
|sum(count)|
+----------+
|     16912|
+----------+



In [167]:
# Weather
print("rows:", weather_df.count())
print("columns: ", len(weather_df.columns[0]) ," \n")

weather_df \
    .groupby(weather_df.columns) \
    .count() \
    .where('count > 1') \
    .select(F.sum('count'))\
    .show()


weather
rows: 12805
columns:  5  

+----------+
|sum(count)|
+----------+
|      5134|
+----------+



In [168]:
weather_df.limit(5).toPandas()

Unnamed: 0,y/m/d,high(℃),low(℃),max_precipitation_10min(mm),daylight_hours(hours)
0,1984/1/1,10.1,0.9,0.0,8.5
1,1984/1/2,8.9,0.3,0.0,7.0
2,1984/1/3,7.7,0.9,1.0,4.1
3,1984/1/4,10.9,1.5,0.5,6.5
4,1984/1/5,7.6,1.9,0.0,8.6


As we can see above, in weather dataset, each row contains a unique date information. Therefore it would be better not to drop duplicates.

In calendar dataset, it seems there are no any duplicates found. Let me keep checking other datasets.

### Step 3. Define the Data Model

#### 3-1. Map out the conceptual data model and explain why you chose that model
At the moment, there are mainly two original datasets. I will split them out to 9 tables eventually.
The fact and dimensional tables are the below screenshot.

#### 3-2. List the steps necessary to pipeline the data into the chosen data model
Since daat for final tables scattered around two datasets, I'm going to build one staging table combining two datasets. Then, pump up each final tables.

### Step 4. Run ETL to Model the Data

#### 4-1 Create the data pipelines and the data model

In [None]:
from sql_queries import *
import psycopg2
conn = psycopg2.connect("host=127.0.0.1 dbname=studentdb user=taro password=''")
conn.set_session(autocommit=True)
cur = conn.cursor()

# create sparkify database with UTF8 encoding
cur.execute("DROP DATABASE IF EXISTS sparkifydb")
cur.execute("CREATE DATABASE sparkifydb WITH ENCODING 'utf8' TEMPLATE template0")

# close connection to default database
conn.close()

# connect to sparkify database
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=taro password=''")
cur = conn.cursor()

In [None]:
# Listing table creation
cur.execute(listings_table_create)
conn.commit()

In [None]:
# Weather table creation
cur.execute(weather_table_create)
conn.commit()

In [None]:
# Calendar table creation
cur.execute(calendar_table_create)
conn.commit()

In [None]:
# Host table creation
cur.execute(host_table_create)
conn.commit()

In [None]:
# Location table creation
cur.execute(location_table_create)
conn.commit()

In [None]:
# Property table creation
cur.execute(property_table_create)
conn.commit()

In [None]:
# Requirements table creation
cur.execute(requirements_table_create)
conn.commit()

In [None]:
# Reviews table creation
cur.execute(reviews_table_create)
conn.commit()

In [None]:
# Price table creation
cur.execute(price_table_create)
conn.commit()

In [None]:
# Pyspark Configuration
import os
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import udf, col
from pyspark.sql.types import FloatType, TimestampType, StringType
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .config("spark.jars", "postgresql-42.2.18.jar") \
        .getOrCreate()

In [None]:
url = "jdbc:postgresql://localhost/sparkifydb"
    properties = {
        "user": "taro",
        "password": "",
        "driver": "org.postgresql.Driver"
    }

### Listing Fact Table

In [None]:
listing_fact_df = listing_df.select("id", "host_id")
listing_fact_df = listing_fact_df.withColumn("id", col("id").cast("integer"))
listing_fact_df = listing_fact_df.withColumn("host_id", col("host_id").cast("integer"))
listing_fact_df.printSchema()
listing_fact_df.limit(5).toPandas()

In [None]:
# Insert data into listingx table
listing_fact_df.write.jdbc(url=url, table="listings", properties=properties,mode="append")

### Weather Dimension Table

In [None]:
weather_df.limit(5).toPandas()
weather_dimension_df = weather_df.withColumnRenamed("y/m/d", "date")
weather_dimension_df = weather_dimension_df.withColumn("date", col("date").cast("timestamp"))
weather_dimension_df = weather_dimension_df.withColumnRenamed("high(℃)", "high")
weather_dimension_df = weather_dimension_df.withColumn("high", F.col("high").cast("float"))
weather_dimension_df = weather_dimension_df.withColumnRenamed("low(℃)", "low")
weather_dimension_df = weather_dimension_df.withColumn("low", col("low").cast("float"))
weather_dimension_df = weather_dimension_df.withColumnRenamed("max_precipitation_10min(mm)", "max_precipitation_10min")
weather_dimension_df = weather_dimension_df.withColumn("max_precipitation_10min", col("max_precipitation_10min").cast("float"))
weather_dimension_df = weather_dimension_df.withColumnRenamed("daylight_hours(hours)", "daylight_hours")
weather_dimension_df = weather_dimension_df.withColumn("daylight_hours", col("daylight_hours").cast("float"))
weather_dimension_df.printSchema()

In [None]:
# Insert data into weather table
weather_dimension_df.write.jdbc(url=url, table="weather", properties=properties,mode="append")

### Calendar Dimension Table

In [None]:
calendar_df.printSchema()

# Data type conversion
calendar_df = calendar_df.withColumn("available", col("available").cast("boolean"))
calendar_df = calendar_df.withColumn("minimum_nights", col("minimum_nights").cast("integer"))
calendar_df = calendar_df.withColumn("maximum_nights", col("maximum_nights").cast("integer"))
calendar_df = calendar_df.withColumn("available", col("available").cast("boolean"))

# Column renaming
calendar_df = calendar_df.withColumnRenamed("price", "boooking_price")
calendar_df = calendar_df.withColumnRenamed("minimum_nights", "boooking_minimum_nights")
calendar_df = calendar_df.withColumnRenamed("maximum_nights", "boooking_maximum_nights")

# Show the first 5 rows
calendar_df.limit(5).toPandas()

# Data type conversion
listing_df = listing_df.withColumn("has_availability", col("has_availability").cast("integer"))
listing_df = listing_df.withColumn("availability_30", col("availability_30").cast("integer"))
listing_df = listing_df.withColumn("availability_60", col("availability_60").cast("integer"))
listing_df = listing_df.withColumn("availability_90", col("availability_90").cast("integer"))
listing_df = listing_df.withColumn("availability_365", col("availability_365").cast("integer"))
listing_df = listing_df.withColumn("calendar_last_scraped", col("calendar_last_scraped").cast("double"))
listing_df = listing_df.withColumn("guests_included", col("guests_included").cast("integer"))

# Join calendar and listing datasets
calendar_dimension_df = calendar_df.join(listing_df, (calendar_df.listing_id == listing_df.id))\
                        .select("listing_id", "date", "available", "price", "adjusted_price", \
                                "minimum_nights", "maximum_nights", "calendar_updated", "has_availability", \
                                "availability_30", "availability_60", "availability_90", "availability_365", \
                                "guests_included", "extra_people")

# Show the first 5 rows
calendar_dimension_df.limit(5).toPandas()


In [None]:
# Insert data into calendar table
calendar_dimension_df.write.jdbc(url=url, table="calendar", properties=properties,mode="append")

### Host Dimension Table

In [None]:
listing_df.printSchema()
# Data type conversion
listing_df = listing_df.withColumn("host_since", col("host_since").cast("timestamp"))
listing_df = listing_df.withColumn("host_is_superhost", col("host_is_superhost").cast("boolean"))
listing_df = listing_df.withColumn("host_listings_count", col("host_listings_count").cast("integer"))
listing_df = listing_df.withColumn("host_total_listings_count", col("host_total_listings_count").cast("integer"))
listing_df = listing_df.withColumn("host_has_profile_pic", col("host_has_profile_pic").cast("boolean"))
listing_df = listing_df.withColumn("host_identity_verified", col("host_identity_verified").cast("boolean"))

host_dimension_df = listing_df.select("id", "host_id", "host_url", "host_name", "host_since", "host_location", "host_about", "host_response_time", "host_response_rate", "host_acceptance_rate", "host_is_superhost", "host_thumbnail_url", "host_picture_url", "host_neighbourhood", "host_listings_count", "host_total_listings_count", "host_verifications", "host_has_profile_pic", "host_identity_verified")
host_dimension_df.limit(5).toPandas()

In [None]:
# Insert data into host table
host_dimension_df.write.jdbc(url=url, table="host", properties=properties,mode="append")

### Property Dimension Table

In [None]:
# Data type conversion
listing_df = listing_df.withColumn("accommodates", col("accommodates").cast("integer"))
listing_df = listing_df.withColumn("bathrooms", col("bathrooms").cast("float"))
listing_df = listing_df.withColumn("bedrooms", col("bedrooms").cast("integer"))
listing_df = listing_df.withColumn("beds", col("beds").cast("integer"))
listing_df = listing_df.withColumn("square_feet", col("square_feet").cast("integer"))


property_dimension_df = listing_df.select("listing_url", "name", "summary", "space", "description", "experiences_offered", "neighborhood_overview", "notes", "transit", "access", "interaction", "house_rules", "thumbnail_url", "medium_url", "picture_url", "xl_picture_url", "neighbourhood", "neighbourhood_cleansed", "neighbourhood_group_cleansed", "property_type", "room_type", "accommodates", "bathrooms", "bedrooms", "beds", "bed_type", "amenities", "square_feet")
property_dimension_df.limit(5).toPandas()


In [None]:
# Insert data into property table
property_dimension_df.write.jdbc(url=url, table="property", properties=properties,mode="append")

### Requirements Dimension Table

In [None]:
# Data type conversion
listing_df = listing_df.withColumn("requires_license", col("requires_license").cast("double"))
listing_df = listing_df.withColumn("instant_bookable", col("instant_bookable").cast("boolean"))
listing_df = listing_df.withColumn("is_business_travel_ready", col("is_business_travel_ready").cast("boolean"))
listing_df = listing_df.withColumn("require_guest_profile_picture", col("require_guest_profile_picture").cast("boolean"))
listing_df = listing_df.withColumn("require_guest_phone_verification", col("require_guest_phone_verification").cast("boolean"))
listing_df = listing_df.withColumn("calculated_host_listings_count", col("calculated_host_listings_count").cast("integer"))
listing_df = listing_df.withColumn("calculated_host_listings_count_entire_homes", col("calculated_host_listings_count_entire_homes").cast("integer"))
listing_df = listing_df.withColumn("calculated_host_listings_count_private_rooms", col("calculated_host_listings_count_private_rooms").cast("integer"))
listing_df = listing_df.withColumn("calculated_host_listings_count_shared_rooms", col("calculated_host_listings_count_shared_rooms").cast("integer"))


requirements_dimension_df = listing_df.select("id", "requires_license", "license", "jurisdiction_names", "instant_bookable", "is_business_travel_ready", "cancellation_policy", "require_guest_profile_picture", "require_guest_phone_verification", "calculated_host_listings_count", "calculated_host_listings_count_entire_homes", "calculated_host_listings_count_private_rooms", "calculated_host_listings_count_shared_rooms")
requirements_dimension_df.limit(5).toPandas()

In [None]:
# Insert data into requirements table
requirements_dimension_df.write.jdbc(url=url, table="requirements", properties=properties,mode="append")

### Reviews Dimension Table

In [None]:
# Data type conversion
review_df = review_df.withColumn("id", col("id").cast("double"))
review_df = review_df.withColumn("date", col("date").cast("timestamp"))
review_df = review_df.withColumn("reviewer_id", col("reviewer_id").cast("double"))
listing_df = listing_df.withColumn("number_of_reviews", col("number_of_reviews").cast("integer"))
listing_df = listing_df.withColumn("number_of_reviews_ltm", col("number_of_reviews_ltm").cast("integer"))
listing_df = listing_df.withColumn("first_review", col("first_review").cast("double"))
listing_df = listing_df.withColumn("last_review", col("last_review").cast("double"))
listing_df = listing_df.withColumn("review_scores_rating", col("review_scores_rating").cast("integer"))
listing_df = listing_df.withColumn("review_scores_accuracy", col("review_scores_accuracy").cast("integer"))
listing_df = listing_df.withColumn("review_scores_cleanliness", col("review_scores_cleanliness").cast("integer"))
listing_df = listing_df.withColumn("review_scores_checkin", col("review_scores_checkin").cast("integer"))
listing_df = listing_df.withColumn("review_scores_communication", col("review_scores_communication").cast("integer"))
listing_df = listing_df.withColumn("review_scores_location", col("review_scores_location").cast("integer"))
listing_df = listing_df.withColumn("review_scores_value", col("review_scores_value").cast("integer"))
listing_df = listing_df.withColumn("reviews_per_month", col("reviews_per_month").cast("float"))

# Join review and listing datasets
reviews_dimension_df = review_df.join(listing_df, (review_df.listing_id == listing_df.id))\
                        .select("listing_id", "date", "reviewer_id", "reviewer_name", "comments", "number_of_reviews", "number_of_reviews_ltm", "first_review", "last_review", "review_scores_rating", "review_scores_accuracy", "review_scores_cleanliness", "review_scores_checkin", "review_scores_communication", "review_scores_location", "review_scores_value", "reviews_per_month")
reviews_dimension_df.limit(5).toPandas()

In [None]:
# Insert data into reviews table
reviews_dimension_df.write.jdbc(url=url, table="reviews", properties=properties,mode="append")

### Price Dimension Table

In [None]:
price_dimension_df = listing_df.select("id", "price", "weekly_price", "monthly_price", "security_deposit", "cleaning_fee")
price_dimension_df.limit(5).toPandas()

In [None]:
# Insert data into price table
price_dimension_df.write.jdbc(url=url, table="price", properties=properties,mode="append")

## 4-2 Data Dictionary

### Fact Table
#### Listings
* id: primary key of the table
* host_id: host owner id of the property
listing_id

#### weather
* weather_id: primary key of the table
* date: timestamp of the data collected
* high: high temperature in a day (Celsius)
* low: low temperature in a day (Celsius)
* max_precipitation_in_a_day: max precipitation ina day (mm)
* daylight_hours_in_a_day: min precipitation in a day (mm)

#### calendar
* calendar_id: primary key of the table
* listing_id: property id
* date: calendar date
* price: price of the usage
* adjusted_price: price including tax
* minimum_nights: minimum nights for booking
* maximum_nights: maximum nights for booking
* calendar_updated: date of information updated
* has_availability: property avaialbility
* availability_30: avialability before 30 days
* availability_60: avialability before 60 days
* availability_90: avialability before 90 days
* availability_365: avialability before 360 days
* calendar_last_scraped: avialability before 30 days
* guests_included: number of available guests
* extra_people: number of avaialble extra people
* minimum_nights: minimum nights for booking
* maximum_nights: maximum nights for booking

#### host
* id: primary key of the table
* listing_id: property id
* host_id: host owner registration id
* host_url: host website url
* host_name: host owner name
* host_since: host activities start year
* host_location: host owner address
* host_about: host owner description
* host_response_time: host owner average response time
* host_response_rate: evaluation
* host_acceptance_rate: percentage of acceptance
* host_is_superhost: value about if the host is superhost
* host_thumbnail_url: image url
* host_picture_url: host owner picture url
* host_neighbourhood: host neightbourhood information
* host_listings_count: count of host listings
* host_total_listings_count: host total listing count
* host_verifications: host verification value
* host_has_profile_pic: the value if the host has registered her/his picture
* host_identity_verified: id verification

#### location
* location_id: primary key of the table
* listing_id: property id
* street: location street information
* city: location city information
* state: location state information
* zipcode: location zipcode information
* market: location nearby market information
* smart_location: location smart information
* country_code: country code
* country: country name
* latitude: latitude
* longitude: longiture
* is_location_exact: preciseness of the location

#### property
* property_id: primary key of the table
* listing_id: property listing id
* listing_url: property listing url
* name: property name
* summary: propery summary
* space: size of space
* description: detailed description about the propery
* experiences_offered: special experience offered for the
* neighborhood_overview: overview description
* notes: additional notes
* transit: transit
* access: acccess
* interaction: interaction 
* house_rules: house rules
* thumbnail_url: url of thumbnails
* medium_url: medium url
* picture_url: picture url
* xl_picture_url: xl picture url
* neighbourhood: neighboorhood information
* neighbourhood_cleansed: neighboorhood cleansed confirmation
* neighbourhood_group_cleansed:neighboorhood group cleansed confirmation
* property_type: property type
* room_type: room type
* accommodates: accommodates
* bathrooms: bathrooms
* bedrooms: bedrooms
* beds: beds
* bed_type: bed types
* amenities: amenities
* square_feet: square feet

#### requirements
* requirements_id: primary key of the table
* listing_id: property id
* requires_license: requires verification information
* license: license required
* jurisdiction_names: jurisdiction names
* instant_bookable: the value if the instant booking is available
* is_business_travel_ready: if the property is business oriented
* cancellation_policy: cancellation policy
* require_guest_profile_picture: if the guest profile picture required
* require_guest_phone_verification: if the guest phone required to be verified
* calculated_host_listings_count: the number of host listings
* calculated_host_listings_count_entire_homes: the number of host listings


#### reviews
* review_id: primary key of the table
* listing_id: property id
* date: date of review given
* reviewer_id: reviewer id
* reviewer_name: reviewer name
* comments: additional comments
* number_of_reviews: number of reviews
* number_of_reviews_ltm: number of reviews items
* first_review :first review date
* last_review : last review date
* review_scores_rating: review socre rating
* review_scores_accuracy: review score accuracy
* review_scores_cleanliness: review score cleanliness
* review_scores_checkin: review score checkins
* review_scores_communication: review score for communication
* review_scores_location: review score location given
* review_scores_value: review score
* reviews_per_month: the number of reviews given per month

## 4-3. Data Quality Checks
Check if the number of rows is more that 1 row.

In [None]:
print("listing_fact_df")
print("rows:", listing_fact_df.count())
print("columns: ", len(listing_fact_df.columns[0]) ," \n")

print("weather_dimension_df")
print("rows:", weather_dimension_df.count())
print("columns: ", len(weather_dimension_df.columns[0]) ," \n")

print("calendar_dimension_df")
print("rows:", calendar_dimension_df.count())
print("columns: ", len(calendar_dimension_df.columns[0]) ," \n")

print("host_dimension_df")
print("rows:", host_dimension_df.count())
print("columns: ", len(host_dimension_df.columns[0]) ," \n")

print("location_dimension_df")
print("rows:", location_dimension_df.count())
print("columns: ", len(location_dimension_df.columns[0]) ," \n")

print("property_dimension_df")
print("rows:", property_dimension_df.count())
print("columns: ", len(property_dimension_df.columns[0]) ," \n")

print("requirements_dimension_df")
print("rows:", requirements_dimension_df.count())
print("columns: ", len(requirements_dimension_df.columns[0]) ," \n")

print("reviews_dimension_df")
print("rows:", reviews_dimension_df.count())
print("columns: ", len(reviews_dimension_df.columns[0]) ," \n")


Test data type verification
All of the database result should be more than one rows.

In [None]:
sql = insert "select * from listings;"
listing_db = spark.read.jdbc(url=url, table=sql, properties=properties).first()['count'] 
print(listing_db)

In [None]:
sql = insert "select * from weather;"
weather_db = spark.read.jdbc(url=url, table=sql, properties=properties).first()['count'] 
print(weather_db)

In [None]:
sql = insert "select * from calendar;"
calendar_db = spark.read.jdbc(url=url, table=sql, properties=properties).first()['count'] 
print(calendar_db)

In [None]:
sql = insert "select * from host;"
host_db = spark.read.jdbc(url=url, table=sql, properties=properties).first()['count'] 
print(host_db)

In [None]:
sql = insert "select * from property;"
property_db = spark.read.jdbc(url=url, table=sql, properties=properties).first()['count'] 
print(property_db)

In [None]:
sql = insert "select * from requirements;"
requirements_db = spark.read.jdbc(url=url, table=sql, properties=properties).first()['count'] 
print(requirements_db)

In [None]:
sql = insert "select * from reviews;"
reviews_db = spark.read.jdbc(url=url, table=sql, properties=properties).first()['count'] 
print(reviews_db)

In [None]:
sql = insert "select * from price;"
price_db = spark.read.jdbc(url=url, table=sql, properties=properties).first()['count'] 
print(price_db)

## Step 5. Project Write Up

### Expected queries to run
One of the expected queries to run with these dataset is to find the median price of the property based on season and weather. This information is helpful for owners who is going to start a service at a new property or new property owners who are not familiar with the rent market to give insights.

### The reason of model selected
The primary reason of the model I selected is to find out the property information in Japan for further business opportunities for real property business. To accomplish this purpose, Airbnb dataset covers most of the dataset that I need. Also, these datasets are a good opportunity to solidify my understanding that I have learned through the data engineering program before going wild.

### Steps of the process
1. Figure out and design the purpose of the project and goals
2. Reasearch datasets to fulfill the purposes and goals
3. Design data models based on the selected datasets
4. Define the data types of data tables
5. Design the process of the execution
6. Figure out the defects of the execution process
7. Start building the database schema
8. Configure the environment and gather configuration requirements
9. Construct ETL process
10. Build testcases

### Technologies
* Apache Spark with Python: Distributed data processing platform for multiple data sources. Python has a variety of libraries to process and clean data effectively.
* Juyter notebook: Although Spark is powerful to handle data processing, it is a pain to take time for compiling process. Jupyter alleviates this point providing the service as an interactive platform which you can run functions many times once you execute Spark session so that it saves time when you want to verify or debug your functions works with bug free.
* Postgers DBMS: Relational database to store data on local storage with fast and robust functionalities.

### Frequency of the date update
* weather data could be collected on a daily basis on personal computer because the each daily data is ssmall size amount. Once a month, developer could upload on a cloud storage so that developer would not have a pain from the financial perspective.
* property information also expected to be uploaded on once a month since there would not be expected to start a new property business a lot such as hundreds or thounsands scale.

### Further scenarios considerations
* if the data was increased by 100x. - At present, the size of the dataset is able to be stored on a personal local computer storage. However, if the size increased by 100x, this approch is not realistic for most of the personal computers. In that case, developer may want to consider store data on a cloud storage such as S3 buket. Also, developer may want to create a clear policy for data achive such as 5 years old data would be archived or deleted since all of the data by 100x would not be used unless developer desires to figure out transition of the history of the property business, which also save the amount of money.

* if the pipelines were run on a daily basis by 7am. - At present, there is no scheduling development has been done. Therefore, scheduling and auto surveyrance system would be required to avoid human errors. For example, Airflow would be one of the best option which meets the satisfactions.

* if the datase neede to be accessed by 100+ people. - At present, the usage would be not considered. However, a distributed database such as Redshift would be required, which have a capability of multiple developers to access in a dataset at the same time.