# Домашнее задание по курсу Инфраструктура Больших данных

In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import ArrayType, StringType
from pyspark import broadcast


import emoji

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

spark = SparkSession.builder \
    .master('local') \
    .appName('BigData-HW') \
    .config('spark.executor.memory', '16gb') \
    .config("spark.cores.max", "8") \
    .getOrCreate()

RESULTS = 'json_results/'

In [2]:
def init_json_to_parquet():
    spark.read.json('./followers_posts_api_final.json').write.parquet('./followers_posts_api_final.parquet')
    spark.read.json('./posts_api.json').write.parquet('./posts_api.parquet')

def get_init_files():
    files_path = ['./followers.parquet', './posts_likes.parquet', 
                  './followers_posts_likes.parquet', './followers_posts_api_final.parquet', 
                  './posts_api.parquet']
    return map(lambda file_path : spark.read.parquet(file_path), files_path)

def save_results(dataframe, path):
    dataframe.write.json(RESULTS + path)


init_json_to_parquet

[followers, posts_likes, followers_posts_likes, followers_posts, posts] = get_init_files()

<function __main__.init_json_to_parquet()>

In [3]:
top20_comments_1 = posts.select('key','comments.count').sort(desc("count")).limit(20)
top20_comments_2 = followers_posts.select('key','comments.count').sort(desc("count")).limit(20)

save_results(top20_comments_1, 'task1_top20_comments1.json')
save_results(top20_comments_2, 'task1_top20_comments2.json')

In [4]:
top20_likes_1 = posts.select('key','likes.count').sort(desc("count")).limit(20)
top20_likes_2 = followers_posts.select('key','likes.count').sort(desc("count")).limit(20)

top20_likes_1.show()
top20_likes_2.show()

save_results(top20_likes_1, 'task1_top20_likes1.json')
save_results(top20_likes_2, 'task1_top20_likes2.json')

+---------+-----+
|      key|count|
+---------+-----+
|-94_32022| 1637|
|-94_35068| 1629|
|-94_17492| 1516|
|-94_18526| 1026|
|-94_19552|  955|
|-94_41468|  952|
|-94_19419|  868|
|-94_29046|  824|
|-94_32546|  786|
|-94_24085|  765|
|-94_40180|  759|
|-94_33658|  708|
|-94_13532|  633|
|-94_40842|  631|
|-94_35117|  588|
|-94_17014|  581|
|-94_19583|  553|
|-94_19809|  552|
|-94_27455|  550|
|-94_11999|  549|
+---------+-----+

+----------------+-----+
|             key|count|
+----------------+-----+
|119920644_425873| 3271|
|   368925151_330| 2745|
|  101059709_3309| 2505|
| 119920644_46298| 2457|
|   368925151_329| 2194|
|119920644_311504| 2160|
|119920644_340354| 2148|
| 119920644_90055| 2145|
| 187877260_31472| 2099|
|119920644_388261| 2092|
|119920644_399151| 2000|
| 119920644_46295| 1968|
|119920644_428224| 1914|
| 119920644_46264| 1776|
|119920644_387736| 1739|
|119920644_322736| 1665|
|     640248_4004| 1593|
| 187877260_32164| 1574|
| 119920644_46287| 1517|
| 124015630_16762

In [5]:
top20_reposts_1 = posts.select('key','reposts.count').sort(desc("count")).limit(20)
top20_reposts_2 = followers_posts.select('key','reposts.count').sort(desc("count")).limit(20)

save_results(top20_reposts_1, 'task1_top20_reposts1.json')
save_results(top20_reposts_2, 'task1_top20_reposts2.json')

In [6]:
top20_followers_likes = followers_posts_likes.groupby('likerId') \
                                             .count() \
                                             .sort(desc("count")) \
                                             .limit(20)

save_results(top20_followers_likes, 'task2_top20_followers_likes.json')

In [7]:
top20_followers_reposts = followers_posts.filter(followers_posts.copy_history.owner_id.getItem(0) == -94).select('key','copy_history.id','owner_id')
top20_followers_reposts = top20_followers_reposts.groupBy('owner_id').count().sort(desc("count")).limit(20)

save_results(top20_followers_reposts, 'task2_top20_followers_reposts.json')

In [8]:
reposts = followers_posts.filter(followers_posts.copy_history.owner_id.getItem(0) == -94) \
                         .select('key','copy_history.id','copy_history.owner_id') \
                         .withColumn("owner_id", element_at('owner_id',1)) \
                         .withColumn("id", element_at('id',1)) \
                         .select("owner_id","key","id") \
                         .selectExpr("cast(owner_id as string) owner_id", "cast(id as string) id", "key")

reposts = reposts.select(concat(col('owner_id'), lit('_'),  col('id')), 'key') \
                 .toDF("itmo_posts","users_posts")

reposts = reposts.groupBy("itmo_posts") \
                 .agg(collect_set("users_posts") \
                 .alias("users_posts"))

reposts.show(5)

save_results(reposts.coalesce(1), 'task3_reposts.json')

+-------------+--------------------+
|   itmo_posts|         users_posts|
+-------------+--------------------+
|    -94_41684|    [43225330_12294]|
|    -94_42316|[192932783_2713, ...|
|-94_456242467|      [3290903_4624]|
|    -94_40134|     [268247082_538]|
|    -94_38995|     [147054031_307]|
+-------------+--------------------+
only showing top 5 rows



In [9]:
sc = spark.sparkContext
emojis = sc.broadcast(emoji.UNICODE_EMOJI)

extract_emoji_udf = udf(lambda text: [e for e in text if e in emojis.value], ArrayType(StringType()))

posts_text = posts.filter(posts.text != "").select('key','text')
text_emoji = posts_text.select("key", extract_emoji_udf("text").alias("emojis"))
text_emoji.sort(desc("emojis")).show(5)

save_results(text_emoji.coalesce(1), 'task4_text_emoji.json')

+---------+--------+
|      key|  emojis|
+---------+--------+
|-94_42548|    [🧪]|
|-94_41588|    [🧪]|
|-94_41140|    [🧪]|
|-94_41997|    [🧪]|
|-94_42356|[🧡, 🚲]|
+---------+--------+
only showing top 5 rows



In [10]:
likes_new = followers_posts_likes.filter(followers_posts_likes.ownerId != followers_posts_likes.likerId)
df1 = likes_new.groupBy('likerId').agg(collect_set('ownerId').alias('ownerId_all'))
df1 = df1.selectExpr("likerId as liker_owner", "ownerId_all as ownerId_all")

inner_join = likes_new.join(df1, likes_new.ownerId == df1.liker_owner)

friends = inner_join.withColumn("IsFriends",
  expr("array_contains(ownerId_all, likerId)"))

friends_true = friends.filter(friends.IsFriends == True)

friends_true_groups = friends_true.groupBy('ownerId').agg(collect_set('likerId').alias('friends_all'))


save_results(friends_true_groups, 'task5_friends.json')

In [11]:
friends_false = friends.filter(friends.IsFriends == False)
fans = friends_false.groupBy('ownerId') \
                    .agg(collect_set('likerId').alias('fans_all'))

save_results(fans, 'task6_fans.json')