<a href="https://colab.research.google.com/github/romjiik/Big_data_processing_algorithms/blob/main/HW_%E2%84%962_Spark_Python.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# install stuff

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
! pip3 install pyspark pandas scikit-learn

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


# setup pyspark

In [3]:
import pyspark.sql
from pyspark.sql import functions as sf
import pyspark

In [4]:
sparkConf = pyspark.SparkConf() \
    .set("spark.executor.memory", "512m")\
    .set("spark.driver.memory", "512m")
ss = pyspark.sql.SparkSession.builder.config(conf=sparkConf).master('local[1]').getOrCreate()

# load data

In [5]:
train_data = ss.read.orc('/content/drive/MyDrive/Colab Notebooks/big_data/hw_data/posts_train.orc')
test_data = ss.read.orc('/content/drive/MyDrive/Colab Notebooks/big_data/hw_data/posts_test.orc')
channel_data = ss.read.orc('/content/drive/MyDrive/Colab Notebooks/big_data/hw_data/channels_orc')

Посмотрим на данные

In [6]:
train_data.show(5)

+----------+--------+-----+--------------------+-----+---------+------------+--------------------+------------+
|channel_id|      id|tg_id|                text|views|has_image|is_forwarded|                date|forwarded_id|
+----------+--------+-----+--------------------+-----+---------+------------+--------------------+------------+
|       221|29741094| 7182|МК, а это вот про...|15368|        f|           f|2018-11-03 13:05:...|        null|
|       221|46751120| 7388|Хочу пошутить, чт...|12448|        f|           f|2018-11-26 16:15:...|        null|
|       221|32631368| 5512|Уточню: Котляр бы...|69984|        f|           f|2018-05-16 16:03:...|        null|
|       221|46751758| 7173|Да не на два лаге...|10241|        f|           f|2018-11-02 13:45:...|        null|
|       221|33073441| 3185|А что касается Уд...|46847|        f|           f|2017-08-10 13:44:...|        null|
+----------+--------+-----+--------------------+-----+---------+------------+--------------------+------

In [7]:
channel_data.show(5)

+--------------------+----------+--------------------+----------------+----------+----------+--------------------+--------------------+----------+----------+
|         description|is_private|         last_parsed|            name|post_count|     tg_id|               title|             updated|user_count|channel_id|
+--------------------+----------+--------------------+----------------+----------+----------+--------------------+--------------------+----------+----------+
|                    |     false|2019-01-26 16:53:...|   MoeinZchannel|     708.0|1002972402|             Moein Z|2019-01-26 16:53:...|   62411.0|      7910|
|Вокруг столько ме...|     false|2019-02-12 00:39:...|  merzotachannel|    1027.0|1336284461|            Мерзость|2019-02-12 00:39:...|   12982.0|     14121|
|🗣እኛስ የተሰቀለውን ክርስ...|     false|2019-02-10 06:57:...|christian_mezmur|    1168.0|1136987361|Christian Mezmur ...|2019-02-10 06:57:...|   21704.0|     17375|
|     Chiroyli_qomatt|     false|2018-12-08 00:54:...

# make features

Проверим на наличие дубликатов

In [8]:
print((train_data.count(), len(train_data.columns)))

(5460759, 9)


In [9]:
train_data = train_data.dropDuplicates()

In [10]:
print((train_data.count(), len(train_data.columns)))

(5460759, 9)


In [11]:
print(len(channel_data.collect()))

2000


In [12]:
print(len(channel_data.dropDuplicates(['name']).collect()))

2000


Дублей нет

Одним из хороших признаков к количеству просмотров может быть количество подписчиков канала из мета данных о канале, так как чем больше людей в канале, тем больше людей видят пост

In [13]:
# задаю алиасы, чтобы удобно джойнить
train_data = train_data.alias('train_data')
test_data = test_data.alias('test_data')
channel_data = channel_data.alias('channel_data')

