In [None]:
#import necessary dependencies
from pyspark.context import SparkContext
from awsglue.context import GlueContext
import boto3
import io
import sys
import os
import random
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, StructType, StructField
from pyspark.sql.window import Window
from awsglue.transforms import *
from awsglue.transforms import ApplyMapping
from awsglue.dynamicframe import DynamicFrame

In [None]:
#initialize spark context and glue context
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

In [None]:
# create dynamicframes 
def create_dynamic_frame_from_csv(file):
    """
    This function creates a dynamic frame from a CSV file
  
    Parameter(csv file): The CSV file to create a dynamic frame
    
    Returns: The dynamic frame created from the CSV file
    """

    dynamic_frame = glueContext.create_dynamic_frame.from_options(
        format_options={"withHeader": True, "separator": ","},
        connection_type="s3",
        format="csv",
        connection_options={"paths": [file], "recurse": True},
        transformation_ctx="airbnbETLpipeline"
  )
    return dynamic_frame

In [None]:
# create dynamicframes for all the cities
dubaiData = create_dynamic_frame_from_csv("s3://airbnb-listings-data/raw-data/DubaiData_transalated.csv")
losAngelesData=  create_dynamic_frame_from_csv('s3://airbnb-listings-data/raw-data/LAData.csv')
londonData  = create_dynamic_frame_from_csv('s3://airbnb-listings-data/raw-data/LondonData.csv')
miamiData = create_dynamic_frame_from_csv('s3://airbnb-listings-data/raw-data/MiamiData.csv')
newYorkCityData = create_dynamic_frame_from_csv('s3://airbnb-listings-data/raw-data/NYCData.csv')
sanFranciscoData = create_dynamic_frame_from_csv('s3://airbnb-listings-data/raw-data/SanFransiscoData.csv')
sydneyData = create_dynamic_frame_from_csv('s3://airbnb-listings-data/raw-data/SydneyData.csv')
tokyoData =  create_dynamic_frame_from_csv('s3://airbnb_listings-bucket/raw_data/TokyoData_with_zipcodes.csv')
torontoData  = create_dynamic_frame_from_csv('s3://airbnb_listings-bucket/raw_data/TorontoData_with_zipcodes.csv')

In [None]:
#convert dynamicframe to dataframe and check the schema to map the columns
dubaiData.printSchema()

In [None]:
# create a list of all the dynamic frames
mapped_data = [
    ("Listing Title", "string", "listingTitle", "string"),
    ("Property Type", "string", "propertyType", "string"),
    ("Listing Type", "string", "listingType", "string"),
    ("Created Date", "string", "createdDate", "string"),
    ("Last Scraped Date", "string", "lastScrapedDate", "string"),
    ("Country", "string", "country", "string"),
    ("State", "string", "state", "string"),
    ("City", "string", "city", "string"),
    ("Zipcode", "string", "zipcode", "string"),
    ("Longitude", "string", "longitude", "float"),  
    ("guest_controls", "string", "guestControls", "string"), 
    ("Airbnb Location Rating", "string", "airbnbLocationRating", "int"), 
    ("Currency Native", "string", "currencyNative", "string"),
    ("Average Daily Rate (Native)", "float", "averageDailyRateNative", "float"), 
    ("Average Daily Rate (USD)", "float", "averageDailyRateUSD", "float"), 
    ("Count Blocked Days LTM", "string", "countBlockedDaysLTM", "int"), 
    ("Bedrooms", "string", "bedrooms", "int"), 
    ("Airbnb Property ID", "string", "airbnbPropertyID", "string"), 
    ("Check-in Time", "string", "checkInTime", "string"), 
    ("Bathrooms", "string", "bathrooms", "int"), 
    ("Airbnb Communication Rating", "string", "airbnbCommunicationRating", "int"),
    ("Airbnb Host ID", "string", "airbnbHostID", "string"),
    ("Pets Allowed", "string", "petsAllowed", "string"),
    ("Extra People Fee(Native)", "string", "extraPeopleFeeNative", "float"),
    ("License", "string", "license", "string"),
    ("Instantbook Enabled", "string", "instantbookEnabled", "string"),
    ("Amenities", "string", "amenities", "string"),
    ("Overall Rating", "string", "overallRating", "float"),
    ("Airbnb Accuracy Rating", "string", "airbnbAccuracyRating", "int"),
    ("Cancellation Policy", "string", "cancellationPolicy", "string"),
    ("Cleaning Fee (USD)", "string", "cleaningFeeUSD", "float"),
    ("Listing URL", "string", "listingURL", "string"),
    ("instant_bookable", "string", "instantBookable", "string"),
    ("picture_url", "string", "pictureUrl", "string"),
    ("Latitude", "string", "latitude", "float"),
    ("Count Available Days LTM", "int", "countavailableDaysLTM", "int"),
    ("Count Reservation Days LTM", "int", "countReservationDaysLTM", "int"),
    ("Number of Reviews", "string", "numberofReviews", "int"),
    ("Checkout Time", "string", "checkoutTime", "string"),
    ("Airbnb Value Rating", "string", "airbnbValueRating", "int"),
    ("Extra People Fee (USD)", "string", "extraPeopleFeeUSD", "float"),
    ("Airbnb Checkin Rating", "string", "airbnbCheckinRating", "int"),
    ("Airbnb Superhost", "string", "airbnbSuperhost", "string"),
    ("Exact Location", "string", "exactLocation", "string"),
    ("Host Listing Count", "string", "hostListingCount", "int"),
    ("Minimum Stay", "string", "minimumStay", "int"),
    ("Occupancy Rate LTM", "string", "occupancyRateLTM", "int"),
    ("Max Guests", "string", "maxGuests", "int"),
    ("Number of Bookings LTM", "string", "numberOfBookingsLTM", "int"),
    ("Number of Bookings LTM - Number of observed month", "string", "numberOfBookingsLTMObservedMonth", "int"),
    ("Airbnb Cleanliness Rating", "string", "airbnbCleanlinessRating", "int"),
    ("Annual Revenue LTM (USD)", "string", "annualRevenueLTMUSD", "float"), 
    ("Annual Revenue LTM (Native)", "string", "annualRevenueLTMNative", "float"), 
]


