<a href="https://colab.research.google.com/github/yutao-data/Apache_Hive-TPC_DS/blob/main/Part%202/BDM_P2_Formatted_Exploitation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
!pip install pyspark
!pip install delta-spark
!pip install statsmodels
!pip install mlflow



In [43]:
import mlflow
import mlflow.spark
from google.colab import drive
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import LinearRegressionModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col, avg, count, when, datediff, current_date, to_date, regexp_replace, month, dayofmonth, sum as _sum, coalesce, lit, isnan, unix_timestamp, lag, from_json, lead
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, LongType, StringType
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable
from delta import configure_spark_with_delta_pip
import os
import json
import time
import glob
import pandas as pd

In [6]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [7]:
# Initialize a Spark session
builder = SparkSession.builder \
    .appName("DataProcessingWithDeltaLake") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.2.1") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

delta_extensions = spark.conf.get("spark.sql.extensions", "")
print("Delta Lake loaded:", "io.delta.sql.DeltaSparkSessionExtension" in delta_extensions)

Delta Lake loaded: True


# Formatter

## Process Airport Data

Data Ingestion

In [8]:
file_path = "/content/drive/MyDrive/BDM/Data/Landing_Zone/2023_TransitAeri_FlightRadar_Ppal_Pais.csv"
airport_df = spark.read.options(inferSchema=True, header=True).csv(file_path)
airport_df.show()

+---------------+---------+--------------------+-----------+
|Data_Referencia|Codi_Pais|            Nom_Pais|Nombre_Vols|
+---------------+---------+--------------------+-----------+
|     2023-01-01|        1|             Espanya|        193|
|     2023-01-01|        2|              Itàlia|         63|
|     2023-01-01|        3|          Regne Unit|         57|
|     2023-01-01|        4|              França|         63|
|     2023-01-01|        5|            Alemanya|         50|
|     2023-01-01|        6|            Portugal|         26|
|     2023-01-01|        7|       Països Baixos|         28|
|     2023-01-01|        8|              Suïssa|         26|
|     2023-01-01|        9|             Bèlgica|         10|
|     2023-01-01|       10|          Marroc, el|         21|
|     2023-01-01|       11|   Estats Units, els|          9|
|     2023-01-01|       12|             Àustria|          5|
|     2023-01-01|       13|             Irlanda|         12|
|     2023-01-01|       

Data Cleaning

In [9]:
# Drop duplicates
airport_df_cleaned = airport_df.dropDuplicates()

# Handle missing values
airport_df_cleaned = airport_df_cleaned.dropna()

airport_df_cleaned.show()
airport_df_cleaned.printSchema()

+---------------+---------+--------------------+-----------+
|Data_Referencia|Codi_Pais|            Nom_Pais|Nombre_Vols|
+---------------+---------+--------------------+-----------+
|     2023-01-02|        6|            Portugal|         29|
|     2023-01-30|        7|       Països Baixos|         20|
|     2023-02-21|        3|          Regne Unit|         58|
|     2023-02-22|       13|             Irlanda|          8|
|     2023-03-03|       23|               Qatar|          4|
|     2023-03-09|        2|              Itàlia|         75|
|     2023-03-16|        2|              Itàlia|         74|
|     2023-03-16|       23|               Qatar|          4|
|     2023-03-17|        7|       Països Baixos|         28|
|     2023-03-28|        5|            Alemanya|         65|
|     2023-03-31|        6|            Portugal|         40|
|     2023-04-02|       20|Emirats Àrabs Uni...|          6|
|     2023-04-03|        9|             Bèlgica|         22|
|     2023-04-18|       

Data Transformation

In [10]:
# Calculate total tourists per country and find top 5 countries
country_tourists = airport_df_cleaned.groupBy("Nom_Pais").agg(_sum("Nombre_Vols").alias("total_tourists"))
top_countries = country_tourists.orderBy(col("total_tourists").desc()).limit(5)
top_country_list = [row['Nom_Pais'] for row in top_countries.collect()]

