# EMR ETL Notebook
- this notebook is tested to run on AWS EMR cluster with configuration listed in docs/aws_create_cluster.txt and config/spark-config
- Contents
    - **ETL Part I: preprocess raw data into parquet files**
    - **ETL Part II: create dimensional model using the preprocessed data**
  
- outputs are tested in etl_notebooks/emr-etl-test-notebook.ipynb
- /apps folder contains .py files with identical ETL code, they can be run using spark-submit
- to run and test ETL Part III use redshift-etl-notebook.ipynb




In [None]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import *
from sparknlp.annotator import *
import boto3

In [None]:
## Paths    
TEST = False
scrape_year_month = '2021-01'

# S3
path_global_listings = 'airbnb-listings.csv'
path_city_listings = f'cities/*/{scrape_year_month}/listings.csv'    
path_city_reviews = f'cities/*/{scrape_year_month}/reviews.csv'
path_city_temperature = "weather/ECA_blend_tg/*.txt"
path_city_rain = "weather/ECA_blend_rr/*.txt"

raw_data_folder = "raw"
# input_parquet_folder = "input_parquets_notebook"
# dim_model_folder = "dim_model_notebook"
# dim_model_folder_new = "dim_model_notebook_temp"
input_parquet_folder = "input_parquets_airflow"
dim_model_folder = "dim_model_airflow"
dim_model_folder_new = "dim_model_airflow_temp"
s3_path = "s3://{}/{}/{}"

bucket_name = "airbnbprj-us"

if TEST:
    input_parquet_folder += "_test"
    dim_model_folder += "_test"
    dim_model_folder_new += "_test"

raw_global_listings_path = s3_path.format(bucket_name, raw_data_folder, path_global_listings)
raw_city_listings_path = s3_path.format(bucket_name, raw_data_folder, path_city_listings)
raw_city_reviews_path = s3_path.format(bucket_name, raw_data_folder, path_city_reviews)
raw_city_temperature_path = s3_path.format(bucket_name, raw_data_folder, path_city_temperature)
raw_city_rain_data_path = s3_path.format(bucket_name, raw_data_folder, path_city_rain)

path_out_global_listings = s3_path.format(bucket_name, input_parquet_folder, 'global_listings.parquet')
path_out_city_listings_data = s3_path.format(bucket_name, input_parquet_folder, f'city_listings/{scrape_year_month}/city_listings.parquet')
path_out_city_reviews_data = s3_path.format(bucket_name, input_parquet_folder, f'city_reviews/{scrape_year_month}/city_reviews.parquet')
path_out_city_temperature_data = s3_path.format(bucket_name, input_parquet_folder, 'city_temperature.parquet')
path_out_city_rain_data = s3_path.format(bucket_name, input_parquet_folder, 'city_rain.parquet')
path_out_weather_stations = s3_path.format(bucket_name, input_parquet_folder, 'weather_stations.parquet')

dim_model_listings = s3_path.format(bucket_name, dim_model_folder, 'listings.csv')
dim_model_hosts = s3_path.format(bucket_name, dim_model_folder, 'hosts.csv')
dim_model_reviews = s3_path.format(bucket_name, dim_model_folder, 'reviews.csv')
dim_model_reviewers = s3_path.format(bucket_name, dim_model_folder, 'reviewers.csv')
dim_model_weather = s3_path.format(bucket_name, dim_model_folder, 'weather.csv')

dim_model_listings_new = s3_path.format(bucket_name, dim_model_folder_new, 'listings.csv')
dim_model_hosts_new = s3_path.format(bucket_name, dim_model_folder_new, 'hosts.csv')
dim_model_reviews_new = s3_path.format(bucket_name, dim_model_folder_new, 'reviews.csv')
dim_model_reviewers_new = s3_path.format(bucket_name, dim_model_folder_new, 'reviewers.csv')
dim_model_weather_new = s3_path.format(bucket_name, dim_model_folder_new, 'weather.csv')