In [None]:
def applymappings(df, mapped_data, transform_ctx_tag):
    """ 
    This function applies mappings to the dataframe

    Parameters:
    df: The dataframe to apply mappings to
    mapped_data: The mapping data
    transform_ctx_tag: The transformation context tag
    Returns: The mapped dataframe
    """
    mapped_df = ApplyMapping.apply(
        frame = df,
        mappings = mapped_data,
        transformation_ctx = transform_ctx_tag
    )
    
    return mapped_df

In [None]:
# apply mappings to all the cities
mapped_losAngelesData = applymappings(losAngelesData, mapped_data, "mappedLosAngelesData")
mapped_londonData = applymappings(londonData, mapped_data, "mappedLondonData")
mapped_newYorkCityData = applymappings(newYorkCityData, mapped_data, "mappedNewYorkCityData")
mapped_sanFranciscoData = applymappings(sanFranciscoData, mapped_data, "mappedSanFranciscoData")
mapped_sydneyData = applymappings(sydneyData, mapped_data, "mappedSydneyData")
mapped_tokyoData = applymappings(tokyoData, mapped_data, "mappedTokyoData")

In [None]:
# Apply mapping fucntions to miamiData and add column differences
mapped_data_copy_miami = mapped_data.copy()

mapped_data_copy_miami.extend([("Neighbourhood", "string", "neighbourhood", "string"), 
                           ("Metropolitan Statistical Area", "string", "metropolitan_statistical_area'", "string"),
                           ("Last Host Count Updated Date", "string", "last_host_count_updated_date", "string")])

mapped_miamiData = applymappings(miamiData, mapped_data_copy_miami, "mappedMiamiData")

In [None]:
# Apply mapping fucntions to dubaiData and add column differences
mapped_data_copy_dubai = mapped_data.copy()

mapped_data_copy_dubai.remove(("State", "string", "state", "string"))

mapped_dubaiData = applymappings(dubaiData, mapped_data_copy_dubai, 'mappedDubaiData')

In [None]:
# Convert dynamic frame to spark dataframe
losAngelesData_df = mapped_losAngelesData.toDF()
londonData_df = mapped_londonData.toDF() 
miamiData_df = mapped_miamiData.toDF() 
newYorkCityData_df = mapped_newYorkCityData.toDF() 
sanFranciscoData_df = mapped_sanFranciscoData.toDF() 
sydneyData_df = mapped_sydneyData.toDF() 
tokyoData_df = mapped_tokyoData.toDF()
dubaiData_df = mapped_dubaiData.toDF() 
torontoData_df = mapped_dubaiData.toDF()

