LIBRARIES LOADING



In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
!tar xf spark-3.4.0-bin-hadoop3.tgz
!pip install -q findspark


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.0-bin-hadoop3"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

LOADING THE DATASET

In [None]:
from pyspark.sql import Row
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/autos/imports-85.data

# definim numele coloanelor
columns = ["symboling", "normalized_losses", "make", "fuel_type", "aspiration", "num_of_doors", "body_style", "drive_wheels", "engine_location", "wheel_base", "length", "width", "height", "curb_weight", "engine_type", "num_of_cylinders", "engine_size", "fuel_system", "bore", "stroke", "compression_ratio", "horsepower", "peak_rpm", "city_mpg", "highway_mpg", "price"]

# incarcam setul de date
rdd = spark.sparkContext.textFile("imports-85.data")
data = rdd.map(lambda line: line.split(",")).map(lambda x: Row(x[0], x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], x[10], x[11], x[12], x[13], x[14], x[15], x[16], x[17], x[18], x[19], x[20], x[21], x[22], x[23], x[24], x[25]))

#transformam RDD în DataFrame
df = spark.createDataFrame(data, columns)




--2023-06-20 14:40:57--  https://archive.ics.uci.edu/ml/machine-learning-databases/autos/imports-85.data
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified
Saving to: ‘imports-85.data’

imports-85.data         [ <=>                ]  25.33K  --.-KB/s    in 0.06s   

2023-06-20 14:40:57 (403 KB/s) - ‘imports-85.data’ saved [25936]



CLEANING THE DATASET

In [None]:
from pyspark.sql.functions import when, col

df = df.replace('?', None)

# parcurgem setul pentru verificarea de valori nule
for col in df.columns:
    print(col, "\t", "with null values: ", df.filter(df[col].isNull()).count())


#eliminare randuri cu valori lipsa
df = df.dropna(how='any')


symboling 	 with null values:  0
normalized_losses 	 with null values:  41
make 	 with null values:  0
fuel_type 	 with null values:  0
aspiration 	 with null values:  0
num_of_doors 	 with null values:  2
body_style 	 with null values:  0
drive_wheels 	 with null values:  0
engine_location 	 with null values:  0
wheel_base 	 with null values:  0
length 	 with null values:  0
width 	 with null values:  0
height 	 with null values:  0
curb_weight 	 with null values:  0
engine_type 	 with null values:  0
num_of_cylinders 	 with null values:  0
engine_size 	 with null values:  0
fuel_system 	 with null values:  0
bore 	 with null values:  4
stroke 	 with null values:  4
compression_ratio 	 with null values:  0
horsepower 	 with null values:  2
peak_rpm 	 with null values:  2
city_mpg 	 with null values:  0
highway_mpg 	 with null values:  0
price 	 with null values:  4


TRANSFORMING THE DATASET


In [None]:
from pyspark.sql.functions import col

#adaugam noua coloana la dateset-ul existent
df = df.withColumn("power_to_weight_ratio", col("horsepower") / col("curb_weight"))

df.select("horsepower", "curb_weight", "power_to_weight_ratio").show(5)



+----------+-----------+---------------------+
|horsepower|curb_weight|power_to_weight_ratio|
+----------+-----------+---------------------+
|       102|       2337| 0.043645699614890884|
|       115|       2824|  0.04072237960339943|
|       110|       2844|  0.03867791842475387|
|       140|       3086|  0.04536616979909268|
|       101|       2395| 0.042171189979123176|
+----------+-----------+---------------------+
only showing top 5 rows



GROUPING THE DATASET

In [None]:
from pyspark.sql.functions import avg

# grupare dupa pretul mediu in functie de marca
df.groupBy("make").agg(avg("price").alias("avg_price")).show()

# grupare dupa pretul mediu in functie de caroserie
df.groupBy("body_style").agg(avg("price").alias("avg_price")).show()


+-------------+------------------+
|         make|         avg_price|
+-------------+------------------+
|       jaguar|           32250.0|
|   mitsubishi|            7813.0|
|         audi|          18246.25|
|          bmw|           18857.5|
|        dodge|          7790.125|
|        mazda|            9080.0|
|mercedes-benz|           29726.4|
|    chevrolet|            6007.0|
|        honda| 8184.692307692308|
|       nissan|10415.666666666666|
|       peugot| 15758.57142857143|
|       toyota| 9696.645161290322|
|         saab|15223.333333333334|
|     plymouth| 7163.333333333333|
|      porsche|           22018.0|
|   volkswagen|          8738.125|
|       subaru|           8541.25|
|        volvo| 18063.18181818182|
+-------------+------------------+