dim_model_reviews_step1 = s3_path.format(bucket_name, dim_model_folder_new, 'reviews_step1.csv')
dim_model_reviews_step2 = s3_path.format(bucket_name, dim_model_folder_new, 'reviews_step2.csv')

In [None]:
paths = [
raw_global_listings_path ,
raw_city_listings_path ,
raw_city_reviews_path ,
raw_city_temperature_path ,
raw_city_rain_data_path ,
path_out_global_listings ,
path_out_city_listings_data ,
path_out_city_reviews_data ,
path_out_city_temperature_data ,
path_out_city_rain_data ,
path_out_weather_stations ,
dim_model_listings ,
dim_model_hosts ,
dim_model_reviews ,
dim_model_reviewers ,
dim_model_weather ,
dim_model_listings_new ,
dim_model_hosts_new ,
dim_model_reviews_new ,
dim_model_reviewers_new ,
dim_model_weather_new,
dim_model_reviews_step1,
dim_model_reviews_step2]
for path in paths:
    print(path)

In [None]:
s3_client = boto3.client('s3')
s3 = boto3.resource('s3')

In [None]:
def model_exists(path):
    response = s3_client.list_objects(Bucket=bucket_name, MaxKeys=1, Prefix=path.replace(f"s3://{bucket_name}/",""))
    if 'Contents' not in response:
        return False
    else:
        return True

# Part I - preprocessing raw data

## Global listings
- read as csv
- drop columns that are not relevant
- rename columns
- save as parquet

In [None]:
if not model_exists(path_out_global_listings):
    df_global_listings = spark.read.csv(raw_global_listings_path, header="True", inferSchema="True",multiLine="True",escape='"',ignoreLeadingWhiteSpace="True", sep=";")
    df_global_listings = df_global_listings.toDF(*[column.replace(" ","_").lower() for column in df_global_listings.columns])
    columns_to_drop = ['xl_picture_url', 'cancellation_policy', 'access', 'features\r', 'zipcode', 'country_code', 'smart_location',\
      'country', 'security_deposit', 'medium_url', 'transit', 'cleaning_fee', 'street', 'experiences_offered', \
      'thumbnail_url', 'extra_people', 'weekly_price', 'notes', 'house_rules', 'monthly_price', \
      'summary', 'square_feet', 'interaction', 'state','jurisdiction_names', 'market', 'geolocation', \
      'space', 'bed_type', 'guests_included']
    df_global_listings = df_global_listings.drop(*columns_to_drop)
    df_global_listings = df_global_listings.withColumn('scrape_year', F.year(F.col('last_scraped'))).withColumn('scrape_month', F.month(F.col('last_scraped')))
    
    if TEST:    
        df_global_listings.filter("city = 'Amsterdam'").write.partitionBy('scrape_year','scrape_month').parquet(path_out_global_listings)
    else:    
        df_global_listings.write.partitionBy('scrape_year','scrape_month').parquet(path_out_global_listings)

## Local listings (Amsterdam, Berlin, London, Paris), scraped in {scrape_year_month}
- read as csv
- extract new column city from filename
- write as parquet

In [None]:
if not model_exists(path_out_city_listings_data):
    df_city_listings = spark.read.csv(raw_city_listings_path, header="True", inferSchema="True",multiLine="True",escape='"',ignoreLeadingWhiteSpace="True")
    df_city_listings = df_city_listings.withColumn("city",F.element_at(F.split(F.input_file_name(),"/"), -3))
    df_city_listings = df_city_listings.withColumn('scrape_year', F.year(F.col('last_scraped'))).withColumn('scrape_month', F.month(F.col('last_scraped')))

    if TEST:    
        df_city_listings.filter("city = 'Amsterdam'").write.partitionBy('scrape_year','scrape_month').parquet(path_out_city_listings_data)
    else:    
        df_city_listings.write.partitionBy('scrape_year','scrape_month').parquet(path_out_city_listings_data)