In [None]:
# store data in dictionary
airbnb_listings_dict = {
    "dubaiData": dubaiData_df,
    "losAngelesData": losAngelesData_df,
    "londonData": londonData_df,
    "miamiData": miamiData_df,
    "newYorkCityData": newYorkCityData_df,
    "sanFranciscoData": sanFranciscoData_df,
    "sydneyData": sydneyData_df,
    "tokyoData": tokyoData_df,
    "torontoData": torontoData_df
}

In [None]:
# show the first 10 rows of the dubaiData
airbnb_listings_dict["dubaiData"].show(10)

In [None]:
# Find common columns across all the dataframes 
unique_columns = set(airbnb_listings_dict['miamiData'].columns).intersection(
    set(airbnb_listings_dict['dubaiData'].columns), 
    set(airbnb_listings_dict['losAngelesData'].columns),
    set(airbnb_listings_dict['londonData'].columns),
    set(airbnb_listings_dict['newYorkCityData'].columns),
    set(airbnb_listings_dict['sanFranciscoData'].columns),
    set(airbnb_listings_dict['sydneyData'].columns),
    set(airbnb_listings_dict['tokyoData'].columns),
    set(airbnb_listings_dict['torontoData'].columns)
)
print(f'{unique_columns}\n Number of columns in common: {len(unique_columns)}')

In [None]:
# Find the column differences in comparison to miami dynamic frame with thighest number of columns
print(f"miami_dubai_column_differences: {set(airbnb_listings_dict['miamiData'].columns).difference(set(airbnb_listings_dict['dubaiData'].columns))}")

In [None]:
# Find the column differences in comparison to miami dynamic frame with thighest number of columns
print(f"miami_toronto_column_differences: {set(airbnb_listings_dict['miamiData'].columns).difference(set(airbnb_listings_dict['torontoData'].columns))}")

In [None]:
# Find the column differences in comparison to miami dynamic frame with thighest number of columns
print(f"miami_la_column_differences: {set(airbnb_listings_dict['miamiData'].columns).difference(set(airbnb_listings_dict['losAngelesData'].columns))}")

In [None]:
print(f"miami_london_column_differences: {set(airbnb_listings_dict['miamiData'].columns).difference(set(airbnb_listings_dict['londonData'].columns))}")

In [None]:
print(f"miami_nyc_column_differences: {set(airbnb_listings_dict['miamiData'].columns).difference(set(airbnb_listings_dict['newYorkCityData'].columns))}")

In [None]:
print(f"miami_sanfransisco_column_differences: {set(airbnb_listings_dict['miamiData'].columns).difference(set(airbnb_listings_dict['sanFranciscoData'].columns))}")

In [None]:
print(f"miami_sydney_column_differences: {set(airbnb_listings_dict['miamiData'].columns).difference(set(airbnb_listings_dict['sydneyData'].columns))}")

In [None]:
print(f"miami_tokyo_column_differences: {set(airbnb_listings_dict['miamiData'].columns).difference(set(airbnb_listings_dict['tokyoData'].columns))}")

In [None]:
# Find the number of rows and columns in raw_data and put the results in a dataframe
def get_shape(listings_dict):
    """
    This function returns the number of rows and columns in the dataframes

    Parameters:
    listings_dict: The dictionary of dataframes

    Returns: A list of tuples containing the dataframe name, number of rows and number of columns
    """
    
    df_summary = []
    
    for key, value in listings_dict.items():
        rows = value.count()
        cols = len(value.columns)
        df_summary.append((key, rows, cols))
        
    return df_summary
       

In [None]:
# Find the number of rows and columns in raw_data and put the results in a dataframe
summary_list = get_shape(airbnb_listings_dict)

In [None]:
# Create a dataframe from the summary list
raw_summary_df = spark.createDataFrame(summary_list, ["city", "num_rows", "num_columns"])

In [None]:
# Show the summary dataframe
raw_summary_df.show()

In [None]:
# Drop Neighbourhood, Last Host Count Updated Date,Metropolitan Statistical Area columns
airbnb_listings_dict['miamiData'] = airbnb_listings_dict['miamiData'].drop('Neighbourhood', 'Last Host Count Updated Date','Metropolitan Statistical Area')

