# Requirements

In [21]:
!pip install pyspark



# Import libraries

In [22]:
from google.colab import drive
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.functions import count, when, isnull,col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Spark session

In [23]:
spark = (
    SparkSession
    .builder
    .master('local')
    .appName('model_train')
    .getOrCreate()
)
spark

# Read data

In [24]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [25]:
df = spark.read.csv("/content/drive/MyDrive/FinalDataset.csv", header = True)

In [26]:
print(f'Number of records: {df.count()}')

Number of records: 128144


In [27]:
df.show()

+----+-------+--------------------+--------------------+---------------------+--------------------+-----------+-----------------+-------+-----+-------+---------+
| age|user_id|         region_name|           city_name|cpe_manufacturer_name|      cpe_model_name|cpe_type_cd|cpe_model_os_type|  price|count|Sum_req|PartofDay|
+----+-------+--------------------+--------------------+---------------------+--------------------+-----------+-----------------+-------+-----+-------+---------+
|31.0| 350459|Удмуртская Респуб...|              Ижевск|               Xiaomi|        Redmi Note 7| smartphone|          Android|14948.0|  178|    240|      day|
|35.0| 188276|Ленинградская обл...|               Тосно|              Samsung| Galaxy A8 2018 Dual| smartphone|          Android|11434.0|  111|    157|  evening|
|41.0|  99002|  Пензенская область|            Заречный|               Huawei|            Honor 8X| smartphone|          Android|18288.0|  639|    856|      day|
|33.0| 155506|  Краснодарски

# Preprocessing

In [28]:
COLUMNS_NAME = ['age', 'user_id', 'region_name', 'city_name', 'cpe_manufacturer_name',
                'cpe_model_name', 'cpe_type_cd', 'cpe_model_os_type', 'price', 'count',
                'Sum_req', 'PartofDay']

In [29]:

def full_nan_value_pyspark(dataset):
    return dataset.na.fill(0)

def convert_value_to_int_pyspark(dataset):
    columns = ['age', 'user_id', 'price', 'count','Sum_req']
    for column in columns:
        dataset = dataset.withColumn(column, dataset[column].cast(IntegerType()))
    return dataset

def removing_unnecessary_columns_pyspark(dataset):
    columns_to_drop = ['city_name', 'cpe_model_os_type', 'cpe_model_name', 'region_name', 'cpe_manufacturer_name', 'cpe_type_cd', 'PartofDay']
    dataset = dataset.drop(*columns_to_drop)
    return dataset

def concert_categorical_data_pyspark(dataset):
    categorical_features = ['region_name', 'cpe_manufacturer_name', 'cpe_type_cd', 'PartofDay']

    indexers = [StringIndexer(inputCol=column, outputCol=column + "_index") for column in categorical_features]

    encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=column + "_encoded") for indexer, column in zip(indexers, categorical_features)]

    pipeline = Pipeline(stages=indexers + encoders)

    model = pipeline.fit(dataset)
    dataset = model.transform(dataset)

    for column in categorical_features:
        dataset = dataset.drop(column)
        dataset = dataset.drop(column + "_index")

    return dataset


In [30]:
df = full_nan_value_pyspark(df)
df = concert_categorical_data_pyspark(df)
df = removing_unnecessary_columns_pyspark(df)
df = convert_value_to_int_pyspark(df)
df = df.filter(col("price").isNotNull())

In [31]:
null_counts = df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).toPandas().transpose()
print(null_counts)

                               0
age                            0
user_id                        0
price                          0
count                          0
Sum_req                        0
region_name_encoded            0
cpe_manufacturer_name_encoded  0
cpe_type_cd_encoded            0
PartofDay_encoded              0


In [32]:
print(f'Number of records after data preprocessing: {df.count()}')

Number of records after data preprocessing: 124858


In [33]:
df.show()

+---+-------+-----+-----+-------+-------------------+-----------------------------+-------------------+-----------------+
|age|user_id|price|count|Sum_req|region_name_encoded|cpe_manufacturer_name_encoded|cpe_type_cd_encoded|PartofDay_encoded|
+---+-------+-----+-----+-------+-------------------+-----------------------------+-------------------+-----------------+
| 31| 350459|14948|  178|    240|    (79,[17],[1.0])|               (36,[3],[1.0])|      (3,[0],[1.0])|    (3,[0],[1.0])|
| 35| 188276|11434|  111|    157|    (79,[23],[1.0])|               (36,[1],[1.0])|      (3,[0],[1.0])|    (3,[1],[1.0])|
| 41|  99002|18288|  639|    856|    (79,[54],[1.0])|               (36,[2],[1.0])|      (3,[0],[1.0])|    (3,[0],[1.0])|
| 33| 155506|37090|   22|     38|     (79,[1],[1.0])|               (36,[0],[1.0])|      (3,[0],[1.0])|    (3,[2],[1.0])|
| 54| 213873| 8987|    4|      4|     (79,[1],[1.0])|               (36,[2],[1.0])|      (3,[0],[1.0])|    (3,[2],[1.0])|
| 63| 212300|41990| 2628

# Model train

In [34]:
feature_columns = ['user_id',  'price', 'count', 'Sum_req', 'region_name_encoded', 'cpe_manufacturer_name_encoded', 'cpe_type_cd_encoded', 'PartofDay_encoded']  # список ваших признаков
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df = assembler.transform(df)
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

In [35]:
rf = RandomForestRegressor(featuresCol="features", labelCol="age")
model = rf.fit(train_data)

In [36]:
predictions = model.transform(test_data)
evaluator = RegressionEvaluator(labelCol="age", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")

Root Mean Squared Error (RMSE) on test data: 11.054473107138017


# Save model

In [39]:
model_path = "random_forest_model"
model.save(model_path)

# Stop spark application

In [40]:
spark.stop()