## Local reviews (Amsterdam, Berlin, London, Paris) scraped in {scrape_year_month}
- read as csv
- extract new column city from filename
- write as parquet

In [None]:
if not model_exists(path_out_city_reviews_data):
    df_city_reviews = spark.read.csv(raw_city_reviews_path, header="True", inferSchema="True",multiLine="True",escape='"',ignoreLeadingWhiteSpace="True")
    df_city_reviews = df_city_reviews.withColumn("city",F.element_at(F.split(F.input_file_name(),"/"), -3))
    df_city_reviews = df_city_reviews.withColumn('year', F.year(F.col('date'))).withColumn('month', F.month(F.col('date')))
    
    if TEST:    
        df_city_reviews.filter("city = 'Amsterdam'").write.partitionBy('year','month','city').parquet(path_out_city_reviews_data)
    else:
        df_city_reviews.write.partitionBy('year','month','city').parquet(path_out_city_reviews_data)

## Weather data
- read as text
- skip multiline header and keep only rows with actual data
- write as parquet

In [None]:
if not model_exists(path_out_city_temperature_data):
    text = sc.textFile(raw_city_temperature_path) \
        .map(lambda line: line.replace(" ","").split(",")) \
        .filter(lambda line: len(line)==5) \
        .filter(lambda line: line[0]!="STAID")

    df = spark.createDataFrame(text)  
    columns = ["STAID", "SOUID", "DATE", "TG", "Q_TG"]
    df = df.toDF(*columns)
    df.write.parquet(path_out_city_temperature_data)

In [None]:
if not model_exists(path_out_city_rain_data):
    text = sc.textFile(raw_city_rain_data_path) \
        .map(lambda line: line.replace(" ","").split(",")) \
        .filter(lambda line: len(line)==5) \
        .filter(lambda line: line[0]!="STAID")

    df = spark.createDataFrame(text)  
    columns = ["STAID", "SOUID", "DATE", "RR", "Q_TG"]
    df = df.toDF(*columns)
    df.write.parquet(path_out_city_rain_data)

In [None]:
# create manual 'look-up' table joining weather station id's and city name
if not model_exists(path_out_weather_stations):
    station_city = [(593,'Amsterdam'), (41,'Berlin'), (1860,'London'),(11249,'Paris')]
    columns = ["STAID","city"]
    df_stations = spark.createDataFrame(data=station_city, schema = columns)
    df_stations.write.parquet(path_out_weather_stations)

# Part II - Dimensional model
The dimensional model comprises 5 tables:
- Fact table
    - reviews
- Dimensional tables
    - reviewers
    - listings
    - hosts
    - weather

## Listings table
- uniquely identified by listing_id
- source data are extracted from raw listings data

In [None]:
# if listings table does not exit then read global listings data,
# append new null columns and sort them alphabetically to allow merging with city listings data later    
if not model_exists(dim_model_listings):
    df_listings_global = spark.read.parquet(path_out_global_listings)
    df_listings_global.createOrReplaceTempView("global")
    query="""
    SELECT *, cast(null as string) as bathrooms_text, cast(null as integer) as calculated_host_listings_count_entire_homes, cast(null as integer) as calculated_host_listings_count_private_rooms,
     cast(null as integer) as calculated_host_listings_count_shared_rooms, cast(null as string) as host_has_profile_pic, cast(null as string) as host_identity_verified, cast(null as string) as host_is_superhost,
     cast(null as string) as instant_bookable, cast(null as integer) as maximum_maximum_nights, cast(null as integer) as maximum_minimum_nights, cast(null as double) as maximum_nights_avg_ntm, cast(null as integer) as minimum_maximum_nights,
     cast(null as integer) as minimum_minimum_nights, cast(null as double) as minimum_nights_avg_ntm, cast(null as integer) as number_of_reviews_l30d, cast(null as integer) as number_of_reviews_ltm
    FROM global
    """
    df_listings_hosts = spark.sql(query)
    df_listings_hosts = df_listings_hosts.select(sorted(df_listings_hosts.columns))
    df_listings_hosts = df_listings_hosts.withColumnRenamed("id","listing_id")

