In [1]:
from pyspark import SparkContext, SparkConf

In [2]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import *

In [3]:
from pyspark.conf import SparkConf
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql import SQLContext

In [4]:
conf = SparkConf().setAppName("sample_app")
sc = SparkContext(conf=conf)
sqlContext = SQLContext (sc)

Load the data 

In [5]:
segments = sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", "true").option("delimiter", "\u0001").load("hdfs:///user/pknees/RSC20/training.tsv")

In [5]:
segments.show(1)

+--------------------+----+--------------------+----+----+----+--------+--------------------+----------+--------------------+----+----+-----+----------+--------------------+----+----+-----+----------+-----+----+----+----+----+
|                 _c0| _c1|                 _c2| _c3| _c4| _c5|     _c6|                 _c7|       _c8|                 _c9|_c10|_c11| _c12|      _c13|                _c14|_c15|_c16| _c17|      _c18| _c19|_c20|_c21|_c22|_c23|
+--------------------+----+--------------------+----+----+----+--------+--------------------+----------+--------------------+----+----+-----+----------+--------------------+----+----+-----+----------+-----+----+----+----+----+
|101	1942	18628	15...|null|E7D6C5094767223F6...|null|null|null|TopLevel|22C448FF81263D4BA...|1581262691|D557B03872EF8986F...| 986|1201|false|1274269909|00000776B07587ECA...|  94| 648|false|1478011810|false|null|null|null|null|
+--------------------+----+--------------------+----+----+----+--------+--------------------

In [6]:
column_names = ["text_tokens", "hashtags", "tweet_id", "present_media", "present_links", "present_domains",
                "tweet_type", "language", "tweet_timestamp", "engaged_with_user_id", "engaged_with_user_follower_count",
               "engaged_with_user_following_count", "engaged_with_user_is_verified", "engaged_with_user_account_creation",
               "engaging_user_id", "engaging_user_follower_count", "engaging_user_following_count", "engaging_user_is_verified",
               "engaging_user_account_creation", "engaged_follows_engaging", "reply_timestamp", "retweet_timestamp", "retweet_with_comment_timestamp", "like_timestamp"]

In [7]:
segments = segments.toDF(*column_names)

In [8]:
#reduced = segments
big,reduced = segments.randomSplit([0.999, 0.001], seed = 32)

In [38]:
reduced.select("reply_timestamp", "retweet_timestamp", "retweet_with_comment_timestamp", "like_timestamp").show(5)

+---------------+-----------------+------------------------------+--------------+
|reply_timestamp|retweet_timestamp|retweet_with_comment_timestamp|like_timestamp|
+---------------+-----------------+------------------------------+--------------+
|           null|             null|                          null|    1581412148|
|           null|             null|                          null|    1581348656|
|           null|             null|                          null|          null|
|           null|             null|                          null|          null|
|           null|             null|                          null|    1581059560|
+---------------+-----------------+------------------------------+--------------+
only showing top 5 rows



In [9]:
import pyspark.sql.functions as f
from transformers import BertTokenizer
from transformers import BertModel
from pyspark.sql.functions import *

#f.col(reduced.select("text_tokens")).apply(lambda text_tokens: "".join(tokenizer.convert_tokens_to_string(tokenizer.convert_ids_to_tokens(text_tokens.split('\t')))))

I0604 15:58:00.392180 140458577397568 file_utils.py:41] PyTorch version 1.0.1.post2 available.


In [10]:
reduced.select("text_tokens")

DataFrame[text_tokens: string]

Encode list[] columns to boolean whether is it contain elements or not

In [10]:
def containsElement(inp):
    try:
        if len(inp) > 0:
            return True
    except:
        return False

encodeToBool = udf(lambda inp: containsElement(inp))

Count the list elements

In [11]:
def countElements(inp,boolCol):
    if boolCol == 'true':        
        return len(inp.split())    
    
    return 0

elementCount = udf(lambda inp,bl: countElements(inp,bl))

Count the media type

In [12]:
def countMedia(inp,boolCol, mediaType):
    if boolCol == 'true':
        return inp.split().count(str(mediaType))

    return 0

Create functions to define number of videos, photos, and GIFs