# Filter and pivot the DataFrame for the top 5 countries
pivot_df = airport_df_cleaned.filter(col("Nom_Pais").isin(top_country_list)).groupBy("Data_Referencia").pivot("Nom_Pais").agg(_sum("Nombre_Vols"))

# Add a new column for total tourists and fill NaNs with zeros
for country in top_country_list:
    pivot_df = pivot_df.withColumn(country, coalesce(col(country), lit(0)))

pivot_df = pivot_df.withColumn("total_tourists", sum(col(country) for country in top_country_list))

airport_df_final = pivot_df.orderBy("Data_Referencia")

column_renames = {
    "Data_Referencia": "Date",
    "Alemanya": "Germany_Tourists",
    "Espanya": "Spain_Tourists",
    "França": "France_Tourists",
    "Itàlia": "Italy_Tourists",
    "Regne Unit": "UK_Tourists",
    "total_tourists": "Total_Tourists"
}

for old_col, new_col in column_renames.items():
    airport_df_final = airport_df_final.withColumnRenamed(old_col, new_col)

airport_df_final.show(5)

+----------+----------------+--------------+---------------+--------------+-----------+--------------+
|      Date|Germany_Tourists|Spain_Tourists|France_Tourists|Italy_Tourists|UK_Tourists|Total_Tourists|
+----------+----------------+--------------+---------------+--------------+-----------+--------------+
|2023-01-01|              50|           193|             63|            63|         57|           426|
|2023-01-02|              56|           212|             67|            82|         73|           490|
|2023-01-03|              54|           227|             57|            80|         59|           477|
|2023-01-04|              57|           217|             54|            71|         62|           461|
|2023-01-05|              53|           195|             55|            79|         55|           437|
+----------+----------------+--------------+---------------+--------------+-----------+--------------+
only showing top 5 rows



## Process Listing Data

Data Ingestion

In [11]:
file_path = "/content/drive/MyDrive/BDM/Data/Landing_Zone/listings.csv"
listings_df = spark.read.csv(file_path, header=True, inferSchema=True)
listings_df.show(5)

+------+--------------------+-------+--------------+-------------------+--------------------+-----------------+-----------------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+---------------------+-----------+
|    id|                name|host_id|     host_name|neighbourhood_group|       neighbourhood|         latitude|        longitude|      room_type|price|minimum_nights|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|number_of_reviews_ltm|    license|
+------+--------------------+-------+--------------+-------------------+--------------------+-----------------+-----------------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+---------------------+-----------+
| 17475|Rental unit in 08...|  65623|          Luca|           Eixample|la Dreta de l'Eix...|         41.3993

Data Cleaning

In [12]:
listings_df_cleaned = listings_df.dropDuplicates().dropna()

Data Transformation

In [13]:
listings_df_cleaned = listings_df_cleaned.fillna({'license': 'Unknown', 'reviews_per_month': 0})

# Convert price to integer
listings_df_cleaned = listings_df_cleaned.withColumn("price", col("price").cast("integer"))

# Calculate average price per neighbourhood
avg_price_per_neighbourhood = listings_df_cleaned.groupBy("neighbourhood").agg(avg("price").alias("avg_price"))

# Calculate number of listings per neighbourhood
count_listings_per_neighbourhood = listings_df_cleaned.groupBy("neighbourhood").agg(count("id").alias("listings_count"))

# Handle date columns
listings_df_cleaned = listings_df_cleaned.withColumn("last_review", col("last_review").cast("date"))
listings_df_cleaned = listings_df_cleaned.withColumn("days_since_last_review", datediff(current_date(), col("last_review")))

listings_df_final = listings_df_cleaned.withColumn(
    "room_type_encoded",
    when(col("room_type") == "Entire home/apt", 1)
    .when(col("room_type") == "Private room", 2)
    .when(col("room_type") == "Shared room", 3)
    .when(col("room_type") == "Hotel room", 4)
    .otherwise(0)
)