In [None]:
if not model_exists(dim_model_listings):
    # drop columns that are later included in hosts table, keep host_id
    columns_to_drop = ["host_name", "host_url", "host_since", "host_location", "host_about", "host_response_time", "host_response_rate", "host_acceptance_rate",
    "host_is_superhost", "host_thumbnail_url", "host_picture_url", "host_neighbourhood", "host_listings_count",
    "host_total_listings_count", "host_verifications", "host_has_profile_pic", "host_identity_verified"]
    df_listings = df_listings_hosts.drop(*columns_to_drop)    

In [None]:
# if listings table already exists then read it. It will be merged with monthly listings data later
if model_exists(dim_model_listings):    
    df_listings = spark.read.csv(dim_model_listings,header="True", inferSchema="True",multiLine="True",escape='"',ignoreLeadingWhiteSpace="True")

In [None]:
# read monthly listings data and sort columns to allow merging
df_listings_hosts_monthly = spark.read.parquet(path_out_city_listings_data)
df_listings_hosts_monthly = df_listings_hosts_monthly.select(sorted(df_listings_hosts_monthly.columns))

In [None]:
# drop columns that are later included in hosts table, keep host_id
columns_to_drop = ["host_name", "host_url", "host_since", "host_location", "host_about", "host_response_time", "host_response_rate", "host_acceptance_rate",
"host_is_superhost", "host_thumbnail_url", "host_picture_url", "host_neighbourhood", "host_listings_count",
"host_total_listings_count", "host_verifications", "host_has_profile_pic", "host_identity_verified"]
df_listings_monthly = df_listings_hosts_monthly.drop(*columns_to_drop).withColumnRenamed("id","listing_id")

In [None]:
# merge global and local listings, drop duplicates by filtering by latest scrape date
df_listings_updated = df_listings.union(df_listings_monthly)
windowSpec  = Window.partitionBy("listing_id").orderBy("last_scraped").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)                                             
df_listings_updated = df_listings_updated.withColumn("latest", F.last("last_scraped").over(windowSpec))\
                      .filter("last_scraped == latest")\
                      .dropDuplicates(["listing_id"])\
                      .drop('latest')

In [None]:
df_listings_updated.write.csv(dim_model_listings_new, escape='"', header="true")          

## Hosts table
- uniquely identified by host_id
- source data are extracted from raw listings data

In [None]:
# If dimensional model does not exist yet, then create hosts table from raw listings data
if not model_exists(dim_model_listings):
    df_listings_hosts.createOrReplaceTempView("listings")
    query="""
    SELECT host_id, host_name, host_url, host_since, host_location, host_about, host_response_time, host_response_rate, host_acceptance_rate,
    host_is_superhost, host_thumbnail_url, host_picture_url, host_neighbourhood, host_listings_count,
    host_total_listings_count, host_verifications, host_has_profile_pic, host_identity_verified, last_scraped     
    FROM listings
    """
    df_hosts = spark.sql(query)      

In [None]:
if not model_exists(dim_model_listings):
    # drop duplicates to keep unique hosts
    windowSpec  = Window.partitionBy("host_id").orderBy("last_scraped").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)                                             
    df_hosts = df_hosts.withColumn("latest", F.last("last_scraped").over(windowSpec))\
                       .filter("last_scraped == latest")\
                       .dropDuplicates(["host_id"])\
                       .drop('latest')

In [None]:
# if dimensional model already exists (testing listings model) then read the hosts table, it will be merged with hosts data from monthly listings data
if model_exists(dim_model_listings):    
    # read existing hosts table
    df_hosts = spark.read.csv(dim_model_hosts,header="True", inferSchema="True",multiLine="True",escape='"',ignoreLeadingWhiteSpace="True")