In [13]:
mediaPhoto = udf(lambda inp,bl: countMedia(inp, bl, "Photo"))
mediaGif = udf(lambda inp,bl: countMedia(inp, bl, "GIF"))
mediaVideo = udf(lambda inp,bl: countMedia(inp, bl, "Video"))

Check that is there other media just for sure

In [14]:
def countOther(inputCol, boolCol, video, gif, photo):
    if boolCol == "true":
        numInCol = len(inputCol.split())
        mediaNum = int(video) + int(gif) + int(photo)
        if mediaNum == numInCol:
            return 0        
        return numInCol-mediaNum  
    return 0    

mediaOther = udf(lambda inp,bl,vd,gf,ph: countOther(inp,bl,vd,gf,ph))

In [42]:
def classifyLabels(inputCol):
    if inputCol is not None:
        return 1
    
    return 0

labelClassify = udf(lambda inp: classifyLabels(inp))

Encode 'present_media' column

In [15]:
reduced = reduced.withColumn("has_media",encodeToBool(f.col("present_media")))

In [16]:
reduced.select("has_media", "present_media").show(7)

+---------+-------------+
|has_media|present_media|
+---------+-------------+
|     true|        Photo|
|     true|        Video|
|     true|        Video|
|     true|        Photo|
|     true|        Video|
|     true|        Photo|
|    false|         null|
+---------+-------------+
only showing top 7 rows



Count the media types for each attribute

In [17]:
reduced = reduced.withColumn("no_Photo",mediaPhoto(f.col("present_media"),f.col("has_media")))
reduced = reduced.withColumn("no_Gif",mediaGif(f.col("present_media"),f.col("has_media")))
reduced = reduced.withColumn("no_Video",mediaVideo(f.col("present_media"),f.col("has_media")))
reduced = reduced.withColumn("no_Other",mediaOther(f.col("present_media"),f.col("has_media"),f.col("no_Video"),f.col("no_Gif"),f.col("no_Photo")))

In [None]:
reduced.select("has_media", "present_media", "no_Media").show(7)

In [None]:
reduced.select("has_media", "present_media", "no_Video", "no_Gif","no_Photo","no_Other").show(50)

In [None]:
reduced.select("has_media", "present_media", "no_Video", "no_Gif","no_Photo","no_Other").filter(reduced["has_media"] == "true").show(30)

Encode links

In [18]:
reduced = reduced.withColumn("has_links",encodeToBool(f.col("present_links")))

In [19]:
reduced = reduced.withColumn("no_links",elementCount(f.col("present_links"),f.col("has_links")))

In [None]:
reduced.select("present_links", "has_links", "no_links").filter(reduced["has_links"] == "true").filter(reduced["no_links"] != 1).show(20)

Encode hashtags

In [27]:
reduced = reduced.withColumn("has_hashtags",encodeToBool(f.col("hashtags")))
reduced = reduced.withColumn("no_hashtags",elementCount(f.col("hashtags"),f.col("has_hashtags")))

In [28]:
reduced.select("hashtags", "has_hashtags", "no_hashtags").show(7)
#.filter(reduced["has_links"] == "true").filter(reduced["no_domains"] != 1).show(20)

+--------------------+------------+-----------+
|            hashtags|has_hashtags|no_hashtags|
+--------------------+------------+-----------+
|221D4221C80D250DB...|        true|          1|
|                null|       false|          0|
|E5B60323EDA9808E7...|        true|          2|
|D680643EDEC162480...|        true|          2|
|                null|       false|          0|
|                null|       false|          0|
|                null|       false|          0|
+--------------------+------------+-----------+
only showing top 7 rows



Encode domains

In [29]:
reduced = reduced.withColumn("has_domains",encodeToBool(f.col("present_domains")))
reduced = reduced.withColumn("no_domains",elementCount(f.col("present_domains"),f.col("has_domains")))

In [30]:
reduced.select("present_domains", "has_domains", "no_domains").show(7)

+--------------------+-----------+----------+
|     present_domains|has_domains|no_domains|
+--------------------+-----------+----------+
|                null|      false|         0|
|                null|      false|         0|
|                null|      false|         0|
|                null|      false|         0|
|                null|      false|         0|
|3183ACF54B4022B25...|       true|         1|
|                null|      false|         0|
+--------------------+-----------+----------+
only showing top 7 rows



Simplify dependent labels