In [None]:
# Drop last scraped data
def remove_last_scraped_data(df, df_name):
    df = df.drop('Last Scraped Date')
    print(f'{df_name} Last Scraped Date column dropped')

    return df

In [None]:
# Remove last scraped data from all the dataframes
for df_name, df in airbnb_listings_dict .items():
    airbnb_listings_dict[df_name] = remove_last_scraped_data(df, df_name)

In [None]:
# Drop duplicates
def drop_duplicates(df):
    df = df.dropDuplicates()
    print("duplicates dropped")
    return df

In [None]:
# Drop duplicates from all the dataframes
for df_name, df in airbnb_listings_dict.items():
    airbnb_listings_dict[df_name] = drop_duplicates(df)

In [None]:
# Define emojis in unicodes 
emoji_patterns = re.compile("(?u)["
        u"\U0001F600-\U0001F64F"  # emoticons
        u"\U0001F300-\U0001F5FF"  # symbols & pictographs
        u"\U0001F680-\U0001F6FF"  # transport & map symbols
        u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
        u"\U00002700-\U000027BF"  # Dingbats
        u"\U00002600-\U000026FF"  # Miscellaneous Symbols
        u"\U00002B00-\U00002BFF"  # Miscellaneous Symbols and Arrows
        u"\U0001F100-\U0001F1FF"  # Enclosed Alphanumeric Supplement
        u"\U0001F200-\U0001F2FF"  # Enclosed Ideographic Supplement
        u"\U0001F900-\U0001F9FF"  # Supplemental Symbols and Pictographs
        u" \U000025A0-\U000025FF" # Geometric Shapes
         "]+")

In [None]:
def remove_emojis(text):
    """
    This function removes emojis from text

    Parameters:
    text: The text to remove emojis from

    Returns: The text with emojis removed
    """
    if isinstance(text, str):
        return emoji_patterns.sub(' ', text)
    else:
        return text

In [None]:
# create a udf for remove emoji
remove_emoji_udf = F.udf(remove_emojis)

In [None]:
# Function to remove emojis from all the dataframes
def remove_emojis_from(df):
    """
    This function removes emojis from all the columns in a dataframe

    Parameters:
    df: The dataframe to remove emojis from

    Returns: The dataframe with emojis removed
    """
    for col_name in df.columns:
        df = df.withColumn(col_name, remove_emoji_udf(F.col(col_name)))
    return df

In [None]:
# Remove emojis from all the dataframes
airbnb_listings_dict['tokyoData'] = remove_emojis_from(airbnb_listings_dict['tokyoData'])

In [None]:
airbnb_listings_dict['dubaiData'] = remove_emojis_from(airbnb_listings_dict['dubaiData'])

In [None]:
airbnb_listings_dict['losAngelesData'] = remove_emojis_from(airbnb_listings_dict['losAngelesData'])

In [None]:
airbnb_listings_dict['londonData'] = remove_emojis_from(airbnb_listings_dict['londonData'])

In [None]:
airbnb_listings_dict['miamiData'] = remove_emojis_from(airbnb_listings_dict['miamiData'])

In [None]:
airbnb_listings_dict['newYorkCityData'] = remove_emojis_from(airbnb_listings_dict['newYorkCityData'])

In [None]:
airbnb_listings_dict['sanFranciscoData'] = remove_emojis_from(airbnb_listings_dict['sanFranciscoData'])

In [None]:
airbnb_listings_dict['sydneyData'] = remove_emojis_from(airbnb_listings_dict['sydneyData'])

In [None]:
airbnb_listings_dict['torontoData'] = remove_emojis_from(airbnb_listings_dict['torontoData'])

In [None]:
def standardised_columns(df, country, city= None, state=None, currency_native=None):
    """
    This function standardises the columns in the dataframe

    Parameters:
    df: The dataframe to standardise columns
    country: The country to add to the dataframe
    city: The city to add to the dataframe
    state: The state to add to the dataframe
    currency_native: The currency native to add to the dataframe

    Returns: The dataframe with standardised columns
    """ 
    df = df.withColumn('country', F.lit(country))
    if city is not None:
        df = df.withColumn('city', F.lit(city))
    if state is not None:
        df = df.withColumn('state', F.lit(state))
    if currency_native is not None:
        df = df.withColumn('currency_native', F.lit(currency_native))
    return df

