In [12]:
from pyspark import SparkContext
from time import time
import sys
from datetime import datetime
from json import load, loads
import csv, io

rdd = sc.wholeTextFiles("hdfs://localhost:9000/raw_json")

facebook_data = rdd.filter(lambda x: "facebook" in x[0]).flatMap(lambda x: loads(x[1])).persist()
twitter_data = rdd.filter(lambda x: "twitter" in x[0]).flatMap(lambda x: loads(x[1])).persist()
youtube_data = rdd.filter(lambda x: "youtube" in x[0]).flatMap(lambda x: loads(x[1])).persist()
instagram_data = rdd.filter(lambda x: "instagram" in x[0]).flatMap(lambda x: loads(x[1])).persist()

print("done reading")

done reading


HDFS digunakan sebagai backgroundservice sumber data dari Spark. Tiap pembacaan memiliki info nama file dan isi file, yang saya gunakan untuk membedakan sosial media tiap entry data.

In [6]:
def post_mapper(post):
    try:
        return(post.get("created_time", "-").split("T")[0], (1, 0))
    except:
        pass
        
def comment_mapper(comment):
    try:
        return (comment.get("created_time", "-").split("T")[0], (0, 1))
    except:
        pass
        
post_mapped = facebook_data.map(lambda post: post_mapper(post)).persist()

comment_data = facebook_data.flatMap(lambda x: x.get("comments", {}).get("data", []))
comment_mapped = comment_data.map(lambda comment: comment_mapper(comment)).persist()

facebook_mapped = post_mapped.union(comment_mapped)
facebook_reduced = (facebook_mapped
    .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
    .map(lambda x: ("facebook", x[0], x[1][0], x[1][1]))
)
print(facebook_reduced.collect()[:10])

[('facebook', '2021-01-30', 1, 16), ('facebook', '2021-01-28', 1, 25), ('facebook', '2021-01-25', 1, 21), ('facebook', '2021-01-21', 1, 35), ('facebook', '2021-01-19', 4, 101), ('facebook', '2021-01-18', 4, 91), ('facebook', '2021-01-15', 3, 62), ('facebook', '2021-01-10', 1, 26), ('facebook', '2021-01-02', 1, 23), ('facebook', '2021-02-28', 1, 27)]


Pemrosesan data facebook untuk setiap entrynya berupa suatu post, juga merupakan sarang dari sejumlah comment. Comment-comment ini juga tidak musti pada hari yang sama, oleh karena itu perlu dimasukkan dalam proses mapreduce dengan logika lain. Saya memanfaatkan flatMap untuk memproses comment untuk akhirnya menghasilkan 3-tuple sama dengan hasil post, yakni (tanggal, nb_post, nb_comment)

In [7]:
def twitter_mapper(data):
    date = datetime.strptime(data.get('created_at'), "%a %b %d %H:%M:%S %z %Y")
    try:
        return (date.strftime('%Y-%m-%d'), (1, data.get("reply_count", 0)))
    except:
        pass

twitter_mapped = twitter_data.map(lambda post: twitter_mapper(post)).persist()
twitter_reduced = (twitter_mapped
    .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
    .map(lambda x: ("twitter", x[0], x[1][0], x[1][1]))
)
print(twitter_reduced.collect()[:10])

[('twitter', '2021-01-01', 11, 637), ('twitter', '2021-01-10', 5, 670), ('twitter', '2021-01-16', 1, 1), ('twitter', '2021-01-18', 7, 1816), ('twitter', '2021-01-19', 24, 1242), ('twitter', '2021-02-05', 185, 1685), ('twitter', '2021-01-29', 3, 2), ('twitter', '2021-01-31', 7, 482), ('twitter', '2021-01-28', 4, 6), ('twitter', '2021-02-06', 17, 14)]


Setiap entry data merupakan post twitter yang memiliki data jumlah reply (saya anggap reply sebagai comment). Tanggal saya ambil dari salah satu fungsi bawaan datetime untuk mengganti format agar sesuai sosial media lain. Sesuai umumnya pada tugas ini, mapper menghasilkan 3-tuple (tanggal, nb_post, nb_comment)

In [8]:
def youtube_mapper(data):
    date_str = data.get("snippet", {}).get("publishedAt", "-").split("T")[0]
    if(data.get("kind") == "youtube#video"):
        return (date_str, (1, 0))
    else:
        return (date_str, (0, 1))

    
youtube_mapped = youtube_data.map(lambda post: youtube_mapper(post)).persist()
youtube_reduced = (youtube_mapped
    .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
    .map(lambda x: ("youtube", x[0], x[1][0], x[1][1]))
)
print(youtube_reduced.collect()[:10])

[('youtube', '2021-07-26', 1, 10), ('youtube', '2021-07-22', 0, 8), ('youtube', '2021-07-15', 0, 8), ('youtube', '2021-06-27', 0, 6), ('youtube', '-', 0, 1738), ('youtube', '2021-06-23', 0, 10), ('youtube', '2021-06-17', 0, 12), ('youtube', '2021-06-14', 1, 6), ('youtube', '2021-06-12', 0, 172), ('youtube', '2021-06-11', 0, 4)]


Youtube post atau comment dibedakan dari kindnya, youtube#video sebagai post atau youtube#comment sebagai comment.

In [10]:
def instagram_mapper(data):
    date_str = "-"
    try:
        timestamp = int(data.get("created_time"))
        if(timestamp > 0):
            date_str = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d')
    except:
        pass
    if data.get("parent"): # This is a comment
        return (date_str, (0, 1))
    else: # This is a post
        return (date_str, (1, 0))

instagram_mapped = instagram_data.map(lambda post: instagram_mapper(post)).persist()
instagram_reduced = (instagram_mapped
    .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
    .map(lambda x: ("instagram", x[0], x[1][0], x[1][1]))
)
print(instagram_reduced.collect()[:10])

[('instagram', '2021-03-20', 6, 86), ('instagram', '2021-03-21', 0, 38), ('instagram', '2021-03-22', 14, 36), ('instagram', '2021-03-29', 0, 5), ('instagram', '2021-04-09', 19, 14), ('instagram', '2021-04-22', 4, 5), ('instagram', '2021-01-04', 8, 2), ('instagram', '2021-01-15', 13, 12), ('instagram', '2021-01-16', 2, 83), ('instagram', '2021-01-17', 0, 24)]


Saya membedakan apakah data instagram merupakan post atau comment dari apakah memiliki atribut "parent" yang berarti merupakan comment. Sebetulnya dalam post ada juga kumpulan comment, namun saya cek data yang sama dengan komen tersebut muncul lagi di entry berbeda, sehingga saya rasa pendekatan ini sudah sesuai

In [16]:
def list_to_csv_str(x):
    output = io.StringIO("")
    csv.writer(output).writerow(x)
    return output.getvalue().strip()

result = facebook_reduced.union(twitter_reduced).union(youtube_reduced).union(instagram_reduced)
result_as_csv = result.map(list_to_csv_str)
result_as_csv.saveAsTextFile("hdfs://localhost:9000/output2/milestone2_result")
result_as_csv.saveAsTextFile("milestone2_result")

Simpan ke dalam folder milestone2 setelah digabung. Sekali lagi dijelaskan terdapat 4 kolom, yakni berturut-turut jenis media sosial, tanggal, banyak post pada tanggal tersebut, dan banyak komen pada tanggal tersebut