# джойню таблицу с мета-данными к изначальным таблицам по channel_id
train_data = train_data.join(channel_data, how='inner', on='channel_id').select('train_data.*', 'channel_data.user_count')
test_data = test_data.join(channel_data, how='inner', on='channel_id').select('test_data.*', 'channel_data.user_count')

Неплохой фичей может быть время от нынешнего поста до предыдущего. Буду считать с помощью оконной функции

In [14]:
# привожу дату к нужному формату
train_data = train_data.withColumn('date', train_data['date'].cast('timestamp'))
test_data = test_data.withColumn('date', test_data['date'].cast('timestamp'))

In [15]:
from pyspark.sql import Window
# делаю окно, по которому буду идти - в рамках канала и по увеличению даты
w = Window.partitionBy('channel_id').orderBy('date')
# считаю разницу между датой данной строки и предыдущей по времени. Для предыдущей использую lag по окну. Время привожу к 
# формату unix и получаю разницу по времени в секундах
train_data = train_data.withColumn('previos_post_diff', sf.unix_timestamp(train_data.date) -  \
                                   sf.unix_timestamp(sf.lag('date', 1).over(w)))
# для самого первого по времени поста в рамках канала значения не будет, поэтому заполню нулем
train_data = train_data.na.fill(0)
# аналогично для теста
test_data = test_data.withColumn('previos_post_diff', sf.unix_timestamp(test_data.date) -  \
                                   sf.unix_timestamp(sf.lag('date', 1).over(w)))
test_data = test_data.na.fill(0)

Полезной фичей, которую можно выделить из текста будет количество хэштегов, так как они могут влиять на охват

In [16]:
# с помощью регулярных выражений нахожу слова с хэштегами, а дальше проверяю наличие символа # и перевожу bool в int
train_data = train_data.withColumn("hashtags_count", 
                                   sf.regexp_extract(train_data.text, r"#\w+", 0).rlike("#").cast("integer"))
# там, где хэштегов не нашлось - заполняю нулями
train_data = train_data.na.fill(0)
# аналогично для теста
test_data = test_data.withColumn("hashtags_count", 
                                   sf.regexp_extract(test_data.text, r"#\w+", 0).rlike("#").cast("integer"))

test_data = test_data.na.fill(0)

In [17]:
def make_features(df):
    cols = ['channel_id', 'id', 'tg_id', 'has_image', 'is_forwarded', 'user_count', 'previos_post_diff', 'hashtags_count']
    if 'views' in df.columns:
        cols.append('views')
    return df.select(*cols)

In [18]:
train_features = make_features(train_data).cache()
test_features = make_features(test_data).cache()

In [19]:
train_features.write.csv('train_csv', mode='overwrite', header=True)
test_features.write.csv('test_csv', mode='overwrite', header=True)

# load features to pandas
you also can use .toPandas()

In [20]:
import subprocess
import glob
import os
import shutil
import pandas as pd

def load_and_merge_csv(path, **kwargs):
    dfs = []
    for g in glob.glob(os.path.join(path, '*.csv')):
        dfs.append(pd.read_csv(g, **kwargs))
    res = pd.concat(dfs)
    res = res.set_index('id')
    return res

In [21]:
trainXY = load_and_merge_csv('train_csv')
testX = load_and_merge_csv('test_csv')

In [22]:
import numpy as np

Ycol = 'views'
to_drop = ['channel_id']
trainX, trainY = trainXY.drop(Ycol, axis=1).drop(to_drop, axis=1), trainXY[Ycol]
trainY = np.log(trainY + 100)

testX = testX.drop(to_drop, axis=1)

# train your model and predict test

In [23]:
!pip3 install catboost

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [24]:
# в качестве модели буду использовать катбуст. Пул нужен для того, чтобы закинуть в него данные и передать потом в модель
from catboost import CatBoostRegressor, Pool

data_train = Pool(
            data=trainX,
            label=trainY,
            cat_features=['has_image', 'is_forwarded']
            )

In [26]:
# инициализирую модель и обучаю ее
model = CatBoostRegressor(iterations=200)
model.fit(data_train)

