In [47]:
from os import environ
import findspark
import pandas
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, create_map, lit, sum, split, explode
from pyspark.sql.functions import to_timestamp, from_unixtime


environ['SPARK_HOME'] = '/usr/local/spark'
findspark.init()

sc = SparkContext("local", "first app")
sqlContext = SQLContext(sc)

In [135]:
import json
text_file = sc.textFile('AFINN-111.txt')
with open('dictionary.json', 'w') as file:
    for row in text_file.collect():
        xrow = row.split('\t')
        dct = {'word':xrow[0], 'value': xrow[1]}
        data = json.dump(dct, file)
        file.write('\n')
text_file.count()

2477

In [136]:
raw_tweet_df = sqlContext.read.json('three_minutes_tweets.json')
affin_dict_df = sqlContext.read.json('dictionary.json')

In [137]:
raw_data_df = raw_tweet_df.select(
    col('id').alias('id'),
    col('user.screen_name').alias('username'),
    'timestamp_ms',
    col('lang').alias('lang_code'),
    col('user.time_zone').alias('timezone'),
    col('place.full_name').alias('place_fullname'),
    col('user.location').alias('user_location'),
    col('entities.user_mentions.screen_name').alias('user_mention'),
    col('entities.urls.display_url').alias('display_url'),
    'text',
).filter(raw_tweet_df.text.isNotNull())
raw_data_df.limit(3).toPandas()

Unnamed: 0,id,username,timestamp_ms,lang_code,timezone,place_fullname,user_location,user_mention,display_url,text
0,633030779619012608,snoow3333,1439761273661,ar,Nairobi,,,[],[],ايه الأكل 😜
1,633030779610664960,balwinderstyles,1439761273659,und,,,,"[nitishuna, LasVegasChicas, I_luv_reds, Dimond...",[],RT @nitishuna: @LasVegasChicas @I_luv_reds @Di...
2,633030779631566848,eqtybas,1439761273664,ar,Pacific Time (US & Canada),,,[],[],إنّ العرب إذا تغلبوا على أوطان أسرع إليها الخر...


In [138]:
splitted_text_df = raw_data_df.select(
    'id',
    'username',
    'lang_code',
    'timezone',
    'place_fullname',
    'user_location',
    'user_mention',
    'display_url',
    'text',
    'timestamp_ms',
).withColumn('create_dt', from_unixtime((raw_data_df.timestamp_ms / 1000).cast('bigint'), 'dd.MM.yyyy HH:mm:ss'))\
.withColumn('splitted_text', explode(split(col('text'), ' ')))
splitted_text_df.limit(3).toPandas()

Unnamed: 0,id,username,lang_code,timezone,place_fullname,user_location,user_mention,display_url,text,timestamp_ms,create_dt,splitted_text
0,633030779619012608,snoow3333,ar,Nairobi,,,[],[],ايه الأكل 😜,1439761273661,17.08.2015 00:41:13,ايه
1,633030779619012608,snoow3333,ar,Nairobi,,,[],[],ايه الأكل 😜,1439761273661,17.08.2015 00:41:13,الأكل
2,633030779619012608,snoow3333,ar,Nairobi,,,[],[],ايه الأكل 😜,1439761273661,17.08.2015 00:41:13,😜


In [139]:
joined_df = splitted_text_df.alias('SPLT')\
.join(affin_dict_df.alias('DICT'), col('SPLT.splitted_text') == col('DICT.word'), 'left')\
.select('SPLT.id',
        'SPLT.username',
        'SPLT.timezone',
        'SPLT.lang_code',
        'SPLT.place_fullname',
        'SPLT.user_location',
        'SPLT.user_mention',
        'SPLT.display_url',
        'SPLT.text',
        'SPLT.splitted_text',
        'SPLT.create_dt',
        'DICT.word',
        'DICT.value')
# join_df.show()
joined_df.limit(3).toPandas()
# inner (without any lang) or left join(only 'EN')

Unnamed: 0,id,username,timezone,lang_code,place_fullname,user_location,user_mention,display_url,text,splitted_text,create_dt,word,value
0,633030779619012608,snoow3333,Nairobi,ar,,,[],[],ايه الأكل 😜,ايه,17.08.2015 00:41:13,,
1,633030779619012608,snoow3333,Nairobi,ar,,,[],[],ايه الأكل 😜,الأكل,17.08.2015 00:41:13,,
2,633030779619012608,snoow3333,Nairobi,ar,,,[],[],ايه الأكل 😜,😜,17.08.2015 00:41:13,,


