In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

import kagglehub
mining_path = kagglehub.dataset_download('samsono/mining')

In [2]:
! pip install pyspark



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [4]:
data = spark.read.csv(f'/kaggle/input/mining/single_variable_regression.csv', header=True, inferSchema=True)

In [5]:
len(data.columns)

2

In [6]:
data.count()

10

In [7]:
data.show()

+----------+----------+
|house_size|price_sold|
+----------+----------+
|      1490|    600000|
|      2500|    950000|
|      1200|    550000|
|       900|    450000|
|      1300|    560000|
|      1000|    500000|
|       850|    430000|
|       750|    400000|
|      2000|    800000|
|      1600|    700000|
+----------+----------+



In [8]:
from pyspark.ml.feature import VectorAssembler

In [9]:
assembler_object = VectorAssembler(inputCols=['house_size'], outputCol='house_size_vector')

In [10]:
feature_vector_df = assembler_object.transform(data)

In [11]:
feature_vector_df.show()

+----------+----------+-----------------+
|house_size|price_sold|house_size_vector|
+----------+----------+-----------------+
|      1490|    600000|         [1490.0]|
|      2500|    950000|         [2500.0]|
|      1200|    550000|         [1200.0]|
|       900|    450000|          [900.0]|
|      1300|    560000|         [1300.0]|
|      1000|    500000|         [1000.0]|
|       850|    430000|          [850.0]|
|       750|    400000|          [750.0]|
|      2000|    800000|         [2000.0]|
|      1600|    700000|         [1600.0]|
+----------+----------+-----------------+



In [12]:
feature_vector_df.printSchema()

root
 |-- house_size: integer (nullable = true)
 |-- price_sold: integer (nullable = true)
 |-- house_size_vector: vector (nullable = true)



In [13]:
formatted_data = feature_vector_df.select('house_size_vector', 'price_sold')

In [14]:
train_data, test_data = formatted_data.randomSplit([0.7, 0.3])

In [15]:
from pyspark.ml.regression import LinearRegression

linear_reg = LinearRegression(
    featuresCol='house_size_vector', 
    labelCol='price_sold'
)

In [16]:
linear_reg_model = linear_reg.fit(train_data)

In [17]:
test_results = linear_reg_model.evaluate(test_data)

In [18]:
test_results.residuals.show()

+-------------------+
|          residuals|
+-------------------+
|  -5630.15312131925|
| -1466.431095406413|
|  16861.01295641926|
|-38334.511189634795|
|  135.4534746762365|
+-------------------+



In [19]:
linear_reg_model.predict(test_data)

Py4JJavaError: An error occurred while calling o96.predict.
: java.lang.ClassCastException: class org.apache.spark.sql.Dataset cannot be cast to class org.apache.spark.ml.linalg.Vector (org.apache.spark.sql.Dataset and org.apache.spark.ml.linalg.Vector are in unnamed module of loader 'app')
	at org.apache.spark.ml.regression.LinearRegressionModel.predict(LinearRegression.scala:696)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [None]:
test_results.rootMeanSquaredError

In [None]:
test_results.r2

In [None]:
unlabelled_data = test_data.select('house_size_vector')

In [None]:
prediction = linear_reg_model.transform(unlabelled_data)

In [None]:
prediction.show()

In [None]:
w = linear_reg_model.coefficients[0]

In [None]:
w

In [None]:
b = linear_reg_model.intercept

In [None]:
b

In [None]:
# What is the price of the new house if it's size is 1511sqft
new_house_size = 1511
new_house_price = (w * 1511) + b

In [None]:
new_house_price

In [None]:
import matplotlib.pyplot as plt

data_plot = data.toPandas()

In [None]:
data_plot

In [None]:
data_plot.plot.scatter(x='house_size', y='price_sold', figsize=(5, 3))

x = data_plot.iloc[:, 0]
y = w*x + b

plt.plot(x, y, 'r')

In [None]:
data = spark.read.csv('/kaggle/input/mining/multi_variable_regression.csv', header=True, inferSchema=True)

In [None]:
data.show()

In [None]:
data.printSchema()

In [None]:
from pyspark.ml.feature import StringIndexer

In [None]:
string_index_obj = StringIndexer(inputCol='area', outputCol='area_feature')

In [None]:
string_index_df_obj = string_index_obj.fit(data)
final_data = string_index_df_obj.transform(data)

In [None]:
final_data.show()

In [None]:
assembler_object = VectorAssembler(
    inputCols=['house_size', 'bedrooms', 'floors', 'house_age', 'area_feature'],
    outputCol='features'
)

In [None]:
feature_vector_df = assembler_object.transform(dataset=final_data)

In [None]:
feature_vector_df.show(truncate=False)

In [None]:
df = feature_vector_df.select('features', 'price_sold')

In [None]:
train_df, test_df = df.randomSplit(weights=[0.7, 0.3])

lr = LinearRegression(
    featuresCol='features',
    labelCol='price_sold'
)

model = lr.fit(train_df)

In [None]:
result = model.evaluate(test_df)
result.r2

In [None]:
# What will be the price for a house with the following features:
# size = 1742
# beds = 3
# floors = 1
# age = 10
# area = 2

In [None]:
model.coefficients

In [None]:
# Task - loading the tips dataset, create a model to predict the total bill. Use all features except the total bill to train your model
# What is the performance of your model (r^2)

import seaborn as sns

df_tips = sns.load_dataset('tips')
df = spark.createDataFrame(df_tips)

df.show()