+-----------+------------------+
| body_style|         avg_price|
+-----------+------------------+
|      wagon|11351.411764705883|
|convertible|           26362.5|
|      sedan|12558.620253164558|
|  hatchback| 9220.160714285714

USING SPARKSQL

In [None]:
# punem dataset-ul intr-o vizualizare temporara
df.createOrReplaceTempView("autos")

# utilizam sparkSQL pentru a avea aceleasi rezultate ca mai sus
spark.sql("SELECT make, AVG(price) FROM autos GROUP BY make").show()
spark.sql("SELECT body_style, AVG(price) FROM autos GROUP BY body_style").show()


+-------------+------------------+
|         make|        avg(price)|
+-------------+------------------+
|       jaguar|           32250.0|
|   mitsubishi|            7813.0|
|         audi|          18246.25|
|          bmw|           18857.5|
|        dodge|          7790.125|
|        mazda|            9080.0|
|mercedes-benz|           29726.4|
|    chevrolet|            6007.0|
|        honda| 8184.692307692308|
|       nissan|10415.666666666666|
|       peugot| 15758.57142857143|
|       toyota| 9696.645161290322|
|         saab|15223.333333333334|
|     plymouth| 7163.333333333333|
|      porsche|           22018.0|
|   volkswagen|          8738.125|
|       subaru|           8541.25|
|        volvo| 18063.18181818182|
+-------------+------------------+

+-----------+------------------+
| body_style|        avg(price)|
+-----------+------------------+
|      wagon|11351.411764705883|
|convertible|           26362.5|
|      sedan|12558.620253164558|
|  hatchback| 9220.160714285714

Problema 1: Prezicerea prețului unei mașini

1.Enunțul problemei: Dorim să prezicem prețul unei mașini bazându-ne pe caracteristici cum ar fi puterea motorului, dimensiunea motorului, lungimea, lățimea, înălțimea și greutatea. Vrem să găsim o relație între aceste caracteristici și prețul mașinii.

2.Justificarea alegerii metodei: Pentru a prezice prețul unei mașini, vom folosi o tehnică numită "Regresia liniară". Aceasta presupune că există o legătură liniară între caracteristicile mașinii și prețul său. De exemplu, putem presupune că o mașină cu o putere mai mare a motorului va avea un preț mai mare.

3.Explicarea soluției:

Pregătirea datelor: Înainte de a putea prezice prețul, trebuie să pregătim datele. Asta înseamnă că trebuie să adunăm toate caracteristicile relevante într-o singură listă. Astfel, vom putea folosi acea listă pentru a antrena modelul nostru de regresie.

Împărțirea datelor: Am împărțit setul de date într-un set de antrenament (80% din date) și un set de testare (20% din date). Setul de antrenament este folosit pentru a antrena modelul, iar setul de testare este folosit pentru a evalua performanța modelului.

Antrenarea modelului: Am creat o instanță a modelului de Regresie liniară și am antrenat modelul folosind setul de antrenament. Antrenarea modelului implică găsirea celei mai bune linii (sau hiperplan, în cazul mai multor caracteristici) care minimizează suma pătratelor reziduurilor (adică diferența dintre valoarea reală și valoarea prezisă).

Evaluarea modelului: După ce modelul a fost antrenat, vom evalua cât de bine funcționează folosind setul de testare. Vom calcula o metrică numită "eroarea medie pătratică" (RMSE). Cu cât valoarea RMSE este mai mică, cu atât modelul prezice mai bine prețurile mașinilor.

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# transformam datele pentru regresie
df = df.withColumn("horsepower", col("horsepower").cast("float"))
df = df.withColumn("engine_size", col("engine_size").cast("float"))
df = df.withColumn("wheel_base", col("wheel_base").cast("float"))
df = df.withColumn("width", col("width").cast("float"))
df = df.withColumn("height", col("height").cast("float"))
df = df.withColumn("curb_weight", col("curb_weight").cast("float"))
df = df.withColumn("price", col("price").cast("float"))

assembler = VectorAssembler(
    inputCols=["horsepower", "engine_size", "wheel_base", "width", "height", "curb_weight"],
    outputCol="features")
output = assembler.transform(df)
final_data = output.select("features", "price")
final_data.show()

#impartirea datelor in set de antrenament si de testare
train_data, test_data = final_data.randomSplit([0.8, 0.2])

#antrenarea modelului
lr = LinearRegression(labelCol='price')
lr_model = lr.fit(train_data)