In [140]:
sqlContext.sql('show tables').show()

+--------+-----------------+-----------+
|database|        tableName|isTemporary|
+--------+-----------------+-----------+
|        |dt_most_happy_usr|       true|
|        |        dt_report|       true|
+--------+-----------------+-----------+



In [141]:
joined_df.createOrReplaceTempView('dt_report')

In [143]:
# timezone_report desc= 
sqlContext.sql('''
select 
  timezone
, sum(value) as happy_value 
    from dt_report
        where timezone is not null
    group by timezone 
    order by 2 desc''').limit(5).toPandas()

Unnamed: 0,timezone,happy_value
0,London,59.0
1,Amsterdam,51.0
2,Pacific Time (US & Canada),41.0
3,Eastern Time (US & Canada),37.0
4,Central Time (US & Canada),33.0


In [144]:
# timezone_report asc= 
sqlContext.sql('''
select
  timezone
, sum(value) as happy_value
    from dt_report 
        where value is not null 
    group by timezone 
    order by 2 asc''').limit(5).toPandas()

Unnamed: 0,timezone,happy_value
0,Arizona,-31.0
1,Berlin,-15.0
2,Mexico City,-14.0
3,Buenos Aires,-13.0
4,Santiago,-12.0


In [145]:
# location_report desc= 
sqlContext.sql('''
select 
  user_location
, sum(value) as happy_value 
    from dt_report 
        where user_location <> "" 
    group by user_location 
    order by 2 desc''').limit(5).toPandas()

Unnamed: 0,user_location,happy_value
0,USA,16.0
1,@ManUtd,14.0
2,cala boca e me beija,12.0
3,Ireland,10.0
4,"Wichita, Kansas",9.0


In [147]:
# location_report asc= 
sqlContext.sql('''
select 
  user_location
, sum(value) as happy_value
    from dt_report 
        where user_location <> "" 
        and value is not null 
    group by user_location 
    order by 2 asc''').limit(5).toPandas()

Unnamed: 0,user_location,happy_value
0,London,-15.0
1,houston → san marcos,-9.0
2,she/they/he,-8.0
3,Football,-8.0
4,301 to my city,-8.0


In [148]:
# user_report desc= 
sqlContext.sql('''
select 
  username
, sum(value) as happy_value
    from dt_report 
        where value is not null 
    group by username 
    order by 2 desc''').limit(5).toPandas()

Unnamed: 0,username,happy_value
0,awhmyfelix,16.0
1,kWame_NyAn,14.0
2,pailyxo,12.0
3,lisacortezza,10.0
4,esnuff,9.0


In [149]:
# user_report asc= 
sqlContext.sql('''
select 
  username
, sum(value) as happy_value
    from dt_report 
        where value is not null 
    group by username 
    order by 2 asc''').limit(5).toPandas()

Unnamed: 0,username,happy_value
0,SpacceeOutt,-10.0
1,DestiniTeyonna,-9.0
2,itsssCat,-9.0
3,BasedGodJose,-9.0
4,JGoldfarb,-9.0


In [150]:
splitted_usermention_df = joined_df.withColumn('mentions', explode(col('user_mention')))
splitted_usermention_df.limit(3).toPandas()
splitted_usermention_df.createOrReplaceTempView('dt_most_happy_usr')

In [156]:
# sqlContext.sql('select * from dt_most_happy_usr').limit(5).toPandas()
sqlContext.sql('''
select
  username
, sum(value) as happy_value
    from dt_report
        where username in (select mentions from dt_most_happy_usr)
    group by username
    order by 2 desc
''').limit(1).toPandas()

Unnamed: 0,username,happy_value
0,SPINSouthWest,7.0


In [160]:
sqlContext.sql('''
select
  id
, username
, timezone
, lang_code
, place_fullname
, user_location
, text
from dt_report where username = "SPINSouthWest"''').limit(1).toPandas()

Unnamed: 0,id,username,timezone,lang_code,place_fullname,user_location,text
0,633031442310766593,SPINSouthWest,Dublin,en,,Ireland,RT @aine_kearins: @SPINSouthWest @IMRoryHall n...