listings_df_final = listings_df_final.drop("price")
listings_df_final = listings_df_final.drop("minimum_nights")
listings_df_final = listings_df_final.drop("maximum_nights")

listings_df_final.show(5)

+------+--------------------+-------+----------+-------------------+--------------------+--------+---------+---------------+-----------------+-----------+-----------------+------------------------------+----------------+---------------------+-------------+----------------------+-----------------+
|    id|                name|host_id| host_name|neighbourhood_group|       neighbourhood|latitude|longitude|      room_type|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|number_of_reviews_ltm|      license|days_since_last_review|room_type_encoded|
+------+--------------------+-------+----------+-------------------+--------------------+--------+---------+---------------+-----------------+-----------+-----------------+------------------------------+----------------+---------------------+-------------+----------------------+-----------------+
|206167|Rental unit in Ba...|1014050|   Minerva|     Sants-Montjuïc|        el Poble Sec| 41.3724|   2.164

## Process Calendar Data

Data Ingestion

In [14]:
file_path = "/content/drive/MyDrive/BDM/Data/Landing_Zone/calendar.csv"
calendar_df = spark.read.csv(file_path, header=True, inferSchema=True)
calendar_df.show(5)

+----------+----------+---------+-------+--------------+--------------+--------------+
|listing_id|      date|available|  price|adjusted_price|minimum_nights|maximum_nights|
+----------+----------+---------+-------+--------------+--------------+--------------+
|    198958|2023-12-14|        t|$190.00|          NULL|             2|           365|
|    198958|2023-12-15|        t|$190.00|          NULL|             2|           365|
|    198958|2023-12-16|        t|$190.00|          NULL|             2|           365|
|    198958|2023-12-17|        t|$190.00|          NULL|             2|           365|
|    198958|2023-12-18|        t|$190.00|          NULL|             2|           365|
+----------+----------+---------+-------+--------------+--------------+--------------+
only showing top 5 rows



Data Cleaning

In [15]:
calendar_df_cleaned = calendar_df.dropDuplicates()
calendar_df_cleaned.cache()

# Remove currency symbols and commas from the price column, then convert to float
calendar_df_cleaned = calendar_df_cleaned.withColumn("price", regexp_replace(col("price"), "[$,]", "").cast("float"))
calendar_df_cleaned = calendar_df_cleaned.dropna(subset=["price"])

avg_price = calendar_df_cleaned.agg(avg("price")).first()[0]

# Handle missing values by filling missing prices with the average price
calendar_df_cleaned = calendar_df_cleaned.na.fill({'price': avg_price})

Data Transformation

In [16]:
calendar_df_cleaned = calendar_df_cleaned.withColumn("date", to_date(col("date"), 'yyyy-MM-dd'))

# Calculate availability percentage per listing_id
availability_df = calendar_df_cleaned.groupBy("listing_id").agg(
    (count(when(col("available") == "t", 1)) / count("available") * 100).alias("availability_percentage")
)

# Join availability_df back to the main dataframe
calendar_df_final = calendar_df_cleaned.join(availability_df, on="listing_id", how="left")

# Show the transformed DataFrame
calendar_df_final.show(5)

+----------+----------+---------+------+--------------+--------------+--------------+-----------------------+
|listing_id|      date|available| price|adjusted_price|minimum_nights|maximum_nights|availability_percentage|
+----------+----------+---------+------+--------------+--------------+--------------+-----------------------+
|     36763|2024-05-12|        t|  30.0|          NULL|            31|            65|     57.534246575342465|
|     36763|2024-02-07|        f|  30.0|          NULL|            31|            65|     57.534246575342465|
|     40983|2024-11-20|        t| 205.0|          NULL|             2|           364|      78.63013698630137|
|     46153|2024-03-03|        f|  70.0|          NULL|            31|          1125|      66.57534246575342|
|  20472746|2024-11-28|        t|1292.0|     $1,292.00|             4|             7|      88.76712328767124|
+----------+----------+---------+------+--------------+--------------+--------------+-----------------------+
only showi

