<a href="https://colab.research.google.com/github/solomontessema/Data-Analytics-and-AI-with-Python/blob/main/notebooks/PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

In [None]:
# Install Java
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Download Spark from a stable mirror
!wget -q https://mirrors.huaweicloud.com/apache/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz

# Extract Spark
!tar -xvf spark-3.4.1-bin-hadoop3.tgz

# Install findspark
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AirbnbReviews").getOrCreate()


In [None]:
df = spark.read.csv("/content/drive/MyDrive/listings.csv", header=True, inferSchema=True)
df.printSchema()
df.show(5)


In [None]:
from pyspark.sql.functions import col, to_date

df_clean = df.filter(col("price").isNotNull())
df_clean = df_clean.withColumn("last_review", to_date(col("last_review")))
df_clean.select("id", "name", "price", "last_review").show()


In [None]:
df_clean.groupBy("room_type").avg("price").orderBy("avg(price)", ascending=False).show()


In [None]:
df_clean.groupBy("neighbourhood").count().orderBy("count", ascending=False).show()

In [None]:
df_clean.write.parquet("/content/drive/MyDrive/listings_clean.parquet")



In [None]:
df_clean.groupby("name").avg("reviews_per_month").orderBy("name",ascending =True).show()

In [None]:
df_clean.groupBy("name").count().show()

In [None]:
df_clean.join(df_clean,df_clean["name"]==df_clean["name"], "inner").show()


In [None]:
from pyspark.sql.functions import col, isnan
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

df = spark.read.csv("/content/drive/MyDrive/listings.csv", header=True, inferSchema=True)

df_clean = df.dropna(subset=["room_type", "minimum_nights", "number_of_reviews", "price"])
df_clean = df_clean.withColumn("number_of_reviews", col("number_of_reviews").cast("double"))
df_clean = df_clean.withColumn("minimum_nights", col("minimum_nights").cast("double"))
df_clean = df_clean.withColumn("price", col("price").cast("double"))
df_clean = df_clean.filter((~isnan("price")) & (col("price").isNotNull()))

indexer = StringIndexer(inputCol="room_type", outputCol="room_type_index")
df_indexed = indexer.fit(df_clean).transform(df_clean)

df_indexed = df_indexed.filter(
    (col("minimum_nights").isNotNull()) &
    (col("number_of_reviews").isNotNull()) &
    (col("room_type_index").isNotNull()) &
    (col("price").isNotNull())
)

assembler = VectorAssembler(
    inputCols=["room_type_index", "minimum_nights", "number_of_reviews"],
    outputCol="features"
)
df_vector = assembler.transform(df_indexed)
df_vector = df_vector.filter(col("features").isNotNull())


train_df, test_df = df_vector.randomSplit([0.8, 0.2], seed=42)

lr = LinearRegression(featuresCol="features", labelCol="price")
model = lr.fit(train_df)

predictions = model.transform(test_df)

evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")


In [None]:
model.save("/content/drive/MyDrive/linear_model")

In [None]:
from pyspark.ml.regression import LinearRegressionModel
loaded_model = LinearRegressionModel.load("/content/drive/MyDrive/linear_model")


In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.regression import LinearRegressionModel

sample_data = [Row(features=Vectors.dense([1.0, 3.0, 25.0]))]
sample_df = spark.createDataFrame(sample_data)

prediction = loaded_model.transform(sample_df)
prediction.select("features", "prediction").show()