In [None]:
# create hosts table
df_listings_hosts_monthly.createOrReplaceTempView("listings")
query="""
SELECT host_id, host_name, host_url, host_since, host_location, host_about, host_response_time, host_response_rate, host_acceptance_rate,
host_is_superhost, host_thumbnail_url, host_picture_url, host_neighbourhood, host_listings_count,
host_total_listings_count, host_verifications, host_has_profile_pic, host_identity_verified, last_scraped     
FROM listings
"""
df_hosts_monthly = spark.sql(query)      

In [None]:
# merge hosts data extracted from montly listings data with existing host table
df_hosts_updated = df_hosts.union(df_hosts_monthly)
windowSpec  = Window.partitionBy("host_id").orderBy("last_scraped").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)                                             
df_hosts_updated = df_hosts_updated.withColumn("latest", F.last("last_scraped").over(windowSpec))\
                   .filter("last_scraped == latest")\
                   .dropDuplicates(["host_id"])\
                   .drop('latest')

In [None]:
df_hosts_updated.write.csv(dim_model_hosts_new, escape='"', header="true")        

## Reviews table
- uniquely identified by reviews_id
- contains "foreign keys" to other tables host_id, listing_id, weather_id, reviewer_id
- source data are extracted from raw reviews data
- includes new data obtained by NLP processing: comment_language, (comment) sentiment

Data:
- reviews table from existing dimensional model (if exists)
- monthly reviews update containing some but not all reviews from previous months
- listings table from dimensional model

Steps:
1. Find all rows in monthly update where review_id is not in dimensional model already (select all rows if there is no dimensional model)
2. using 1: join with listings table from dimensional model to get host_id
3. using 2: run language detection
4. using 3: run sentiment analysis on english comments
5. Combine rows (union) of existing dimensional model and result from step 4

### Step 1&2: Find new reviews and update their host_id

In [None]:
df_reviews_monthly = spark.read.parquet(path_out_city_reviews_data)

In [None]:
df_listings = spark.read.csv(dim_model_listings_new,header="True", inferSchema="True",multiLine="True",escape='"',ignoreLeadingWhiteSpace="True")

In [None]:
if not model_exists(dim_model_reviews):
    df_reviews_delta = df_reviews_monthly
    
else:
    df_reviews = spark.read.csv(dim_model_reviews,header="True", inferSchema="True",multiLine="True",escape='"',ignoreLeadingWhiteSpace="True")
    df_reviews.createOrReplaceTempView("reviews")
    df_reviews_monthly.createOrReplaceTempView("reviews_monthly")

    query="""
    SELECT *
    FROM reviews_monthly
    WHERE reviews_monthly.date >= 
        (SELECT max(reviews.date)
         FROM reviews)   
    """
    df_reviews_delta = spark.sql(query)

In [None]:
df_reviews_delta.createOrReplaceTempView("reviews_delta")
df_listings.createOrReplaceTempView("listings")

query="""
SELECT r.id as review_id, r.reviewer_id, r.listing_id, listings.host_id as host_id, concat_ws("_",r.city, r.date) as weather_id, r.date, r.reviewer_name, r.comments 
FROM reviews_delta r
LEFT JOIN listings
ON r.listing_id == listings.listing_id
"""
df_reviews_delta = spark.sql(query)

In [None]:
df_reviews_delta.write.csv(dim_model_reviews_step1, escape='"', header="true")        

### Step 3: Detect comment language

In [None]:
if TEST:
    df_reviews_delta = df_reviews_delta.limit(10000)

In [None]:
# Detect language, translate, detect sentiment
#spark = sparknlp.start()
language_detector = PretrainedPipeline('detect_language_220', lang='xx')
df_result = language_detector.annotate(df_reviews_delta, column="comments")
df_reviews_delta2 = df_result.withColumn("comment_language", F.concat_ws(",",F.col("language.result"))).drop("document").drop("sentence").drop("language").withColumnRenamed('text','comments')

In [None]:
df_reviews_delta2.write.csv(dim_model_reviews_step2, escape='"', header="true")        

### Step 4: Detect sentiment of english comments