In [None]:
# Standardise columns for all the dataframes
standardised_columns(df=airbnb_listings_dict['dubaiData'], country = 'AE', city = 'Dubai', state = 'Dubai', currency_native= 'AED' )
standardised_columns(df=airbnb_listings_dict['losAngelesData'], country = 'US', state = 'Los Angeles', currency_native= 'USD')
standardised_columns(df=airbnb_listings_dict['londonData'],  country = 'GB', state = 'UK',  currency_native= 'GBP')
standardised_columns(df=airbnb_listings_dict['miamiData'], country = 'US',state = 'Florida',  currency_native= 'GBP' )
standardised_columns(df=airbnb_listings_dict['newYorkCityData'], country = 'US',  currency_native= 'USD' )
standardised_columns(df=airbnb_listings_dict['sanFranciscoData'],  country = 'US', state = 'California', city = 'San Francisco', currency_native= 'USD' )
standardised_columns(df=airbnb_listings_dict['sydneyData'], country = 'AU', state = 'New South Wales', currency_native= 'AUD' )
standardised_columns(df=airbnb_listings_dict['tokyoData'], country = 'JP', state = 'Tokyo', currency_native= 'JPY' )
standardised_columns(df=airbnb_listings_dict['torontoData'],country = 'CA', state = 'Ontario', currency_native= 'CAD' )

In [None]:
# confirm the number of rows and columns after updating the columns
summary_list = get_shape(airbnb_listings_dict)
summary_df = spark.createDataFrame(summary_list, ["city", "num_rows", "num_columns"])

In [None]:
# Show the summary dataframe
summary_df.show()

In [None]:
# create new ID columns for dimension tables
def create_id_columns(df_name, df):
  """
  This function creates new ID columns for the dataframe

  Parameters:
  df_name: The name of the dataframe
  df: The dataframe to create new ID columns

  Returns: The dataframe with new ID columns
  """

  prefix = df_name[:3].upper()

  df = df.withColumn("monotonically_increasing_id", F.monotonically_increasing_id()+ 1)
  window_spec = Window.orderBy("monotonically_increasing_id")
  df = df.withColumn("listingId", F.concat_ws('', F.lit(prefix), F.lit('LIS' ), F.row_number().over(window_spec).cast(StringType())))
  df = df.withColumn("amenitiesId", F.concat_ws('', F.lit(prefix), F.lit('AMT'), F.row_number().over(window_spec).cast(StringType())))
  df = df.withColumn("ratingId", F.concat_ws('', F.lit(prefix), F.lit('RTE'), F.row_number().over(window_spec).cast(StringType())))
  df = df.withColumn("bookingId", F.concat_ws('', F.lit(prefix), F.lit('BKD'), F.row_number().over(window_spec).cast(StringType())))

  df = df.drop('monotonically_increasing_id"')

  return df

In [None]:
#create id columns in df dictionary
for df_name, df in airbnb_listings_dict.items():
    airbnb_listings_dict[df_name] = create_id_columns(df_name, df)

In [None]:
# show the first 10 rows of the dubaiData
airbnb_listings_dict['torontoData'].select('listingId').show()

In [None]:
def fill_missing_values(df):
    """
    This function fills missing values in the dataframe

    Parameters:
    df: The dataframe to fill missing values

    Returns: The dataframe with missing values filled
    """
    for col_name, dtype in df.dtypes:
        if dtype in ["string"]:
            df = df.fillna("NA", subset=[col_name])
        elif dtype in ["float"]:
            df = df.fillna(0.0, subset=[col_name])
        elif dtype in ["int"]:
            df = df.fillna(0, subset=[col_name])
        elif dtype in ["date"]:
            df = df.fillna("NA", subset=[col_name])
        else:
            continue
    return df

In [None]:
#create id columns in df dictionary
for df_name, df in airbnb_listings_dict.items():
    airbnb_listings_dict[df_name] = fill_missing_values(df)

