<a href="https://colab.research.google.com/github/yutao-data/Apache_Hive_TPC_DI/blob/master/Project%202/BDM_P2_Exploitation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark
!pip install delta-spark
!pip install statsmodels
!pip install mlflow

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=c4fa3fa353d306e3534e9178a4ba99b84b95fd7a17a78eb8153aef1222336edb
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1
Collecting delta-spark
  Downloading delta_spark-3.2.0-py3-none-any.whl (21 kB)
Installing collected packages: delta-spark
Successfully installed delta-spark-3.2.0
Collecting mlflow
  Downloading mlflow-2.14.0-py3-none-any.whl (25.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
import mlflow
import mlflow.spark
from google.colab import drive
from pyspark.sql import SparkSession
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.mllib.feature import StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, avg, count, when, datediff, current_date, to_date, regexp_replace, month, dayofmonth, sum as _sum, coalesce, lit, isnan, unix_timestamp, lag, from_json, lead, rand, percentile_approx
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, LongType, StringType
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import Row
from delta.tables import DeltaTable
from delta import configure_spark_with_delta_pip
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import os
import json
import time
import glob
import pandas as pd
import numpy as np
import tempfile
import shutil

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
# Initialize a Spark session
builder = SparkSession.builder \
    .appName("DataProcessingWithDeltaLake") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.2.1") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

delta_extensions = spark.conf.get("spark.sql.extensions", "")
print("Delta Lake loaded:", "io.delta.sql.DeltaSparkSessionExtension" in delta_extensions)

Delta Lake loaded: True


# Exploitation

## Descriptive Analysis

In [None]:
delta_lake_path = "/content/drive/MyDrive/BDM/Data/Formatted_Zone/filtered_df"

# Read the Delta Lake table
filtered_df = spark.read.format("delta").load(delta_lake_path)
filtered_df.printSchema()
filtered_df.show()

### KPIs

In [None]:
# Average price per area
avg_price_per_neighbourhood_group = filtered_df.groupBy("neighbourhood_group").agg(F.avg("price").alias("average_price"))
avg_price_per_neighbourhood_group.show()

# Average price per room type
avg_price_per_room_type = filtered_df.groupBy("room_type").agg(F.avg("price").alias("average_price"))
avg_price_per_room_type.show()

# Vacancy rate per area
availability_rate_per_neighbourhood_group = filtered_df.groupBy("neighbourhood_group").agg(F.avg("availability_percentage").alias("availability_rate"))
availability_rate_per_neighbourhood_group.show()

# Trends in the total number of tourists per month
monthly_tourists_trend = filtered_df.groupBy(F.month("date").alias("month")).agg(F.sum("Total_Tourists").alias("total_tourists"))
monthly_tourists_trend.show()

# Save KPIs
avg_price_per_neighbourhood_group.coalesce(1).write.csv("/content/drive/MyDrive/BDM/Data/Exploitation_Zone/KPIs/avg_price_per_neighbourhood_group.csv", header=True, mode="overwrite")
avg_price_per_room_type.coalesce(1).write.csv("/content/drive/MyDrive/BDM/Data/Exploitation_Zone/KPIs/avg_price_per_room_type.csv", header=True, mode="overwrite")
availability_rate_per_neighbourhood_group.coalesce(1).write.csv("/content/drive/MyDrive/BDM/Data/Exploitation_Zone/KPIs/availability_rate_per_neighbourhood_group.csv", header=True, mode="overwrite")
monthly_tourists_trend.coalesce(1).write.csv("/content/drive/MyDrive/BDM/Data/Exploitation_Zone/KPIs/monthly_tourists_trend.csv", header=True, mode="overwrite")

## Predictive Analysis

In [None]:
delta_lake_path = "/content/drive/MyDrive/BDM/Data/Formatted_Zone/filtered_df"

# Read the Delta Lake table
filtered_df = spark.read.format("delta").load(delta_lake_path)

### Data Splict

In [None]:
# spark = SparkSession.builder.appName("TouristRegression").getOrCreate()
# sc = spark.sparkContext

# df = filtered_df.select("Germany_Tourists", "Italy_Tourists").limit(5000)
# df.show(5)

# rdd = df.rdd.map(lambda row: LabeledPoint(row.Italy_Tourists, [row.Germany_Tourists]))
# rdd = rdd.sample(False, 1.0, seed=42)

# features = np.array([point.features[0] for point in rdd.collect()])
# labels = np.array([point.label for point in rdd.collect()])

# mean = np.mean(features)
# std = np.std(features)
# normalized_features = (features - mean) / std

# normalized_data = [LabeledPoint(label, [feature]) for label, feature in zip(labels, normalized_features)]

# train_rdd, test_rdd = sc.parallelize(normalized_data).randomSplit([0.8, 0.2], seed=42)

In [None]:
spark = SparkSession.builder.appName("TouristRegression").getOrCreate()
sc = spark.sparkContext

df = filtered_df.select("Germany_Tourists").limit(5000)
df.show(5)

tourist_data = df.collect()
labeled_points = []
for i in range(len(tourist_data) - 1):
    current_day = tourist_data[i]['Germany_Tourists']
    next_day = tourist_data[i + 1]['Germany_Tourists']
    labeled_points.append(LabeledPoint(next_day, [current_day]))

rdd = sc.parallelize(labeled_points)

features = np.array([point.features[0] for point in rdd.collect()])
labels = np.array([point.label for point in rdd.collect()])

mean = np.mean(features)
std = np.std(features)
normalized_features = (features - mean) / std

normalized_data = [LabeledPoint(label, [feature]) for label, feature in zip(labels, normalized_features)]

train_rdd, test_rdd = sc.parallelize(normalized_data).randomSplit([0.8, 0.2], seed=42)

### Model Training

In [None]:
learning_rate = 0.01
iterations = 100

lrm = LinearRegressionWithSGD.train(train_rdd, iterations=iterations, step=learning_rate, initialWeights=np.array([1.0]))

### Model Evaluation

In [None]:
test_features = np.array([point.features[0] for point in test_rdd.collect()])
test_labels = np.array(test_rdd.map(lambda p: p.label).collect())
test_original = test_features * std + mean

predictions = []
for feature in test_original:
    prediction = lrm.predict([feature])
    predictions.append(prediction)

mse = mean_squared_error(test_labels, predictions)
mae = mean_absolute_error(test_labels, predictions)
r2 = r2_score(test_labels, predictions)

print(f"Mean Squared Error (MSE): {mse}")
print(f"Mean Absolute Error (MAE): {mae}")
print(f"R-squared (R²): {r2}")

### Model & Predictions Store

In [None]:
results = spark.createDataFrame([Row(label=float(l), prediction=float(p)) for l, p in zip(test_labels, predictions)])

In [None]:
output_path = "/content/drive/MyDrive/BDM/Data/Exploitation_Zone/predictions.csv"
results.write.mode("overwrite").csv(output_path, header=True)

In [None]:
model_path = "/content/drive/MyDrive/BDM/Data/Exploitation_Zone/lr_model"
if os.path.exists(model_path):
    shutil.rmtree(model_path)

lrm.save(sc, model_path)

# Stream analytics

## Data Flow Simulator Prepare

In [None]:
sample_df = filtered_df.limit(10).toPandas()

sample_df['date'] = sample_df['date'].astype(str)
sample_df['last_review'] = sample_df['last_review'].astype(str)

data_dir = "/content/data_stream"
os.makedirs(data_dir, exist_ok=True)

## Spark Streaming

In [None]:
# for index, row in sample_df.iterrows():
#     message = row.to_dict()
#     file_path = os.path.join(data_dir, f"data_{index}.json")
#     with open(file_path, "w") as file:
#         json.dump(message, file)
#     time.sleep(1)

# schema = StructType([
#     StructField("id", LongType(), True),
#     StructField("host_id", IntegerType(), True),
#     StructField("latitude", DoubleType(), True),
#     StructField("longitude", DoubleType(), True),
#     StructField("number_of_reviews", IntegerType(), True),
#     StructField("last_review", StringType(), True),
#     StructField("reviews_per_month", DoubleType(), True),
#     StructField("calculated_host_listings_count", IntegerType(), True),
#     StructField("availability_365", IntegerType(), True),
#     StructField("number_of_reviews_ltm", IntegerType(), True),
#     StructField("days_since_last_review", IntegerType(), True),
#     StructField("room_type_encoded", IntegerType(), True),
#     StructField("date", StringType(), True),
#     StructField("price", DoubleType(), True),
#     StructField("minimum_nights", IntegerType(), True),
#     StructField("maximum_nights", IntegerType(), True),
#     StructField("availability_percentage", DoubleType(), True),
#     StructField("Germany_Tourists", LongType(), True),
#     StructField("Spain_Tourists", LongType(), True),
#     StructField("France_Tourists", LongType(), True),
#     StructField("Italy_Tourists", LongType(), True),
#     StructField("UK_Tourists", LongType(), True),
#     StructField("Total_Tourists", LongType(), True),
# ])

# model_path = "/content/drive/MyDrive/BDM/Data/Exploitation_Zone/lr_model"
# lr_model = LinearRegressionWithSGD.load(model_path)

# df_stream = spark \
#     .readStream \
#     .schema(schema) \
#     .json("/content/data_stream")

# feature_columns = [
#     "host_id", "latitude", "longitude", "room_type_encoded", "number_of_reviews",
#     "reviews_per_month", "calculated_host_listings_count", "availability_365",
#     "days_since_last_review", "price", "minimum_nights", "maximum_nights",
#     "availability_percentage", "Germany_Tourists", "Spain_Tourists", "France_Tourists",
#     "Italy_Tourists", "UK_Tourists", "Total_Tourists"
# ]

# assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
# df_features = assembler.transform(df_stream)

# predictions = lr_model.transform(df_features)

# output_path = "/content/predictions_output"

# query = predictions.select("features", "prediction", "Total_Tourists", "date").writeStream \
#     .outputMode("append") \
#     .format("json") \
#     .option("path", output_path) \
#     .option("checkpointLocation", "/content/checkpoint") \
#     .start()

# time.sleep(20)
# query.stop()

# result_files = glob.glob(f"{output_path}/*.json")
# results = []

# for file in result_files:
#     with open(file, "r") as f:
#         for line in f:
#             results.append(json.loads(line))

# results_df = pd.DataFrame(results)
# print(results_df.head())