<h3>Define dataset path</h3>

In [2]:
import os

big_data_dir = 'D://Datasets/bigdata20/bigdata20'
followers = os.path.join(big_data_dir, 'followers.parquet')
followers_posts = os.path.join(big_data_dir, 'followers_posts_api_final.json')
followers_posts_likes = os.path.join(big_data_dir, 'followers_posts_likes.parquet')
posts = os.path.join(big_data_dir, 'posts_api.json')
posts_likes = os.path.join(big_data_dir, 'posts_likes.parquet')

<h3>Spark initialization</h3>

In [1]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext

conf = SparkConf().setAppName('appName').setMaster('local')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

In [3]:
from pyspark.sql.functions import col, isnan, when, trim, sort_array, explode

<h3>Load dataset</h3>

In [4]:
posts_df = sqlContext.read.json(posts)
posts_df.describe()

DataFrame[summary: string, date: string, from_id: string, id: string, key: string, marked_as_ads: string, owner_id: string, post_type: string, signer_id: string, text: string, unavailable: string]

In [5]:
posts_df.printSchema()

root
 |-- attachments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- album: struct (nullable = true)
 |    |    |    |-- created: long (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- owner_id: long (nullable = true)
 |    |    |    |-- size: long (nullable = true)
 |    |    |    |-- thumb: struct (nullable = true)
 |    |    |    |    |-- access_key: string (nullable = true)
 |    |    |    |    |-- album_id: long (nullable = true)
 |    |    |    |    |-- date: long (nullable = true)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |-- owner_id: long (nullable = true)
 |    |    |    |    |-- sizes: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |    |-- type: string (nullable = true)
 |   

In [8]:
followers_posts_df = sqlContext.read.json(followers_posts)
followers_posts_df.describe()

DataFrame[summary: string, date: string, final_post: string, from_id: string, id: string, is_pinned: string, key: string, owner_id: string, post_type: string, signer_id: string, text: string, unavailable: string]

<h3>Get top posts</h3>

In [6]:
def get_top_posts(posts_df, prefix):
    print('before drop nans: {}'.format(posts_df.count()))
    posts_df = posts_df.na.drop(subset=["date"])
    print('after drop nans: {}'.format(posts_df.count()))

    for column, file_postfix in [('like', 'liked'), ('repost', 'reposted'), ('comment', 'commented')]:
        answer = posts_df.orderBy(posts_df[column + 's'].desc(), asc=False).limit(20).toPandas()
        answer_json = answer.to_json()
        with open('{}_top_{}.json'.format(prefix, file_postfix), 'w') as f:
            f.write(answer_json)

In [7]:
get_top_posts(posts_df, 'task1_posts')

before drop nans: 18430
after drop nans: 12988


In [9]:
get_top_posts(followers_posts_df, 'task1_followers_posts')

before drop nans: 816961
after drop nans: 807103
