### Spark home-work 

#### Installing requiered dependencies and import

In [16]:
!pip install emoji



In [17]:
import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import ArrayType, StringType
from pyspark import broadcast
import emoji

In [18]:
spark = SparkSession.builder.master('local[*]').appName('Spark-HW').getOrCreate()

#### Data uploading

In [19]:
posts = spark.read.json('./bigdata20/posts_api.json')
posts.createOrReplaceTempView("posts")

posts_likes = spark.read.parquet('./bigdata20/posts_likes.parquet')
posts_likes.createOrReplaceTempView("posts_likes")

In [20]:
followers = spark.read.parquet('./bigdata20/followers.parquet')
followers.createOrReplaceTempView("followers")

followers_posts = spark.read.json('./bigdata20/followers_posts_api_final.json')
followers_posts.createOrReplaceTempView("followers_posts")

followers_posts_likes = spark.read.parquet('./bigdata20/followers_posts_likes.parquet')
followers_posts_likes.createOrReplaceTempView("followers_posts_likes")

#### JSON saver

In [21]:
def write_to_json(file, task):
    file.write.json('./'+task+'/'+str(file))

## 1 task
##### Comments

In [22]:
posts_comments = spark.sql('SELECT key, comments.count FROM posts ORDER BY count DESC LIMIT 20')
follow_posts_comm = spark.sql('SELECT key, comments.count FROM followers_posts ORDER BY count DESC LIMIT 20')

In [24]:
write_to_json(posts_comm, "task_1")
write_to_json(follow_posts_comm, "task_1")

##### Likes

In [25]:
posts_likes = spark.sql('SELECT key, likes.count FROM posts ORDER BY count DESC LIMIT 20')
follow_posts_likes = spark.sql('SELECT key, likes.count FROM followers_posts ORDER BY count DESC LIMIT 20')

In [27]:
write_to_json(posts_likes, "task_1")
write_to_json(follow_posts_likes, "task_1")

#### Reposts

In [28]:
posts_rep = spark.sql('SELECT key, reposts.count FROM posts ORDER BY count DESC LIMIT 20')
follow_posts_rep = spark.sql('SELECT key, reposts.count FROM followers_posts ORDER BY count DESC LIMIT 20')

In [30]:
write_to_json(posts_rep, "task_1")
write_to_json(follow_posts_rep, "task_1")

## 2 task

In [35]:
followers_likes = spark.sql('SELECT likerId, COUNT(*) as number_of_likes FROM followers_posts_likes GROUP BY likerId').sort(desc("number_of_likes")).limit(20)
followers_own = spark.sql('SELECT copy_history.owner_id, COUNT(*) as own_count FROM followers_posts GROUP BY copy_history.owner_id').sort(desc("own_count")).limit(20)

In [36]:
write_to_json(followers_likes, "task_2")
write_to_json(followers_own, "task_2")

## 3 task

In [37]:
posts_followers_posts = posts.select('owner_id', col('id').alias('post_id')).join(followers_posts.select(col('owner_id').alias('user_id'), 'copy_history'), posts.owner_id == followers_posts.copy_history.owner_id.getItem(0)).select('post_id', 'user_id').groupby('post_id').agg(collect_set('user_id'))

In [45]:
write_to_json(posts_followers_posts, "task_3")

## 4 task

In [39]:
sc = spark.sparkContext
emojis = sc.broadcast(emoji.UNICODE_EMOJI)

emoji_udf = udf(lambda text: [symb for symb in text if symb in emojis.value], ArrayType(StringType()))

posts_text = posts \
    .filter(posts.text != '') \
    .select('id','text') 
text_emoji = posts_text \
    .select("id", emoji_udf("text").alias("emojis"))

In [46]:
write_to_json(text_emoji, "task_4")

## 5 task

In [41]:
likes_without_self = followers_posts_likes \
    .filter(followers_posts_likes.ownerId != followers_posts_likes.likerId)
likes = likes_without_self.groupBy('likerId').agg(collect_set('ownerId').alias('ownerIdSet')).select(col('likerId').alias('likerUserId'), 'ownerIdSet')

final_likes = likes \
    .join(likes_without_self, likes.likerUserId == likes_without_self.ownerId)
aggregated_likes = final_likes.withColumn("IsFriends", expr("array_contains(ownerIdSet, likerId)"))

friends = aggregated_likes \
    .filter(aggregated_likes.IsFriends == True) \
    .groupBy(col('ownerId').alias('user')).agg(collect_set('likerId').alias('friends'))

In [47]:
write_to_json(friends, "task_5")

## 6 task

In [48]:
fans = aggregated_likes \
    .filter(aggregated_likes.IsFriends == False) \
    .groupBy(col('ownerId') \
    .alias('user')) \
    .agg(collect_set('likerId').alias('fans'))

In [50]:
write_to_json(fans, "task_6")