<a href="https://colab.research.google.com/github/raniaabdullah/Capstone-dataengineering/blob/master/Capstone_Project_Template.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ETL pipeline for Airbnb listings in Madrid .
### Data Engineering Capstone Project

#### Project Summary
The aim of this project is to create ETL pipeline by using the listings data , calander data and review data of Airbnb Madrid location to form database allows data analyst to query the data relating to hosts and listings in Mardrid city e.g what is the name of host has the most listings in 2019? 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [0]:
# Do all imports and installs here
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import *
import os
import configparser
import pandas as pd

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, Madrid listings data to form the first dimension table then user_review data to form second dimension table and user_review table is the third dimension table.The three datasets will be joined on Madtrid_listings to form the fact table. The final database optimized to allowa data analyst query all information needed to explore listings behavior in Airbnb Madrid. Spark will be used to process the data.

#### Describe and Gather Data 
Information about Airbnb listings in Madrid, Spain comes from Kaggle. It is provided in csv format. 

# Make sure that your AWS credentials are loaded as env vars

In [0]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

# Create spark session with hadoop-aws package

In [0]:
spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.5") \
        .getOrCreate() 

In [0]:
#Build SQL context object
sqlContext = SQLContext(spark)

# Read data from csv files

In [0]:
# Read in the data here
df_listings = spark.read.format("csv").option("header", "true").load("listings.csv")

In [0]:
df_listings.printSchema()
df_listings.limit(3).toPandas()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- availability_365: string (nullable = true)



Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,6369,"Rooftop terrace room with ensuite bathroom, Airc.",13660,Simon,Chamartín,Hispanoamérica,40.45628,-3.67763,Private room,70,1,65,2019-09-04,0.56,1,53
1,21853,Bright and airy room,83531,Abdel,Latina,Cármenes,40.40341,-3.74084,Private room,17,4,33,2018-07-15,0.55,2,48
2,24805,Gran Via Studio Madrid,101471,Iraido,Centro,Universidad,40.42202,-3.70395,Entire home/apt,80,5,2,2017-07-03,0.03,1,354


In [0]:
df_calendar=spark.read.format("csv").option("header", "true").load("calendar.csv")

In [0]:
df_calendar.printSchema()
df_calendar.limit(3).toPandas()

root
 |-- listing_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- available: string (nullable = true)
 |-- price: string (nullable = true)
 |-- adjusted_price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- maximum_nights: string (nullable = true)



Unnamed: 0,listing_id,date,available,price,adjusted_price,minimum_nights,maximum_nights
0,336834,2019-09-19,f,$63.00,$63.00,5,250
1,6369,2019-09-19,f,$70.00,$70.00,1,365
2,6369,2019-09-20,f,$75.00,$75.00,1,365


In [0]:
df_reviews=spark.read.format("csv").option("header", "true").load("reviews.csv")

In [0]:
df_reviews.printSchema()
df_reviews.limit(3).toPandas()

root
 |-- listing_id: string (nullable = true)
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- reviewer_id: string (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)



Unnamed: 0,listing_id,id,date,reviewer_id,reviewer_name,comments
0,6369,29428,2010-03-14,84790,Nancy,Simon and Arturo have the ultimate location in...
1,6369,31018,2010-03-23,84338,David,Myself and Kristy originally planned on stayin...
2,6369,34694,2010-04-10,98655,Marion,We had a great time at Arturo and Simon's ! A ...


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
1. Drop any listing hasn't null date values.
2. Remove all duplicates rows 
3. change column called id in reviews table to be host_id , change column called id in listings table to be listing_id
4. Remove listing_id in review calendar if its data type is string

In [0]:
# Performing cleaning tasks here
# Removing entirely duplicate rows from listings, calendar and reviews tables
df_listings=df_listings.distinct()
df_calendar=df_calendar.distinct()
df_reviews=df_reviews.distinct()

# Drop listings with nall date from calendar table 
df_calendar = df_calendar.filter(df_calendar["date"] != "NaN")

# Change columns name 
df_listings=df_listings.withColumnRenamed("id", "listing_id")
df_reviews = df_reviews.withColumnRenamed("id", "host_id")
df_calendar = df_calendar.withColumnRenamed("date", "listing_date")

