In [1]:
!pwd

/home/analytics/Memelysis/analytics


In [2]:
import findspark
findspark.init()

In [3]:
import subprocess

from pyspark import SparkContext, SparkConf

from pyspark.sql.types import StructType, IntegerType, StringType, FloatType
from pyspark.sql import SparkSession, SQLContext, functions

from pyspark.sql.functions import lit
from functools import reduce
from pyspark.sql import DataFrame

In [4]:
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()

In [5]:
cmd = 'hdfs dfs -ls /jsons/'

files = subprocess.check_output(cmd, shell=True)
files = files.strip()
files = files.decode().split('\n')

out = []

for path in files:
    f = path.split(" ")[-1]
    if 'memes_' in f:
        out.append(f)
        print(f)

/jsons/memes_1588956783893.json
/jsons/memes_1588975800125.json
/jsons/memes_1588979400070.json
/jsons/memes_1588983000044.json
/jsons/memes_1588986600066.json
/jsons/memes_1588990200082.json
/jsons/memes_1588993800056.json
/jsons/memes_1588997400057.json
/jsons/memes_1589001000045.json
/jsons/memes_1589004600051.json
/jsons/memes_1589008200048.json
/jsons/memes_1589011800040.json
/jsons/memes_1589015400180.json
/jsons/memes_1589019000033.json
/jsons/memes_1589022600061.json
/jsons/memes_1589026200044.json
/jsons/memes_1589029800040.json
/jsons/memes_1589033400043.json
/jsons/memes_1589040600208.json
/jsons/memes_1589041500031.json
/jsons/memes_1589045100027.json
/jsons/memes_1589048700046.json


In [6]:
sqlContext = SQLContext(sc)

In [8]:
sample_json = out[-1]

In [9]:
df = spark.read.json(sample_json)

In [10]:
df.printSchema()

root
 |-- additional_data: string (nullable = true)
 |-- extension: string (nullable = true)
 |-- id: string (nullable = true)
 |-- image_path: string (nullable = true)
 |-- source: string (nullable = true)
 |-- text: string (nullable = true)
 |-- url: string (nullable = true)



# TWITTER

In [55]:
twitter_df = df.filter(df.source == 'twitter')

twitter_schema = StructType().add(
    'created_at', StringType(), True).add(
    'text', StringType(), True).add(
    'favorite_count', IntegerType(), True).add(
    'retweet_count', IntegerType(), True).add(
    'hashtags', StringType(), True)

twitter_data =  twitter_df.select(
    functions.col('id'),
    functions.from_json(functions.col('additional_data'), schema=twitter_schema).alias("data")
).select('id', 'data.*')

twitter_data = twitter_data.withColumnRenamed('text', 'tweet_text')

twitter_df = twitter_df.alias('twitter_df')
twitter_data = twitter_data.alias('twitter_data')

twitter_join = twitter_df.join(twitter_data,
                             twitter_df.id == twitter_data.id).select(
    'twitter_df.id',
    'twitter_df.image_path',
    'twitter_df.extension',
    'twitter_df.source',
    'twitter_df.text',
    'twitter_df.url',
    'twitter_data.created_at',
    'twitter_data.tweet_text',
    'twitter_data.favorite_count',
    'twitter_data.retweet_count',
    'twitter_data.hashtags')
twitter_join.printSchema()
twitter_join.show()