In [None]:
#create lisiting dimension table
listing_dim = (airbnb_listings_dict["dubaiData"]
            .select('listingID','listingTitle', 'propertyType', 'listingType', 'listingURL', 'createdDate', 'currencyNative')
            .union(airbnb_listings_dict["losAngelesData"].select('listingID','listingTitle', 'propertyType', 'listingType', 'listingURL', 'createdDate', 'currencyNative'))
            .union(airbnb_listings_dict["londonData"].select('listingID','listingTitle', 'propertyType', 'listingType', 'listingURL', 'createdDate', 'currencyNative'))
            .union(airbnb_listings_dict["miamiData"].select('listingID','listingTitle', 'propertyType', 'listingType', 'listingURL', 'createdDate', 'currencyNative'))
            .union(airbnb_listings_dict["newYorkCityData"].select('listingID','listingTitle', 'propertyType', 'listingType', 'listingURL', 'createdDate', 'currencyNative'))
            .union(airbnb_listings_dict["sanFranciscoData"].select('listingID','listingTitle', 'propertyType', 'listingType', 'listingURL', 'createdDate', 'currencyNative'))
            .union(airbnb_listings_dict["sydneyData"].select('listingID','listingTitle', 'propertyType', 'listingType', 'listingURL', 'createdDate', 'currencyNative'))
            .union(airbnb_listings_dict["tokyoData"].select('listingID','listingTitle', 'propertyType', 'listingType', 'listingURL', 'createdDate', 'currencyNative'))
            .union(airbnb_listings_dict["torontoData"].select('listingID','listingTitle', 'propertyType', 'listingType', 'listingURL', 'createdDate', 'currencyNative')))


In [None]:
listing_dim.show()

In [None]:
#create host dimension table
host_dim = (airbnb_listings_dict["dubaiData"].select('airbnbHostID', 'hostListingCount')
            .union(airbnb_listings_dict["losAngelesData"].select('airbnbHostID', 'hostListingCount'))
            .union(airbnb_listings_dict["londonData"].select('airbnbHostID', 'hostListingCount'))
            .union(airbnb_listings_dict["miamiData"].select('airbnbHostID', 'hostListingCount'))
            .union(airbnb_listings_dict["newYorkCityData"].select('airbnbHostID', 'hostListingCount'))
            .union(airbnb_listings_dict["sanFranciscoData"].select('airbnbHostID', 'hostListingCount'))
            .union(airbnb_listings_dict["sydneyData"].select('airbnbHostID', 'hostListingCount'))
            .union(airbnb_listings_dict["tokyoData"].select('airbnbHostID', 'hostListingCount'))
            .union(airbnb_listings_dict["torontoData"].select('airbnbHostID', 'hostListingCount')))

In [None]:
host_dim.show()

In [None]:
#fill create state column filled with a constant value
airbnb_listings_dict['dubaiData'] = airbnb_listings_dict['dubaiData'].withColumn('state', F.lit('dubai'))
airbnb_listings_dict['torontoData'] = airbnb_listings_dict['torontoData'].withColumn('state', F.lit('toronto'))

In [None]:
#create location dimension table
location_dim = (airbnb_listings_dict["dubaiData"]
                .select('airbnbPropertyID', 'country', 'state', 'city', 'zipcode', 'latitude', 'longitude', 'exactLocation')
                .union(airbnb_listings_dict["losAngelesData"].select('airbnbPropertyID', 'country', 'state', 'city', 'zipcode', 'latitude', 'longitude', 'exactLocation'))
                .union(airbnb_listings_dict["londonData"].select('airbnbPropertyID', 'country', 'state', 'city', 'zipcode', 'latitude', 'longitude', 'exactLocation'))
                .union(airbnb_listings_dict["miamiData"].select('airbnbPropertyID', 'country', 'state', 'city', 'zipcode', 'latitude', 'longitude', 'exactLocation'))
                .union(airbnb_listings_dict["newYorkCityData"].select('airbnbPropertyID', 'country', 'state', 'city', 'zipcode', 'latitude', 'longitude', 'exactLocation'))
                .union(airbnb_listings_dict["sanFranciscoData"].select('airbnbPropertyID', 'country', 'state', 'city', 'zipcode', 'latitude', 'longitude', 'exactLocation'))
                .union(airbnb_listings_dict["sydneyData"].select('airbnbPropertyID', 'country', 'state', 'city', 'zipcode', 'latitude', 'longitude', 'exactLocation'))
                .union(airbnb_listings_dict["tokyoData"].select('airbnbPropertyID', 'country', 'state', 'city', 'zipcode', 'latitude', 'longitude', 'exactLocation'))
                .union(airbnb_listings_dict["torontoData"].select('airbnbPropertyID', 'country', 'state', 'city', 'zipcode', 'latitude', 'longitude', 'exactLocation')))

