In [1]:
spark

In [2]:
df = (spark.read
      .format("com.mongodb.spark.sql.DefaultSource")
      .option("uri","mongodb://localhost/tweet.sample")
      .load())
df.createOrReplaceTempView('tweet')

In [4]:
query = '''SELECT lang, count(*) count FROM tweet WHERE delete IS NULL GROUP BY 1 ORDER BY 2 DESC'''
spark.sql(query).show(3)
        

+----+-----+
|lang|count|
+----+-----+
|  ja|37927|
|  en|32403|
|  ko|12524|
+----+-----+
only showing top 3 rows



In [5]:
query = '''SELECT from_unixtime(timestamp_ms / 1000) time, text from tweet WHERE lang = 'en' '''
en_tweets = spark.sql(query)

In [6]:
en_tweets.show(3)

+-------------------+--------------------+
|               time|                text|
+-------------------+--------------------+
|2017-11-26 16:43:48|my 2 favs 😍😍😍 ...|
|2017-11-26 16:43:48|Andrew Bartlett '...|
|2017-11-26 16:43:48|RT @mocent0: ‘Hav...|
+-------------------+--------------------+
only showing top 3 rows



In [7]:
from pyspark.sql import Row

def text_split(row):
    for word in row.text.split():
        yield Row(time=row.time, word=word)

In [8]:
en_tweets.rdd.take(1)

[Row(time=u'2017-11-26 17:38:08', text=u'RT @Svage2times: Pull up with the clit, lemme lick https://t.co/uRp3O1u6ix')]

In [9]:
en_tweets.rdd.flatMap(text_split).take(2)

[Row(time=u'2017-11-26 17:38:08', word=u'RT'),
 Row(time=u'2017-11-26 17:38:08', word=u'@Svage2times:')]

In [10]:
en_tweets.rdd.flatMap(text_split).toDF().show(2)

+-------------------+-------------+
|               time|         word|
+-------------------+-------------+
|2017-11-26 17:38:08|           RT|
|2017-11-26 17:38:08|@Svage2times:|
+-------------------+-------------+
only showing top 2 rows



In [11]:
words = en_tweets.rdd.flatMap(text_split).toDF()

In [12]:
words.createOrReplaceTempView('words')

In [13]:
query = '''SELECT word, count(*) count FROM words GROUP BY 1 ORDER BY 2 DESC'''
spark.sql(query).show(3)
        

+----+-----+
|word|count|
+----+-----+
|  RT|18582|
| the| 8680|
|  to| 7598|
+----+-----+
only showing top 3 rows



In [14]:
spark.sql(query).show(100)

+-------------+-----+
|         word|count|
+-------------+-----+
|           RT|18582|
|          the| 8680|
|           to| 7598|
|            a| 5404|
|          and| 5049|
|            I| 4631|
|           of| 4631|
|           is| 4032|
|          you| 3987|
|           in| 3870|
|          for| 3500|
|           on| 2365|
|         this| 2242|
|           my| 2204|
|         that| 1869|
|         with| 1766|
|           me| 1676|
|           be| 1628|
|            -| 1571|
|           it| 1520|
|           at| 1491|
|            i| 1477|
|          are| 1462|
|          The| 1420|
|         your| 1412|
|           so| 1387|
|         have| 1246|
|          was| 1182|
|          but| 1110|
|           by| 1082|
|          not| 1082|
|         just| 1072|
|        &amp;| 1071|
|         like| 1062|
|          all|  991|
|         from|  977|
|         will|  955|
|         when|  922|
|          who|  899|
|           we|  895|
|          one|  821|
|           if|  803|
|         

In [15]:
words.write.saveAsTable('twitter_sample_words')

In [16]:
!ls -R spark-warehouse

[34mtwitter_sample_words[m[m

spark-warehouse/twitter_sample_words:
_SUCCESS
part-00000-e3862246-7b55-4419-b6a3-4aa9a959eeb1-c000.snappy.parquet
part-00001-e3862246-7b55-4419-b6a3-4aa9a959eeb1-c000.snappy.parquet


In [17]:
spark.table('twitter_sample_words').count()

433713

In [18]:
query = '''SELECT substr(time, 1, 13) time, word, count(*) count FROM twitter_sample_words GROUP BY 1,2 '''
spark.sql(query).count()

116064

In [24]:
query = '''SELECT word, count,IF(count > 1000, word, concat('COUNT=',count)) category FROM ( SELECT word, count(*) count FROM twitter_sample_words GROUP BY 1)t'''
spark.sql(query).count()

99565

In [26]:
spark.sql(query).show(5)

+--------------+-----+---------+
|          word|count| category|
+--------------+-----+---------+
|@BuzzFeedNews:|    4|  COUNT=4|
|      Scrapped|    2|  COUNT=2|
|      everyday|   26| COUNT=26|
|@OviyaasFamily|    1|  COUNT=1|
|         still|  392|COUNT=392|
+--------------+-----+---------+
only showing top 5 rows



In [28]:
result = spark.sql(query).toPandas()

In [32]:
result.head(10)

Unnamed: 0,word,count,category
0,@BuzzFeedNews:,4,COUNT=4
1,Scrapped,2,COUNT=2
2,everyday,26,COUNT=26
3,@OviyaasFamily,1,COUNT=1
4,still,392,COUNT=392
5,Amazing,40,COUNT=40
6,those,149,COUNT=149
7,some,388,COUNT=388
8,persist,2,COUNT=2
9,hope,111,COUNT=111


In [41]:
import pandas as pd
query = '''SELECT substr(time, 1, 13) time, word, count(*) count FROM twitter_sample_words GROUP BY 1,2 '''
result=spark.sql(query).toPandas()
result['time'] = pd.to_datetime(result['time'])
result.head(10)


Unnamed: 0,time,word,count
0,2017-11-26 17:00:00,We,153
1,2017-11-26 17:00:00,download,5
2,2017-11-26 17:00:00,Hyojin,2
3,2017-11-26 17:00:00,press,34
4,2017-11-26 17:00:00,@milliebbrown:,7
5,2017-11-26 17:00:00,fuck,71
6,2017-11-26 17:00:00,janet,3
7,2017-11-26 17:00:00,Muhammadu,1
8,2017-11-26 17:00:00,@DrWhoOnline:,1
9,2017-11-26 17:00:00,@ArmaanMalik22,1


In [43]:
result.to_csv('word_summary.csv', index=False, encoding='utf-8')