### Window functions

In [1]:
from pyspark import SparkContext

In [2]:
sc = SparkContext()
sc

21/12/20 21:45:54 WARN Utils: Your hostname, srimac.local resolves to a loopback address: 127.0.0.1; using 192.168.1.10 instead (on interface en0)
21/12/20 21:45:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/20 21:45:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/12/20 21:45:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .appName("Window functions") \
        .getOrCreate()

### Load the dataset

Dataset has been taken from https://www.kaggle.com/arminehn/disasteraccident-sources. <br />
It contains a list of Twitter users who regularly report on natural and man-made disasters, violence or crime. The accounts may belong to journalists, news media, local fire or police departments, other local authorities, or disaster monitors.

In [4]:
accounts = spark.read\
                .format("csv")\
                .option("header", "true")\
                .load('../datasets/disaster_accident_crime_accounts.csv')

                                                                                

In [5]:
accounts.show(5)

+--------------+--------------------+--------------+--------------------+---------+---------+----------------+--------------------+--------------------+
|      category|    on twitter since|twitter handle|         profile url|followers|following|profile location|     profile lat/lon| profile description|
+--------------+--------------------+--------------+--------------------+---------+---------+----------------+--------------------+--------------------+
|          news|1:45 PM - 11 Sep ...|  abc13houston|http://www.abc13.com|   568709|     1113|     Houston, TX|29.75893,-95.3676899|ABC13 is Houston'...|
|police/traffic|3:41 AM - 19 Aug ...|   metpoliceuk|https://beta.met....|   670529|      367|          London| 51.50732,-0.1276399|London's Metropol...|
|            wx|7:00 AM - 17 Feb ...|         wsbtv|http://2wsb.tv/Ne...|   934824|     2134|     Atlanta, GA|33.74909,-84.3901799|Metro Atlanta and...|
|          news|9:22 PM - 19 Sep ...|          ABC7|     http://abc7.com|   904324

In [6]:
accounts = accounts.drop('on twitter since',
                         'profile url',
                         'following',
                         'profile location',
                         'profile lat/lon',
                         'profile description'
                        )

In [7]:
from pyspark.sql.types import IntegerType

In [8]:
accounts = accounts.withColumn("followers", 
                               accounts['followers'].cast(IntegerType()))

In [27]:
accounts.show(5)
accounts.count()
accounts.describe()


+--------------+--------------+---------+
|      category|twitter handle|followers|
+--------------+--------------+---------+
|          news|  abc13houston|   568709|
|police/traffic|   metpoliceuk|   670529|
|            wx|         wsbtv|   934824|
|          news|          ABC7|   904324|
|            wx|         FOX59|   204408|
+--------------+--------------+---------+
only showing top 5 rows



                                                                                

DataFrame[summary: string, category: string, twitter handle: string, followers: string]

#### Window rank function

In [9]:
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func

In [10]:
windowSpec1 = Window \
              .partitionBy(accounts['category']) \
              .orderBy(accounts['followers'].desc())

In [11]:
followers_rank = (func.rank().over(windowSpec1))

followers_rank

Column<'RANK() OVER (PARTITION BY category ORDER BY followers DESC NULLS LAST unspecifiedframe$())'>

In [14]:
# name_count = data.groupBy(['Name', 'Gender'])\
#                  .count()
    
# name_count.show(5)

acccounts_ctegory_group = accounts.groupBy(func.col("category")).count()
acccounts_ctegory_group.show()

+-----------------+-----+
|         category|count|
+-----------------+-----+
|             news| 3909|
|   police/traffic|  557|
|         fire/emt|  702|
|               wx|  906|
|disaster/accident|  512|
|             govt|  204|
|       journalist| 3792|
|            quake|   25|
+-----------------+-----+



In [21]:
followers_rank = accounts.select(accounts['twitter handle'],
                                 accounts['category'],
                                 accounts['followers']
                                ).withColumn('rank', 
                                             func.rank().over(windowSpec1))\
                 .where(func.col("rank")<5)

followers_rank.show()

+---------------+-----------------+---------+----+
| twitter handle|         category|followers|rank|
+---------------+-----------------+---------+----+
|          adage|disaster/accident|   954734|   1|
| spectatorindex|disaster/accident|   714691|   2|
|    CrimesWorld|disaster/accident|   590042|   3|
| SkyFallCarroll|disaster/accident|   443010|   4|
|         nycgov|         fire/emt|   880052|   1|
|  KenyaRedCross|         fire/emt|   765617|   2|
|     NYCTSubway|         fire/emt|   672142|   3|
|  SheriffClarke|         fire/emt|   644748|   4|
|       NYCMayor|             govt|   965189|   1|
|      MIB_India|             govt|   884202|   2|
|       Heritage|             govt|   602540|   3|
| DeptVetAffairs|             govt|   557641|   4|
|     KlasraRauf|       journalist|   995009|   1|
|  gauravcsawant|       journalist|   974663|   2|
| KeithOlbermann|       journalist|   948584|   3|
|   malaysiakini|       journalist|   936741|   4|
|IndiaTodayFLASH|             n

