In [1]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import * 
from pyspark.sql.types import * 
import pandas as pd
import os

In [2]:
sc = SparkSession.builder.appName("PysparkLab2") \
    .config ("spark.sql.shuffle.partitions", "50") \
    .config("spark.driver.maxResultSize","5g") \
    .config ("spark.sql.execution.arrow.enabled", "true") \
    .getOrCreate()

In [3]:
data_dir = r"G:\Data\spark_labs\bigdata20\bigdata20\followers_posts_likes.parquet"
result_dir = r"G:\Data\spark_labs\bigdata20\bigdata20\results\task2"

parquet_files = [_ for _ in os.listdir(data_dir) if  _.endswith('.parquet')]

for file_idx, file_path in enumerate(parquet_files):
    if file_idx == 0:
        users_df = sc.read.parquet(os.path.join(data_dir, file_path))
    else: 
        users_df = users_df.union(sc.read.parquet(os.path.join(data_dir, file_path)))
# users_df = sc.read.parquet(users_path)

In [4]:
users_df.count()

3190103

In [5]:
### Likes
users_df.createOrReplaceTempView("likes_view")
top_likers = sc.sql("select count(itemId) as likes_count, likerId from likes_view WHERE itemType == 'post' GROUP BY likerId ORDER BY likes_count desc LIMIT 20")

In [6]:
top_likers.show()
top_likers.toPandas().to_json(os.path.join(result_dir, 'top_liked.json'))

+-----------+---------+
|likes_count|  likerId|
+-----------+---------+
|       8104|194073434|
|       5332|150371150|
|       5261|271081114|
|       4946|  6524088|
|       3711|189597336|
|       3394|142999083|
|       3217|215686327|
|       2747|514404131|
|       2350|  2818498|
|       2212|419925361|
|       2162|493380857|
|       2122|424434709|
|       1985| 95783577|
|       1975| 94697255|
|       1777|  4448812|
|       1763|330771656|
|       1720|228571738|
|       1661|325927416|
|       1645|347711731|
|       1641|501177379|
+-----------+---------+



  PyArrow >= 0.15.1 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


In [89]:
posts_data_dir = r"G:\Data\spark_labs\bigdata20\bigdata20\followers_posts_api_final.json"
posts1_path = os.path.join(posts_data_dir, '0_39773a62_followers_posts_api_final.json')
posts2_path = os.path.join(posts_data_dir, '1_7a8a2098_followers_posts_api_final.json')
posts1_df = sc.read.json(posts1_path)
posts2_df = sc.read.json(posts2_path)

In [90]:
# can't union because error - 'signer_id'
# from pyspark.sql.types import StructType
# union_udf = udf(lambda row: [item.pop('signer_id', None) for item in row], StructType())
# posts1_df = posts1_df.withColumn('copy_history', lit(union_udf('copy_history')))

# posts_df = posts1_df.union(posts2_df)

In [91]:
posts1_df.show()

+--------------------+-------------+--------------------+---------+----------+----------+-------+----+-----+---------+-----------+-------------+--------+------------------+---------+-------+---------+--------------------+-----------+-----+
|         attachments|     comments|        copy_history|copyright|      date|final_post|from_id| geo|   id|is_pinned|        key|        likes|owner_id|       post_source|post_type|reposts|signer_id|                text|unavailable|views|
+--------------------+-------------+--------------------+---------+----------+----------+-------+----+-----+---------+-----------+-------------+--------+------------------+---------+-------+---------+--------------------+-----------+-----+
|[[,,,,,,,,,, vide...| [0, 0, true]|                null|     null|1550165023|      null|  87449|null| 3316|     null| 87449_3316| [1, 1, 6, 0]|   87449| [,, iphone, api,]|     post| [0, 0]|     null|Я люблю Вас. Я вч...|       null|[428]|
|[[,,,,,, [fe02668...| [0, 0, true]|    

In [92]:
posts1_df.columns

['attachments',
 'comments',
 'copy_history',
 'copyright',
 'date',
 'final_post',
 'from_id',
 'geo',
 'id',
 'is_pinned',
 'key',
 'likes',
 'owner_id',
 'post_source',
 'post_type',
 'reposts',
 'signer_id',
 'text',
 'unavailable',
 'views']

In [95]:
### From the second dataset
posts1_df.createOrReplaceTempView("df1_view")
posts2_df.createOrReplaceTempView("df2_view")
top_reposters = sc.sql("""SELECT owner_id, count(copy_history) as reposts_count 
                            FROM df1_view 
                            WHERE copy_history IS NOT NULL GROUP 
                            BY owner_id 
                            UNION ALL 
                            SELECT owner_id, count(copy_history) as reposts_count 
                            FROM df2_view 
                            WHERE copy_history IS NOT NULL 
                            GROUP BY owner_id 
                            ORDER BY reposts_count desc
                            LIMIT 20
                       """)
# top_reposters = sc.sql("select id, comments, copy_history, text, views, reposts from df1_view WHERE copy_history IS NOT NULL ORDER BY comments.count desc LIMIT 20")

In [96]:
top_reposters.show()

+---------+-------------+
| owner_id|reposts_count|
+---------+-------------+
|  2547211|        19187|
|  2547211|        18555|
|357231922|        12162|
|357231922|        11187|
|168543860|         9232|
|168543860|         9197|
| 25646344|         5748|
| 25646344|         5374|
|176861294|         4626|
|176861294|         4396|
|    29840|         3920|
|524656784|         3752|
|143207077|         3674|
|459339006|         3542|
|524656784|         3490|
|143207077|         3487|
|141687240|         3460|
|514384760|         3386|
|141687240|         3344|
|    29840|         3244|
+---------+-------------+
only showing top 20 rows

