# install stuff

In [1]:
! pip3 install pyspark pandas scikit-learn



# setup pyspark

In [18]:
import pyspark
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.feature import Tokenizer, CountVectorizer, IDF


In [5]:
sparkConf = pyspark.SparkConf() \
    .set("spark.executor.memory", "2g") \
    .set("spark.driver.memory", "2g") \
    .set("spark.driver.maxResultSize", "2g") \
    .set("spark.sql.execution.arrow.maxRecordsPerBatch", "1000") \
    .set("spark.sql.shuffle.partitions", "200")

ss = (
    pyspark.sql.SparkSession
    .builder
    .config(conf=sparkConf)
    .master("local[*]")
    .appName("YourAppName")
    .getOrCreate()
)


# load data

In [8]:
import os
print(os.getcwd())

/workspaces/big_data_hw2


In [9]:
train_data = ss.read.orc('./hw_data/posts_train.orc')
test_data = ss.read.orc('./hw_data/posts_test.orc')
channel_data = ss.read.orc('./hw_data/channels_orc')

In [10]:
print(train_data.count())  # Покажет общее количество строк
print(test_data.count())   # Для тестового набора данных

                                                                                

5460759




244386


                                                                                

In [11]:
train_data.show(5, truncate=False)
test_data.show(5, truncate=False)
channel_data.show(5, truncate=False)

                                                                                

+----------+--------+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+---------+------------+----------------------+------------+
|channel_id|id      |tg_id|text                                                                                                                                                                                                                                   |views|has_image|is_forwarded|date                  |forwarded_id|
+----------+--------+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+---------+------------+----------------------+------------+
|221       |29741094|7182

# make features

In [12]:
idf_model = None

In [13]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml import Pipeline


In [19]:
def make_features(df, channel_data, is_train=True):
    global idf_model

    # Заменяем null-тексты на пустую строку, чтобы не отфильтровывать строки
    df = df.withColumn("text", F.when(F.col("text").isNull(), F.lit("")).otherwise(F.col("text")))

    # Фичи: длина текста, число слов, наличие ссылки
    df = df.withColumn("text_len", F.length("text"))
    df = df.withColumn("word_count", F.size(F.split(F.col("text"), r"\s+")))
    df = df.withColumn("has_link", F.expr("instr(text, 'http') > 0").cast("int"))

    # Дата-временные фичи
    df = df.withColumn("hour", F.hour("date"))
    df = df.withColumn("weekday", F.dayofweek("date"))  # 1 = воскресенье

    # Window-фича: средние просмотры по 5 последним постам
    if is_train:
        w = Window.partitionBy("channel_id").orderBy(F.col("date").asc()).rowsBetween(-5, -1)
        df = df.withColumn("avg_views_last_5", F.avg("views").over(w))
    else:
        df = df.withColumn("avg_views_last_5", F.lit(None).cast("double"))

    # TF-IDF
    tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
    df = tokenizer.transform(df)

    countVectorizer = CountVectorizer(inputCol="tokens", outputCol="tf")
    df = countVectorizer.fit(df).transform(df)

    idf = IDF(inputCol="tf", outputCol="tfidf")
    if is_train:
        idf_model = idf.fit(df)
        df = idf_model.transform(df)
    else:
        if idf_model is None:
            raise ValueError("idf_model must be fitted before using make_features with is_train=False")
        df = idf_model.transform(df)

    # Канальные фичи
    channel_feats = channel_data.select("channel_id", "user_count", "is_private") \
        .withColumn("is_private", F.col("is_private").cast("int"))
    
    # Дополнительные фичи о канале
    w_channel = Window.partitionBy("channel_id").orderBy(F.col("date").desc()).rowsBetween(-1, -1)
    df = df.withColumn("messages_last_day", F.count("text").over(w_channel))
    df = df.withColumn("avg_messages_per_user", F.avg("messages_last_day").over(w_channel))
    
    df = df.join(channel_feats, on="channel_id", how="left")

    # Выбираем нужные колонки
    cols = ['id', 'channel_id', 'text_len', 'word_count', 'has_link',
            'hour', 'weekday', 'user_count', 'is_private', 'avg_views_last_5',
            'tfidf', 'messages_last_day', 'avg_messages_per_user']
    
    if is_train:
        cols.append("views")

    return df.select(*cols)


In [20]:
train_features = make_features(train_data, channel_data, is_train=True)
test_features = make_features(test_data, channel_data, is_train=False)

                                                                                

In [21]:
print(train_features.count())  # Покажет общее количество строк
print(test_features.count())   # Для тестового набора данных


                                                                                

5460759
244386


