In [3]:
import findspark
findspark.init()
import pyspark as ps
import warnings
from pyspark.sql import SQLContext

In [4]:
try:
    # create SparkContext on all CPUs available: in my case I have 4 CPUs on my laptop
    sc = ps.SparkContext('local[4]')
    sqlContext = SQLContext(sc)
    print("Just created a SparkContext")
except ValueError:
    warnings.warn("SparkContext already exists in this scope")
    

Just created a SparkContext


In [6]:
n = 50 #can be set as a parameter

In [7]:
#Load exam1_users.csv file
#its loaded in form of a distrubuted pyspark dataframe
#for parallel computation
#loading everything in stringtype to handle data variations like null value
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType
schema_users = StructType([
    StructField("Handle_User", StringType()),
    StructField("Name", StringType()),
    StructField("Bio", StringType()),
    StructField("Country", StringType()),
    StructField("Location", StringType()),
    StructField("Followers", StringType()),
    StructField("Following", StringType())
])
users_df = (sqlContext.read
    .schema(schema_users)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("exam1_users.csv"))
users_df.take(5)

[Row(Handle_User=u'_____0__o______', Name=u'Mamba Out w/ 60 pts', Bio=u'My favorite NBA player scored 60 points in his final NBA game what did yours do?Lakers.Cubs.Panthers.Syracuse.Sabres.Barca.NYCFC. Periscope/Snapchat/IG: Jaroc93', Country=u'US', Location=u'Raleigh, NC', Followers=u'9614', Following=u'2554'),
 Row(Handle_User=u'_____fumi', Name=u'Fumi Omori', Bio=u'designer', Country=u'US', Location=u'Manhattan, NY', Followers=u'25', Following=u'106'),
 Row(Handle_User=u'___AnaAlicia', Name=u'Ana Alicia Gomez', Bio=u'#FFGF \ufffd make poi feel some type of way', Country=u'US', Location=u'Lansing, MI', Followers=u'236', Following=u'245'),
 Row(Handle_User=u'___antony', Name=u'Tonik Salazar', Bio=u'Dj Overange with Billionaire Boyz Club', Country=u'MX', Location=u'Iztacalco, Distrito Federal', Followers=u'143', Following=u'143'),
 Row(Handle_User=u'___emelyyy', Name=u'E M E L \ufffd', Bio=u'21| God First| California |Chris ??|Catracha', Country=u'US', Location=u'Los Angeles, CA', Foll

In [9]:
#Load exam1_tweets.csv file
#its loaded in form of a distrubuted pyspark dataframe
#for parallel computation
schema_tweets = StructType([
    StructField("Handle", StringType()),
    StructField("Tweets", StringType()),
    StructField("Favs", StringType()),
    StructField("RTs", StringType()),
    StructField("Latitude", StringType()),
    StructField("Longitude", StringType())
])
tweets_df = (sqlContext.read
    .schema(schema_tweets)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("exam1_tweets.csv"))
tweets_df.take(5)

[Row(Handle=u'BillSchulhoff', Tweets=u'Wind 3.2 mph NNE. Barometer 30.20 in, Rising slowly. Temperature 49.3 \ufffdF. Rain today 0.00 in. Humidity 32%', Favs=None, RTs=None, Latitude=u'40.76027778', Longitude=u'-72.95472221999999'),
 Row(Handle=u'danipolis', Tweets=u'Pausa pro caf\ufffd antes de embarcar no pr\ufffdximo v\ufffdo. #trippolisontheroad #danipolisviaja Pause for\ufffd https://t.co/PhcJ4oYktP', Favs=None, RTs=None, Latitude=u'32.89834949', Longitude=u'-97.03919589'),
 Row(Handle=u'KJacobs27', Tweets=u'Good. Morning. #morning #Saturday #diner #VT #breakfast #nucorpsofcadetsring #ring #college\ufffd https://t.co/dBZ7dbwX6f', Favs=None, RTs=None, Latitude=u'44.199476', Longitude=u'-72.50417299999999'),
 Row(Handle=u'stncurtis', Tweets=u'@gratefuldead recordstoredayus ?????? @ TOMS MUSIC TRADE https://t.co/CURRmn6iJo', Favs=None, RTs=None, Latitude=u'39.901474', Longitude=u'-76.60681700000001'),
 Row(Handle=u'wi_borzo', Tweets=u'Egg in a muffin!!! (@ Rocket Baby Bakery - @rocke

In [10]:
# create a column to add length of tweets
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
slen = udf(lambda s: len(s.split()), IntegerType())
tweets_df = tweets_df.withColumn("tweets_len_count", slen(tweets_df.Tweets))
tweets_df.take(5)

[Row(Handle=u'BillSchulhoff', Tweets=u'Wind 3.2 mph NNE. Barometer 30.20 in, Rising slowly. Temperature 49.3 \ufffdF. Rain today 0.00 in. Humidity 32%', Favs=None, RTs=None, Latitude=u'40.76027778', Longitude=u'-72.95472221999999', tweets_len_count=18),
 Row(Handle=u'danipolis', Tweets=u'Pausa pro caf\ufffd antes de embarcar no pr\ufffdximo v\ufffdo. #trippolisontheroad #danipolisviaja Pause for\ufffd https://t.co/PhcJ4oYktP', Favs=None, RTs=None, Latitude=u'32.89834949', Longitude=u'-97.03919589', tweets_len_count=14),
 Row(Handle=u'KJacobs27', Tweets=u'Good. Morning. #morning #Saturday #diner #VT #breakfast #nucorpsofcadetsring #ring #college\ufffd https://t.co/dBZ7dbwX6f', Favs=None, RTs=None, Latitude=u'44.199476', Longitude=u'-72.50417299999999', tweets_len_count=11),
 Row(Handle=u'stncurtis', Tweets=u'@gratefuldead recordstoredayus ?????? @ TOMS MUSIC TRADE https://t.co/CURRmn6iJo', Favs=None, RTs=None, Latitude=u'39.901474', Longitude=u'-76.60681700000001', tweets_len_count=8),


In [11]:
#doing an aggregation to sum up all the length of tweets per user
from pyspark.sql.functions import sum
from pyspark.sql.functions import desc
tweets_df_shortlist = tweets_df.groupBy('Handle').agg(sum('tweets_len_count').alias('total_len')).sort(desc('total_len'))
print tweets_df_shortlist.take(40)

[Row(Handle=u'511NY', total_len=23528), Row(Handle=u'CVSHealthJobs', total_len=10265), Row(Handle=u'WDCareers', total_len=9844), Row(Handle=u'SONICjobs', total_len=7178), Row(Handle=u'WorkWithSHC', total_len=7012), Row(Handle=u'CompassJobBoard', total_len=6947), Row(Handle=u'FavoriteJobs', total_len=6771), Row(Handle=u'VSIcareers', total_len=6680), Row(Handle=u'SpeedwayJobs', total_len=5441), Row(Handle=u'VirtualJukebox', total_len=5144), Row(Handle=u'Sunrise_Careers', total_len=5120), Row(Handle=u'Kindred_Jobs', total_len=4955), Row(Handle=u'AccountableHS', total_len=4710), Row(Handle=u'FTGiGSJobs', total_len=4598), Row(Handle=u'MercyJobs', total_len=4511), Row(Handle=u'dine_here', total_len=4419), Row(Handle=u'tmj_ca_hrta', total_len=4411), Row(Handle=u'JobsatVA', total_len=4372), Row(Handle=u'NevadaCountyWX', total_len=4335), Row(Handle=u'HMSHostCareers', total_len=4034), Row(Handle=u'workatavalonbay', total_len=3696), Row(Handle=u'TotalTrafficDFW', total_len=3657), Row(Handle=u'Joi

In [14]:
# doing a join with users_df on handle
df_final = users_df.join(tweets_df_shortlist, users_df.Handle_User == tweets_df_shortlist.Handle).sort(desc('total_len'))
df_final.take(5)

[Row(Handle_User=u'511NY', Name=u'511 New York', Bio=u'Traffic & transit updates for all of New York State provided by New York State 511. Visit the website for more feeds.', Country=u'US', Location=u'Trenton, NJ', Followers=u'1285', Following=u'206', Handle=u'511NY', total_len=23528),
 Row(Handle_User=u'CVSHealthJobs', Name=u'CVS Health Jobs', Bio=u"We're #hiring! Explore a variety of open positions that offer a challenging & rewarding #career. Follow us to uncover #job opportunities at @CVSHealth.", Country=u'US', Location=u'Texas, USA', Followers=u'628', Following=u'3', Handle=u'CVSHealthJobs', total_len=10265),
 Row(Handle_User=u'WDCareers', Name=u'Winn-Dixie Careers', Bio=u"Join our WINN-ing team and help make the lives of our customers and fellow associates FUN! Winn-Dixie is one of the nation's largest food retailers.", Country=u'US', Location=u'Edgewood, FL', Followers=u'527', Following=u'252', Handle=u'WDCareers', total_len=9844),
 Row(Handle_User=u'SONICjobs', Name=u'SONIC Jo

In [19]:
#selecting the column names required
#descending sorting to get the top n users
#repartitioning into 1 file to save it to hdfs
#if multiple systems connected partitioning can be made n
df_final.select('Handle', 'Name', 'Bio', 'Followers', 'Following', 'total_len').sort(desc('total_len')).limit(n).show(n)
df_final.select('Handle', 'Name', 'Bio', 'Followers', 'Following', 'total_len').sort(desc('total_len')).limit(n).repartition(1).write.csv("final.hdfs", sep=',')

+---------------+--------------------+--------------------+---------+---------+---------+
|         Handle|                Name|                 Bio|Followers|Following|total_len|
+---------------+--------------------+--------------------+---------+---------+---------+
|          511NY|        511 New York|Traffic & transit...|     1285|      206|    23528|
|  CVSHealthJobs|     CVS Health Jobs|We're #hiring! Ex...|      628|        3|    10265|
|      WDCareers|  Winn-Dixie Careers|Join our WINN-ing...|      527|      252|     9844|
|      SONICjobs|          SONIC Jobs|Check out our ope...|      268|       39|     7178|
|    WorkWithSHC|         SHC Careers|Work for the Best...|      926|        1|     7012|
|CompassJobBoard|  CompassUSAJobBoard|Welcome to Compas...|      881|      124|     6947|
|   FavoriteJobs|       Favorite Jobs|We're your Advoca...|      191|       99|     6771|
|     VSIcareers| Vitamin Shoppe Jobs|At #VitaminShoppe...|      601|       96|     6680|
|   Speedw