In [14]:
traffic = followers_rank.filter(followers_rank['category'] == 'police/traffic')

traffic.show()

+---------------+--------------+---------+----+
| twitter handle|      category|followers|rank|
+---------------+--------------+---------+----+
|          CPBlr|police/traffic|   822032|   1|
|       Ma3Route|police/traffic|   734626|   2|
|    metpoliceuk|police/traffic|   670529|   3|
|MassStatePolice|police/traffic|   416173|   4|
|  wsdot_traffic|police/traffic|   362617|   5|
|  trafficbutter|police/traffic|   262205|   6|
|BaltimorePolice|police/traffic|   255021|   7|
|           NJSP|police/traffic|   254180|   8|
|      TrafficSA|police/traffic|   253441|   9|
|  KenyanTraffic|police/traffic|   241647|  10|
|     EWNTraffic|police/traffic|   229280|  11|
|      jeffcanoy|police/traffic|   229002|  12|
|    FastCoIdeas|police/traffic|   208623|  13|
|trafficscotland|police/traffic|   190064|  14|
|        2xTessy|police/traffic|   188600|  15|
|   DenverPolice|police/traffic|   184100|  16|
|      Venice311|police/traffic|   165389|  17|
|      Kent_999s|police/traffic|   16484

#### Window max function between rows

In [22]:
windowSpec2 = Window \
              .partitionBy(accounts['category']) \
              .orderBy(accounts['followers'].desc()) \
              .rowsBetween(-1, 0)

In [23]:
followers_max = (func.max(accounts['followers']).over(windowSpec2))

In [24]:
accounts.select(accounts['twitter handle'],
                accounts['category'],
                accounts['followers'],
                followers_max.alias("followers_max")).show()

+---------------+-----------------+---------+-------------+
| twitter handle|         category|followers|followers_max|
+---------------+-----------------+---------+-------------+
|          adage|disaster/accident|   954734|       954734|
| spectatorindex|disaster/accident|   714691|       954734|
|    CrimesWorld|disaster/accident|   590042|       714691|
| SkyFallCarroll|disaster/accident|   443010|       590042|
|   RealTimeWWII|disaster/accident|   363529|       443010|
|USNationalGuard|disaster/accident|   355179|       363529|
|BillionDollarID|disaster/accident|   348445|       355179|
| ConspiracyCase|disaster/accident|   288366|       348445|
|  RepAdamSchiff|disaster/accident|   283715|       288366|
|  MilitaryEarth|disaster/accident|   275498|       283715|
|WeirdCrimeFacts|disaster/accident|   237594|       275498|
|    Breaking911|disaster/accident|   204830|       237594|
|         ICEgov|disaster/accident|   197774|       204830|
|   NDRRMC_OpCen|disaster/accident|   18

#### Window followers difference function between ranges

In [18]:
windowSpec3 = Window \
              .partitionBy(accounts['category']) \
              .orderBy(accounts['followers'].desc()) 

In [19]:
followers_difference = \
  (func.max(accounts['followers']).over(windowSpec3) - accounts['followers'])

In [20]:
accounts.select(
    accounts['twitter handle'],
    accounts['category'],
    accounts['followers'],
    followers_difference.alias("followers_difference")).show()

+---------------+--------+---------+--------------------+
| twitter handle|category|followers|followers_difference|
+---------------+--------+---------+--------------------+
|IndiaTodayFLASH|    news|   978677|                   0|
|          Salon|    news|   976952|                1725|
| CapitalFMKenya|    news|   967429|               11248|
|CapitalOfficial|    news|   954127|               24550|
|      DunyaNews|    news|   950673|               28004|
|       NewsHour|    news|   948347|               30330|
| chicagotribune|    news|   926799|               51878|
|          STcom|    news|   907667|               71010|
|           ABC7|    news|   904324|               74353|
|GuardianNigeria|    news|   902789|               75888|
|      60Minutes|    news|   898224|               80453|
|    DailyMirror|    news|   898158|               80519|
|  foxandfriends|    news|   896748|               81929|
|   BuzzFeedNews|    news|   891313|               87364|
|            N