## Data Integration

In [17]:
# Merge Listings and Calendar
merged_df = listings_df_final.join(calendar_df_final, listings_df_final.id == calendar_df_final.listing_id, "inner")
merged_df = merged_df.drop(calendar_df_final.listing_id)
merged_df.show(5)

+-----+--------------------+-------+---------+-------------------+--------------------+--------+---------+---------------+-----------------+-----------+-----------------+------------------------------+----------------+---------------------+-----------+----------------------+-----------------+----------+---------+-----+--------------+--------------+--------------+-----------------------+
|   id|                name|host_id|host_name|neighbourhood_group|       neighbourhood|latitude|longitude|      room_type|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|number_of_reviews_ltm|    license|days_since_last_review|room_type_encoded|      date|available|price|adjusted_price|minimum_nights|maximum_nights|availability_percentage|
+-----+--------------------+-------+---------+-------------------+--------------------+--------+---------+---------------+-----------------+-----------+-----------------+------------------------------+----------------+--

In [18]:
# Merge with Airport Data
merged_df = merged_df.withColumn("month_day", col("date").substr(6, 5))
airport_df_final = airport_df_final.withColumn("month_day", col("Date").substr(6, 5))
final_merged_df = merged_df.join(airport_df_final.drop("Date"), on="month_day", how="inner")
final_merged_df = final_merged_df.drop("month_day")
final_merged_df = final_merged_df.drop("adjusted_price")

columns_to_drop = [col for col in final_merged_df.columns if col.endswith("price") and col != "price"]
final_merged_df = final_merged_df.drop(*columns_to_drop)

final_merged_df.show(5)

+-----+--------------------+-------+---------+-------------------+--------------------+--------+---------+---------------+-----------------+-----------+-----------------+------------------------------+----------------+---------------------+-----------+----------------------+-----------------+----------+---------+-----+--------------+--------------+-----------------------+----------------+--------------+---------------+--------------+-----------+--------------+
|   id|                name|host_id|host_name|neighbourhood_group|       neighbourhood|latitude|longitude|      room_type|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|number_of_reviews_ltm|    license|days_since_last_review|room_type_encoded|      date|available|price|minimum_nights|maximum_nights|availability_percentage|Germany_Tourists|Spain_Tourists|France_Tourists|Italy_Tourists|UK_Tourists|Total_Tourists|
+-----+--------------------+-------+---------+-------------------+----

In [19]:
final_merged_df.columns

['id',
 'name',
 'host_id',
 'host_name',
 'neighbourhood_group',
 'neighbourhood',
 'latitude',
 'longitude',
 'room_type',
 'number_of_reviews',
 'last_review',
 'reviews_per_month',
 'calculated_host_listings_count',
 'availability_365',
 'number_of_reviews_ltm',
 'license',
 'days_since_last_review',
 'room_type_encoded',
 'date',
 'available',
 'price',
 'minimum_nights',
 'maximum_nights',
 'availability_percentage',
 'Germany_Tourists',
 'Spain_Tourists',
 'France_Tourists',
 'Italy_Tourists',
 'UK_Tourists',
 'Total_Tourists']

## Data Filtering

In [20]:
filtered_df = final_merged_df.filter(col("price") >= 0)
filtered_df = filtered_df.filter((col("date") >= "2020-01-01") & (col("date") <= "2024-06-15"))
filtered_df = filtered_df.dropna(subset=["price", "availability_percentage"])
filtered_df.show(5)

+-----+--------------------+-------+---------+-------------------+--------------------+--------+---------+---------------+-----------------+-----------+-----------------+------------------------------+----------------+---------------------+-----------+----------------------+-----------------+----------+---------+-----+--------------+--------------+-----------------------+----------------+--------------+---------------+--------------+-----------+--------------+
|   id|                name|host_id|host_name|neighbourhood_group|       neighbourhood|latitude|longitude|      room_type|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|number_of_reviews_ltm|    license|days_since_last_review|room_type_encoded|      date|available|price|minimum_nights|maximum_nights|availability_percentage|Germany_Tourists|Spain_Tourists|France_Tourists|Italy_Tourists|UK_Tourists|Total_Tourists|
+-----+--------------------+-------+---------+-------------------+----