In [16]:
train_features.show(5)  # Покажет первые 5 строк
test_features.show(5)   # Покажет первые 5 строк


                                                                                

+--------+----------+--------+----------+--------+----+-------+----------+----------+------------------+--------------------+-----+
|      id|channel_id|text_len|word_count|has_link|hour|weekday|user_count|is_private|  avg_views_last_5|               tfidf|views|
+--------+----------+--------+----------+--------+----+-------+----------+----------+------------------+--------------------+-----+
|39957252|       496|       6|         1|       0|  13|      2|    4868.0|         0|              NULL|(1000,[579],[2.95...| 1924|
|39957182|       496|      31|         1|       1|  17|      2|    4868.0|         0|            1924.0|(1000,[886],[2.01...| 3191|
|39957164|       496|     156|        21|       0|  11|      1|    4868.0|         0|            2557.5|(1000,[172,182,23...| 2569|
|39957145|       496|     162|        24|       1|   9|      6|    4868.0|         0|2561.3333333333335|(1000,[51,168,189...| 2845|
|39957135|       496|      27|         4|       0|  20|      1|    4868.0|  

In [24]:
from pyspark.sql.functions import col, avg

# Заполняем NULL значениями средним по столбцу
avg_views = train_features.select(avg("avg_views_last_5")).collect()[0][0]
train_features = train_features.fillna({"avg_views_last_5": avg_views})


                                                                                

In [22]:
# Удаляем столбец tfidf
train_features = train_features.drop("tfidf")
test_features = test_features.drop("tfidf")


In [23]:

# Сохраняем данные в CSV
train_features.write.csv('train_csv', mode='overwrite', header=True)
test_features.write.csv('test_csv', mode='overwrite', header=True)


                                                                                

In [26]:
print(train_features.count())  # Покажет общее количество строк
print(test_features.count())   # Для тестового набора данных

5460759
244386


# load features to pandas
you also can use .toPandas()

In [2]:
import pandas as pd
import glob, os

def load_and_merge_csv(path, **kwargs):
    dfs = []
    for g in glob.glob(os.path.join(path, '*.csv')):
        dfs.append(pd.read_csv(g, **kwargs))
    res = pd.concat(dfs)
    res = res.set_index('id')
    return res

In [3]:
trainXY = load_and_merge_csv('train_csv')
testX = load_and_merge_csv('test_csv')

In [4]:
import numpy as np

Ycol = 'views'
to_drop = ['channel_id']  # можно оставить, если считаешь нужным
trainX = trainXY.drop(Ycol, axis=1).drop(columns=to_drop, errors='ignore')
trainY = np.log(trainXY[Ycol] + 100)

testX = testX.drop(columns=to_drop, errors='ignore')

In [5]:
from sklearn.ensemble import GradientBoostingRegressor

model = GradientBoostingRegressor(n_estimators=100, max_depth=5, random_state=42)
model.fit(trainX.fillna(0), trainY)
prediction = model.predict(testX.fillna(0))


In [6]:
final_prediction = np.exp(prediction) - 100


# submit

In [7]:
!curl -H 'Cache-Control: no-cache' https://raw.githubusercontent.com/mike0sv/lsml_submit_server/2024/src/client.py -o client.py

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1899  100  1899    0     0   8221      0 --:--:-- --:--:-- --:--:--  8220


In [8]:
import client

In [9]:
res = client.make_eval(pd.DataFrame({'views': final_prediction}, index=testX.index), final=True)
print(client.check_results())


{'2025-06-22': [{'baseline_beaten': False, 'is_final': False, 'metrics': {'mape': 36.76901358293411, 'mean_absolute_error': 3.299754453995559, 'mean_squared_error': 12.70939697112019, 'rmse': 3.565024119290105, 'rmspe': 38.329523419415224}}, {'baseline_beaten': False, 'is_final': False, 'metrics': {'mape': 36.795616151998246, 'mean_absolute_error': 3.3024307752615667, 'mean_squared_error': 12.726980112777197, 'rmse': 3.567489329034804, 'rmspe': 38.34703074492646}}, {'baseline_beaten': False, 'is_final': False, 'metrics': {'mape': 34.52427470297403, 'mean_absolute_error': 3.085839609072402, 'mean_squared_error': 11.223542190527715, 'rmse': 3.3501555472138476, 'rmspe': 36.12075848381354}}, {'baseline_beaten': False, 'is_final': False, 'metrics': {'mape': 15836.326562603273, 'mean_absolute_error': 1354.1669432398057, 'mean_squared_error': 2477911.6130875777, 'rmse': 1574.1383716457642, 'rmspe': 17760.11578216986}}, {'baseline_beaten': False, 'is_final': False, 'metrics': {'mape': 15806.01