root
 |-- id: string (nullable = true)
 |-- image_path: string (nullable = true)
 |-- extension: string (nullable = true)
 |-- source: string (nullable = true)
 |-- text: string (nullable = true)
 |-- url: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- tweet_text: string (nullable = true)
 |-- favorite_count: integer (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- hashtags: string (nullable = true)

+-------------------+--------------------+---------+-------+--------------------+--------------------+-------------------+--------------------+--------------+-------------+--------------------+
|                 id|          image_path|extension| source|                text|                 url|         created_at|          tweet_text|favorite_count|retweet_count|            hashtags|
+-------------------+--------------------+---------+-------+--------------------+--------------------+-------------------+--------------------+--------------+-----

# MEMEDROID

In [58]:
memedroid_df = df.filter(df.source == 'memedroid')

memedroid_schema = StructType().add(
    'title', StringType(), True).add(
    'tags', StringType(), True).add(
    'date', StringType(), True).add(
    'popularity', StringType(), True)

memedroid_data =  memedroid_df.select(
    functions.col('id'),
    functions.from_json(functions.col('additional_data'), schema=memedroid_schema).alias("data")
).select('id', 'data.*')


memedroid_df = memedroid_df.alias('memedroid_df')
memedroid_data = memedroid_data.alias('memedroid_data')

memedroid_join = memedroid_df.join(memedroid_data,
                             memedroid_df.id == memedroid_data.id).select(
    'memedroid_df.id',
    'memedroid_df.image_path',
    'memedroid_df.extension',
    'memedroid_df.source',
    'memedroid_df.text',
    'memedroid_df.url',
    'memedroid_data.title',
    'memedroid_data.tags',
    'memedroid_data.date',
    'memedroid_data.popularity')

memedroid_join.printSchema()
memedroid_join.show()

root
 |-- id: string (nullable = true)
 |-- image_path: string (nullable = true)
 |-- extension: string (nullable = true)
 |-- source: string (nullable = true)
 |-- text: string (nullable = true)
 |-- url: string (nullable = true)
 |-- title: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- date: string (nullable = true)
 |-- popularity: string (nullable = true)

+--------------------+--------------------+---------+---------+--------------------+--------------------+--------------------+--------------------+-------------------+----------+
|                  id|          image_path|extension|   source|                text|                 url|               title|                tags|               date|popularity|
+--------------------+--------------------+---------+---------+--------------------+--------------------+--------------------+--------------------+-------------------+----------+
|UPLOADED5375eb4ed...|images/memedroid_...|     jpeg|memedroid|Japanese economy 

# IMGUR

In [57]:
imgur_df = df.filter(df.source == 'imgur')

imgur_schema = StructType().add(
    'title', StringType(), True).add(
    'datetime', IntegerType(), True).add(
    'views', IntegerType(), True).add(
    'ups', IntegerType(), True).add(
    'downs', IntegerType(), True).add(
    'points', IntegerType(), True).add(
    'nsfw', StringType(), True)
imgur_data =  imgur_df.select(
    functions.col('id'),
    functions.from_json(functions.col('additional_data'), schema=imgur_schema).alias("data")
).select('id', 'data.*')

imgur_df = imgur_df.alias('imgur_df')
imgur_data = imgur_data.alias('imgur_data')

imgur_join = imgur_df.join(imgur_data,
                             imgur_df.id == imgur_data.id).select(
    'imgur_df.id',
    'imgur_df.image_path',
    'imgur_df.extension',
    'imgur_df.source',
    'imgur_df.text',
    'imgur_df.url',
    'imgur_data.title',
    'imgur_data.datetime',
    'imgur_data.views',
    'imgur_data.ups',
    'imgur_data.downs',
    'imgur_data.points',
    'imgur_data.nsfw'
    )

imgur_join.printSchema()
imgur_join.show()

root
 |-- id: string (nullable = true)
 |-- image_path: string (nullable = true)
 |-- extension: string (nullable = true)
 |-- source: string (nullable = true)
 |-- text: string (nullable = true)
 |-- url: string (nullable = true)
 |-- title: string (nullable = true)
 |-- datetime: integer (nullable = true)
 |-- views: integer (nullable = true)
 |-- ups: integer (nullable = true)
 |-- downs: integer (nullable = true)
 |-- points: integer (nullable = true)
 |-- nsfw: string (nullable = true)

+-------+--------------------+---------+------+--------------------+--------------------+--------------------+----------+-----+---+-----+------+-----+
|     id|          image_path|extension|source|                text|                 url|               title|  datetime|views|ups|downs|points| nsfw|
+-------+--------------------+---------+------+--------------------+--------------------+--------------------+----------+-----+---+-----+------+-----+
|ZT8XYdp|images/imgur_2020...|      png| imgur|7:4

# REDDIT

In [59]:
#[Row(additional_data='{"date"
#     :1.589014785E9,"title":"Gonna get rekt now, I reposted this.","upvotes":11,"upvote_ratio":0.63}')]

reddit_df = df.filter(df.source == 'reddit')

reddit_schema = StructType().add(
    'title', StringType(), True).add(
    'date', FloatType(), True).add(
    'upvotes', IntegerType(), True).add(
    'upvote_ratio', FloatType(), True)
     
reddit_data =  reddit_df.select(
    functions.col('id'),
    functions.from_json(functions.col('additional_data'), schema=reddit_schema).alias("data")
).select('id', 'data.*')

reddit_df = reddit_df.alias('reddit_df')
reddit_data = reddit_data.alias('reddit_data')

reddit_join = reddit_df.join(reddit_data,
                             reddit_df.id == reddit_data.id).select(
    'reddit_df.id',
    'reddit_df.image_path',
    'reddit_df.extension',
    'reddit_df.source',
    'reddit_df.text',
    'reddit_df.url',
    'reddit_data.title',
    'reddit_data.date',
    'reddit_data.upvotes',
    'reddit_data.upvote_ratio'
    )

reddit_join.printSchema()
reddit_join.show()

root
 |-- id: string (nullable = true)
 |-- image_path: string (nullable = true)
 |-- extension: string (nullable = true)
 |-- source: string (nullable = true)
 |-- text: string (nullable = true)
 |-- url: string (nullable = true)
 |-- title: string (nullable = true)
 |-- date: float (nullable = true)
 |-- upvotes: integer (nullable = true)
 |-- upvote_ratio: float (nullable = true)

+----------------+--------------------+---------+------+--------------------+--------------------+--------------------+------------+-------+------------+
|              id|          image_path|extension|source|                text|                 url|               title|        date|upvotes|upvote_ratio|
+----------------+--------------------+---------+------+--------------------+--------------------+--------------------+------------+-------+------------+
|2020050912000573|images/reddit_202...|      png|reddit|When you make a m...|https://i.redd.it...|Gonna get rekt no...|1.58901478E9|     11|        0.6

# JOIN

In [60]:
twitter_join.printSchema()
memedroid_join.printSchema()
imgur_join.printSchema()
reddit_join.printSchema()

root
 |-- id: string (nullable = true)
 |-- image_path: string (nullable = true)
 |-- extension: string (nullable = true)
 |-- source: string (nullable = true)
 |-- text: string (nullable = true)
 |-- url: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- tweet_text: string (nullable = true)
 |-- favorite_count: integer (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- hashtags: string (nullable = true)

root
 |-- id: string (nullable = true)
 |-- image_path: string (nullable = true)
 |-- extension: string (nullable = true)
 |-- source: string (nullable = true)
 |-- text: string (nullable = true)
 |-- url: string (nullable = true)
 |-- title: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- date: string (nullable = true)
 |-- popularity: string (nullable = true)

root
 |-- id: string (nullable = true)
 |-- image_path: string (nullable = true)
 |-- extension: string (nullable = true)
 |-- source: string (nullable = true)
 |-- tex

In [61]:
full_outer_join = ta.join(tb, ta.name == tb.name,how='full') # Could also use 'full_outer'
full_outer_join.show()

NameError: name 'ta' is not defined

In [62]:
twitter_join = twitter_join.alias('twitter_join')
memedroid_join = memedroid_join.alias('memedroid_join')
imgur_join = imgur_join.alias('imgur_join')
reddit_join = reddit_join.alias('reddit_join')

NameError: name 'TableA' is not defined

In [66]:
common_cols = ['id', 'image_path', 'extension', 'source', 'text', 'url']

In [67]:
twitter_join.select(['id', 'image_path', 'extension', 'source', 'text', 'url']).count()

43

In [69]:
twitter_join.select(common_cols).union(imgur_join.select(common_cols))

53

In [72]:
sc.union([twitter_join.select(common_cols),
         memedroid_join.select(common_cols),
         imgur_join.select(common_cols),
         reddit_join.select(common_cols)])

AttributeError: 'DataFrame' object has no attribute '_jrdd_deserializer'

In [73]:
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

In [78]:
unionAll(*[twitter_join.select(common_cols),
         memedroid_join.select(common_cols),
         imgur_join.select(common_cols),
         reddit_join.select(common_cols)]).show()

+-------------------+--------------------+---------+-------+--------------------+--------------------+
|                 id|          image_path|extension| source|                text|                 url|
+-------------------+--------------------+---------+-------+--------------------+--------------------+
|1259043022077034497|images/twitter_20...|      jpg|twitter|Every other membe...|http://pbs.twimg....|
|1259036283676098561|images/twitter_20...|      jpg|twitter|"OmG HAVE yOu SeE...|http://pbs.twimg....|
|1259040856100741121|images/twitter_20...|      jpg|twitter|@shammatheking gu...|http://pbs.twimg....|
|1259034554842648577|images/twitter_20...|      jpg|twitter|Can I copy your h...|http://pbs.twimg....|
|1259033833216843777|images/twitter_20...|      jpg|twitter|d TikTok @gunungg...|http://pbs.twimg....|
|1259030996315496448|images/twitter_20...|      png|twitter|Someone: bites th...|http://pbs.twimg....|
|1259037496899514369|images/twitter_20...|      png|twitter|How about you

In [7]:

def parse_json(df):

    # Schemes for different sources
    twitter_schema = StructType().add(
    'created_at', StringType(), True).add(
    'text', StringType(), True).add(
    'favorite_count', IntegerType(), True).add(
    'retweet_count', IntegerType(), True).add(
    'hashtags', StringType(), True)

    memedroid_schema = StructType().add(
    'title', StringType(), True).add(
    'tags', StringType(), True).add(
    'date', StringType(), True).add(
    'popularity', StringType(), True)

    imgur_schema = StructType().add(
    'title', StringType(), True).add(
    'datetime', IntegerType(), True).add(
    'views', IntegerType(), True).add(
    'ups', IntegerType(), True).add(
    'downs', IntegerType(), True).add(
    'points', IntegerType(), True).add(
    'nsfw', StringType(), True)

    reddit_schema = StructType().add(
    'title', StringType(), True).add(
    'date', FloatType(), True).add(
    'upvotes', IntegerType(), True).add(
    'upvote_ratio', FloatType(), True)
    
    #final dataframe scheme
    final_schema = StructType().add(
        'id', StringType(), True).add(
        'image_path', StringType(), True).add(
        'extension', StringType(), True).add(
        'source', StringType(), True).add(
        'text', StringType(), True).add(
        'url', StringType(), True).add(
        # Twitter
        'twitter_created_at', StringType(), True).add(
        'twitter_tweet_text', StringType(), True).add(
        'twitter_favorite_count', IntegerType(), True).add(
        'twitter_retweet_count', IntegerType(), True).add(
        'twitter_hashtags', StringType(), True).add(
        # Memedroid
        'memedroid_title', StringType(), True).add(
        'memedroid_tags', StringType(), True).add(
        'memedroid_date', StringType(), True).add(
        'memedroid_popularity', StringType(), True).add(
        # imgur
        'imgur_title', StringType(), True).add(
        'imgur_datetime', StringType(), True).add(
        'imgur_views', IntegerType(), True).add(
        'imgur_ups', IntegerType(), True).add(
        'imgur_downs', IntegerType(), True).add(
        'imgur_points', IntegerType(), True).add(
        'imgur_nsfw', StringType(), True).add(
        # reddit
        'reddit_title', StringType(), True).add(
        'reddit_date', FloatType(), True).add(
        'reddit_upvotes', IntegerType(), True).add(
        'reddit_upvote_ratio', FloatType(), True)
    output = sqlContext.createDataFrame(sc.emptyRDD(), final_schema)

    schemes = {"twitter": twitter_schema,
               "memedroid": memedroid_schema,
               "imgur": imgur_schema,
               "reddit": reddit_schema}

    # all sources contain such columns 
    common_cols = ['id', 'image_path', 'extension', 'source', 'text', 'url']

    specific_cols = {"twitter": ['created_at', 'text', 'favorite_count', 'retweet_count', 'hashtags'],
    "memedroid": ['title', 'tags', 'date', 'popularity'],
    "imgur": ['title', 'datetime', 'views', 'ups', 'downs', 'points', 'nsfw'],
    "reddit": ['title', 'date', 'upvotes', 'upvote_ratio']}

    parsed_jsons = []

    for source in ["twitter", "memedroid", "imgur", "reddit"]:
        
        current_df = df.filter(df.source == source) 

        # check if specific data source was correctly collected 
        if current_df.count() > 0:

            current_data = current_df.select(
                functions.col('id'),
                functions.from_json(functions.col('additional_data'), schema=schemes[source]).alias('data')
                ).select('id', 'data.*')

            # twitter have 2 column with same name
            if source == "twitter": current_data = current_data.withColumnRenamed('text', 'tweet_text')

            df_alias = source + "_df"
            data_alias = source + "_data"

            current_df = current_df.alias(df_alias)
            current_data = current_data.alias(data_alias)
                
            # twitter col workaround
            current_join = current_df.join(current_data, current_df.id == current_data.id).select(
                *([(df_alias + "." + colname) for colname in common_cols] +
                 [(data_alias + "." + colname) if colname != 'text' else (data_alias + ".tweet_" + colname) for colname in specific_cols[source]])
                 )

            # renaming cols
            for col in current_join.schema.names:
                if col not in common_cols:
                    current_join = current_join.withColumnRenamed(col, source + '_' + col)

        for field in output.schema.fields:
            if field.name not in current_join.schema.names:
                current_join = current_join.withColumn(field.name, lit(None).cast(field.dataType))
                
        current_join = current_join.select(output.schema.names)
        parsed_jsons.append(current_join)
    

    def unionAll(*dfs):
        return reduce(DataFrame.unionAll, dfs)

    return unionAll(*[df.select('*') for df in parsed_jsons])

In [None]:
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

x = None
for path in out[1:]:
    
    print(path)
    df = spark.read.json(path)
    df = parse_json(df)
    if x is None:
        x = df
    else:
        x = x.union(df)
    del df

/jsons/memes_1588975800125.json


In [17]:
df = spark.read.json(out[0])

In [18]:
x.printSchema()

root
 |-- id: string (nullable = true)
 |-- image_path: string (nullable = true)
 |-- extension: string (nullable = true)
 |-- source: string (nullable = true)
 |-- text: string (nullable = true)
 |-- url: string (nullable = true)
 |-- twitter_created_at: string (nullable = true)
 |-- twitter_tweet_text: string (nullable = true)
 |-- twitter_favorite_count: integer (nullable = true)
 |-- twitter_retweet_count: integer (nullable = true)
 |-- twitter_hashtags: string (nullable = true)
 |-- memedroid_title: string (nullable = true)
 |-- memedroid_tags: string (nullable = true)
 |-- memedroid_date: string (nullable = true)
 |-- memedroid_popularity: string (nullable = true)
 |-- imgur_title: string (nullable = true)
 |-- imgur_datetime: string (nullable = true)
 |-- imgur_views: integer (nullable = true)
 |-- imgur_ups: integer (nullable = true)
 |-- imgur_downs: integer (nullable = true)
 |-- imgur_points: integer (nullable = true)
 |-- imgur_nsfw: string (nullable = true)
 |-- reddit_tit

In [22]:
type(x)

pyspark.sql.dataframe.DataFrame