Learning rate set to 0.5
0:	learn: 1.3318141	total: 779ms	remaining: 2m 35s
1:	learn: 1.2803169	total: 1.38s	remaining: 2m 16s
2:	learn: 1.2608424	total: 1.99s	remaining: 2m 10s
3:	learn: 1.2494896	total: 2.56s	remaining: 2m 5s
4:	learn: 1.2449299	total: 3.12s	remaining: 2m 1s
5:	learn: 1.2424979	total: 3.68s	remaining: 1m 58s
6:	learn: 1.2390658	total: 4.25s	remaining: 1m 57s
7:	learn: 1.2356352	total: 4.84s	remaining: 1m 56s
8:	learn: 1.2320357	total: 5.41s	remaining: 1m 54s
9:	learn: 1.2295300	total: 6.01s	remaining: 1m 54s
10:	learn: 1.2273941	total: 6.58s	remaining: 1m 53s
11:	learn: 1.2256632	total: 7.26s	remaining: 1m 53s
12:	learn: 1.2219306	total: 8.3s	remaining: 1m 59s
13:	learn: 1.2198230	total: 9.32s	remaining: 2m 3s
14:	learn: 1.2166462	total: 10.4s	remaining: 2m 7s
15:	learn: 1.2135471	total: 11.4s	remaining: 2m 10s
16:	learn: 1.2115980	total: 11.9s	remaining: 2m 8s
17:	learn: 1.2101229	total: 12.5s	remaining: 2m 6s
18:	learn: 1.2077624	total: 13.1s	remaining: 2m 4s
19:	l

<catboost.core.CatBoostRegressor at 0x7f84f5b215a0>

In [27]:
prediction = model.predict(testX)

In [28]:
prediction

array([7.34366479, 7.69458015, 7.75328089, ..., 8.7623654 , 9.42898875,
       9.37390279])

In [29]:
prediction.shape

(244386,)

In [30]:
assert prediction.shape == (244386,)

# submit

In [31]:
! curl -H 'Cache-Control: no-cache' https://raw.githubusercontent.com/mike0sv/lsml_submit_server/2023/src/client.py -o client.py

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1907  100  1907    0     0   5253      0 --:--:-- --:--:-- --:--:--  5253


In [32]:
import client

In [35]:
client.make_eval(pd.DataFrame({'views': prediction}, index=testX.index), final=True)

Enter username:
user12
Enter password:
··········


{'data': {'mape': 11.004088394069493,
  'mean_absolute_error': 0.8332399555539057,
  'mean_squared_error': 1.6428055673416508,
  'rmse': 1.2817197694276432,
  'rmspe': 21.011626084464258},
 'ok': True}

In [36]:
client.check_results()

{'2022-06-22': [{'baseline_beaten': True,
   'is_final': False,
   'metrics': {'mape': 12.341396873546397,
    'mean_absolute_error': 0.9609591200240208,
    'mean_squared_error': 1.7034760140196612,
    'rmse': 1.3051727908670412,
    'rmspe': 18.607012460419195}},
  {'baseline_beaten': True,
   'is_final': True,
   'metrics': {'mape': 12.341396873546397,
    'mean_absolute_error': 0.9609591200240208,
    'mean_squared_error': 1.7034760140196612,
    'rmse': 1.3051727908670412,
    'rmspe': 18.607012460419195}},
  {'baseline_beaten': True,
   'is_final': False,
   'metrics': {'mape': 11.108795658652609,
    'mean_absolute_error': 0.8803678193558462,
    'mean_squared_error': 1.4708072986136502,
    'rmse': 1.2127684439387636,
    'rmspe': 16.84264300082646}},
  {'baseline_beaten': True,
   'is_final': True,
   'metrics': {'mape': 11.108795658652609,
    'mean_absolute_error': 0.8803678193558462,
    'mean_squared_error': 1.4708072986136502,
    'rmse': 1.2127684439387636,
    'rmspe':