In [21]:
row_count = filtered_df.count()
column_count = len(filtered_df.columns)

print(f"Number of rows after filtering: {row_count}")
print(f"Number of columns: {column_count}")

Number of rows after filtering: 1852749
Number of columns: 30


## Data Validation

In [22]:
# Check for duplicate rows
duplicate_count = filtered_df.count() - filtered_df.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_count}")

# Separate numeric and non-numeric columns
numeric_columns = [field.name for field in filtered_df.schema.fields if field.dataType in ['IntegerType', 'LongType', 'FloatType', 'DoubleType']]
non_numeric_columns = [field.name for field in filtered_df.schema.fields if field.dataType not in ['IntegerType', 'LongType', 'FloatType', 'DoubleType']]

# Check for null and missing values in numeric columns
null_counts_numeric = filtered_df.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in numeric_columns])
null_counts_numeric.show()

# Check for null values in non-numeric columns
null_counts_non_numeric = filtered_df.select([count(when(col(c).isNull(), c)).alias(c) for c in non_numeric_columns])
null_counts_non_numeric.show()

# Verify data types
filtered_df.printSchema()

# Price should be non-negative
invalid_prices = filtered_df.filter(col("price") < 0).count()
print(f"Number of invalid price values: {invalid_prices}")

# Availability percentage should be between 0 and 100
invalid_availability = filtered_df.filter((col("availability_percentage") < 0) | (col("availability_percentage") > 100)).count()
print(f"Number of invalid availability_percentage values: {invalid_availability}")

# Uniqueness check for IDs
unique_id_count = filtered_df.select("id").distinct().count()
total_id_count = filtered_df.count()
print(f"Number of unique ids: {unique_id_count}, Total ids: {total_id_count}")

# Latitude should be between -90 and 90
invalid_latitude = filtered_df.filter((col("latitude") < -90) | (col("latitude") > 90)).count()
print(f"Number of invalid latitude values: {invalid_latitude}")

# Longitude should be between -180 and 180
invalid_longitude = filtered_df.filter((col("longitude") < -180) | (col("longitude") > 180)).count()
print(f"Number of invalid longitude values: {invalid_longitude}")

# Summary statistics
filtered_df.describe(["price", "minimum_nights", "number_of_reviews", "reviews_per_month", "availability_365", "availability_percentage"]).show()

# Distribution checks for specific columns
filtered_df.groupBy("room_type").count().show()
filtered_df.groupBy("neighbourhood_group").count().show()

# Consistency checks : Last review should be before the current date
invalid_last_review = filtered_df.filter(col("last_review") > current_date()).count()
print(f"Number of invalid last_review values: {invalid_last_review}")

Number of duplicate rows: 0
++
||
++
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
++
only showing top 20 rows

+---+----+-------+---------+-------------------+-------------+--------+---------+---------+-----------------+-----------+-----------------+------------------------------+----------------+---------------------+-------+----------------------+-----------------+----+---------+-----+--------------+--------------+-----------------------+----------------+--------------+---------------+--------------+-----------+--------------+
| id|name|host_id|host_name|neighbourhood_group|neighbourhood|latitude|longitude|room_type|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|number_of_reviews_ltm|license|days_since_last_review|room_type_encoded|date|available|price|minimum_nights|maximum_nights|availability_percentage|Germany_Tourists|Spain_Tourists|France_Tourists|Italy_Tourists|UK_Tourists|Total_Tourists|
+---+----+-------+--------

## Store to Formatted Zone

In [23]:
delta_lake_path = "/content/drive/MyDrive/BDM/Data/Formatted_Zone/filtered_df"

filtered_df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").save(delta_lake_path)