In [None]:
location_dim.show()

In [None]:
#create amenities dimension table
amenities_dim = (airbnb_listings_dict["dubaiData"].select('amenitiesID', 'amenities')
            .union(airbnb_listings_dict["losAngelesData"].select('amenitiesID', 'amenities'))
            .union(airbnb_listings_dict["londonData"].select('amenitiesID', 'amenities'))
            .union(airbnb_listings_dict["miamiData"].select('amenitiesID', 'amenities'))
            .union(airbnb_listings_dict["newYorkCityData"].select('amenitiesID', 'amenities'))
            .union(airbnb_listings_dict["sanFranciscoData"].select('amenitiesID', 'amenities'))
            .union(airbnb_listings_dict["sydneyData"].select('amenitiesID', 'amenities'))
            .union(airbnb_listings_dict["tokyoData"].select('amenitiesID', 'amenities'))
               .union(airbnb_listings_dict["torontoData"].select('amenitiesID', 'amenities')))

In [None]:
amenities_dim.show()

In [None]:
#create ratings dimension table
rating_dim = (airbnb_listings_dict["dubaiData"].select('ratingID', 'overallRating', 'airbnbCommunicationRating',
                   'airbnbAccuracyRating', 'airbnbCleanlinessRating', 'airbnbCheckinRating', 'airbnbLocationRating', 'airbnbValueRating')
             .union(airbnb_listings_dict["losAngelesData"].select('ratingID', 'overallRating', 'airbnbCommunicationRating',
                   'airbnbAccuracyRating', 'airbnbCleanlinessRating', 'airbnbCheckinRating', 'airbnbLocationRating', 'airbnbValueRating'))
             .union(airbnb_listings_dict["londonData"].select('ratingID', 'overallRating', 'airbnbCommunicationRating',
                   'airbnbAccuracyRating', 'airbnbCleanlinessRating', 'airbnbCheckinRating', 'airbnbLocationRating', 'airbnbValueRating'))
             .union(airbnb_listings_dict["miamiData"].select('ratingID', 'overallRating', 'airbnbCommunicationRating',
                   'airbnbAccuracyRating', 'airbnbCleanlinessRating', 'airbnbCheckinRating', 'airbnbLocationRating', 'airbnbValueRating'))
             .union(airbnb_listings_dict["newYorkCityData"].select('ratingID', 'overallRating', 'airbnbCommunicationRating',
                   'airbnbAccuracyRating', 'airbnbCleanlinessRating', 'airbnbCheckinRating', 'airbnbLocationRating', 'airbnbValueRating'))
             .union(airbnb_listings_dict["sanFranciscoData"].select('ratingID', 'overallRating', 'airbnbCommunicationRating',
                   'airbnbAccuracyRating', 'airbnbCleanlinessRating', 'airbnbCheckinRating', 'airbnbLocationRating', 'airbnbValueRating'))
             .union(airbnb_listings_dict["sydneyData"].select('ratingID', 'overallRating', 'airbnbCommunicationRating',
                   'airbnbAccuracyRating', 'airbnbCleanlinessRating', 'airbnbCheckinRating', 'airbnbLocationRating', 'airbnbValueRating'))
             .union(airbnb_listings_dict["tokyoData"].select('ratingID', 'overallRating', 'airbnbCommunicationRating',
                   'airbnbAccuracyRating', 'airbnbCleanlinessRating', 'airbnbCheckinRating', 'airbnbLocationRating', 'airbnbValueRating'))
             .union(airbnb_listings_dict["torontoData"].select('ratingID', 'overallRating', 'airbnbCommunicationRating',
                   'airbnbAccuracyRating', 'airbnbCleanlinessRating', 'airbnbCheckinRating', 'airbnbLocationRating', 'airbnbValueRating')))


In [None]:
rating_dim.show()