In [43]:
reduced = reduced.withColumn("is_reply",labelClassify(f.col("reply_timestamp"))).withColumn("is_retweet",labelClassify(f.col("retweet_timestamp"))).withColumn("is_retweet_with_comment",labelClassify(f.col("retweet_with_comment_timestamp"))).withColumn("is_like",labelClassify(f.col("like_timestamp")))
        
    
#"reply_timestamp", "retweet_timestamp", "retweet_with_comment_timestamp", "like_timestamp"

In [None]:
reduced.select("reply_timestamp", "is_reply", "retweet_timestamp", "is_retweet", "retweet_with_comment_timestamp", "is_retweet_with_comment", "like_timestamp", "is_like").show(30)

In [None]:
reduced.select("is_reply","is_retweet", "is_retweet_with_comment", "is_like","has_links", "no_links", "has_hashtags", "no_hashtags")

Decode tokens just for interpretation

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased', do_lower_case=False)
model = BertModel.from_pretrained('bert-base-multilingual-cased')
transf = udf(lambda text_tokens: "".join(tokenizer.convert_tokens_to_string(tokenizer.convert_ids_to_tokens(text_tokens.split('\t')))))

In [69]:
from datetime import datetime
'''
def convertTimesTamp(input):
    converted = 0
    try:
        converted = datetime.fromtimestamp(input).strftime('%Y-%m-%d %H:%M:%S')
    except:
        converted = null
    return str(converted)
'''
#timestampImport = udf(lambda ts:convertTimesTamp(ts))
#timestampImport = udf(lambda ts:datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'))
timestampImport = udf(lambda ts:datetime.fromtimestamp(ts).strftime('%d'))
    #lambda: datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'))

In [62]:
#funcWeekDay =  udf(lambda x: datetime.strptime(x, '%Y-%m-%d').strftime('%w').substr(1, 10))

In [36]:
reduced = reduced.withColumn("decoded_tweet_text",transf(f.col("text_tokens")))

In [53]:
reduced = reduced.withColumn("tweet_timestamp_decoded",timestampImport(f.col("tweet_timestamp")))

In [70]:
segments = segments.withColumn("tweet_timestamp_decoded",timestampImport(f.col("tweet_timestamp")))

counter = 0
if(("media_type").length > 0 ):
    return True # Contains media
else return False # Doesn't contain media 
    

In [None]:
reduced.select("text_tokens","decoded_tweet_text", "tweet_type", "tweet_id", "reply_timestamp", "retweet_timestamp", "retweet_with_comment_timestamp", "like_timestamp").show(1,False )

In [None]:
reduced.select("tweet_timestamp_decoded").show(8)

In [None]:
segments.select("tweet_timestamp_decoded").show(8)

In [82]:
segments.select("tweet_timestamp_decoded").distinct().rdd.map(lambda r:  r).collect()

[Row(tweet_timestamp_decoded='07'),
 Row(tweet_timestamp_decoded='11'),
 Row(tweet_timestamp_decoded='09'),
 Row(tweet_timestamp_decoded='08'),
 Row(tweet_timestamp_decoded='06'),
 Row(tweet_timestamp_decoded='10'),
 Row(tweet_timestamp_decoded='12'),
 Row(tweet_timestamp_decoded='13')]

In [67]:
segments.select("tweet_timestamp_decoded").rdd.max()[0]

'6'

In [58]:
segments.select("tweet_timestamp_decoded").rdd.min()[0]

'2020-02-06 01:00:00'

Filter for the 6th of February, only use verified users

In [87]:
segmentsFiltered = segments.filter(segments["tweet_timestamp_decoded"] == "06" ).filter(segments['engaged_with_user_is_verified'] == True)

In [85]:
segmentsFiltered = segments.filter(segments["tweet_timestamp_decoded"] == "12" )

In [77]:
segmentsFiltered.select("tweet_timestamp_decoded").rdd.max()[0]

'06'

In [78]:
segmentsFiltered.select("tweet_timestamp_decoded").rdd.min()[0]

'06'

In [88]:
segmentsFiltered.count()

4422804

In [80]:
segments.count()

121386431

In [89]:
segmentsFiltered.count()/segments.count()

0.036435736379793554

In [None]:

reduced.filter($"tweet_timestamp" ===  )