# Exploitation

## Descriptive Analysis

In [24]:
delta_lake_path = "/content/drive/MyDrive/BDM/Data/Formatted_Zone/filtered_df"

# Read the Delta Lake table
filtered_df = spark.read.format("delta").load(delta_lake_path)
filtered_df.printSchema()
filtered_df.show()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- room_type: string (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: date (nullable = true)
 |-- reviews_per_month: double (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)
 |-- number_of_reviews_ltm: integer (nullable = true)
 |-- license: string (nullable = true)
 |-- days_since_last_review: integer (nullable = true)
 |-- room_type_encoded: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- available: string (nullable = true)
 |-- price: float (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- maximum_nights:

### KPIs

In [25]:
# Average price per area
avg_price_per_neighbourhood_group = filtered_df.groupBy("neighbourhood_group").agg(F.avg("price").alias("average_price"))
avg_price_per_neighbourhood_group.show()

# Average price per room type
avg_price_per_room_type = filtered_df.groupBy("room_type").agg(F.avg("price").alias("average_price"))
avg_price_per_room_type.show()

# Vacancy rate per area
availability_rate_per_neighbourhood_group = filtered_df.groupBy("neighbourhood_group").agg(F.avg("availability_percentage").alias("availability_rate"))
availability_rate_per_neighbourhood_group.show()

# Trends in the total number of tourists per month
monthly_tourists_trend = filtered_df.groupBy(F.month("date").alias("month")).agg(F.sum("Total_Tourists").alias("total_tourists"))
monthly_tourists_trend.show()

# Save KPIs
avg_price_per_neighbourhood_group.coalesce(1).write.csv("/content/drive/MyDrive/BDM/Data/Exploitation_Zone/KPIs/avg_price_per_neighbourhood_group.csv", header=True, mode="overwrite")
avg_price_per_room_type.coalesce(1).write.csv("/content/drive/MyDrive/BDM/Data/Exploitation_Zone/KPIs/avg_price_per_room_type.csv", header=True, mode="overwrite")
availability_rate_per_neighbourhood_group.coalesce(1).write.csv("/content/drive/MyDrive/BDM/Data/Exploitation_Zone/KPIs/availability_rate_per_neighbourhood_group.csv", header=True, mode="overwrite")
monthly_tourists_trend.coalesce(1).write.csv("/content/drive/MyDrive/BDM/Data/Exploitation_Zone/KPIs/monthly_tourists_trend.csv", header=True, mode="overwrite")

+-------------------+------------------+
|neighbourhood_group|     average_price|
+-------------------+------------------+
|             Gràcia|252.91087552277205|
|         Sant Martí| 309.8315567543374|
|     Horta-Guinardó|129.51123475283543|
|          Les Corts|172.57770826473492|
|     Sants-Montjuïc|201.66637722817734|
|         Nou Barris| 95.39879839786381|
|Sarrià-Sant Gervasi|206.81039398652152|
|           Eixample| 361.8364371214326|
|        Sant Andreu|179.96796735526652|
|       Ciutat Vella|233.49953475036816|
+-------------------+------------------+

+---------------+------------------+
|      room_type|     average_price|
+---------------+------------------+
|    Shared room| 218.7249429078914|
|     Hotel room|1315.9984906419802|
|Entire home/apt|313.38256805448657|
|   Private room|179.66480239230583|
+---------------+------------------+

+-------------------+------------------+
|neighbourhood_group| availability_rate|
+-------------------+------------------+
|    

## Predictive Analysis

In [45]:
delta_lake_path = "/content/drive/MyDrive/BDM/Data/Formatted_Zone/filtered_df"

# Read the Delta Lake table
filtered_df = spark.read.format("delta").load(delta_lake_path)
filtered_df.printSchema()
filtered_df.show()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- room_type: string (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: date (nullable = true)
 |-- reviews_per_month: double (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)
 |-- number_of_reviews_ltm: integer (nullable = true)
 |-- license: string (nullable = true)
 |-- days_since_last_review: integer (nullable = true)
 |-- room_type_encoded: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- available: string (nullable = true)
 |-- price: float (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- maximum_nights:

### Data Splict

In [29]:
filtered_df = filtered_df.withColumn("date", col("date").cast("date"))
filtered_df.printSchema()
filtered_df.show(5)

window_spec = Window.partitionBy("id").orderBy("date")

filtered_df = filtered_df.withColumn("next_day_tourists", lead("Total_Tourists").over(window_spec))

filtered_df = filtered_df.filter(col("next_day_tourists").isNotNull())

filtered_df.show(5)

feature_columns = [
    "host_id", "latitude", "longitude", "room_type_encoded", "number_of_reviews",
    "reviews_per_month", "calculated_host_listings_count", "availability_365",
    "days_since_last_review", "price", "minimum_nights", "maximum_nights",
    "availability_percentage", "Germany_Tourists", "Spain_Tourists", "France_Tourists",
    "Italy_Tourists", "UK_Tourists", "Total_Tourists"
]
target_column = "next_day_tourists"

train_ratio = 0.8
train_df, test_df = filtered_df.randomSplit([train_ratio, 1 - train_ratio], seed=42)

train_df.show(5)
test_df.show(5)

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- room_type: string (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: date (nullable = true)
 |-- reviews_per_month: double (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)
 |-- number_of_reviews_ltm: integer (nullable = true)
 |-- license: string (nullable = true)
 |-- days_since_last_review: integer (nullable = true)
 |-- room_type_encoded: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- available: string (nullable = true)
 |-- price: float (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- maximum_nights:

### Model Training

In [30]:
# Define the features and label columns for the VectorAssembler
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Transform the training and test sets
train_df = assembler.transform(train_df).select("features", col(target_column).alias("label"))
test_df = assembler.transform(test_df).select("features", col(target_column).alias("label"))

lr = LinearRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_df)

predictions = lr_model.transform(test_df)

predictions.select("features", "label", "prediction").show(5)

+--------------------+-----+-----------------+
|            features|label|       prediction|
+--------------------+-----+-----------------+
|[177617.0,41.3963...|  544|527.7573673722007|
|[177617.0,41.3963...|  579|533.7668767812655|
|[177617.0,41.3963...|  539|565.6336362187195|
|[177617.0,41.3963...|  562|523.4028226708232|
|[177617.0,41.3963...|  477| 502.604398591066|
+--------------------+-----+-----------------+
only showing top 5 rows



### Model Validation

In [31]:
# Evaluate the model using RMSE, MAE, and R2
evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator_rmse.evaluate(predictions)

evaluator_mae = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")
mae = evaluator_mae.evaluate(predictions)

evaluator_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")
print(f"Mean Absolute Error (MAE) on test data = {mae}")
print(f"R-squared (R2) on test data = {r2}")

Root Mean Squared Error (RMSE) on test data = 48.87104892571636
Mean Absolute Error (MAE) on test data = 40.433859916921726
R-squared (R2) on test data = 0.5263993754115708


### Model Store

In [34]:
# Select relevant columns for visualization
predictions_to_save = predictions.select("prediction", "label")
test_df_to_save = test_df.select("label")

# Save the predictions and test set as CSV
predictions_path = "/content/drive/MyDrive/BDM/Data/Exploitation_Zone/predictions.csv"
test_set_path = "/content/drive/MyDrive/BDM/Data/Exploitation_Zone/test_set.csv"

predictions_to_save.write.mode("overwrite").csv(predictions_path, header=True)
test_df_to_save.write.mode("overwrite").csv(test_set_path, header=True)

print(f"Predictions saved to {predictions_path}")
print(f"Test set saved to {test_set_path}")

Predictions saved to /content/drive/MyDrive/BDM/Data/Exploitation_Zone/predictions.csv
Test set saved to /content/drive/MyDrive/BDM/Data/Exploitation_Zone/test_set.csv


In [37]:
# Save the model
model_path = "/content/drive/MyDrive/BDM/Data/Exploitation_Zone/lr_model"
lr_model.write().overwrite().save(model_path)
print(f"Model saved to {model_path}")

Model saved to /content/drive/MyDrive/BDM/Data/Exploitation_Zone/lr_model


# Stream analytics

## Data Flow Simulator Prepare

In [38]:
sample_df = filtered_df.limit(10).toPandas()

sample_df['date'] = sample_df['date'].astype(str)
sample_df['last_review'] = sample_df['last_review'].astype(str)

data_dir = "/content/data_stream"
os.makedirs(data_dir, exist_ok=True)

## Spark Streaming

In [44]:
for index, row in sample_df.iterrows():
    message = row.to_dict()
    file_path = os.path.join(data_dir, f"data_{index}.json")
    with open(file_path, "w") as file:
        json.dump(message, file)
    time.sleep(1)

schema = StructType([
    StructField("id", LongType(), True),
    StructField("host_id", IntegerType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("number_of_reviews", IntegerType(), True),
    StructField("last_review", StringType(), True),
    StructField("reviews_per_month", DoubleType(), True),
    StructField("calculated_host_listings_count", IntegerType(), True),
    StructField("availability_365", IntegerType(), True),
    StructField("number_of_reviews_ltm", IntegerType(), True),
    StructField("days_since_last_review", IntegerType(), True),
    StructField("room_type_encoded", IntegerType(), True),
    StructField("date", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("minimum_nights", IntegerType(), True),
    StructField("maximum_nights", IntegerType(), True),
    StructField("availability_percentage", DoubleType(), True),
    StructField("Germany_Tourists", LongType(), True),
    StructField("Spain_Tourists", LongType(), True),
    StructField("France_Tourists", LongType(), True),
    StructField("Italy_Tourists", LongType(), True),
    StructField("UK_Tourists", LongType(), True),
    StructField("Total_Tourists", LongType(), True),
])

model_path = "/content/drive/MyDrive/BDM/Data/Exploitation_Zone/lr_model"
lr_model = LinearRegressionModel.load(model_path)

df_stream = spark \
    .readStream \
    .schema(schema) \
    .json("/content/data_stream")

feature_columns = [
    "host_id", "latitude", "longitude", "room_type_encoded", "number_of_reviews",
    "reviews_per_month", "calculated_host_listings_count", "availability_365",
    "days_since_last_review", "price", "minimum_nights", "maximum_nights",
    "availability_percentage", "Germany_Tourists", "Spain_Tourists", "France_Tourists",
    "Italy_Tourists", "UK_Tourists", "Total_Tourists"
]

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_features = assembler.transform(df_stream)

predictions = lr_model.transform(df_features)

output_path = "/content/predictions_output"

query = predictions.select("features", "prediction", "Total_Tourists", "date").writeStream \
    .outputMode("append") \
    .format("json") \
    .option("path", output_path) \
    .option("checkpointLocation", "/content/checkpoint") \
    .start()

time.sleep(20)
query.stop()

result_files = glob.glob(f"{output_path}/*.json")
results = []

for file in result_files:
    with open(file, "r") as f:
        for line in f:
            results.append(json.loads(line))

results_df = pd.DataFrame(results)
print(results_df.head())

                                            features  prediction  \
0  {'type': 1, 'values': [177617.0, 41.39631, 2.1...  527.757367   
1  {'type': 1, 'values': [177617.0, 41.39631, 2.1...  559.923428   
2  {'type': 1, 'values': [177617.0, 41.39631, 2.1...  531.275637   
3  {'type': 1, 'values': [177617.0, 41.39631, 2.1...  533.766877   
4  {'type': 1, 'values': [177617.0, 41.39631, 2.1...  538.804282   

   Total_Tourists        date  
0             489  2023-12-16  
1             579  2023-12-18  
2             544  2023-12-19  
3             571  2023-12-20  
4             539  2023-12-23  
