# ETL to transform Airbnb rawdata to database schema

This ETL is coded in pyspark. Though the current data size could be handled in Pandas or similar libraries, pyspark was chosen for scale reasons.

The present ETL has two purposes: the main is to read and transform the obtained Airbnb files into a structured, table-like set of files useful for a normalised relational database, with incorporated schema. The second goal is to push the pyspark dataframes to the dedicated PostgreSQL database, as well as saving backups locally and in a dedicated AWS S3 bucket for this purpose.

In a previous notebook, the files were explored to gain a better understanding of the data relations. Hence, from 6 CSV files, 4 are used in this ETL, obtaining a total of 8 tables. The following graph aims to plot the flow of data through the ETL.

![ETL data flow](images/ETL_Airbnb.png)

## Index
- [Initialisation](#initialisation)
- [Functions](#functions)
- [Neighbourhoods](#neighbourhoods)
- [Listings](#listings)
    - [Listings main](#listings_main)
    - [Hosts main](#hosts)
    - [Host verification](#verification)
    - [Listing amenities](#amenities)
    - [Listing complements](#complements)
- [Reviews](#reviews)
- [Calendar](#calendar)
- [Push to S3](#s3)
- [Push to database](#database)

## Initialisation <a id=initialisation></a>

In [56]:
### Import modules

import os
import json
import boto3
import warnings
warnings.filterwarnings('ignore')

import findspark
findspark.init()

from pyspark.sql import SparkSession, Window, SQLContext
from pyspark.sql import functions as psF
from pyspark.sql import types as psDT

spark = SparkSession.builder.appName('SparkWithPostgres')\
        .config("spark.driver.extraClassPath", "./postgresql-42.3.2.jar")\
        .getOrCreate()
print('Spark session:')
print(spark)

import pyspark
# from pyspark.sql import SQLContext

Spark session:
<pyspark.sql.session.SparkSession object at 0x7f7288a5dbb0>


In [57]:
### Initialise Spark context

sc = pyspark.SparkContext.getOrCreate()

print('Initialisation of Spark context')
print(sc)
print('')

sqlContext = SQLContext(sc)
print('Initialisation of SparkSQL')
print(sqlContext)

spark.sparkContext.setLogLevel("ERROR")
sc.setLogLevel("ERROR")

Initialisation of Spark context
<SparkContext master=local[*] appName=SparkWithPostgres>

Initialisation of SparkSQL
<pyspark.sql.context.SQLContext object at 0x7f728837a670>


In [58]:
### Constants

CSV_PATH = './OriginalData_csv/'
PARQUET_PATH = './SchemaReadyData_parquet/'

In [59]:
### Database credentials

DB_HOST = os.environ.get('DB_HOST')
DB_PORT = os.environ.get('DB_PORT')
DB_NAME = os.environ.get('DB_NAME')
DB_USERNAME = os.environ.get('DB_USERNAME')
DB_PASSWORD = os.environ.get('DB_PASSWORD')

POSTGRES_URL = 'jdbc:postgresql://' + DB_HOST + ':' + DB_PORT + '/' + DB_NAME
SCHEMA = 'listings.'
PROPERTIES = {'user': DB_USERNAME,
             'password': DB_PASSWORD,
             'driver': 'org.postgresql.Driver'}

## Functions <a id=functions></a>

Some processes are repeated throughout this ETL. In that spirit, functions are created to carry out recurrent tasks.

In [60]:
### Definition of functions

# Functions to unfold arrays saved as strings into an array type of strings
# Credit: https://silpara.medium.com/pyspark-string-to-array-of-string-in-dataframe-b9572233ccea
def parse_array_from_string(x):
    '''
    Recieves a string and returns it into a json readable format in order to be
    transformed to an array.
    
        params:
            x (string): a single string
            
        returns:
            res (list): a list (array) containing the strings
    '''
    res = json.loads(x)
    return res;

retrieve_array_func = psF.udf(parse_array_from_string, psDT.ArrayType(psDT.StringType()))

# Read CSV file into a spark dataframe
def read_csv(filename, path = CSV_PATH):
    '''
    This function reads a csv file from a given path. It takes into account multiline text fileds
    inside the csv (containing quotation marks and characters like \n, \r)
    
        params:
            filename (string): filename with extension of the csv file
            path (string) OPTIONAL: path where the csv is located
            
        returns:
            df (pyspark dataframe): data loaded from the file
    '''
    df = spark.read.options(delimiter = ',',
                                 header = True,
                                 #lineSep = '\n',
                                 escape = '"',
                                 multiline = True).csv(CSV_PATH + filename)
    
    # Show schema
    print('Schema of dataframe in ' + filename)
    df.printSchema()
    
    # Return dataframe
    return df;

# Drop column(s) of dataframe
def drop_columns(df, columns):
    '''
    Drops a column (or columns) from a given dataframe
    
        params:
            df (pyspark dataframe): data with column(s) to be dropped
            columns (string OR list): column(s) to be removed
        
        returns:
            return_df (pyspark dataframe): dataframe with column(s) removed
    '''
    if(type(columns) == list):
        df_return = df.drop(*columns)
    else:
        df_return = df.drop(columns)
        
    return df_return;

# Trim whitespacing before and after string
def trim_whitespace(df, column):
    '''
    Trims leading and trailing whitespaces in string fields
    
        params:
            df (pyspark dataframe): dataframe containing strings
            column (string): string representing column to be trimmed
        
        returns:
            return_df (pyspark dataframe): dataframe with trimmed strings
    '''
    df_return = df.withColumn(column, psF.trim(column))
    return df_return;

# Add index to a given dataframe
def add_index(df, index_name = 'id'):
    '''
    Adds a new index column with values starting at 1
    
        params:
            df (pyspark dataframe): dataframe to add index to
            index_name (string) OPTIONAL: name of new column
        returns:
            return_df (pyspark dataframe): dataframe with added index column
    '''
    df_return = df.withColumn(index_name,
                              psF.row_number().over(Window.orderBy(psF.monotonically_increasing_id())))
    df_return = df_return.withColumn(index_name,
                                     df_return[index_name].cast(psDT.LongType()))
    return df_return;

# Rename column(s)
def rename(df, col_in, col_out):
    '''
    Renames a column from the dataframe
    
        params:
            df (pyspark dataframe): dataframe containing column to be renamed
            col_in (string): name of original column
            col_out (string): new name of column
        
        returns:
            return_df (pyspark dataframe): dataframe with added index column
    '''
    df_return = df.withColumnRenamed(col_in, col_out)
    return df_return;

# Remove substring
def remove_substr(df, column, substr):
    '''
    Function to eliminate a substring from a column. Useful to format strings
    
        params:
            df (pyspark dataframe): dataframe containing column to formatted
            column (string): name of column where the substring is to be removed
            substr (string): string to be eliminated from column content
        
        returns:
            return_df (pyspark dataframe): dataframe with formatted column
    '''
    df_return = df.withColumn(column, psF.regexp_replace(column, substr, ''))
    return df_return;

# Parse nulls
def parse_nulls(df, column):
    '''
    Function to parse stings equal to null (None OR N/A) and convert them to NULL
    
        params:
            df (pyspark dataframe): dataframe to be parsed
            column (string): column containing string nulls
        
        returns:
            return_df (pyspark dataframe): dataframe with formatted column containing
                NULLs instead of None or N/A
    '''
    df_return = df.withColumn(column,
                              psF.when((psF.col(column) == 'None') | (psF.col(column) == 'N/A'), None) \
                              .otherwise(psF.col(column)))
    return df_return;

# Cast column into datatpye
def cast_col(df, column, dtype):
    '''
    Function to change the data type of a column
    
        params:
            df (pyspark dataframe): dataframe containing column of interest
            column (string): column subjected to new type
            dtype (pyspark data type): data type from pySpark to be implemented
        
        returns:
            return_df (pyspark dataframe): dataframe with changed data type in given column
        
        raises:
            error: if attempted cast is not successful
    '''
    df_return = df.withColumn(column, df[column].cast(dtype))
    return df_return;

# Save dataframe to parquet
def write_parquet(df, filename, path = PARQUET_PATH, OVERWRITE = True):
    '''
    Saves dataframe to parquet format in given path
    
        params:
            df (pyspark dataframe): dataframe to be saved into parquet format
            filename (string): name of target file, including extension (.parquet)
            path (string) OPTIONAL: path to write the file
            OVERWRITE (bool) OPTIONAL: if True, overwrites existing files. If not, function is not executed should
                another file with the same name exists.
    '''
    if(OVERWRITE):
        df.write.mode('overwrite').parquet(path + filename)
    else:
        df.write.parquet(path + filename)
        
# Save dataframe to parquet
def insert_to_db(df,
                 table_name,
                 schema = SCHEMA,
                 url = POSTGRES_URL,
                 properties = PROPERTIES,
                 overwrite = False,
                 sample = False):
    '''
    Inserts contents of dataframe to PostgreSQL table
    
        params:
            df (pyspark dataframe): dataframe to be inserted into database
            table_name (string): name of target table. Must be created in db schema
            schema (string) OPTIONAL: name of schema, default initiated with notebook
            url (string) OPTIONAL: database host, port and database name in appropiate format
            properties (dict) OPTIONAL: contains additional parameters to connect to db (username, password, driver)
            overwrite (bool) OPTIONAL: if True, overwrites existing files. Default False
            sample (bool) OPTIONAL: If True, inserts only 20 rows to database. Default False
    '''
    # Calculate sample if indicated
    if(sample):
        df = df.limit(20)
        
    # Add writing mode to properties if indicated
    if(overwrite):
        mode = 'overwrite'
    else:
        mode = 'append'
        
    # Execute insertion into the database
    df.write.jdbc(url,
                  table = schema + table_name,
                  mode = mode,
                  properties = properties)

## Neighbourhoods <a id=neighbourhoods></a>

In [61]:
### Load neighbourhoods to Spark dataframe

df_neighbourhoods = read_csv('neighbourhoods.csv')

Schema of dataframe in neighbourhoods.csv
root
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)



In [62]:
### Drop empty column and add index column

df_neighbourhoods_process = drop_columns(df_neighbourhoods, 'neighbourhood_group')

# Trim neighbourhood column
df_neighbourhoods_process = trim_whitespace(df_neighbourhoods_process, 'neighbourhood')

# Add index column
df_neighbourhoods_process = add_index(df_neighbourhoods_process)

# Print schema
print('Schema of table neighbourhood')
df_neighbourhoods_process.printSchema()

Schema of table neighbourhood
root
 |-- neighbourhood: string (nullable = true)
 |-- id: long (nullable = false)



In [8]:
### Export neighbourhoods to parquet

write_parquet(df_neighbourhoods_process, 'neighbourhoods.parquet')

## Listings <a id=listings></a>

In [63]:
### Load listings to Spark dataframe

df_listings = read_csv('listings.csv')

Schema of dataframe in listings.csv
root
 |-- id: string (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: string (nullable = true)
 |-- last_scraped: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: string (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_about: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_thumbnail_url: string (nullable = true)
 |-- host_picture_url: string (nullable = true)
 |-- host_neighbourhood: string (nullable = true)
 |-- host_listings_count

In [64]:
### Drop empty columns (for more insight, consult EDA notebook)

COLUMNS_TO_DROP = ['neighbourhood_group_cleansed',
                  'bathrooms',
                  'calendar_updated',
                  'license']

df_listings_drop = drop_columns(df_listings, COLUMNS_TO_DROP)

In [65]:
### Create 4 tables from listings df

print('Columns on original file:', len(df_listings_drop.columns))

COLS_LISTINGS = ['id',
                 'host_id',
                 'listing_url',
                 'scrape_id',
                 'last_scraped',
                 'name',
                 'description',
                 'neighborhood_overview',
                 'picture_url',
                 'neighbourhood',
                 'neighbourhood_cleansed',
                 'latitude',
                 'longitude',
                 'property_type',
                 'room_type',
                 'accommodates',
                 'bathrooms_text',
                 'bedrooms',
                 'beds',
                 'price',
                 'minimum_nights',
                 'maximum_nights']

COLS_LISTINGS_AMENITIES = ['id',
                           'amenities']

COLS_HOSTS = ['host_id',
              'host_url',
              'host_name',
              'host_since',
              'host_location',
              'host_about',
              'host_response_time',
              'host_response_rate',
              'host_acceptance_rate',
              'host_is_superhost',
              'host_thumbnail_url',
              'host_picture_url',
              'host_neighbourhood',
              'host_listings_count',
              'host_total_listings_count',
              'host_verifications',
              'host_has_profile_pic',
              'host_identity_verified']

COLS_COMPLENET = ['id',
                  'minimum_minimum_nights',
                  'maximum_minimum_nights',
                  'minimum_maximum_nights',
                  'maximum_maximum_nights',
                  'minimum_nights_avg_ntm',
                  'maximum_nights_avg_ntm',
                  'has_availability',
                  'availability_30',
                  'availability_60',
                  'availability_90',
                  'availability_365',
                  'calendar_last_scraped',
                  'number_of_reviews',
                  'number_of_reviews_ltm',
                  'number_of_reviews_l30d',
                  'first_review',
                  'last_review',
                  'review_scores_rating',
                  'review_scores_accuracy',
                  'review_scores_cleanliness',
                  'review_scores_checkin',
                  'review_scores_communication',
                  'review_scores_location',
                  'review_scores_value',
                  'instant_bookable',
                  'calculated_host_listings_count',
                  'calculated_host_listings_count_entire_homes',
                  'calculated_host_listings_count_private_rooms',
                  'calculated_host_listings_count_shared_rooms',
                  'reviews_per_month']

print('Columns after the split:',
      len(COLS_LISTINGS) + len(COLS_HOSTS) + len(COLS_COMPLENET) + len(COLS_LISTINGS_AMENITIES) - 3) #Correction for 3 additioanl id columns

# Create dataframes
df_listings_main = df_listings_drop.select(COLS_LISTINGS)
df_listings_amenities = df_listings_drop.select(COLS_LISTINGS_AMENITIES)
df_hosts = df_listings_drop.select(COLS_HOSTS)
df_listings_complements = df_listings_drop.select(COLS_COMPLENET)

Columns on original file: 70
Columns after the split: 70


### Process listings main <a id=listings_main></a>

In [66]:
### Merge with neighbourhoods to obtain neighbourhood_id

df_neighbourhoods_merge = rename(df_neighbourhoods_process, 'id', 'neighbourhood_id')
df_neighbourhoods_merge = rename(df_neighbourhoods_merge, 'neighbourhood', 'neighbourhood_cleansed')

df_listings_merged = df_listings_main.join(df_neighbourhoods_merge,
                                         how = 'left',
                                         on = ['neighbourhood_cleansed'])

df_listings_merged = drop_columns(df_listings_merged, 'neighbourhood_cleansed')

In [67]:
### Give structure to listings table

print('Schema of main listings')
df_listings_merged.printSchema()

# Define dict with datatypes
LISTINGS_MAIN_DTYPES = {'id': psDT.LongType(),
                        'host_id': psDT.LongType(),
                        'scrape_id': psDT.LongType(),
                        'last_scraped': psDT.DateType(),
                        'latitude': psDT.DoubleType(),
                        'longitude': psDT.DoubleType(),
                        'accommodates': psDT.IntegerType(),
                        'bedrooms': psDT.IntegerType(),
                        'beds': psDT.IntegerType(),
                        'price': psDT.DoubleType(),
                        'minimum_nights': psDT.IntegerType(),
                        'maximum_nights': psDT.IntegerType(),
                        'neighbourhood_id': psDT.IntegerType()}

# Define dict with new names
LISTINGS_MAIN_NAMES = {'last_scraped': 'date_last_scraped',
                      'neighbourhood': 'neighbourhood_typed'}

# Copy dataframe
df_listings_main_process = df_listings_merged.select('*')

# Drop $ sign from price
df_listings_main_process = remove_substr(df_listings_main_process, 'price', '\$')

# Use stripping for strings
for COLUMN in df_listings_main_process.columns:
    
    # Strip whitespaces
    df_listings_main_process = trim_whitespace(df_listings_main_process, COLUMN)
    
    # Replace None and N/A with null
    df_listings_main_process = parse_nulls(df_listings_main_process, COLUMN)
    
    # Change data type
    if(COLUMN in LISTINGS_MAIN_DTYPES.keys()):
        df_listings_main_process = cast_col(df_listings_main_process, COLUMN, LISTINGS_MAIN_DTYPES[COLUMN])
    
    # Rename columns
    if(COLUMN in LISTINGS_MAIN_NAMES.keys()):
        df_listings_main_process = rename(df_listings_main_process, COLUMN, LISTINGS_MAIN_NAMES[COLUMN])
        
# Print schema
print('Schema of table listings')
df_listings_main_process.printSchema()

Schema of main listings
root
 |-- id: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: string (nullable = true)
 |-- last_scraped: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: string (nullable = true)
 |-- bathrooms_text: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- beds: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- maximum_nights: string (nullable = true)
 |-- neighbourhood_id: long (nullable = true)

Schema of table listings
root
 |-- id: lo

In [51]:
### Export listings to parquet

write_parquet(df_listings_main_process, 'listings.parquet')

### Process hosts - main data <a id=hosts></a>

In [15]:
### Drop duplicates

df_hosts_unique = df_hosts.dropDuplicates()

print('Rows on previous dataframe:', df_hosts.count())
print('Rows on unique dataframe:', df_hosts_unique.count())

Rows on previous dataframe: 66641
Rows on unique dataframe: 44695


In [16]:
### Separate host verifications

df_host_verifications = df_hosts_unique.select(['host_id', 'host_verifications'])
df_hosts_sliced = drop_columns(df_hosts_unique, 'host_verifications')

# Replace None with null and drop those records (it means host has no verifications at all)
df_host_verifications = parse_nulls(df_host_verifications, 'host_verifications')
df_host_verifications = df_host_verifications.na.drop(subset = ['host_verifications'])

In [17]:
### Give structure to hosts sliced table

print('Schema of hosts sliced')
df_hosts_sliced.printSchema()

# Define dict with datatypes
HOST_DTYPES = {'host_id': psDT.LongType(),
               'host_since': psDT.DateType(),
               'host_response_rate': psDT.DoubleType(),
               'host_acceptance_rate': psDT.DoubleType(),
               'host_is_superhost': psDT.BooleanType(),
               'host_listings_count': psDT.IntegerType(),
               'host_total_listings_count': psDT.IntegerType(),
               'host_has_profile_pic': psDT.BooleanType(),
               'host_identity_verified': psDT.BooleanType()}

# Define features to be transformed to decimal from percentage
PERCENTAGE_FEAT = ['host_response_rate', 'host_acceptance_rate']

# Copy dataframe
df_hosts_process = df_hosts_sliced.select('*')

# Drop % sign from percentage feats
for COLUMN in PERCENTAGE_FEAT:
    df_hosts_process = remove_substr(df_hosts_process, COLUMN, '\%')

# Use stripping for strings
for COLUMN in df_hosts_process.columns:
    
    # Strip whitespaces
    df_hosts_process = trim_whitespace(df_hosts_process, COLUMN)
    
    # Replace None and N/A with null
    df_hosts_process = parse_nulls(df_hosts_process, COLUMN)
    
    # Change data type
    if(COLUMN in HOST_DTYPES.keys()):
        df_hosts_process = cast_col(df_hosts_process, COLUMN, HOST_DTYPES[COLUMN])
        
    # Make percentage columns decimal
    if(COLUMN in PERCENTAGE_FEAT):
        df_hosts_process = df_hosts_process.withColumn(COLUMN,
                                                   psF.col(COLUMN) / 100)
    
    # Rename columns by removing host_
    df_hosts_process = rename(df_hosts_process, COLUMN, COLUMN[5:])
        
# Print schema
print('Schema of table hosts')
df_hosts_process.printSchema()

Schema of hosts sliced
root
 |-- host_id: string (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: string (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_about: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_thumbnail_url: string (nullable = true)
 |-- host_picture_url: string (nullable = true)
 |-- host_neighbourhood: string (nullable = true)
 |-- host_listings_count: string (nullable = true)
 |-- host_total_listings_count: string (nullable = true)
 |-- host_has_profile_pic: string (nullable = true)
 |-- host_identity_verified: string (nullable = true)

Schema of table hosts
root
 |-- id: long (nullable = true)
 |-- url: string (nullable = true)
 |-- name: string (nullable = true)
 |-- since: date (nullable = 

In [18]:
### Export hosts to parquet

write_parquet(df_hosts_process, 'hosts.parquet')

### Process hosts - verifications <a id=verification></a>

In [19]:
### Explode verifications into different rows for final table

print('Schema of hosts verifications')
df_host_verifications.printSchema()

# Copy dataframe
df_host_verifications_process = df_host_verifications.select('*')

# Convert quotation to double quotation --> in order to properly load to json
df_host_verifications_process = df_host_verifications_process.withColumn('host_verifications',
                                                   psF.regexp_replace('host_verifications', '\'', '\"'))

# Change data types
df_host_verifications_process = cast_col(df_host_verifications_process, 'host_id', psDT.LongType())
df_host_verifications_process = df_host_verifications_process.withColumn('host_verifications',
                                                       retrieve_array_func(psF.col('host_verifications')))

# Explode list into rows
df_host_verifications_process = df_host_verifications_process.select('host_id',
                                                                    psF.explode(df_host_verifications_process.host_verifications) \
                                                                    .alias('verification'))

# Add index column
df_host_verifications_process = add_index(df_host_verifications_process)

# Trim values of string
df_host_verifications_process = trim_whitespace(df_host_verifications_process, 'verification')

# Print schema
print('Schema of table host verification')
df_host_verifications_process.printSchema()

Schema of hosts verifications
root
 |-- host_id: string (nullable = true)
 |-- host_verifications: string (nullable = true)

Schema of table host verification
root
 |-- host_id: long (nullable = true)
 |-- verification: string (nullable = true)
 |-- id: long (nullable = false)



In [20]:
### Export host verification to parquet

write_parquet(df_host_verifications_process, 'host_verification.parquet')

### Process listings amenities <a id=amenities></a>

In [21]:
### Explode verifications into different rows for final table

print('Schema of amenities')
df_listings_amenities.printSchema()

# Copy dataframe
df_amenities_process = df_listings_amenities.select('*')

# Rename id columns to listing_id
df_amenities_process = rename(df_amenities_process, 'id', 'listing_id')

# Change data types
df_amenities_process = cast_col(df_amenities_process, 'listing_id', psDT.LongType())
df_amenities_process = df_amenities_process.withColumn('amenities',
                                                       retrieve_array_func(psF.col('amenities')))

# Explode list into rows
df_amenities_process = df_amenities_process.select('listing_id',
                                                   psF.explode(df_amenities_process.amenities) \
                                                   .alias('amenity'))

# Add index column
df_amenities_process = add_index(df_amenities_process)

# Trim values of string
df_amenities_process = trim_whitespace(df_amenities_process, 'amenity')

# Print schema
print('Schema of table listing amenities')
df_amenities_process.printSchema()

Schema of amenities
root
 |-- id: string (nullable = true)
 |-- amenities: string (nullable = true)

Schema of table listing amenities
root
 |-- listing_id: long (nullable = true)
 |-- amenity: string (nullable = true)
 |-- id: long (nullable = false)



In [22]:
### Export listing amenities to parquet

write_parquet(df_amenities_process, 'listing_amenities.parquet')

### Process listings complements <a id=complements></a>

In [23]:
### Give structure to listings table

print('Schema of listings complements')
df_listings_complements.printSchema()

# Define dict with datatypes
COMPLEMENTS_MAIN_DTPYES = {'listing_id': psDT.LongType(),
                          'minimum_minimum_nights': psDT.IntegerType(),
                          'maximum_minimum_nights': psDT.IntegerType(),
                          'minimum_maximum_nights': psDT.IntegerType(),
                          'maximum_maximum_nights': psDT.IntegerType(),
                          'minimum_nights_avg_ntm': psDT.DoubleType(),
                          'maximum_nights_avg_ntm': psDT.DoubleType(),
                          'has_availability': psDT.BooleanType(),
                          'availability_30': psDT.IntegerType(),
                          'availability_60': psDT.IntegerType(),
                          'availability_90': psDT.IntegerType(),
                          'availability_365': psDT.IntegerType(),
                          'calendar_last_scraped': psDT.DateType(),
                          'number_of_reviews': psDT.IntegerType(),
                          'number_of_reviews_ltm': psDT.IntegerType(),
                          'number_of_reviews_l30d': psDT.IntegerType(),
                          'first_review': psDT.DateType(),
                          'last_review': psDT.DateType(),
                          'review_scores_rating': psDT.DoubleType(),
                          'review_scores_accuracy': psDT.DoubleType(),
                          'review_scores_cleanliness': psDT.DoubleType(),
                          'review_scores_checkin': psDT.DoubleType(),
                          'review_scores_communication': psDT.DoubleType(),
                          'review_scores_location': psDT.DoubleType(),
                          'review_scores_value': psDT.DoubleType(),
                          'instant_bookable': psDT.BooleanType(),
                          'calculated_host_listings_count': psDT.IntegerType(),
                          'calculated_host_listings_count_entire_homes': psDT.IntegerType(),
                          'calculated_host_listings_count_private_rooms': psDT.IntegerType(),
                          'calculated_host_listings_count_shared_rooms': psDT.IntegerType(),
                          'reviews_per_month': psDT.DoubleType()}

# Copy dataframe
df_listings_complements_process = df_listings_complements.select('*')

# Rename id columns to listing_id
df_listings_complements_process = rename(df_listings_complements_process, 'id', 'listing_id')

# Transformations per column
for COLUMN in df_listings_complements_process.columns:
    
    # Strip whitespaces
    df_listings_complements_process = trim_whitespace(df_listings_complements_process, COLUMN)
    
    # Replace None and N/A with null
    df_listings_complements_process = parse_nulls(df_listings_complements_process, COLUMN)
    
    # Change data type
    if(COLUMN in COMPLEMENTS_MAIN_DTPYES.keys()):
        df_listings_complements_process = cast_col(df_listings_complements_process,
                                                   COLUMN,
                                                   COMPLEMENTS_MAIN_DTPYES[COLUMN])

# Add index column
df_listings_complements_process = add_index(df_listings_complements_process)
        
# Print schema
print('Schema of table listings complements')
df_listings_complements_process.printSchema()

Schema of listings complements
root
 |-- id: string (nullable = true)
 |-- minimum_minimum_nights: string (nullable = true)
 |-- maximum_minimum_nights: string (nullable = true)
 |-- minimum_maximum_nights: string (nullable = true)
 |-- maximum_maximum_nights: string (nullable = true)
 |-- minimum_nights_avg_ntm: string (nullable = true)
 |-- maximum_nights_avg_ntm: string (nullable = true)
 |-- has_availability: string (nullable = true)
 |-- availability_30: string (nullable = true)
 |-- availability_60: string (nullable = true)
 |-- availability_90: string (nullable = true)
 |-- availability_365: string (nullable = true)
 |-- calendar_last_scraped: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- number_of_reviews_ltm: string (nullable = true)
 |-- number_of_reviews_l30d: string (nullable = true)
 |-- first_review: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- review_scores_rating: string (nullable = true)
 |-- review_scores_ac

In [24]:
### Export listing complements to parquet

write_parquet(df_listings_complements_process, 'listing_complements.parquet')

## Reviews <a id=reviews></a>

In [25]:
### Load reviews to Spark dataframe

df_reviews = read_csv('reviews.csv')

Schema of dataframe in reviews.csv
root
 |-- listing_id: string (nullable = true)
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- reviewer_id: string (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)



In [26]:
### Give structure to reviews table

print('Schema of reviews')
df_reviews.printSchema()

# Define dict with datatypes
REVIEWS_MAIN_DTPYES = {'listing_id': psDT.LongType(),
                      'id': psDT.LongType(),
                      'date': psDT.DateType(),
                      'reviewer_id': psDT.LongType()}

# Copy dataframe
df_reviews_process = df_reviews.select('*')

# Transformations per column
for COLUMN in df_reviews_process.columns:
    
    # Strip whitespaces
    df_reviews_process = trim_whitespace(df_reviews_process, COLUMN)
    
    # Replace None and N/A with null
    df_reviews_process = parse_nulls(df_reviews_process, COLUMN)
    
    # Change data type
    if(COLUMN in REVIEWS_MAIN_DTPYES.keys()):
        df_reviews_process = cast_col(df_reviews_process, COLUMN, REVIEWS_MAIN_DTPYES[COLUMN])
    
# Print schema
print('Schema of table listings complements')
df_reviews_process.printSchema()

Schema of reviews
root
 |-- listing_id: string (nullable = true)
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- reviewer_id: string (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)

Schema of table listings complements
root
 |-- listing_id: long (nullable = true)
 |-- id: long (nullable = true)
 |-- date: date (nullable = true)
 |-- reviewer_id: long (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)



In [27]:
### Export reviews to parquet

write_parquet(df_reviews_process, 'reviews.parquet')

## Calendar <a id=calendar></a>

In [28]:
### Load reviews to Spark dataframe

df_calendar = read_csv('calendar.csv')

Schema of dataframe in calendar.csv
root
 |-- listing_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- available: string (nullable = true)
 |-- price: string (nullable = true)
 |-- adjusted_price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- maximum_nights: string (nullable = true)



In [29]:
### Give structure to calendar

print('Schema of calendar')
df_calendar.printSchema()

# Define dict with datatypes
CALENDAR_MAIN_DTPYES = {'listing_id': psDT.LongType(),
                       'date': psDT.DateType(),
                       'available': psDT.BooleanType(),
                       'price': psDT.DoubleType(),
                       'adjusted_price': psDT.DoubleType(),
                       'minimum_nights': psDT.IntegerType(),
                       'maximum_nights': psDT.IntegerType()}

# Copy dataframe
df_calendar_process = df_calendar.select('*')

# Drop $ sign from price and adjusted_price
df_calendar_process = remove_substr(df_calendar_process, 'price', '\$')
df_calendar_process = remove_substr(df_calendar_process, 'adjusted_price', '\$')

# # Transformations per column
for COLUMN in df_calendar_process.columns:
    
    # Strip whitespaces
    df_calendar_process = trim_whitespace(df_calendar_process, COLUMN)
    
    # Replace None and N/A with null
    df_calendar_process = parse_nulls(df_calendar_process, COLUMN)
    
    # Change data type
    if(COLUMN in CALENDAR_MAIN_DTPYES.keys()):
        df_calendar_process = cast_col(df_calendar_process, COLUMN, CALENDAR_MAIN_DTPYES[COLUMN])

# Add index column
df_calendar_process = add_index(df_calendar_process)
        
# Print schema
print('Schema of table calendar')
df_calendar_process.printSchema()

Schema of calendar
root
 |-- listing_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- available: string (nullable = true)
 |-- price: string (nullable = true)
 |-- adjusted_price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- maximum_nights: string (nullable = true)

Schema of table calendar
root
 |-- listing_id: long (nullable = true)
 |-- date: date (nullable = true)
 |-- available: boolean (nullable = true)
 |-- price: double (nullable = true)
 |-- adjusted_price: double (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- maximum_nights: integer (nullable = true)
 |-- id: long (nullable = false)



In [30]:
### Export calendar to parquet

write_parquet(df_calendar_process, 'calendar.parquet')

## Push data to S3 bucket <a id=s3></a>

In [31]:
### Initialise AWS params

s3_client = boto3.client("s3")
s3_parquet_bucket_name = os.environ.get('AWS_BUCKET_PARQUET')
s3_csv_bucket_name = os.environ.get('AWS_BUCKET_CSV')

In [32]:
### Delete previous parquet files in bucket

try:
    objects_parquet = [object_.size for object_ in boto3.resource('s3').Bucket(s3_parquet_bucket_name).objects.all()]
    if(len(objects_parquet) > 0):
        s3 = boto3.resource('s3')
        s3_parquet_bucket = s3.Bucket(s3_parquet_bucket_name)
        s3_parquet_bucket.objects.all().delete()
        print('Successfully deleted previous versions')
    else:
        print('Bucket was empty')
except:
    print('Something went wrong with this operation')

Successfully deleted previous versions


In [33]:
### Write CSV files into S3 storage

files = [f for f in os.listdir(CSV_PATH) if os.path.isfile(os.path.join(CSV_PATH, f))]
for f in files:
    s3_client.upload_file(CSV_PATH + f, s3_csv_bucket_name, f)

In [34]:
### Write parquet files into S3 storage

dir_parquet = [d for d in os.listdir(PARQUET_PATH) if os.path.isdir(os.path.join(PARQUET_PATH, d))]

try:
    for dirs in dir_parquet:
        INNER_PATH = PARQUET_PATH + dirs
        files = [f for f in os.listdir(INNER_PATH) if os.path.isfile(os.path.join(INNER_PATH, f))]

        for f in files:
            s3_client.upload_file(INNER_PATH + '/' + f,
                                  s3_parquet_bucket_name,
                                  dirs + '/' + f)
    
    print('Successfully saved copies in S3 bucket')

except:
    print('Oh oh, something went wrong')

Successfully saved copies in S3 bucket


In [35]:
### Print total size of buckets

object_size_csv = [object_.size for object_ in boto3.resource('s3').Bucket(s3_csv_bucket_name).objects.all()]
object_size_parquet = [object_.size for object_ in boto3.resource('s3').Bucket(s3_parquet_bucket_name).objects.all()]

print('Total size of csv backup: {:.1f} MB'.format(sum(object_size_csv) / (1024 ** 2)))
print('Total size of parquet backup: {:.1f} MB'.format(sum(object_size_parquet) / (1024 ** 2)))

print('Compression ratio: {:.2f}'.format(sum(object_size_csv) / sum(object_size_parquet)))

Total size of csv backup: 1488.2 MB
Total size of parquet backup: 338.8 MB
Compression ratio: 4.39


## Push data to PostgreSQL database <a id=database></a>

In [68]:
### Insert records into the database

dataframes = {
    'calendar': df_calendar_process,
    'host_verification': df_host_verifications_process,
    'hosts': df_hosts_process,
    'listing_amenities': df_amenities_process,
    'listing_complements': df_listings_complements_process,
    'listings': df_listings_main_process,
    'neighbourhoods': df_neighbourhoods_process,
    'reviews': df_reviews_process
}

for TABLE_NAME in dataframes.keys():
    insert_to_db(dataframes[TABLE_NAME], TABLE_NAME)