In [0]:
df_listings.printSchema()
df_listings.limit(3).toPandas()

root
 |-- listing_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- availability_365: string (nullable = true)



Unnamed: 0,listing_id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,594298,"Great Design, Views and Location",153391,Fernando,Centro,Sol,40.41871,-3.70607,Entire home/apt,105,1,378,2019-08-20,4.46,6,3
1,709208,Apartment in the heart of Madrid,3648882,Luz,Centro,Sol,40.41674,-3.70743,Entire home/apt,69,2,512,2019-09-15,6.06,1,316
2,852654,GRAN VÍA-SOL estudio-duplex con terraza “VINTAGE”,3726732,Javier,Centro,Sol,40.41882,-3.70107,Entire home/apt,67,3,138,2019-08-01,2.14,8,318


In [0]:
df_reviews.printSchema()
df_reviews.limit(3).toPandas()

root
 |-- listing_id: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- reviewer_id: string (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)



Unnamed: 0,listing_id,host_id,date,reviewer_id,reviewer_name,comments
0,El piso y la habitacion son muy bonitos y hay ...,,,,,
1,"x""",,,,,
2,82481,53081880.0,2015-11-04,24664309.0,Dominic,Mercedes hat uns freundlich empfangen. Unsere ...


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model


## **Star Schema**

1. **Dimension Tables** 
   
    - listings_table
            . listing_id,host_id,host_name,room_type,price,minimum_nights,number_of_reviews,calculated_host_listings_count,availability_365
    - calendar_table
           . listing_id,listing_date,available,price,adjusted_price,minimum_nights,maximum_nights
    - review_table
           . listing_id,host_id,date,reviewer_id,reviewer_name,comments
1. **Fact Table** 

   - listings_fact_table
        .      listing_id,host_id,host_name,calculated_host_listings_count,listing_date,reviewer_id,reviewer_name
        

The star schema is best and simple choice for data anlayzation. 
        
   
   
#### 3.2 Mapping Out Data Pipelines
1. Dimension tables will be created from cleansed data.
2. Fact table is created as a SQL query with joins to dimension tables.
3. Fact table is converted back to a spark dataframe.
4. Fact table is written as final parquet file.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [0]:
# Create Dimension tables
df_listings.createOrReplaceTempView("listings")
df_calendar.createOrReplaceTempView("calendar")
df_reviews.createOrReplaceTempView("reviews")

#allow unlimited time for SQL joins and parquet writes.
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "0") 

In [0]:
# Create fact table by joining dimension tables
# to know what is the name of host has the most listings in 2019
listings_fact_table=spark.sql("""

SELECT listings.listing_id as listing_id,
       listings.host_id as host_id,
       listings.host_name as host_name,
       listings.calculated_host_listings_count as numbert_of_listings,
       calendar.listing_date as listing_date
FROM listings
JOIN  calendar ON (listings.listing_id = calendar.listing_id)
WHERE
   calendar.listing_date BETWEEN '2019-01-1' AND '2019-10-30'                        """)

In [0]:
# Write fact table to parquet
listings_fact_table.write.parquet("listings_fact_table")

AnalysisException: 'path file:/home/workspace/listings_fact_table already exists.;'

In [0]:
listings_fact_table.toDF('listing_id', ' host_id', 'host_name', 'numbert_of_listings', \
          'listing_date').show(5)

+----------+--------+---------+-------------------+------------+
|listing_id| host_id|host_name|numbert_of_listings|listing_date|
+----------+--------+---------+-------------------+------------+
|    167183|  796746| Consuelo|                  2|  2019-09-20|
|    167183|  796746| Consuelo|                  2|  2019-09-23|
|    167183|  796746| Consuelo|                  2|  2019-09-25|
|    167183|  796746| Consuelo|                  2|  2019-10-17|
|    167183|  796746| Consuelo|                  2|  2019-10-04|
+----------+--------+---------+-------------------+------------+
only showing top 5 rows