#evaluarea modelului
test_results = lr_model.evaluate(test_data)
test_results.rootMeanSquaredError



+--------------------+-------+
|            features|  price|
+--------------------+-------+
|[102.0,109.0,99.8...|13950.0|
|[115.0,136.0,99.4...|17450.0|
|[110.0,136.0,105....|17710.0|
|[140.0,131.0,105....|23875.0|
|[101.0,108.0,101....|16430.0|
|[101.0,108.0,101....|16925.0|
|[121.0,164.0,101....|20970.0|
|[121.0,164.0,101....|21105.0|
|[48.0,61.0,88.400...| 5151.0|
|[70.0,90.0,94.5,6...| 6295.0|
|[70.0,90.0,94.5,6...| 6575.0|
|[68.0,90.0,93.699...| 5572.0|
|[68.0,90.0,93.699...| 6377.0|
|[102.0,98.0,93.69...| 7957.0|
|[68.0,90.0,93.699...| 6229.0|
|[68.0,90.0,93.699...| 6692.0|
|[68.0,90.0,93.699...| 7609.0|
|[88.0,122.0,103.3...| 8921.0|
|[145.0,156.0,95.9...|12964.0|
|[58.0,92.0,86.599...| 6479.0|
+--------------------+-------+
only showing top 20 rows



2201.1410455009122

Dacă valoarea RMSE este relativ mică comparativ cu scala prețurilor din setul de date, putem considera că modelul prezice prețurile mașinilor în mod acceptabil. Cu alte cuvinte, prezicerile modelului se apropie în general de prețurile reale ale mașinilor. Atfel comparam RMSE cu preturile masinilor noastre.

In [None]:
df.select("price").describe().show()


+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|               159|
|   mean|11445.729559748428|
| stddev| 5877.856195222509|
|    min|            5118.0|
|    max|           35056.0|
+-------+------------------+



Problema 2: Clasificarea - Prezicerea stilului de caroserie al unei mașini

Enunțul problemei: Vrem să prezicem stilul de caroserie al unei mașini bazat pe caracteristicile acesteia, cum ar fi 'horsepower', 'engine_size', 'wheel_base', 'width', 'height' și 'curb_weight'.

Justificarea alegerii metodei: Pentru a aborda această problemă de clasificare, vom utiliza algoritmul de clasificare logistică. Acesta este un algoritm robust și eficient pentru problemele de clasificare. Ne permite să prezicem stilul de caroserie al mașinii pe baza caracteristicilor sale, atribuindu-i o etichetă corespunzătoare.

Explicarea soluției:

Pregătirea datelor: Înainte de a antrena modelul, vom asigura că datele sunt pregătite într-un format adecvat. Acest lucru poate implica eliminarea sau imputarea valorilor lipsă, codificarea caracteristicilor categorice și scalarea caracteristicilor numerice, dacă este necesar.

Antrenarea modelului: Utilizând Spark MLlib, vom crea un obiect de clasificare logistică și vom antrena modelul nostru folosind setul de date pregătit. Vom specifica caracteristicile de intrare și etichetele de clasă și vom ajusta modelul pe datele de antrenament.

Evaluarea modelului: După ce modelul a fost antrenat, vom evalua performanța acestuia. Folosind un evaluator de clasificare multiplu ('MulticlassClassificationEvaluator'), vom măsura acuratețea, precizia, recall-ul și alte metrici relevante. Aceste măsurători ne vor ajuta să înțelegem cât de bine se potrivește modelul nostru datelor de testare și cât de bine prezice stilul de caroserie al mașinii.

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#codificam eticheta de clasificare
indexer = StringIndexer(inputCol="body_style", outputCol="body_style_index")
df = indexer.fit(df).transform(df)
df.show(7)

#transformam datele sa le adaptam pe modelul nostru cu ajutorul unui assembler
assembler = VectorAssembler(
    inputCols=["horsepower", "engine_size", "wheel_base", "width", "height", "curb_weight"],
    outputCol="features")
output = assembler.transform(df)
final_data = output.select("features", "body_style_index")

#impartirea datelor in set de antrenament si de testare
train_data, test_data = final_data.randomSplit([0.8, 0.2])

#antrenarea modelului
lr = LogisticRegression(labelCol='body_style_index')
lr_model = lr.fit(train_data)


#evaluarea modelului
predictions = lr_model.transform(test_data)
predictions.show()

evaluator = MulticlassClassificationEvaluator(
    labelCol="body_style_index", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
accuracy


+---------+-----------------+----+---------+----------+------------+----------+------------+---------------+----------+------+-----+------+-----------+-----------+----------------+-----------+-----------+----+------+-----------------+----------+--------+--------+-----------+-------+---------------------+----------------+
|symboling|normalized_losses|make|fuel_type|aspiration|num_of_doors|body_style|drive_wheels|engine_location|wheel_base|length|width|height|curb_weight|engine_type|num_of_cylinders|engine_size|fuel_system|bore|stroke|compression_ratio|horsepower|peak_rpm|city_mpg|highway_mpg|  price|power_to_weight_ratio|body_style_index|
+---------+-----------------+----+---------+----------+------------+----------+------------+---------------+----------+------+-----+------+-----------+-----------+----------------+-----------+-----------+----+------+-----------------+----------+--------+--------+-----------+-------+---------------------+----------------+
|        2|              164|au

0.5294117647058824



4.Utilizarea unui datapipeline

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression

In [None]:
# transformam coloana body_style in tip numeric
indexer = StringIndexer(inputCol="body_style", outputCol="body_style_index_pipeline")

#transformam datele sa le adaptam pe modelul nostru cu ajutorul unui assembler
assembler = VectorAssembler(
    inputCols=["horsepower", "engine_size", "wheel_base", "width", "height", "curb_weight"],
    outputCol="features")

#crearea modelului de regresie liniara pentru invatarea automata
lr = LinearRegression(labelCol='price')

# crearea pipeline-ului
pipeline = Pipeline(stages=[indexer, assembler, lr])

#impartirea datelor in set de antrenament si de testare
train_data, test_data = df.randomSplit([0.8, 0.2])

# antrenarea modelului
model = pipeline.fit(train_data)

# evaluarea modelului cu ajutorul RMSE
predictions = model.transform(test_data)
evaluator = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("RMSE = %g" % rmse)



RMSE = 2345.94




5.Functie UDF

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

#definirea functiei UDF
def power_to_engine_size(horsepower, engine_size):
        return float(horsepower) / float(engine_size)

power_to_engine_size_udf = udf(power_to_engine_size, FloatType())

#aplicarea functiei pentru a crea o noua coloana pe setul nostru de date
df = df.withColumn("power_to_engine_size_ratio", power_to_engine_size_udf(df['horsepower'], df['engine_size']))

df.show()

+---------+-----------------+---------+---------+----------+------------+----------+------------+---------------+----------+------+-----+------+-----------+-----------+----------------+-----------+-----------+----+------+-----------------+----------+--------+--------+-----------+-------+---------------------+----------------+--------------------------+
|symboling|normalized_losses|     make|fuel_type|aspiration|num_of_doors|body_style|drive_wheels|engine_location|wheel_base|length|width|height|curb_weight|engine_type|num_of_cylinders|engine_size|fuel_system|bore|stroke|compression_ratio|horsepower|peak_rpm|city_mpg|highway_mpg|  price|power_to_weight_ratio|body_style_index|power_to_engine_size_ratio|
+---------+-----------------+---------+---------+----------+------------+----------+------------+---------------+----------+------+-----+------+-----------+-----------+----------------+-----------+-----------+----+------+-----------------+----------+--------+--------+-----------+-------+--

Optimizarea hiperparametrilor pe exemplul de la PIPELINE





In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#model regresie liniara
lr = LinearRegression(labelCol='price')

# setarea grilei de hiperparametri
paramGrid = ParamGridBuilder() \
    .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0]) \
    .addGrid(lr.regParam, [0.1, 0.05,0.01]) \
    .build()

# setare evaluator
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")

#crossvalidator
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)  # folosim k=3 fold-uri pentru validare

#transformam datele sa le adaptam pe modelul nostru cu ajutorul unui assembler
assembler = VectorAssembler(
    inputCols=["horsepower", "engine_size", "wheel_base", "width", "height", "curb_weight"],
    outputCol="features")

output = assembler.transform(df)
final_data = output.select("features", "price")

#impartirea datelor in set de antrenament si de testare
train_data, test_data = final_data.randomSplit([0.8, 0.2])

first_five = train_data.head(50)

first_five_df = spark.createDataFrame(first_five, train_data.schema)

#antrenam crossvalidator pentru a alege cea mai buna varianta
cvModel = crossval.fit(first_five_df)

#aplicam modelul cel mai bun pe datele de testare
predictions = cvModel.transform(test_data)

# calculam RMSE
rmse = evaluator.evaluate(predictions)
print("RMSE = %g" % rmse)



RMSE = 2843.97
