In [0]:
%spark.pyspark

from pyspark.sql import SparkSession

from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType, DateType

spark = SparkSession.builder.master("local[*]").appName('Led_Zeppelin').getOrCreate()

In [1]:
%spark.pyspark

data = spark.read.csv("hdfs://vm-dlake2-m-1.test.local/user/yakupov/data/recommendations.csv", sep=',', header=True)

for col_name in data.columns:
    if col_name in ('is_recommended', ):
        continue
    data = data.withColumn(col_name, F.col(col_name).cast("int"))

data.printSchema()

In [2]:
%spark.pyspark

games = spark.read.csv("hdfs://vm-dlake2-m-1.test.local/user/pavlov/data/games.csv", sep=',', header=True)
games = games.withColumn('app_id', F.col('app_id').cast("int"))

games.printSchema()

In [3]:
%spark.pyspark


metadata = spark.read.json("hdfs://vm-dlake2-m-1.test.local/user/pavlov/data/games_metadata.json")

metadata.printSchema()

In [4]:
%spark.pyspark

# games = games.withColumn('app_id', F.col('app_id').cast("int"))

# for col_name in data.columns:
#     if col_name in ('is_recommended', ):
#         continue
#     data = data.withColumn(col_name, F.col(col_name).cast("int"))

data.select(['helpful', 'funny', 'hours']).describe().show()


In [5]:
%spark.pyspark

# отфильтруем значения, которые больше 0.5% квантиля
quantiles = data.approxQuantile(['helpful', 'funny', 'hours'], [0.995], 0.05)

for idx, col_name in enumerate(['helpful', 'funny', 'hours']):
    print(f'{col_name} --> {quantiles[idx]}')
    data = data.filter(F.col(col_name) < quantiles[idx][0])

data.select(['helpful', 'funny', 'hours']).describe().show()

In [6]:
%spark.pyspark

df_grp = data.groupBy('app_id', 'is_recommended').agg(
    F.mean('hours').alias('hours_mean'),
    F.expr('percentile_approx(hours, 0.5)').alias('hours_median'),
    F.countDistinct('user_id').alias('users')
)

df_grp = df_grp.fillna(0)

data.groupBy('is_recommended').agg(
    F.expr('percentile_approx(hours, 0.5)').alias('median_hours'),
    F.mean('hours').alias('mean_hours'),
    F.max('hours').alias('max_hours')
).show()

In [7]:
%spark.pyspark

df_grp.approxQuantile('users', [0.25], 0.05)

In [8]:
%spark.pyspark

# Посмотрим на самые популярные жанры игр (отфильтруем игры, у которых игроков менее квартили)
df = df_grp.filter(F.col('users') > 5)
# df = df.join(metadata.select('app_id', 'tags'), on='app_id')
# df = df.join(games.select('app_id', 'title'), on='app_id')

# df.select(['hours_mean', 'hours_median', 'users']).describe().show()

In [9]:
%spark.pyspark

# TOP-10 игр, в которых играло больше всего пользователей
# df.orderBy(F.col('users').desc()).show(10)

In [10]:
%spark.pyspark

# TOP-10 игр, в которых играло больше всего пользователей
top = df.sort(F.desc("users")).limit(10)
top.show(10)

In [11]:
%spark.pyspark

top_games = [row.app_id for row in top.select('app_id').collect()]
app_games = games.filter(F.col("app_id").isin(top_games)).select('app_id', 'title')
app_games.show()

In [12]:
%spark.pyspark

top_apps = top.join(app_games, on='app_id')
title_users = [row.title for row in top_apps.select('title').collect()]
top_apps.show()

In [13]:
%spark.pyspark


# TOP-10 игр, за которыми было затрачено больше всего времени в среднем
top = df.sort(F.desc("hours_mean")).limit(10)
top.show(10)


In [14]:
%spark.pyspark

top_games = [row.app_id for row in top.select('app_id').collect()]
app_games = games.filter(F.col("app_id").isin(top_games)).select('app_id', 'title')
app_games.show()

In [15]:
%spark.pyspark

top_apps = top.join(app_games, on='app_id')
title_hours = [row.title for row in top_apps.select('title').collect()]
top_apps.show()

In [16]:
%spark.pyspark

# Только одна игра, которая входит в TOP-10 по количеству играющих и затраченных часов
set(title_users) & set(title_hours)

In [17]:
%spark.pyspark
