In [16]:
from pyspark import SparkContext
from pyspark.sql import *
import string
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col, regexp_replace
from urllib.parse import *
from pyspark.sql.types import TimestampType
import re

In [2]:
sc = SparkContext()
sqlContext = SQLContext(sc)

In [3]:
spark = SparkSession \
 .builder \
 .getOrCreate()

In [9]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('tweets.csv')

In [10]:
df_usr = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('users.csv')

In [11]:
df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- user_key: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- created_str: string (nullable = true)
 |-- retweet_count: string (nullable = true)
 |-- retweeted: string (nullable = true)
 |-- favorite_count: string (nullable = true)
 |-- text: string (nullable = true)
 |-- tweet_id: string (nullable = true)
 |-- source: string (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- expanded_urls: string (nullable = true)
 |-- posted: string (nullable = true)
 |-- mentions: string (nullable = true)
 |-- retweeted_status_id: string (nullable = true)
 |-- in_reply_to_status_id: string (nullable = true)



Iegūt no datu kopas šādu informāciju:

Tvītu un Twitter lietotāju tabulu ierakstu skaitu

In [6]:
df.dtypes

[('user_id', 'string'),
 ('user_key', 'string'),
 ('created_at', 'string'),
 ('created_str', 'string'),
 ('retweet_count', 'string'),
 ('retweeted', 'string'),
 ('favorite_count', 'string'),
 ('text', 'string'),
 ('tweet_id', 'string'),
 ('source', 'string'),
 ('hashtags', 'string'),
 ('expanded_urls', 'string'),
 ('posted', 'string'),
 ('mentions', 'string'),
 ('retweeted_status_id', 'string'),
 ('in_reply_to_status_id', 'string')]

In [12]:
#Twītu skaits
df.count()

207249

In [13]:
# Twitter lietotāju skaits
df_usr.count()

454

Agrāko un vēlāko (pēc datuma/laika) datos esošo Twitter ziņu.

In [23]:
# Maina datu tipu created_str kolonnai us timestamp
df = df.withColumn("created_str", df["created_str"].cast(TimestampType()))

# Attīra Hashtag kolonnu no nevajadzīgiem simboliem
df = df.withColumn('hashtags_cl', regexp_replace('hashtags', '\W', ''))

# Izveido temp view
df.createOrReplaceTempView("tweets")

In [24]:
df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- user_key: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- created_str: timestamp (nullable = true)
 |-- retweet_count: string (nullable = true)
 |-- retweeted: string (nullable = true)
 |-- favorite_count: string (nullable = true)
 |-- text: string (nullable = true)
 |-- tweet_id: string (nullable = true)
 |-- source: string (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- expanded_urls: string (nullable = true)
 |-- posted: string (nullable = true)
 |-- mentions: string (nullable = true)
 |-- retweeted_status_id: string (nullable = true)
 |-- in_reply_to_status_id: string (nullable = true)
 |-- hashtags_cl: string (nullable = true)



In [25]:
#agrakais
spark.sql("""select tweet_id,text,created_str
             from tweets 
             order by created_str""").na.drop().first()

Row(tweet_id='488745973032964096', text='I am in Love with LOVE!', created_str=datetime.datetime(2014, 7, 14, 18, 4, 55))

In [27]:
#vēlākais
spark.sql("""select tweet_id,text,created_str
             from tweets 
             order by created_str desc""").first()

Row(tweet_id='912604038692261888', text='Pal�stinenser erschie�t Israelis � Attent�ter tot https://t.co/F4aO6rgWWV', created_str=datetime.datetime(2017, 9, 26, 9, 5, 32))

20 aktīvākos tvītu autorus un to tvītu skaitu, sakārtotus dilstošā tvītu skaita secībā

In [28]:
(spark.sql("""select user_id,user_key,count(*) as tweet_count 
             from tweets group by user_id,user_key""")
    .orderBy("tweet_count", ascending=False)
    .limit(20)
    .show())

+----------+---------------+-----------+
|   user_id|       user_key|tweet_count|
+----------+---------------+-----------+
|1679279490|  ameliebaldwin|       9269|
|1671234620|        hyddrox|       6813|
|2882013788|    giselleevns|       6652|
|2671070290|   patriotblake|       4140|
|4508630900| thefoundingson|       3663|
|1727482238| melvinsroberts|       3346|
|1768259989|   mrclydepratt|       3263|
|1868496344|  brianaregland|       3261|
|2572058134|  leroylovesusa|       3229|
|1658420976|      baobaeham|       3215|
|1655194147|  melanymelanin|       3212|
|1658202894|    laurabaeley|       3201|
|1623180199| jeffreykahunas|       3197|
|1684524144|   datwisenigga|       3197|
|4224729994|        ten_gop|       3194|
|1676481360|    emileewaren|       3192|
|1660771422|garrettsimpson_|       3188|
|1694026190|jacquelinisbest|       3169|
|1649967228|     _nickluna_|       3159|
|1690487623|   michellearry|       3156|
+----------+---------------+-----------+



20 populārākos tvītos pieminētos hashtagus, sakārtotus dilstošā secībā pēc tvītu skaita, kur tie pieminēti

In [31]:
# Hashtag kolonna izskatās formatēta list formātā, tāpēc vēlējos noskaidrot,
# kā tiek atdalītas vērtības, kad tekstā ir vairāk par vienu hashtagu.
# Tomēr aplūkojot rezultātus var secināt, ka kollonā sastopams tikai pirmais
# hashtags.
df.select('text','hashtags')\
    .rdd.map(lambda x:(x['hashtags'],x['text'],x['text'].count('#')))\
    .filter(lambda x:x[2]>1)\
    .take(10)


[('"[""Blacklivesmatter""]"',
  'One of the ways to remind that #BlackLivesMatter #BlackPressDay',
  2),
 ('[]',
  'Bewaffnete attackieren Bus mit koptischen Christen #Islamisten #ISIS \nhttps://t.co/YFtCatLk0m',
  2),
 ('"[""STOPIslam""]"',
  '@sendavidperdue How are they gonna protect us if they just let a bunch of terrorist walk the cities of our city? #StopIslam #IslamKills',
  2),
 ('"[""whenthestarsgoblue""]"',
  'RT @hldb73: Bryan or Ryan Adams  #whenthestarsgoblue #RejectedDebateTopics @WorldOfHashtags @TheRyanAdams @bryanadams https://t.co/wFBdne8K…',
  2),
 ('"[""mutual""]"',
  'RT @WorldTruthTV: #mutual #respect https://t.co/auIjJ2RdBU',
  2),
 ('"[""OneLetterOffSports""]"',
  'Kareem Abdul Jabber #OneLetterOffSports @midnight #HashtagWars',
  2),
 ('"[""God""]"',
  '#God can be realized through all paths. All #religions...',
  2),
 ('"[""2017trendsreport""]"',
  'RT @NBLmusicSalerno: #2017trendsreport Follow Mio Paisan @SleepSkee #Excellent @Twitter @LoveQuotes #RT #UrgentA

In [33]:
# Tāpēc hashtagus pamēģinām izgūt no text kolonnas.
df.select('text').na.drop()\
    .rdd.flatMap(lambda x: x['text'].split())\
    .filter(lambda x:x[0]=='#')\
    .map(lambda hst: (hst, 1)) \
    .reduceByKey(lambda a, b: a + b)\
    .sortBy(lambda x:-x[1])\
    .take(20)
    

[('#politics', 3605),
 ('#tcot', 2742),
 ('#MAGA', 2168),
 ('#PJNET', 2085),
 ('#news', 2040),
 ('#Trump', 1536),
 ('#Merkelmussbleiben', 1081),
 ('#TrumpForPresident', 1065),
 ('#WakeUpAmerica', 1021),
 ('#TCOT', 901),
 ('#IslamKills', 896),
 ('#NeverHillary', 889),
 ('#ccot', 861),
 ('#2A', 848),
 ('#Trump2016', 840),
 ('#TrumpPence16', 680),
 ('#RejectedDebateTopics', 671),
 ('#BlackLivesMatter', 641),
 ('#CCOT', 641),
 ('#ThingsYouCantIgnore', 630)]

20 populārākos URL, kas pieminēti tvītos - te ir jāizmanto izvērstie URL, nevis t.co saīsinātie URL

In [15]:
df.select("expanded_urls").na.drop()\
    .rdd.map(lambda x: x['expanded_urls'].replace('"','').replace('[','').replace(']',''))\
    .filter(lambda x: "http" in x)\
    .map(lambda link: (link, 1)) \
    .reduceByKey(lambda a, b: a + b)\
    .sortBy(lambda x:-x[1])\
    .take(20)

[('https://twibble.io', 326),
 ('<a href=http://twitter.com rel=nofollow>Twitter Web Client</a>', 154),
 ('<a href=https://about.twitter.com/products/tweetdeck rel=nofollow>TweetDeck</a>',
  52),
 ('http://USFREEDOMARMY.COM', 18),
 ('http://dailycaller.com/2016/09/16/just-5-7-percent-of-clinton-foundation-budget-actually-went-to-charity/',
  16),
 ('https://twitter.com/gloed_up/status/765196453677527040', 15),
 ('https://twitter.com/reddroostermann/status/771026812566642688', 14),
 ('http://www.vox.com/2016/7/30/12332922/donald-trump-khan-muslim?utm_campaign=vox&utm_content=chorus&utm_medium=social&utm_source=twitter',
  13),
 ('https://twitter.com/hillaryclinton/status/776783575375028224', 12),
 ('http://wh.gov/iFE5w', 12),
 ('http://www.breitbart.com/big-government/2016/09/16/msnbc-politico-bloomberg-cnn-mcclatchy-confirm-hillary-clintons-2008-campaign-spread-birtherism/',
  12),
 ('http://proudemocrat.com/watch-amal-clooney-just-owned-donald-trump-video/#',
  12),
 ('https://twitter

tvītu skaitu pa mēnešiem

In [34]:
spark.sql("""select month(created_str) as cr_month
                    ,count(*) as tweet_count
             from tweets
             group by month(created_str)
""").na.drop().show()

+--------+-----------+
|cr_month|tweet_count|
+--------+-----------+
|      12|      24356|
|       1|      25007|
|       6|       5352|
|       3|      14303|
|       5|       4480|
|       9|      26011|
|       4|       7284|
|       8|      14092|
|       7|      12925|
|      10|      29053|
|      11|      23164|
|       2|      17434|
+--------+-----------+



informāciju par katra mēneša 5 populārākajiem hashtagiem

In [35]:
df_month_ht = spark.sql("""select month(created_str) as cr_month,hashtags_cl, count(*) as ht_count
                           from tweets
                           where hashtags_cl != ''
                           group by month(created_str),hashtags_cl
""").na.drop()

In [36]:
window = Window.partitionBy(df_month_ht['cr_month']).orderBy(df_month_ht['ht_count'].desc())

df_month_ht.select('*', rank().over(window).alias('rank'))\
  .filter(col('rank') <= 5) \
  .show() 

+--------+--------------------+--------+----+
|cr_month|         hashtags_cl|ht_count|rank|
+--------+--------------------+--------+----+
|      12| ThingsYouCantIgnore|     523|   1|
|      12|  ChristmasAftermath|     492|   2|
|      12| IdRunForPresidentIf|     491|   3|
|      12|GiftIdeasForPolit...|     453|   4|
|      12|        2016In4Words|     413|   5|
|       1|          ItsRiskyTo|     434|   1|
|       1|    2017SurvivalTips|     280|   2|
|       1|    ReasonsToProtest|     270|   3|
|       1|     IHaveADreamThat|     263|   4|
|       1|      PotusLastTweet|     254|   5|
|       6|        SummerAMovie|     135|   1|
|       6|                news|     111|   2|
|       6|        2016election|      65|   3|
|       6|                tcot|      59|   4|
|       6|               local|      52|   5|
|       3|          IslamKills|     474|   1|
|       3|   NoCyberCensorship|     242|   2|
|       3|           STOPIslam|     200|   3|
|       3|            Brussels|   

20 populārāko saišu domēni

In [37]:
def returnDomain (urlstr):
    url = re.search("(?P<url>https?://[^\s]+)", urlstr).group("url")
    parsed_uri = urlparse(url)
    domain = '{uri.scheme}://{uri.netloc}/'.format(uri=parsed_uri)
    if (domain != ':///'):
        return domain
    else: return urlstr

In [38]:
df.select("expanded_urls").na.drop()\
    .rdd.map(lambda x: x['expanded_urls'].replace('"','').replace('[','').replace(']',''))\
    .filter(lambda x: "http" in x)\
    .map(lambda link: (returnDomain(link), 1)) \
    .reduceByKey(lambda a, b: a + b)\
    .sortBy(lambda x:-x[1])\
    .take(20)

[('https://twitter.com/', 5834),
 ('http://bit.ly/', 3998),
 ('http://wapo.st/', 449),
 ('http://ln.is/', 395),
 ('https://twibble.io/', 326),
 ('http://sh.st/', 297),
 ('http://dlvr.it/', 296),
 ('http://www.breitbart.com/', 288),
 ('http://fb.me/', 238),
 ('https://youtu.be/', 228),
 ('http://twitter.com/', 182),
 ('http://hill.cm/', 182),
 ('https://www.youtube.com/', 172),
 ('http://dailycaller.com/', 154),
 ('http://ow.ly/', 150),
 ('http://dailym.ai/', 134),
 ('http://www.huffingtonpost.com/', 124),
 ('https://goo.gl/', 121),
 ('http://politi.co/', 120),
 ('http://on.rt.com/', 119)]

Twītu skaits, kuros pieminēta H. Klintone vai Tramps

In [21]:
df.select('text').na.drop()\
    .rdd.flatMap(lambda x: x['text'].upper().split())\
    .filter(lambda x:x in ('TRUMP','CLINTON'))\
    .map(lambda hst: (hst, 1)) \
    .reduceByKey(lambda a, b: a + b)\
    .sortBy(lambda x:-x[1])\
    .take(20)

[('TRUMP', 19858), ('CLINTON', 9224)]