<a href="https://colab.research.google.com/github/woshimajintao/SDM-Joint-Project/blob/main/Project%202/BDM_P2_Exploitation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark
!pip install delta-spark
!pip install statsmodels
!pip install mlflow

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=4d014162e79966e5c84ef0f8683a928ccae26adfaaca898da437757ad10c0150
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1
Collecting delta-spark
  Downloading delta_spark-3.2.0-py3-none-any.whl (21 kB)
Installing collected packages: delta-spark
Successfully installed delta-spark-3.2.0
Collecting mlflow
  Downloading mlflow-2.14.1-py3-none-any.whl (25.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━

In [19]:
import mlflow
import mlflow.spark
from google.colab import drive
from pyspark.sql import SparkSession
from pyspark.mllib.regression import LabeledPoint, LinearRegressionModel, LinearRegressionWithSGD
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.mllib.feature import StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, avg, count, when, datediff, current_date, to_date, regexp_replace, month, dayofmonth, sum as _sum, coalesce, lit, isnan, unix_timestamp, lag, from_json, lead, rand, percentile_approx
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, LongType, StringType
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import Row
from delta.tables import DeltaTable
from delta import configure_spark_with_delta_pip
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import os
import json
import time
import glob
import pandas as pd
import numpy as np
import tempfile
import shutil

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
# Initialize a Spark session
builder = SparkSession.builder \
    .appName("DataProcessingWithDeltaLake") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.2.1") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

delta_extensions = spark.conf.get("spark.sql.extensions", "")
print("Delta Lake loaded:", "io.delta.sql.DeltaSparkSessionExtension" in delta_extensions)

Delta Lake loaded: True


# Exploitation

## Descriptive Analysis

In [5]:
delta_lake_path = "/content/drive/MyDrive/BDM/Data/Formatted_Zone/filtered_df"

# Read the Delta Lake table
filtered_df = spark.read.format("delta").load(delta_lake_path)
filtered_df.printSchema()
filtered_df.show()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- room_type: string (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: date (nullable = true)
 |-- reviews_per_month: double (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)
 |-- number_of_reviews_ltm: integer (nullable = true)
 |-- license: string (nullable = true)
 |-- days_since_last_review: integer (nullable = true)
 |-- room_type_encoded: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- available: string (nullable = true)
 |-- price: float (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- maximum_nights:

### KPIs

In [6]:
# Average price per area
avg_price_per_neighbourhood_group = filtered_df.groupBy("neighbourhood_group").agg(F.avg("price").alias("average_price"))
avg_price_per_neighbourhood_group.show()

# Average price per room type
avg_price_per_room_type = filtered_df.groupBy("room_type").agg(F.avg("price").alias("average_price"))
avg_price_per_room_type.show()

# Vacancy rate per area
availability_rate_per_neighbourhood_group = filtered_df.groupBy("neighbourhood_group").agg(F.avg("availability_percentage").alias("availability_rate"))
availability_rate_per_neighbourhood_group.show()

# Trends in the total number of tourists per month
monthly_tourists_trend = filtered_df.groupBy(F.month("date").alias("month")).agg(F.sum("Total_Tourists").alias("total_tourists"))
monthly_tourists_trend.show()

# Save KPIs
avg_price_per_neighbourhood_group.coalesce(1).write.csv("/content/drive/MyDrive/BDM/Data/Exploitation_Zone/KPIs/avg_price_per_neighbourhood_group.csv", header=True, mode="overwrite")
avg_price_per_room_type.coalesce(1).write.csv("/content/drive/MyDrive/BDM/Data/Exploitation_Zone/KPIs/avg_price_per_room_type.csv", header=True, mode="overwrite")
availability_rate_per_neighbourhood_group.coalesce(1).write.csv("/content/drive/MyDrive/BDM/Data/Exploitation_Zone/KPIs/availability_rate_per_neighbourhood_group.csv", header=True, mode="overwrite")
monthly_tourists_trend.coalesce(1).write.csv("/content/drive/MyDrive/BDM/Data/Exploitation_Zone/KPIs/monthly_tourists_trend.csv", header=True, mode="overwrite")

+-------------------+------------------+
|neighbourhood_group|     average_price|
+-------------------+------------------+
|             Gràcia|252.91087552277205|
|         Sant Martí| 309.8315567543374|
|     Horta-Guinardó|129.51123475283543|
|          Les Corts|172.57770826473492|
|     Sants-Montjuïc|201.66637722817734|
|         Nou Barris| 95.39879839786381|
|Sarrià-Sant Gervasi|206.81039398652152|
|           Eixample| 361.8364371214326|
|        Sant Andreu|179.96796735526652|
|       Ciutat Vella|233.49953475036816|
+-------------------+------------------+

+---------------+------------------+
|      room_type|     average_price|
+---------------+------------------+
|    Shared room| 218.7249429078914|
|     Hotel room|1315.9984906419802|
|Entire home/apt|313.38256805448657|
|   Private room|179.66480239230583|
+---------------+------------------+

+-------------------+------------------+
|neighbourhood_group| availability_rate|
+-------------------+------------------+
|    

## Predictive Analysis

In [7]:
delta_lake_path = "/content/drive/MyDrive/BDM/Data/Formatted_Zone/filtered_df"

# Read the Delta Lake table
filtered_df = spark.read.format("delta").load(delta_lake_path)

### Data Splict

In [8]:
# spark = SparkSession.builder.appName("TouristRegression").getOrCreate()
# sc = spark.sparkContext

# df = filtered_df.select("Germany_Tourists", "Italy_Tourists").limit(5000)
# df.show(5)

# rdd = df.rdd.map(lambda row: LabeledPoint(row.Italy_Tourists, [row.Germany_Tourists]))
# rdd = rdd.sample(False, 1.0, seed=42)

# features = np.array([point.features[0] for point in rdd.collect()])
# labels = np.array([point.label for point in rdd.collect()])

# mean = np.mean(features)
# std = np.std(features)
# normalized_features = (features - mean) / std

# normalized_data = [LabeledPoint(label, [feature]) for label, feature in zip(labels, normalized_features)]

# train_rdd, test_rdd = sc.parallelize(normalized_data).randomSplit([0.8, 0.2], seed=42)

In [9]:
spark = SparkSession.builder.appName("TouristRegression").getOrCreate()
sc = spark.sparkContext

df = filtered_df.select("Germany_Tourists").limit(5000)
df.show(5)

tourist_data = df.collect()
labeled_points = []
for i in range(len(tourist_data) - 1):
    current_day = tourist_data[i]['Germany_Tourists']
    next_day = tourist_data[i + 1]['Germany_Tourists']
    labeled_points.append(LabeledPoint(next_day, [current_day]))

rdd = sc.parallelize(labeled_points)

features = np.array([point.features[0] for point in rdd.collect()])
labels = np.array([point.label for point in rdd.collect()])

mean = np.mean(features)
std = np.std(features)
normalized_features = (features - mean) / std

normalized_data = [LabeledPoint(label, [feature]) for label, feature in zip(labels, normalized_features)]

train_rdd, test_rdd = sc.parallelize(normalized_data).randomSplit([0.8, 0.2], seed=42)

+----------------+
|Germany_Tourists|
+----------------+
|              72|
|              60|
|              69|
|              50|
|              71|
+----------------+
only showing top 5 rows



### Model Training

In [10]:
learning_rate = 0.01
iterations = 100

lrm = LinearRegressionWithSGD.train(train_rdd, iterations=iterations, step=learning_rate, initialWeights=np.array([1.0]))



### Model Evaluation

In [11]:
test_features = np.array([point.features[0] for point in test_rdd.collect()])
test_labels = np.array(test_rdd.map(lambda p: p.label).collect())
test_original = test_features * std + mean

predictions = []
for feature in test_original:
    prediction = lrm.predict([feature])
    predictions.append(prediction)

mse = mean_squared_error(test_labels, predictions)
mae = mean_absolute_error(test_labels, predictions)
r2 = r2_score(test_labels, predictions)

print(f"Mean Squared Error (MSE): {mse}")
print(f"Mean Absolute Error (MAE): {mae}")
print(f"R-squared (R²): {r2}")

Mean Squared Error (MSE): 371.3144684453166
Mean Absolute Error (MAE): 15.88965944985962
R-squared (R²): -2.0633918289788498


### Model & Predictions Store

In [12]:
results = spark.createDataFrame([Row(label=float(l), prediction=float(p)) for l, p in zip(test_labels, predictions)])

In [13]:
output_path = "/content/drive/MyDrive/BDM/Data/Exploitation_Zone/predictions.csv"
results.write.mode("overwrite").csv(output_path, header=True)

In [14]:
model_path = "/content/drive/MyDrive/BDM/Data/Exploitation_Zone/lr_model"
if os.path.exists(model_path):
    shutil.rmtree(model_path)

lrm.save(sc, model_path)

# Stream analytics

## Load Saved

In [20]:
model_path = "/content/drive/MyDrive/BDM/Data/Exploitation_Zone/lr_model"
lr_model = LinearRegressionModel.load(sc, model_path)

## Data Flow Simulator Prepare

In [32]:
delta_lake_path = "/content/drive/MyDrive/BDM/Data/Formatted_Zone/filtered_df"

# Read the Delta Lake table
filtered_df = spark.read.format("delta").load(delta_lake_path)

In [28]:
df = filtered_df.select("Germany_Tourists").limit(20)

tourist_data = df.collect()
labeled_points = []
for i in range(len(tourist_data) - 1):
    current_day = tourist_data[i]['Germany_Tourists']
    next_day = tourist_data[i + 1]['Germany_Tourists']
    labeled_points.append(LabeledPoint(next_day, [current_day]))

rdd = sc.parallelize(labeled_points)
rdd.take(5)

[LabeledPoint(60.0, [72.0]),
 LabeledPoint(69.0, [60.0]),
 LabeledPoint(50.0, [69.0]),
 LabeledPoint(71.0, [50.0]),
 LabeledPoint(63.0, [71.0])]

## Streaming

In [29]:
def simulate_data_stream(rdd):
    for point in rdd.collect():
        yield point
        time.sleep(1)

for labeled_point in simulate_data_stream(rdd):
    current_feature = labeled_point.features[0]
    prediction = lr_model.predict([current_feature])
    print(f"Current: {current_feature}, Prediction: {prediction}")

Current: 72.0, Prediction: 56.397803354426024
Current: 60.0, Prediction: 46.998169462021686
Current: 69.0, Prediction: 54.04789488132494
Current: 50.0, Prediction: 39.16514121835141
Current: 71.0, Prediction: 55.614500530059
Current: 63.0, Prediction: 49.348077935122774
Current: 48.0, Prediction: 37.598535569617354
Current: 65.0, Prediction: 50.91468358385683
Current: 73.0, Prediction: 57.18110617879305
Current: 59.0, Prediction: 46.21486663765466
Current: 60.0, Prediction: 46.998169462021686
Current: 57.0, Prediction: 44.648260988920605
Current: 38.0, Prediction: 29.76550732594707
Current: 41.0, Prediction: 32.11541579904815
Current: 61.0, Prediction: 47.78147228638872
Current: 55.0, Prediction: 43.08165534018655
Current: 75.0, Prediction: 58.74771182752711
Current: 69.0, Prediction: 54.04789488132494
Current: 54.0, Prediction: 42.298352515819516


In [33]:
streaming_label = np.array(rdd.map(lambda p: p.label).collect())
results = spark.createDataFrame([Row(label=float(l), prediction=float(p)) for l, p in zip(streaming_label, predictions)])

In [34]:
output_path = "/content/drive/MyDrive/BDM/Data/Exploitation_Zone/Streaming_predictions.csv"
results.write.mode("overwrite").csv(output_path, header=True)