In [None]:
df_reviews_delta2 = spark.read.csv(dim_model_reviews_step2,header="True", inferSchema="True",multiLine="True",escape='"',ignoreLeadingWhiteSpace="True")

In [None]:
sentiment_analyzer = PretrainedPipeline('analyze_sentimentdl_use_imdb', lang='en')
df_result_sentiment = sentiment_analyzer.annotate(df_reviews_delta2.filter(F.col("comment_language") == 'en'), column="comments")
df_result_sentiment = df_result_sentiment.withColumn("sentiment", F.concat_ws(",",F.col("sentiment.result"))).drop("document").drop("sentence_embeddings").withColumnRenamed('text','comments')

In [None]:
df_reviews_null = df_reviews_delta2.filter("comment_language is null").withColumn("sentiment", F.lit('n/a'))
df_reviews_delta3 = df_reviews_delta2.filter("comment_language != 'en'").withColumn("sentiment", F.lit('n/a'))\
                    .union(df_result_sentiment)\
                    .union(df_reviews_null)

### Step 5: Combine to create new reviews table

In [None]:
if not model_exists(dim_model_reviews):
    df_reviews_delta3.write.csv(dim_model_reviews_new, escape='"', header="true")        
else:
    df_reviews = spark.read.csv(dim_model_reviews,header="True", inferSchema="True",multiLine="True",escape='"',ignoreLeadingWhiteSpace="True")
    df_reviews_updated = df_reviews.union(df_reviews_delta3)
    # Its necessary to drop duplicates since some of the reviews submitted at the scrape date will be included twice
    df_reviews_updated = df_reviews_updated.dropDuplicates(["review_id"])
    df_reviews_updated.write.csv(dim_model_reviews_new, escape='"', header="true")        

## Reviewers table
- uniquely identified by reviewer_id
- it is extracted from the reviews table
- includes new data: languages_spoken - a list of languages used in reviews for each unique reviewer

In [None]:
df_reviews = spark.read.csv(dim_model_reviews_new,header="True", inferSchema="True",multiLine="True",escape='"',ignoreLeadingWhiteSpace="True")

In [None]:
windowSpec  = Window.partitionBy("reviewer_id").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)                                             
df_reviewers = df_reviews \
               .withColumn("languages_spoken", F.collect_set('comment_language').over(windowSpec))\
               .withColumn("latest", F.last("date").over(windowSpec))\
               .filter("date == latest")\
               .dropDuplicates(["reviewer_id"])\
               .select("reviewer_id","reviewer_name", "languages_spoken", "date")\
               .withColumnRenamed("date","last_updated")
df_reviewers = df_reviewers.withColumn("languages_spoken",F.array_join("languages_spoken",","))

In [None]:
df_reviewers.write.csv(dim_model_reviewers_new, escape='"', header="true")

### Weather table
- uniquely identified by weather_id
- shows temperature and rain per date and city
- source data: weather data

In [None]:
df_temp = spark.read.parquet(path_out_city_temperature_data)
df_rain = spark.read.parquet(path_out_city_rain_data)
df_stations = spark.read.parquet(path_out_weather_stations)

In [None]:
df_temp.createOrReplaceTempView("temp")
df_rain.createOrReplaceTempView("rain")
df_stations.createOrReplaceTempView("stations")

query="""
SELECT null as weather_id,to_date(temp.DATE, "yyyyMMdd") as date, temp.TG/10 as temperature, rain.RR/10 as rain, stations.city
FROM temp
JOIN rain
ON temp.DATE == rain.DATE
AND temp.STAID == rain.STAID
JOIN stations
ON temp.STAID == stations.STAID
WHERE to_date(temp.DATE, "yyyyMMdd") > to_date('20090101',"yyyyMMdd")
ORDER BY date
"""
df_weather = spark.sql(query)
df_weather = df_weather.withColumn("weather_id",F.concat_ws("_","city", "date"))

In [None]:
df_weather.write.csv(dim_model_weather_new, escape='"', header="true")