#### 4.2 Data Quality Checks
To ensure the pipeline ran as expected data will br checked if listings_id is not null and has no dublicate. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [0]:
# Check for NULL values in listing_id what should be unique.
# If false is retured for  column selected then the data is fine across the dataset
listings_fact_table.select(isnull('listing_id').alias('listing_id')).dropDuplicates().show()

+----------+
|listing_id|
+----------+
|     false|
+----------+



In [0]:
# apply the query that answer analyst question
# 1. query the max number of listings 
listings_fact_table.select(max('numbert_of_listings').alias('calculated_host_listings_count')).show()

+------------------------------+
|calculated_host_listings_count|
+------------------------------+
|                             9|
+------------------------------+



In [0]:
# quert the host_name , listing_id and   has 9 listings as max listings number
listings_fact_table.select(["listing_id", "host_name", "numbert_of_listings"]).where(listings_fact_table.numbert_of_listings ==9).sort("host_name").collect()

[Row(listing_id='119389', host_name='Alvaro', numbert_of_listings='9'),
 Row(listing_id='119389', host_name='Alvaro', numbert_of_listings='9'),
 Row(listing_id='119389', host_name='Alvaro', numbert_of_listings='9'),
 Row(listing_id='119389', host_name='Alvaro', numbert_of_listings='9'),
 Row(listing_id='119389', host_name='Alvaro', numbert_of_listings='9'),
 Row(listing_id='119389', host_name='Alvaro', numbert_of_listings='9'),
 Row(listing_id='119389', host_name='Alvaro', numbert_of_listings='9'),
 Row(listing_id='119389', host_name='Alvaro', numbert_of_listings='9'),
 Row(listing_id='119389', host_name='Alvaro', numbert_of_listings='9'),
 Row(listing_id='119389', host_name='Alvaro', numbert_of_listings='9'),
 Row(listing_id='119389', host_name='Alvaro', numbert_of_listings='9'),
 Row(listing_id='119389', host_name='Alvaro', numbert_of_listings='9'),
 Row(listing_id='119389', host_name='Alvaro', numbert_of_listings='9'),
 Row(listing_id='119389', host_name='Alvaro', numbert_of_listing

In [0]:
# After quried data that shown there are more than 1 host has the same maximum number of listing and that their name 
listings_fact_table.select(["listing_id", "host_name", "numbert_of_listings"]).where(listings_fact_table.numbert_of_listings ==9).dropDuplicates().sort("host_name").collect()

[Row(listing_id='119389', host_name='Alvaro', numbert_of_listings='9'),
 Row(listing_id='762701', host_name='Alvaro', numbert_of_listings='9'),
 Row(listing_id='72955', host_name='Alvaro', numbert_of_listings='9'),
 Row(listing_id='256004', host_name='Anouk', numbert_of_listings='9'),
 Row(listing_id='217674', host_name='Anouk', numbert_of_listings='9'),
 Row(listing_id='688795', host_name='Anouk', numbert_of_listings='9'),
 Row(listing_id='967421', host_name='Daniel', numbert_of_listings='9'),
 Row(listing_id='150123', host_name='Eloisa', numbert_of_listings='9'),
 Row(listing_id='599977', host_name='IVAN And PIETRO', numbert_of_listings='9'),
 Row(listing_id='337209', host_name='Ignacio', numbert_of_listings='9'),
 Row(listing_id='374465', host_name='Ignacio', numbert_of_listings='9'),
 Row(listing_id='521162', host_name='Raúl', numbert_of_listings='9')]

> #### The hosts names have the most number of listings 
>
> - Alvaro
> - Anouk
> - Daniel
> - Eloisa
> - IVAN And PIETRO
> - Raúl
>
>  *6* hosts going according to **analytics**.

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

<span style="color:red">Look dictionary.txt file </span>




#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

<span style="color:green">In this project Spark used to model data and ETL pipeline </span>

<span style="color:green"> Data can be updated using spark by udating , inserting rows and adding new column</span>

<span style="color:green"> if the data increased and the one PC not enough spark data lakes can be used to partition data into multiple PCs</span>

<span style="color:green"> if the must be updated on a daily basis by 7am every day Airflow can be use to make automatically ETL procecess 
</span>

 <span style="color:green">  if The database needed to be accessed by 100+ people AWS can be used to handle multiple users  </span>