In [None]:
#create booking dimension table
booking_dim = (airbnb_listings_dict["dubaiData"]
               .select('bookingID', 'checkInTime', 'checkoutTime', 'minimumStay')
               .union(airbnb_listings_dict["losAngelesData"].select('bookingID', 'checkInTime', 'checkoutTime', 'minimumStay'))
               .union(airbnb_listings_dict["londonData"].select('bookingID', 'checkInTime', 'checkoutTime', 'minimumStay'))
               .union(airbnb_listings_dict["miamiData"].select('bookingID', 'checkInTime', 'checkoutTime', 'minimumStay'))
               .union(airbnb_listings_dict["newYorkCityData"].select('bookingID', 'checkInTime', 'checkoutTime', 'minimumStay'))
               .union(airbnb_listings_dict["sanFranciscoData"].select('bookingID', 'checkInTime', 'checkoutTime', 'minimumStay'))
               .union(airbnb_listings_dict["sydneyData"].select('bookingID', 'checkInTime', 'checkoutTime', 'minimumStay'))
               .union(airbnb_listings_dict["tokyoData"].select('bookingID', 'checkInTime', 'checkoutTime', 'minimumStay'))
               .union(airbnb_listings_dict["torontoData"].select('bookingID', 'checkInTime', 'checkoutTime', 'minimumStay')))

In [None]:
booking_dim.show()

In [None]:
#create booking fact table
booking_columns = [df.select('bookingID', 'listingID', 'airbnbHostID', 'airbnbPropertyID', 'amenitiesID',
                        'ratingID', 'numberOfReviews', 'bedrooms', 'bathrooms', 'maxGuests', 'airbnbSuperhost',
                        'cancellationPolicy', 'cleaningFeeUSD', 'extraPeopleFeeUSD',
                        'extraPeopleFeeNative', 'instantBookable', 'petsAllowed', 'occupancyRateLTM',
                        'numberOfBookingsLTM', 'numberOfBookingsLTMObservedMonth', 'averageDailyRateUSD') 
                  for df in airbnb_listings_dict.values()]

In [None]:
booking_fact = booking_columns[0]
for df in booking_columns[1:]:
    booking_fact = booking_fact.union(df)

In [None]:
booking_fact.show()

In [None]:
#create a list of tuples to reiterate over
airbnb_listings_fact_dim_tables = {
    (listing_dim, "listing_dim"), 
    (location_dim, "location_dim"),
    (host_dim, "host_dim"),
    (amenities_dim, "amenities_dim"),
    (rating_dim, "rating_dim"),
    (booking_dim, "booking_dim"),
   ( booking_fact, "booking_fact")   
}


In [None]:
#convert spark dataframe to dynamicframe
listing_dim_dyf = DynamicFrame.fromDf(listing_dim, glueContext, 'dynamic_frame')
location_dim_dyf = DynamicFrame.fromDf(location_dim, glueContext, 'dynamic_frame')
host_dim_dyf = DynamicFrame.fromDf(host_dim, glueContext, 'dynamic_frame')
amenities_dim_dyf = DynamicFrame.fromDf(amenities_dim, glueContext, 'dynamic_frame')
rating_dim_dyf = DynamicFrame.fromDf(rating_dim, glueContext, 'dynamic_frame')
booking_dim_dyf = DynamicFrame.fromDf(booking_dim, glueContext, 'dynamic_frame')
booking_fact_dyf = DynamicFrame.fromDf(booking_fact, glueContext, 'dynamic_frame')

In [None]:
#create a list of tuples to reiterate over
dynamic_frames = [
    (listing_dim_dyf, "listing_dim"), 
    (location_dim_dyf, "location_dim"),
    (host_dim_dyf, "host_dim"),
    (amenities_dim_dyf, "amenities_dim"),
    (rating_dim_dyf, "rating_dim"),
    (booking_dim_dyf, "booking_dim"),
   ( booking_fact_dyf, "booking_fact")   
]

In [None]:
#write dynamicframes to s3
for dyf, dyf_name in dynamic_frames:
    sink = glueContext.getSink(
                path=f"s3://airbnb_listings-bucket/transformed_data/{dyf_name}.csv",
                connection_type="s3",
                updateBehavior="UPDATE_IN_DATABASE",
                partitionKeys=[],
                compression="gzip",
                enableUpdateCatalog=True,
                transformation_ctx=f"{dyf_name}_csv_sink")
    sink.setCatalogInfo(
                        catalogDatabase="airbnb_catalog_db",
                        catalogTableName=f"{dyf_name}_csv")
    sink.setFormat("csv")
    sink.writeFrame(dyf)

In [None]:
sc.stop()