In [27]:
from IPython import display
from IPython.core.display import HTML
import pandas

import nltk
from nltk.metrics import *
from collections import Counter

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql import functions
from pyspark.sql import types
from pyspark.sql.types import *
from pyspark.sql.functions import *


In [1]:
# connect to the spark Context (already created somewhere)
sqlContext = SQLContext(sc) 

# load GSR from a specified time period 
# 1 April 2015 seems to capture the beginning of GSR
gsrDf = sqlContext.read.load(format = "au.com.d2dcrc.carbon.spark.gsr", 
                             startTime = "2015-04-01T00:00:00Z", endTime = "2016-01-31T00:00:00Z")
                             #startTime = "2015-11-08T00:00:00Z", endTime = "2015-11-10T00:00:00Z")

gsrDf.printSchema()
print(gsrDf.first())
print(gsrDf.schema)


root
 |-- key:id: long (nullable = true)
 |-- authorId: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampRevision: timestamp (nullable = true)
 |-- widespreadEventId: long (nullable = true)
 |-- eventType: string (nullable = true)
 |-- populationGroup: string (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- key:eventDate: date (nullable = true)
 |-- earliestReportedDate: date (nullable = true)
 |-- crowdSize: string (nullable = true)
 |-- isViolent: boolean (nullable = true)
 |-- newsSourceName: string (nullable = true)
 |-- headline: string (nullable = true)
 |-- englishHeadline: string (nullable = true)
 |-- eventDescription: string (nullable = true)
 |-- firstReportedLink: string (nullable = true)
 |-- otherLinks: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- ingested: boolean (nullable = true)
 |-- currentValidationState: string (nullable = tr

In [29]:
#
# EXPLORE THE GSR
#

gsrDf.groupBy("country").count().show()

indoDf = gsrDf.filter(gsrDf.country == 'Indonesia');
indoStateDf = indoDf.groupBy("state").count().sort('state')
display.display(indoStateDf.toPandas())

fijiDf = gsrDf.filter(gsrDf.country == 'Fiji');
fijiStateDf = fijiDf.groupBy('state').count().sort('state')
display.display(fijiStateDf.toPandas())


malaysiaDf = gsrDf.filter(gsrDf.country == 'Malaysia')
malayStateDf = malaysiaDf.groupBy('state').count().sort('state')
display.display(malayStateDf.toPandas())


pngDf = gsrDf.filter(gsrDf.country == 'Papua New Guinea')
pngStateDf = pngDf.groupBy('state').count().sort('state')
display.display(pngStateDf.toPandas())


pngDf = gsrDf.filter(gsrDf.country == 'PNG')
pngStateDf = pngDf.groupBy('state').count().sort('state')
display.display(pngStateDf.toPandas())


thaiDf = gsrDf.filter(gsrDf.country == 'Thailand')
thaiStateDf = thaiDf.groupBy('state').count().sort('state')
display.display(thaiStateDf.toPandas())

# limit the GSR to only those records that relate to Australia 
# as to-date we have not been collecting twitter data for other countries 
# and therefore only have warnings for Australia
ausGsrDf = gsrDf.filter(gsrDf.country == u'Australia');

# have a look at frequent states & cities.... highlights some cleansing issues
print("Frequent States...")
print(ausGsrDf.freqItems(["state"]).collect())
#display.display(ausGsrDf.freqItems(["state"]).toPandas())

print("Frequent Cities...")
#display.display(ausGsrDf.freqItems(["city"]).toPandas())
print(ausGsrDf.freqItems(["city"]).collect())


# CITY and STATE
# how many distinct cities are there, how frequent is each?
# how many NULL cities are there?
# how many distinct states are there, how frequent is each?
# how many NULL states are there?

cityCountDf = ausGsrDf.groupBy("city").count().sort("city")
print ("number of unique cities in GSR for Australia is:", cityCountDf.count())
print ("number of null cities is: ", cityCountDf.filter(cityCountDf.city.isNull()).collect())
cityCountDf.show()

stateCountDf = ausGsrDf.groupBy("state").count().sort("state")
print ("number of unique states is", stateCountDf.count())
print ("number of null states is", stateCountDf.filter(stateCountDf.state.isNull()).collect())
stateCountDf.show()
print ("some examples of GSR with null states....")
nullStateDf = ausGsrDf.filter(ausGsrDf.state.isNull()).select("key:eventDate", "city", "state", 
                                                              "headline", "eventDescription", "firstReportedLink", 
                                                              "eventType", "populationGroup", "country", 
                                                              "authorId", "widespreadEventId")
display.display(nullStateDf.toPandas())

# how many NULL city records are NOT for widespread events?
# show some NULL cities which are NOT widespread....
print("number of widespreadEvents is: ", ausGsrDf.filter("widespreadEventId > 0").count())
print("number of widespreadEvents with null city is: ", 
      ausGsrDf.filter("widespreadEventId > 0").filter(ausGsrDf.city.isNull()).count())
                                                                                           

# are there any NULL state and NULL city?
noCityNoStateDf = ausGsrDf.filter(ausGsrDf.state.isNull()).filter(ausGsrDf.city.isNull())
print("number of null states and null cities is: ", noCityNoStateDf.count())

                                                  

+----------------+-----+
|         country|count|
+----------------+-----+
|        Thailand|  305|
|       Malaysian|    1|
|       Indonesia| 1675|
|Papua New Guinea|   26|
|             PNG|   17|
|       Australia|  948|
|            Fiji|   69|
|        Malaysia|  222|
+----------------+-----+



Unnamed: 0,state,count
0,,1
1,Aceh,81
2,Bali,41
3,Bangka Belitung,27
4,Bangka–Belitung Islands,2
5,Banten,35
6,Bengkulu,5
7,Central,1
8,Central Sulawesi,1
9,Central Java,101


Unnamed: 0,state,count
0,Central,27
1,Centre,1
2,Fiji,10
3,Nabukavesi,1
4,Northern,11
5,Suva,2
6,Western,17


Unnamed: 0,state,count
0,Ipoh,1
1,Johor,10
2,Johor Bahru,6
3,Johor Baru,1
4,Kedah,6
5,Kelantan,2
6,Kuala \nLumpur,1
7,Kuala Lumpur,53
8,Kuala Selangor,1
9,Kuala lumpur,3


Unnamed: 0,state,count
0,Central Province,2
1,East New Britain,4
2,East Sepik,1
3,Eastern Highlands,3
4,Jiwaka,2
5,Madang Province,3
6,Milne Bay,1
7,Morobe,1
8,National Capital,2
9,Port Moresby,2


Unnamed: 0,state,count
0,,11
1,Boroko,1
2,Konebada,1
3,National Capital,1
4,PNG,3


Unnamed: 0,state,count
0,,291
1,-,3
2,Bangkok,3
3,Chonburi,2
4,Chum-Porn province,2
5,Nonthaburi,2
6,Sa Kaeo,1
7,`,1


Frequent States...
[Row(state_freqItems=['NSW', 'Victoria', 'ACT', 'Northern Territory', 'Western Australia', 'Queensland', 'Hobart', 'Queesland', 'NT', 'WA', 'North Sumatera', 'New South Wales', 'Jakarta Raya', 'South Australia', 'Tasmania', 'VIC', 'SA', 'West Australia', 'Australian Capital Territory', 'TAS', 'QLD', 'Vicoria', 'NCT', None])]
Frequent Cities...
[Row(city_freqItems=['MacKay', 'Northam', 'Hervey Bay', 'Ipswich', 'Gold Coast', 'Richmond', 'Cairns', 'Port Lincoln', 'Wodonga', 'Mackay', 'Coffs Harbour', 'Sunshine Coast', 'Bulga', 'Goldcoast', 'Lithgow', 'Brisbane', 'Geraldton', 'Canberra', 'Tatura', 'Newcastle', 'Heirisson Island', 'Hobart', 'Maitland', 'Melbourne', 'Ballina', 'Launceston', 'Kilburn', 'Sydeny', 'Rockhampton', 'Adelaide', 'Wollongong', 'Darwin', 'Newcastke', 'Lapoinya', 'Sydney', 'Nowra', 'Orchard Hills', 'Borroloola', 'Bendigo', 'Alphington', 'Canbera', 'Fremantle', 'Devonport', 'Nauru', 'Perth', 'Grafton', 'Tiwi Island', 'melbourne', 'Shoalwater Bay', 'Ge

Unnamed: 0,key:eventDate,city,state,headline,eventDescription,firstReportedLink,eventType,populationGroup,country,authorId,widespreadEventId
0,2015-07-27 09:30:00,Songkhla,,Protesters shun power plant hearing,Protesters gathered at the third and final pub...,http://www.bangkokpost.com/news/general/636188...,"Land, Energy, and Resources",General Population,Australia,6,-1
1,2015-11-18 10:30:00,ACT,,Indigenous protesters peacefully demonstrate a...,Activists peacefully demonstrated against the ...,http://www.theage.com.au/act-news/indigenous-p...,Other Government and Political Issues,Ethnic,Australia,6,-1
2,2015-06-16 09:30:00,Nauru,,,Protests have erupted outside Nauru's parliame...,http://www.abc.net.au/news/2015-06-16/nauru-op...,Other Government and Political Issues,Refugees/Displaced,Australia,3,-1
3,2015-06-16 09:30:00,Nauru,,,An opposition MP has been detained after a pro...,http://www.smh.com.au/world/nauru-opposition-m...,Other Government and Political Issues,Refugees/Displaced,Australia,3,-1


number of widespreadEvents is:  318
number of widespreadEvents with null city is:  43
number of null states and null cities is:  0


In [30]:
# Check the crowdSize values
gsrDf.groupBy("crowdSize").count().show()

# Check the populationGroup values
gsrDf.groupBy("populationGroup").count().show()

# Check the eventType values
gsrDf.groupBy("eventType").count().show()

# Check the isViolent values
gsrDf.groupBy("isViolent").count().show()


+---------+-----+
|crowdSize|count|
+---------+-----+
|  Unknown|  336|
|   Unkown|   12|
|  unknown|    1|
|    Small|  323|
|    Large| 2417|
|    small|  132|
|    large|   40|
|     null|    2|
+---------+-----+

+--------------------+-----+
|     populationGroup|count|
+--------------------+-----+
|  Refugees/Displaced|   10|
|         agriculture|    2|
|           Bussiness|    4|
|            Business|  185|
|        Agricultural|   68|
|             General|  102|
|               labor|   24|
|         Agriculture|   10|
|               media|    4|
|           Religious|  133|
|             medical|    5|
|            students|    3|
|              Labour|  134|
|Other Government ...|    1|
|               Legal|    6|
|  General population|   15|
|           religious|    7|
|               Ethic|    1|
|              labour|  148|
|Energy, and Resou...|    1|
+--------------------+-----+
only showing top 20 rows

+--------------------+-----+
|           eventType|count|
+--

In [31]:
def cleanCountryNames(country):
    """ Clean the country field - use a standard set of identifiers"""
    if hasattr(country, 'lower'):
        country_lc = country.lower()
        max_error = 2
        if edit_distance(country_lc, "australia") <= max_error:
            cleanCountry = "Australia"
        elif edit_distance(country_lc, "indonesia") <= max_error:
            cleanCountry = "Indonesia"
        elif edit_distance(country_lc, "papua new guinea") <= max_error:
            cleanCountry = "Papua New Guinea"
        elif edit_distance(country_lc, "png") <= max_error:
            cleanCountry = "Papua New Guinea"   
        elif edit_distance(country_lc, "fiji") <= max_error:
            cleanCountry = "Fiji"  
        elif edit_distance(country_lc, "thailand") <= max_error:
            cleanCountry = "Thailand"  
        elif edit_distance(country_lc, "malaysia") <= max_error:
            cleanCountry = "Malaysia"  
        else:
            cleanCountry = country
        return cleanCountry
    else:
        return 
    
    
# test cases
print("Cleaning country names...")
dirty = "Thailand"
print(dirty, "\t", cleanCountryNames(dirty))
dirty = "Malaysian"
print(dirty, "\t", cleanCountryNames(dirty))
dirty = "PNG"
print(dirty, "\t", cleanCountryNames(dirty))
dirty = "Papaua NewGuinea"
print(dirty, "\t", cleanCountryNames(dirty))
dirty = "Imndonesia"
print(dirty, "\t", cleanCountryNames(dirty))
dirty = "Fiji"
print(dirty, "\t", cleanCountryNames(dirty))
dirty = "Australian"
print(dirty, "\t", cleanCountryNames(dirty))
dirty = None
print(dirty, "\t", cleanCountryNames(dirty))

Cleaning country names...
Thailand 	 Thailand
Malaysian 	 Malaysia
PNG 	 Papua New Guinea
Papaua NewGuinea 	 Papua New Guinea
Imndonesia 	 Indonesia
Fiji 	 Fiji
Australian 	 Australia
None 	 None


In [32]:
# Clean the state field - Use standardised set of state identifiers

def cleanAustStateNames(state):
    """ Clean the state field - use a standard set of identifiers"""
    if state == None :
        return
    else : 
        state_lc = state.lower()
        max_error = 2
        if edit_distance(state_lc, "new south wales") <= max_error:
            cleanState = "NSW"
        elif edit_distance(state_lc, "south australia") <= max_error:
            cleanState = "SA"
        elif edit_distance(state_lc, "victoria") <= max_error:
            cleanState = "VIC"
        elif edit_distance(state_lc, "queensland") <= max_error:
            cleanState = "QLD"
        elif edit_distance(state_lc, "northern territory") <= max_error:
            cleanState = "NT"
        elif edit_distance(state_lc, "western australia") <= max_error:
            cleanState = "WA"
        elif edit_distance(state_lc, "west australia") <= max_error:
            cleanState = "WA"
        elif edit_distance(state_lc, "tasmania") <= max_error:
            cleanState = "TAS"
        elif edit_distance(state_lc, "australian capital territory") <= max_error:
            cleanState = "ACT"
        else:
            cleanState = state
        return cleanState

# test cases
print("Cleaning state names...")
dirty = "Victori"
print(dirty, "\t", cleanAustStateNames(dirty))
dirty = "new south wale"
print(dirty, "\t", cleanAustStateNames(dirty))
dirty = "west australi"
print(dirty, "\t", cleanAustStateNames(dirty))
dirty = "WA"
print(dirty, "\t", cleanAustStateNames(dirty))
dirty = "south austral"
print(dirty, "\t", cleanAustStateNames(dirty))
dirty = "tasmana"
print(dirty, "\t", cleanAustStateNames(dirty))
dirty = "australia capitol territory"
print(dirty, "\t", cleanAustStateNames(dirty))
dirty = "ACT"
print(dirty, "\t", cleanAustStateNames(dirty))


Cleaning state names...
Victori 	 VIC
new south wale 	 NSW
west australi 	 WA
WA 	 WA
south austral 	 SA
tasmana 	 TAS
australia capitol territory 	 ACT
ACT 	 ACT


In [33]:
# Clean the state field - Use standardised set of state identifiers

def cleanIndoStateNames(state):
    """ Clean the state field - use a standard set of identifiers"""
    if state == None :
        return
    else : 
        state_lc = state.lower()
        max_error = 2
        min_error = 1
        if (edit_distance(state_lc, "aceh") <= max_error) or \
           (edit_distance(state_lc, "nanggroe aceh darussalam") <= max_error):
            cleanState = "Aceh"
        elif (edit_distance(state_lc, "north sumatra") <= max_error) or \
             (edit_distance(state_lc, "sumatera utara") <= max_error):
            cleanState = "North Sumatra"
        elif (edit_distance(state_lc, "west sumatra") <= max_error) or \
             (edit_distance(state_lc, "sumatera barat") <= max_error):
            cleanState = "West Sumatra"
        elif (edit_distance(state_lc, "riau") <= max_error) :
            cleanState = "Riau"
        elif (edit_distance(state_lc, "riau islands") <= max_error) or \
             (edit_distance(state_lc, "kepulauan riau") <= max_error):
            cleanState = "Riau Islands"
        elif (edit_distance(state_lc, "jambi") <= max_error) :
            cleanState = "Jambi"
        elif (edit_distance(state_lc, "south sumatra") <= max_error) or \
             (edit_distance(state_lc, "sumatera selatan") <= max_error):
            cleanState = "South Sumatra"
        elif (edit_distance(state_lc, "bangka-belitung islands") <= max_error) or \
             (edit_distance(state_lc, "kepulauan bangka-belitung") <= max_error):
            cleanState = "Bangka-Belitung Islands"
        elif (edit_distance(state_lc, "bengkulu")  <= max_error) :
            cleanState = "Bengkulu"
        elif (edit_distance(state_lc, "lampung") <= max_error) :
            cleanState = "Lampung"            
        elif (edit_distance(state_lc, "special capital region of jakarta") <= max_error) or \
             (edit_distance(state_lc, "daerah khusus ibu kota jakarta") <= max_error) or \
             (edit_distance(state_lc, "jakarta") <= max_error) :
            cleanState = "Special Capital Region of Jakarta"
        elif (edit_distance(state_lc, "banten") <= max_error) :
            cleanState = "Banten"                     
        elif (edit_distance(state_lc, "west java") <= max_error) or \
             (edit_distance(state_lc, "jawa barat") <= max_error):
            cleanState = "West Java"
        elif (edit_distance(state_lc, "central java") <= max_error) or \
             (edit_distance(state_lc, "jawa tengah") <= max_error):
            cleanState = "Central Java"
        elif (edit_distance(state_lc, "yogyakarta special region") <= max_error) or \
             (edit_distance(state_lc, "daerah istimewa yogyakarta") <= max_error) or \
             (edit_distance(state_lc, "yogyakarta") <= max_error) :
            cleanState = "Yogyakarta Special Region"
        elif (edit_distance(state_lc, "east java") <= max_error) or \
             (edit_distance(state_lc, "jawa timur") <= max_error):
            cleanState = "East Java"
        elif (edit_distance(state_lc, "bali") <= max_error) :
            cleanState = "Bali"                     
        elif (edit_distance(state_lc, "west nusa tenggara") <= max_error) or \
             (edit_distance(state_lc, "nusa tenggara barat") <= max_error):
            cleanState = "West Nusa Tenggara"
        elif (edit_distance(state_lc, "east nusa tenggara") <= max_error) or \
             (edit_distance(state_lc, "nusa tenggara timur") <= max_error):
            cleanState = "East Nusa Tenggara"
        elif (edit_distance(state_lc, "west kalimantan") <= max_error) or \
             (edit_distance(state_lc, "kalimantan barat") <= max_error):
            cleanState = "West Kalimantan"
        elif (edit_distance(state_lc, "central kalimantan") <= max_error) or \
             (edit_distance(state_lc, "kalimantan tengah") <= max_error):
            cleanState = "Central Kalimantan"
        elif (edit_distance(state_lc, "south kalimantan") <= max_error) or \
             (edit_distance(state_lc, "kalimantan selatan") <= max_error):
            cleanState = "South Kalimantan"
        elif (edit_distance(state_lc, "east kalimantan") <= max_error) or \
             (edit_distance(state_lc, "kalimantan timur") <= max_error):
            cleanState = "East Kalimantan"
        elif (edit_distance(state_lc, "north kalimantan") <= max_error) or \
             (edit_distance(state_lc, "kalimantan utara") <= max_error):
            cleanState = "North Kalimantan"
        elif (edit_distance(state_lc, "north sulawesi") <= max_error) or \
             (edit_distance(state_lc, "sulawesi utara") <= max_error):
            cleanState = "North Sulawesi"
        elif (edit_distance(state_lc, "gorontalo") <= max_error) :
            cleanState = "Gorontalo"                     
        elif (edit_distance(state_lc, "central sulawesi") <= max_error) or \
             (edit_distance(state_lc, "sulawesi tengah") <= min_error):
            cleanState = "Central Sulawesi"
        elif (edit_distance(state_lc, "west sulawesi") <= max_error) or \
             (edit_distance(state_lc, "sulawesi barat") <= max_error):
            cleanState = "West Sulawesi"
        elif (edit_distance(state_lc, "south sulawesi") <= max_error) or \
             (edit_distance(state_lc, "sulawesi selatan") <= max_error):
            cleanState = "South Sulawesi"
        elif (edit_distance(state_lc, "southeast sulawesi") <= max_error) or \
             (edit_distance(state_lc, "sulawesi tenggara") <= min_error):
            cleanState = "Southeast Sulawesi"
        elif (edit_distance(state_lc, "ambon") <= max_error) :
            cleanState = "Ambon"                     
        elif (edit_distance(state_lc, "north maluku") <= max_error)  or \
             (edit_distance(state_lc, "maluku utara") <= min_error):
            cleanState = "North Maluku"                     
        elif (edit_distance(state_lc, "west papua") <= max_error)  or \
             (edit_distance(state_lc, "papua barat") <= min_error):
            cleanState = "West Papua"                     
        elif (edit_distance(state_lc, "papua") <= max_error) :
            cleanState = "Papua"                     
        else:
            cleanState = state
        return cleanState

# test cases
print("Cleaning state names...")
dirty = "ace"
print(dirty, "\t", cleanIndoStateNames(dirty))
dirty = "Nangroe aceh darussalam"
print(dirty, "\t", cleanIndoStateNames(dirty))
dirty = "Nangroe Aceh Darussalam"
print(dirty, "\t", cleanIndoStateNames(dirty))
dirty = "north sumatr"
print(dirty, "\t", cleanIndoStateNames(dirty))
dirty = "bali"
print(dirty, "\t", cleanIndoStateNames(dirty))
dirty = "papu"
print(dirty, "\t", cleanIndoStateNames(dirty))
dirty = "sulawesi tenggar"
print(dirty, "\t", cleanIndoStateNames(dirty))
dirty = "sulawesi tengar"
print(dirty, "\t", cleanIndoStateNames(dirty))

Cleaning state names...
ace 	 Aceh
Nangroe aceh darussalam 	 Aceh
Nangroe Aceh Darussalam 	 Aceh
north sumatr 	 North Sumatra
bali 	 Bali
papu 	 Papua
sulawesi tenggar 	 Southeast Sulawesi
sulawesi tengar 	 Central Sulawesi


In [34]:
# Clean the state field - Use standardised set of state identifiers

def cleanPngStateNames(state):
    """ Clean the state field - use a standard set of identifiers"""
    if state == None :
        return
    else : 
        state_lc = state.lower()
        max_error = 2
        min_error = 1
        if (edit_distance(state_lc, "central") <= max_error) or \
           (edit_distance(state_lc, "central province") <= max_error):
            cleanState = "Central"
        elif (edit_distance(state_lc, "chimbu") <= max_error) or \
             (edit_distance(state_lc, "simbu") <= max_error) :
            cleanState = "Chimbu"
        elif (edit_distance(state_lc, "eastern highlands") <= max_error) or \
             (edit_distance(state_lc, "east highlands") <= min_error) :
            cleanState = "Eastern Highlands"
        elif (edit_distance(state_lc, "east new britain") <= max_error):
            cleanState = "East New Britain"
        elif (edit_distance(state_lc, "east sepik") <= min_error):
            cleanState = "East Sepik"
        elif (edit_distance(state_lc, "enga") <= max_error):
            cleanState = "Enga"
        elif (edit_distance(state_lc, "madang") <= max_error) or \
             (edit_distance(state_lc, "madang province") <= max_error):
            cleanState = "Madang"            
        elif (edit_distance(state_lc, "manus") <= max_error):
            cleanState = "Manus"
        elif (edit_distance(state_lc, "milne bay") <= max_error):
            cleanState = "Milne Bay"  
        elif (edit_distance(state_lc, "morobe") <= max_error):
            cleanState = "Morobe"
        elif (edit_distance(state_lc, "new ireland") <= max_error):
            cleanState = "New Ireland"
        elif (edit_distance(state_lc, "northern") <= max_error) or \
             (edit_distance(state_lc, "oro province") <= max_error) :
            cleanState = "Northern"
        elif (edit_distance(state_lc, "bougainville") <= max_error):
            cleanState = "Bougainville"
        elif (edit_distance(state_lc, "southern highlands") <= max_error):
            cleanState = "Southern Highlands"
        elif (edit_distance(state_lc, "western province") <= max_error) or \
             (edit_distance(state_lc, "fly") <= min_error):
            cleanState = "Western Province"            
        elif (edit_distance(state_lc, "western highlands") <= max_error):
            cleanState = "Western Highlands"
        elif (edit_distance(state_lc, "west new britain") <= max_error):
            cleanState = "West New Britain"
        elif (edit_distance(state_lc, "west sepik") <= min_error) or \
             (edit_distance(state_lc, "sanduan") <= max_error)  :
            cleanState = "West Sepik"
        elif (edit_distance(state_lc, "national cpaital district") <= max_error) or \
             (edit_distance(state_lc, "national cpaital") <= max_error) or \
             (edit_distance(state_lc, "port moresby") <= max_error) :
            cleanState = "National Capital District"            
        elif (edit_distance(state_lc, "hela") <= max_error):
            cleanState = "Hela"
        elif (edit_distance(state_lc, "jiwaka") <= max_error):
            cleanState = "Jiwaka"            
        else:
            cleanState = state
        return cleanState

# test cases
print("Cleaning state names...")
dirty = "jiwak"
print(dirty, "\t", cleanPngStateNames(dirty))
dirty = "hel"
print(dirty, "\t", cleanPngStateNames(dirty))
dirty = "port moresb"
print(dirty, "\t", cleanPngStateNames(dirty))
dirty = "fl"
print(dirty, "\t", cleanPngStateNames(dirty))
dirty = "nortern"
print(dirty, "\t", cleanPngStateNames(dirty))
dirty = "manu"
print(dirty, "\t", cleanPngStateNames(dirty))
dirty = "eas new britain"
print(dirty, "\t", cleanPngStateNames(dirty))
dirty = "wes new britain"
print(dirty, "\t", cleanPngStateNames(dirty))

Cleaning state names...
jiwak 	 Jiwaka
hel 	 Hela
port moresb 	 National Capital District
fl 	 Western Province
nortern 	 Northern
manu 	 Manus
eas new britain 	 East New Britain
wes new britain 	 West New Britain


In [35]:
# Clean the state field - Use standardised set of state identifiers

def cleanMalaysiaStateNames(state):
    """ Clean the state field - use a standard set of identifiers"""
    if state == None :
        return
    else : 
        state_lc = state.lower()
        max_error = 2
        min_error = 1
        if (edit_distance(state_lc, "perak") <= max_error) :
            cleanState = "Perak"
        elif (edit_distance(state_lc, "kuala lumpur") <= max_error)  :
            cleanState = "Kuala Lumpur"
        elif (edit_distance(state_lc, "johor") <= max_error) or \
             (edit_distance(state_lc, "johor bahru") <= max_error) :
            cleanState = "Johor"
        elif (edit_distance(state_lc, "pahang") <= max_error)  :
            cleanState = "Pahang"
        elif (edit_distance(state_lc, "kelantan") <= max_error)  :
            cleanState = "Kelantan"
        elif (edit_distance(state_lc, "terengganu") <= max_error)  :
            cleanState = "Terengganu"
        elif (edit_distance(state_lc, "kedah") <= max_error)  :
            cleanState = "Kedah"
        elif (edit_distance(state_lc, "perlis") <= max_error)  :
            cleanState = "Perlis"
        elif (edit_distance(state_lc, "penang") <= max_error)  :
            cleanState = "Penang"
        elif (edit_distance(state_lc, "selangor") <= max_error)  :
            cleanState = "Selangor"
        elif (edit_distance(state_lc, "malacca") <= max_error) or \
             (edit_distance(state_lc, "malacca city") <= max_error) :
            cleanState = "Malacca"
        elif (edit_distance(state_lc, "negeri sembilan") <= max_error)  :
            cleanState = "Negeri Sembilan"
        elif (edit_distance(state_lc, "putrajaya") <= max_error)  :
            cleanState = "Putrajaya"
        elif (edit_distance(state_lc, "sarawak") <= max_error)  :
            cleanState = "Sarawak"            
        elif (edit_distance(state_lc, "sabah") <= max_error)  :
            cleanState = "Sabah"            
        elif (edit_distance(state_lc, "labuan") <= max_error)  :
            cleanState = "Labuan"                        
        else:
            cleanState = state
        return cleanState

# test cases
print("Cleaning state names...")
dirty = "sarawa"
print(dirty, "\t", cleanMalaysiaStateNames(dirty))


Cleaning state names...
sarawa 	 Sarawak


In [36]:
# Clean the crowdSize field - Use standardised set of state identifiers

def cleanCrowdSize(crowdSize):
    if crowdSize == None :
        return
    else:
        crowd_lc = crowdSize.lower()
        max_error = 2
        if edit_distance(crowd_lc, "unknown") <= max_error:
            cleanCrowd = "unknown"
        elif edit_distance(crowd_lc, "large") <= max_error:
            cleanCrowd = "large"
        elif edit_distance(crowd_lc, "small") <= max_error:
            cleanCrowd = "small"
        else:
            cleanCrowd = crowd_lc
        return cleanCrowd

# test cases
print("Cleaning crowdSize ...")
dirty = "Unknow"
print(dirty, "\t", cleanCrowdSize(dirty))
dirty = "smal"
print(dirty, "\t", cleanCrowdSize(dirty))
dirty = "large"
print(dirty, "\t", cleanCrowdSize(dirty))
dirty = None
print(dirty, "\t", cleanCrowdSize(dirty))

Cleaning crowdSize ...
Unknow 	 unknown
smal 	 small
large 	 large
None 	 None


In [37]:
# Clean the eventType field - Use standardised set of identifiers

def cleanEventType(eventType):
    if eventType == None :
        return
    else:
        event_lc = eventType.lower()
        max_error = 5
        if edit_distance(event_lc, "employment and wages") <= max_error:
            cleanEvent = "Employment and Wages"
        elif edit_distance(event_lc, "housing and shelter") <= max_error:
            cleanEvent = "Housing and Shelter"
        elif edit_distance(event_lc, "land, energy, and resources") <= max_error:
            cleanEvent = "Land, Energy and Resources"
        elif edit_distance(event_lc, "other economic issues") <= max_error:
            cleanEvent = "Other Economic Issues"
        elif edit_distance(event_lc, "other government and political issues") <= max_error:
            cleanEvent = "Other Government and Political Issues"
        elif edit_distance(event_lc, "other social disruption") <= max_error:
            cleanEvent = "Other Civil Unrest"
        else:
            cleanEvent = event_lc
        return cleanEvent

# test cases
print("Cleaning eventType ...")
dirty = "employment and wges"
print(dirty, "\t", cleanEventType(dirty))
dirty = "Other social diruption"
print(dirty, "\t", cleanEventType(dirty))
dirty = "land, Energy, and resource"
print(dirty, "\t", cleanEventType(dirty))
dirty = "Houses and Shelte"
print(dirty, "\t", cleanEventType(dirty))
dirty = "Other economi issue"
print(dirty, "\t", cleanEventType(dirty))
dirty = None
print(dirty, "\t", cleanEventType(dirty))


Cleaning eventType ...
employment and wges 	 Employment and Wages
Other social diruption 	 Other Civil Unrest
land, Energy, and resource 	 Land, Energy and Resources
Houses and Shelte 	 Housing and Shelter
Other economi issue 	 Other Economic Issues
None 	 None


In [38]:
# Clean the populationGroup field - Use standardised set of identifiers

def cleanPopulationGroup(popGroup):
    if popGroup == None :
        return
    else:
        group_lc = popGroup.lower()
        max_error = 2
        min_error = 1
        if edit_distance(group_lc, "agriculture") <= max_error:
            cleanGroup = "Agriculture"
        elif edit_distance(group_lc, "business") <= max_error:
            cleanGroup = "Business"
        elif edit_distance(group_lc, "general") <= max_error:
            cleanGroup = "General"
        elif edit_distance(group_lc, "general population") <= max_error:
            cleanGroup = "General"
        elif edit_distance(group_lc, "religious") <= max_error:
            cleanGroup = "Religious"
        elif edit_distance(group_lc, "labour") <= max_error:
            cleanGroup = "Labour"
        elif edit_distance(group_lc, "legal") <= max_error:
            cleanGroup = "Legal"
        elif edit_distance(group_lc, "ethnic") <= max_error:
            cleanGroup = "Ethnic"
        elif edit_distance(group_lc, "education") <= max_error:
            cleanGroup = "Education"
        elif edit_distance(group_lc, "refugees/displaced") <= max_error:
            cleanGroup = "Refugees/Displaced"
        elif edit_distance(group_lc, "medical") <= min_error:
            cleanGroup = "Medical"
        elif edit_distance(group_lc, "media") <= min_error:
            cleanGroup = "Media"
        else:
            cleanGroup = group_lc
        return cleanGroup

# test cases
print("Cleaning populationGroup ...")
dirty = "agriculture"
print(dirty, "\t", cleanPopulationGroup(dirty))
dirty = "Bussiness"
print(dirty, "\t", cleanPopulationGroup(dirty))
dirty = "labor"
print(dirty, "\t", cleanPopulationGroup(dirty))
dirty = "media"
print(dirty, "\t", cleanPopulationGroup(dirty))
dirty = "students"
print(dirty, "\t", cleanPopulationGroup(dirty))
dirty = "General population"
print(dirty, "\t", cleanPopulationGroup(dirty))
dirty = "Refugees/Displaced"
print(dirty, "\t", cleanPopulationGroup(dirty))
dirty = None
print(dirty, "\t", cleanPopulationGroup(dirty))


Cleaning populationGroup ...
agriculture 	 Agriculture
Bussiness 	 Business
labor 	 Labour
media 	 Media
students 	 students
General population 	 General
Refugees/Displaced 	 Refugees/Displaced
None 	 None


In [40]:
# clean the state names

#ARRGHHHHH


def cleanState(row):
    if (country == None) or (state == None) :
        return
    else :
        cleanState = state.lower()
        if (country == "Australia"):
            cleanState = cleanAustStateNames(state)
        elif (country == "Indonesia"):
            cleanState = cleanIndoStateNames(state)
        elif (country == "Papua New Guinea"):
            cleanState = cleanPngStateNames(state)
        elif (country == "Malaysia"):
            cleanState = cleanMalaysiaStateNames(state)
        else :
            cleanState = state
        return state
    


In [None]:
# 
# CLEANSING of the GSR.... should do this before de-duplication
#
#    States:
#       Use standardised set of state identifiers
#    Country:
#       Use standard set of country identifiers 
#    City:
#       Correct mis-spellings of cities (use a geonames gazetter??)
#       Standardise on capitalisation (Adelaide, adelaide or ADELAIDE)
#    populationGroup:
#       Use standardised set of state identifiers
#    eventType:
#       Use standardised set of state identifiers
#    crowdSize:
#       Use standardised set of state identifiers
# 

cleanGsrDf = gsrDf

print("Cleaning country names....")
cleanCountry = udf(cleanCountryNames, StringType())
cleanCountryGsrDf = cleanGsrDf.withColumn('country', cleanCountry(cleanGsrDf.country))
cleanCountryGsrDf.groupBy("country").count().show()
cleanGsrDf = cleanCountryGsrDf

print("Cleaning state names....")
cleanStateUdf = udf(cleanState, StringType())
cleanStateGsrDf = cleanGsrDf.withColumn('state', cleanStateUdf(cleanGsrDf.state, cleanGsrDf.country))


                                 
print ("Australian state names")
austStateDf = cleanGsrDf.filter(cleanGsrDf.country == "Australia").groupBy('state').count().sort('state')
display.display(austStateDf.toPandas())
print ("Indonesian state names")
indoStateDf = cleanGsrDf.filter(cleanGsrDf.country == "Indonesia").groupBy('state').count().sort('state')
display.display(indoStateDf.toPandas())
print ("PNG state names")
pngStateDf = cleanGsrDf.filter(cleanGsrDf.country == "Papua New Guinea").groupBy('state').count().sort('state')
display.display(pngStateDf.toPandas())
print ("Malaysian state names")
malayStateDf = cleanGsrDf.filter(cleanGsrDf.country == "Malaysia").groupBy('state').count().sort('state')
display.display(malayStateDf.toPandas())


print("Cleaning population groups....")
cleanPopGroups = udf(cleanPopulationGroup, StringType())
cleanPopGroupGsrDf = cleanGsrDf.withColumn('populationGroup', cleanPopGroups(cleanGsrDf.populationGroup))
cleanPopGroupGsrDf.groupBy("populationGroup").count().show()
cleanGsrDf = cleanPopGroupGsrDf

print("Cleaning event types....")
cleanEvents = udf(cleanEventType, StringType())
cleanEventGsrDf = cleanGsrDf.withColumn('eventType', cleanEvents(cleanGsrDf.eventType))
cleanEventGsrDf.groupBy("eventType").count().show()
cleanGsrDf = cleanEventGsrDf

print("Cleaning crowd size....")
cleanCrowd = udf(cleanCrowdSize, StringType())
cleanCrowdGsrDf = cleanGsrDf.withColumn('crowdSize', cleanCrowd(cleanGsrDf.crowdSize))
cleanCrowdGsrDf.groupBy("crowdSize").count().show()
cleanGsrDf = cleanCrowdGsrDf


# save for later
cleanAllGsrDf = cleanGsrDf

print("")
print("Filtering for only Australian GSR...")
# limit the GSR to only those records that relate to Australia 
# as to-date we have not been collecting twitter data for other countries 
# and therefore only have warnings for Australia
ausGsrDf = cleanGsrDf.filter(cleanGsrDf.country == u'Australia');

#print("Cleaning Australian States...")
#cleanState = udf(lambda x: cleanAustStateNames(x), StringType())
#cleanAusGsrDf = ausGsrDf.withColumn('state', cleanState(ausGsrDf.state))

#stateCountDf = cleanAusGsrDf.groupBy("state").count().sort("state")
#stateCountDf.show()
#print ("Number of unique states is", stateCountDf.count())


In [66]:
# 
# VALIDATION of GSR
# 
# How can we identify events that should be manually reviewed to determine if there are analyst-coding errors?
# e.g. find GSR entries that have.....
#   NULL or invalid country?
#   invalid states for a given country
#   valid city BUT not in the right country?
#   NULL cities that are NOT widespread events?
#

###################### 
# find invalid countries 
###################### 

cleanGsrDf = cleanAllGsrDf

print('------------------')
print("Find invalid country names...")
validCountries = ['Australia', 'Indonesia', 'Papua New Guinea', 'Fiji', 'Malaysia', 'Thailand']
def invalidCountry(countryStr):
    if countryStr not in validCountries:
        return True
    else:
        return False

sqlContext.registerFunction("invalidCountry", invalidCountry, BooleanType())
cleanGsrDf.registerTempTable('cleanGsrT')

invalidDf = sqlContext.sql("SELECT * FROM cleanGsrT WHERE invalidCountry(country)")
num = invalidDf.count()
if num == 0 :
    print("No invalid country names found")
else :
    print(num, " invalid countries....")
    print(df.collect())
    

------------------
Find invalid country names...
No invalid country names found


In [67]:
# 
# VALIDATION of GSR
# 
#######################   
# find invalid states 
###################### 

cleanGsrDf = cleanAusGsrDf

print('------------------')
print("Find invalid Australian state names...")
validStates = ['ACT', 'SA', 'VIC', 'NSW', 'QLD', 'NT', 'WA', 'TAS', None]
def invalidState(state):
    if state not in validStates:
        return True
    else:
        return False

sqlContext.registerFunction("invalidState", invalidState, BooleanType())
cleanGsrDf.registerTempTable('cleanAusGsrT')
invalidDf = sqlContext.sql("SELECT * FROM cleanAusGsrT WHERE invalidState(state)")
num = invalidDf.count()
if num == 0 :
    print("No invalid state names found")
else:
    print(num, " invalid states....")
#    print(invalidDf.select(invalidDf.state).show())
    displayDf = invalidDf.select('timestamp', 'city', 'state', 'country', 'headline', 
                                 'widespreadEventId', 'firstReportedLink', 
                                 'eventType', 'populationGroup').orderBy('timestamp').limit(20)

    pandas.set_option('display.max_colwidth', 50)
    display.display(displayDf.toPandas())
    

------------------
Find invalid Australian state names...
5  invalid states....


Unnamed: 0,timestamp,city,state,country,headline,widespreadEventId,firstReportedLink,eventType,populationGroup
0,2015-06-22 09:30:00,Tasmania,Hobart,Australia,Tasmanian public servants take half-day strike...,627835217,http://www.abc.net.au/news/2015-06-22/tasmania...,Employment and Wages,Labour
1,2015-10-14 10:30:00,South Jakarta,Jakarta Raya,Australia,Begini Kemeriahan Pawai Sambut Tahun Baru Isla...,13775231,http://news.detik.com/berita/3043704/begini-ke...,Other Civil Unrest,General
2,2015-10-21 10:30:00,Central Jakarta,Jakarta Raya,Australia,Mahasiswa Kalimantan Demo Satu Tahun Pemerinta...,21775251,http://www.tribunnews.com/images/editorial/vie...,"Land, Energy and Resources",Education
3,2015-10-23 00:00:00,,NCT,Australia,Prime Minister's department inquires into Tenn...,-1,http://www.abc.net.au/news/2015-10-19/pm-depar...,Housing and Shelter,General
4,2016-01-08 10:30:00,Sydney,North Sumatera,Australia,Sydney light rail protesters chain themselves ...,-1,http://www.abc.net.au/news/2016-01-08/proteste...,"Land, Energy and Resources",General


In [68]:
# 
# VALIDATION of GSR
# 
#######################   
# find invalid eventTypes 
###################### 

cleanGsrDf = cleanAllGsrDf

print('------------------')
print("Find invalid eventTypes...")
validEvent = ['Employment and Wages', 
              'Housing and Shelter', 
              'Land, Energy and Resources', 
              'Other Economic Issues', 
              'Other Government and Political Issues', 
              'Other Civil Unrest', None]
def invalidEvent(eventType):
    if eventType not in validEvent:
        return True
    else:
        return False

sqlContext.registerFunction("invalidEvent", invalidEvent, BooleanType())
cleanGsrDf.registerTempTable('cleanGsrT')
invalidDf = sqlContext.sql("SELECT * FROM cleanGsrT WHERE invalidEvent(eventType)")
num = invalidDf.count()
if num == 0 :
    print("No invalid eventTypes found")
else:
    print(num, " invalid eventTypes....")
#    print(invalidDf.select(invalidDf.state).show())
    displayDf = invalidDf.select('key:eventDate', 'city', 'state', 'country', 'headline', 
                                 'eventType', 'widespreadEventId', 'firstReportedLink', 
                                 'populationGroup').orderBy('key:eventDate').limit(20)

    pandas.set_option('display.max_colwidth', 50)
    display.display(displayDf.toPandas())

------------------
Find invalid eventTypes...
1  invalid eventTypes....


Unnamed: 0,key:eventDate,city,state,country,headline,eventType,widespreadEventId,firstReportedLink,populationGroup
0,2015-11-24 10:30:00,"Parliament Building, Bandung",West java,Indonesia,Hari Ini Ada Tiga Aksi Unjuk Rasa di Kota Bandung,,-1,http://www.tribunnews.com/regional/2015/11/24/...,General


In [69]:
# 
# VALIDATION of GSR
# 
#######################   
# find invalid populationGroup
###################### 

cleanGsrDf = cleanAllGsrDf

print('------------------')
print("Find invalid populationGroup...")
validGroup = ['Agriculture', 
              'Business', 
              'General', 
              'Religious', 
              'Labour', 
              'Legal', 
              'Ethnic',
              'Education',
              'Refugees/Displaced',
              'Medical',
              'Media', None]

def invalidPopGroup(populationGroup):
    if populationGroup not in validGroup:
        return True
    else:
        return False

sqlContext.registerFunction("invalidPopGroup", invalidPopGroup, BooleanType())
cleanGsrDf.registerTempTable('cleanGsrT')
invalidDf = sqlContext.sql("SELECT * FROM cleanGsrT WHERE invalidPopGroup(populationGroup)")
num = invalidDf.count()
if num == 0 :
    print("No invalid populationGroups found")
else:
    print(num, " invalid populationGroups....")
#    print(invalidDf.select(invalidDf.state).show())
    displayDf = invalidDf.select('key:eventDate', 'city', 'state', 'country', 'headline', 
                                 'populationGroup', 'firstReportedLink', 'eventType', 
                                 'widespreadEventId').orderBy('key:eventDate').limit(20)

    pandas.set_option('display.max_colwidth', 50)
    display.display(displayDf.toPandas())

------------------
Find invalid populationGroup...
7  invalid populationGroups....


Unnamed: 0,key:eventDate,city,state,country,headline,populationGroup,firstReportedLink,eventType,widespreadEventId
0,2015-06-25 09:30:00,Pinggir,Kepulauan Riau,Indonesia,Mahasiswa dan Pecinta Alam se Riau demo Kasus ...,students,http://pekanbaru.tribunnews.com/2015/06/25/mah...,"Land, Energy and Resources",-1
1,2015-06-26 09:30:00,Sydney,NSW,Australia,Sydney cyclists take to traffic lanes to prote...,other government and political issues,http://www.abc.net.au/news/2015-06-26/cyclists...,Other Civil Unrest,-1
2,2015-07-08 09:30:00,Pekan baru,Kepulauan Riau,Indonesia,HMI Pekanbaru Desak Walikota Copot Kadisdik Zu...,students,http://pekanbaru.tribunnews.com/2015/07/08/hmi...,Other Government and Political Issues,-1
3,2015-07-10 09:30:00,jakarta,Jakarta Raya,Indonesia,Mahasiswa STMT Trisakti Gelar Aksi di Polda Me...,students,http://www.tribunnews.com/metropolitan/2015/07...,Other Civil Unrest,-1
4,2015-07-13 09:30:00,Ubon Ratchathani,,Thailand,ชาว​บ้าน​ประท้วง อบ​จ. ตัด​ถนน​ผ่า​กลาง​หมู่​บ้าน,"energy, and resourcesal population",http://www.thairath.co.th/content/511471,Other Civil Unrest,-1
5,2015-07-25 09:30:00,Adelaide,SA,Australia,Protesters voice concerns over shipbuilding jo...,large,http://www.abc.net.au/news/2015-07-24/proteste...,Employment and Wages,-1
6,2015-07-25 09:30:00,Adelaide,SA,Australia,Protesters voice concerns over shipbuilding jo...,large,http://www.abc.net.au/news/2015-07-24/proteste...,Employment and Wages,-1


In [70]:
# 
# VALIDATION of GSR
# 
#######################   
# find invalid crowdSize
###################### 

cleanGsrDf = cleanAllGsrDf

print('------------------')
print("Find invalid crowdSize...")
validCrowd = ['small', 
              'large', 
              'unknown', None]

def invalidCrowd(crowdSize):
    if crowdSize not in validCrowd:
        return True
    else:
        return False

sqlContext.registerFunction("invalidCrowd", invalidCrowd, BooleanType())
cleanGsrDf.registerTempTable('cleanGsrT')
invalidDf = sqlContext.sql("SELECT * FROM cleanGsrT WHERE invalidCrowd(crowdSize)")
num = invalidDf.count()
if num == 0 :
    print("No invalid crowdSize found")
else:
    print(num, " invalid crowdSize....")
#    print(invalidDf.select(invalidDf.state).show())
    displayDf = invalidDf.select('key:eventDate', 'city', 'state', 'country', 'headline', 
                                 'crowdSize', 'populationGroup', 'firstReportedLink', 'eventType', 
                                 'widespreadEventId').orderBy('key:eventDate').limit(20)

    pandas.set_option('display.max_colwidth', 50)
    display.display(displayDf.toPandas())

------------------
Find invalid crowdSize...
No invalid crowdSize found


In [71]:
# 
# VALIDATION of GSR
# 
###################### 
# are there any GSR with NULL cities ?
###################### 

cleanGsrDf = cleanAllGsrDf 

print('------------------')
print("Find GSR with null cities ...")
noCityNotWidespreadDf = cleanGsrDf.filter(cleanGsrDf.city.isNull())
num = noCityNotWidespreadDf.count()
if num == 0:
    print("No GSR found with null city")
else:
    print(num, " GSR with null city that ....")
    displayDf = noCityNotWidespreadDf.select('timestamp', 'city', 'state', 'country', 'headline', 
                                             'widespreadEventId', 'firstReportedLink', 
                                             'eventType', 'populationGroup').orderBy('timestamp').limit(20)

    pandas.set_option('display.max_colwidth', 100)
    display.display(displayDf.toPandas())


------------------
Find GSR with null cities ...
203  GSR with null city that ....


Unnamed: 0,timestamp,city,state,country,headline,widespreadEventId,firstReportedLink,eventType,populationGroup
0,2015-02-10 10:30:00,,WA,Australia,Governor Stirling High School bans student's gay art of American footballer,-1,http://www.theage.com.au/wa-news/governor-stirling-high-school-bans-students-gay-art-20150930-gj...,Other Civil Unrest,Education
1,2015-02-11 10:30:00,,NSW,Australia,,-1,http://www.theage.com.au/act-news/nsw-department-of-planning-rejects-wind-farms-application-for-...,Other Government and Political Issues,Agriculture
2,2015-02-11 10:30:00,,NSW,Australia,,-1,http://www.theage.com.au/act-news/nsw-department-of-planning-rejects-wind-farms-application-for-...,Other Government and Political Issues,General
3,2015-06-07 09:30:00,,TAS,Australia,Protesters march through Devonport in support of Alexander Spirit oil tanker workers,-1,http://www.abc.net.au/news/2015-07-06/protesters-march-through-devonport-support-oil-tanker-work...,Employment and Wages,Labour
4,2015-06-10 09:30:00,,WA,Australia,Human chain across Stirling Bridge in Fremantle to protest live animal export,-1,http://www.theage.com.au/wa-news/human-chain-across-stirling-bridge-in-fremantle-to-protest-live...,Other Civil Unrest,General
5,2015-06-11 09:30:00,,NSW,Australia,Council offers assurances about Bathurst district quarry proposal,-1,http://www.abc.net.au/news/2015-11-05/council-offers-assurances-about-bathurst-district-quarry-p...,Other Government and Political Issues,General
6,2015-06-11 09:30:00,,NSW,Australia,Workers and carers brave rain to protest privatisation of disability services,-1,http://www.abc.net.au/news/2015-11-04/workers-and-carers-brave-rain-to-protest-privatisation-of-...,Other Government and Political Issues,Labour
7,2015-06-13 09:30:00,,VIC,Australia,Shorten says Abbott can own a marriage bill,-1,http://www.theaustralian.com.au/national-affairs/politics-news/shorten-says-abbott-can-own-a-mar...,Other Government and Political Issues,General
8,2015-06-17 09:30:00,,Jakarta,Indonesia,,-1,http://thejakartaglobe.beritasatu.com/news/ahmadis-tebet-call-reconciliation-fpi/,Other Civil Unrest,Religious
9,2015-06-17 09:30:00,,Central,Fiji,Protest damages Parliament,-1,http://www.fijitimes.com/story.aspx?id=310126,Other Government and Political Issues,General


In [72]:
# 
# VALIDATION of GSR
# 
######################     
# are there any GSR with NULL state and NULL city?
###################### 

cleanGsrDf = cleanAusGsrDf

print('------------------')
print("Find GSR with null city and null state...")
noCityNoStateDf = cleanGsrDf.filter(cleanGsrDf.state.isNull()).filter(cleanGsrDf.city.isNull())
num = noCityNoStateDf.count()
if num == 0:
    print("No GSR found with null city and null state")
else:
    print(num, " GSR with null city and null state....")
    print(noCityNoStateDf.select('timestamp', 'city', 'state', 'headline', 'firstReportedLink').show())


------------------
Find GSR with null city and null state...
No GSR found with null city and null state


In [73]:
# 
# VALIDATION of GSR
# 
######################     
# are there any GSR with NULL key:eventDate ?
###################### 

cleanGsrDf = cleanAllGsrDf

print('------------------')
print("Find GSR records with null key:eventDate...")
noTimestampDf = cleanGsrDf.filter(cleanGsrDf["key:eventDate"].isNull())
num = noTimestampDf.count()
if num == 0:
    print("No GSR records found with null key:eventDate")
else:
    print(num, " GSR with null key:eventDate....")
    displayDf = noTimestampDf.select('key:eventDate', 'city', 'state', 'country', 'headline', 
                                     'firstReportedLink', 'eventType', 
                                     'populationGroup','widespreadEventId').orderBy('timestamp').limit(20)

    pandas.set_option('display.max_colwidth', 100)
    display.display(displayDf.toPandas())
 

------------------
Find GSR records with null key:eventDate...
No GSR records found with null key:eventDate


In [74]:
# 
# VALIDATION of GSR
# 
######################     
# are there any GSR with NULL firstReportedLink?
###################### 

cleanGsrDf = cleanAllGsrDf 

print('------------------')
print("Find GSR records with null firstReportedLink...")
noLinkDf = cleanGsrDf.filter(cleanGsrDf.firstReportedLink.isNull())
num = noLinkDf.count()
if num == 0:
    print("No GSR records found with null firstReportedLink")
else:
    print(num, "GSR with null firstReportedLink....")
    displayDf = noLinkDf.select('timestamp', 'city', 'state', 'country', 'headline', 
                                     'firstReportedLink', 'eventType', 
                                     'populationGroup','widespreadEventId').orderBy('timestamp').limit(20)

    pandas.set_option('display.max_colwidth', 100)
    display.display(displayDf.toPandas())
 

------------------
Find GSR records with null firstReportedLink...
12 GSR with null firstReportedLink....


Unnamed: 0,timestamp,city,state,country,headline,firstReportedLink,eventType,populationGroup,widespreadEventId
0,2015-06-17 09:30:00,Sukabumi,West Java,Indonesia,Ribuan Warga Sukabumi Ikuti Pawai Tarhib Ramadhan,,Other Civil Unrest,Religious,620835211
1,2015-06-22 09:30:00,,Sa Kaeo,Thailand,พ่อค้า-แม่ค้าเขมรกว่า 2 พันคน ประท้วงเทศบาลคลองหาด หลังถูกบีบให้ขายหมูซีพีเจ้าเดียว,,Other Economic Issues,Business,-1
2,2015-06-29 09:30:00,Melbourne,VIC,Australia,Protesters take over Melbourne bridge as Friday night commuters hit again,,Other Civil Unrest,General,-1
3,2015-07-03 09:30:00,Devonport,TAS,Australia,The Australian crew of an oil tanker is refusing to leave Devonport in Tasmania after being told...,,Employment and Wages,Labour,-1
4,2015-07-14 09:30:00,Bangkok,,Thailand,Vocational students rally to support Prayut,,Other Civil Unrest,Education,-1
5,2015-09-08 09:30:00,Magelang,Central Java,Indonesia,"Kenakan Kostum Tradisional, Pemuda dari 5 Negara Kampanye Pelestarian Candi Borobudur",,Other Civil Unrest,General,-1
6,2015-09-11 09:30:00,Ambon,Maluku,Indonesia,"Pendemo Tuntut ""Caretaker"" Bupati SBT dari Putra Daerah",,Other Government and Political Issues,Education,-1
7,2015-10-13 10:30:00,Purwakarta,West Java,Indonesia,Warga Purwakarta Protes Tarif Listrik 'Bengkak',,"Land, Energy and Resources",General,-1
8,2015-11-27 10:30:00,Mount Gambier,SA,Australia,,,"Land, Energy and Resources",General,-1
9,2015-12-05 10:30:00,Shah Alam,Selangor,Malaysia,,,Other Civil Unrest,Religious,-1


In [75]:
# 
# VALIDATION of GSR
# 
###################### 
# are there any GSR with NULL cities that are NOT widespread events?
###################### 

cleanGsrDf = cleanAllGsrDf 

print('------------------')
print("Find GSR with null cities that are NOT widespread...")
noCityNotWidespreadDf = cleanGsrDf.filter(cleanGsrDf.city.isNull()).filter("widespreadEventId <= 0")
num = noCityNotWidespreadDf.count()
if num == 0:
    print("No GSR found with null city that are NOT widepsread events")
else:
    print(num, " GSR with null city that are NOT widespread events....")
    displayDf = noCityNotWidespreadDf.select('timestamp', 'city', 'state', 'country', 'headline', 
                                             'widespreadEventId', 'firstReportedLink', 
                                             'eventType', 'populationGroup').orderBy('timestamp').limit(20)

    pandas.set_option('display.max_colwidth', 100)
    display.display(displayDf.toPandas())



------------------
Find GSR with null cities that are NOT widespread...
160  GSR with null city that are NOT widespread events....


Unnamed: 0,timestamp,city,state,country,headline,widespreadEventId,firstReportedLink,eventType,populationGroup
0,2015-02-10 10:30:00,,WA,Australia,Governor Stirling High School bans student's gay art of American footballer,-1,http://www.theage.com.au/wa-news/governor-stirling-high-school-bans-students-gay-art-20150930-gj...,Other Civil Unrest,Education
1,2015-02-11 10:30:00,,NSW,Australia,,-1,http://www.theage.com.au/act-news/nsw-department-of-planning-rejects-wind-farms-application-for-...,Other Government and Political Issues,Agriculture
2,2015-02-11 10:30:00,,NSW,Australia,,-1,http://www.theage.com.au/act-news/nsw-department-of-planning-rejects-wind-farms-application-for-...,Other Government and Political Issues,General
3,2015-06-07 09:30:00,,TAS,Australia,Protesters march through Devonport in support of Alexander Spirit oil tanker workers,-1,http://www.abc.net.au/news/2015-07-06/protesters-march-through-devonport-support-oil-tanker-work...,Employment and Wages,Labour
4,2015-06-10 09:30:00,,WA,Australia,Human chain across Stirling Bridge in Fremantle to protest live animal export,-1,http://www.theage.com.au/wa-news/human-chain-across-stirling-bridge-in-fremantle-to-protest-live...,Other Civil Unrest,General
5,2015-06-11 09:30:00,,NSW,Australia,Council offers assurances about Bathurst district quarry proposal,-1,http://www.abc.net.au/news/2015-11-05/council-offers-assurances-about-bathurst-district-quarry-p...,Other Government and Political Issues,General
6,2015-06-11 09:30:00,,NSW,Australia,Workers and carers brave rain to protest privatisation of disability services,-1,http://www.abc.net.au/news/2015-11-04/workers-and-carers-brave-rain-to-protest-privatisation-of-...,Other Government and Political Issues,Labour
7,2015-06-13 09:30:00,,VIC,Australia,Shorten says Abbott can own a marriage bill,-1,http://www.theaustralian.com.au/national-affairs/politics-news/shorten-says-abbott-can-own-a-mar...,Other Government and Political Issues,General
8,2015-06-17 09:30:00,,Johor Baru,Malaysia,NGO's rally for Johor crown prince,-1,http://www.nst.com.my/node/88506,Other Government and Political Issues,Labour
9,2015-06-17 09:30:00,,Central,Fiji,Protest damages Parliament,-1,http://www.fijitimes.com/story.aspx?id=310126,Other Government and Political Issues,General


In [76]:
# 
# VALIDATION of GSR
# 
###################### 
# Find multiple clean GSR records with same firstReportedLink but different key:eventDate 
######################

cleanGsrDf = cleanAllGsrDf 

print('------------------')
print("Find GSR records with same firstReportedLink but different key:eventDate...")

# drop any GSR with None in firstReportedLink and timestamp
gsrRdd = cleanGsrDf.dropna('any',2,('firstReportedLink','key:eventDate')).rdd
print("Number of GSR records: ", gsrRdd.count())

# create a pair RDD where each element is a tuple of (firstReportedLink, row)
linkRowPairRdd = gsrRdd.map(lambda row: (row.asDict()['firstReportedLink'], row))

# create a pair RDD where each element is a tuple of (firstReportedlink, iterable(row))
# there will be one element per firstReportedLink, with the iterable contains all 
# rows that contain the firstReportedLink
linkRowIterPairRdd = linkRowPairRdd.groupByKey()
print("Number of EVENTS (unique firstReportedLink) = ", linkRowIterPairRdd.count())

def getEventsWithManyEventDate (rowIter):
    # find the unique set of timestamps
    uniqueTimestamps = set()
    for row in rowIter:
        uniqueTimestamps.add(row.asDict()['key:eventDate'])
        
    # if more than one timestamp return all rows     
    if len(uniqueTimestamps) > 1:
        return rowIter
    else:
        return None
    
# find elements that have many (more than one) different key:eventDate
# returns a pairRDD wiht tuples (firstReportedLink, iterable(row))
# the elements we are not interested in have None for the iterable(row)
invLinkRowIterPairRdd = linkRowIterPairRdd.mapValues(getEventsWithManyEventDate)

# remove any elements with None for the iter(row)
# returns a pairRDD with tuples (firstReportedLink, iterable(row))
filterPairRdd = invLinkRowIterPairRdd.filter(lambda data: data[1] != None).sortByKey()
print("Number of EVENTS (unique firstReportedLink) with multiple key:eventDate = ", 
      filterPairRdd.count())

# flatten it out to get an RDD of rows
invalidRdd = filterPairRdd.values().flatMap(lambda x: x)
#print(invalidRdd.first())
print("Number of contributing GSR records to these EVENTS = ", invalidRdd.count())
 
if (invalidRdd.count() > 0)  :    
    invalidDf = sqlContext.createDataFrame(invalidRdd)
    displayDf = invalidDf.select('key:eventDate', 
                                 'city', 'state', 'country', 'headline', 
                                 'firstReportedLink', 'authorId', 'widespreadEventId', 
                                 'eventType', 'populationGroup').orderBy('firstReportedLink')

    pandas.set_option('display.max_colwidth', 80)
    display.display(displayDf.toPandas())




------------------
Find GSR records with same firstReportedLink but different key:eventDate...
Number of GSR records:  3251
Number of EVENTS (unique firstReportedLink) =  2601
Number of EVENTS (unique firstReportedLink) with multiple key:eventDate =  72
Number of contributing GSR records to these EVENTS =  242


Unnamed: 0,key:eventDate,city,state,country,headline,firstReportedLink,authorId,widespreadEventId,eventType,populationGroup
0,2015-06-15 09:30:00,Jakarta,Jakarta Raya,Indonesia,"Demo Tolak Sutiyoso, Enam Orang Diamankan Pamdal DPR",http://megapolitan.kompas.com/read/2015/06/15/16392731/Demo.Tolak.Sutiyoso.E...,7,-1,Other Government and Political Issues,Education
1,2015-07-14 09:30:00,Jakarta,Jakarta Raya,Indonesia,"Demo Tolak Sutiyoso, Enam Orang Diamankan Pamdal DPR",http://megapolitan.kompas.com/read/2015/06/15/16392731/Demo.Tolak.Sutiyoso.E...,2,-1,Other Government and Political Issues,Education
2,2015-06-15 09:30:00,Jakarta,Jakarta Raya,Indonesia,"Demo Tolak Sutiyoso, Enam Orang Diamankan Pamdal DPR",http://megapolitan.kompas.com/read/2015/06/15/16392731/Demo.Tolak.Sutiyoso.E...,4,-1,Other Government and Political Issues,Education
3,2015-07-05 09:30:00,Jakarta,Jakarta Raya,Indonesia,"Penahanan Bupati Empat Lawang Ricuh, Kamera Fotografer Rusak",http://nasional.kompas.com/read/2015/07/06/20393601/Penahanan.Bupati.Empat.L...,2,-1,Other Government and Political Issues,General
4,2015-07-06 09:30:00,Kuningan,Jakarta Raya,Indonesia,"Penahanan Bupati Empat Lawang Ricuh, Kamera Fotografer Rusak",http://nasional.kompas.com/read/2015/07/06/20393601/Penahanan.Bupati.Empat.L...,4,-1,Other Government and Political Issues,General
5,2015-06-19 09:30:00,Central Jakarta,Jakarta Raya,Indonesia,Penyewa Stan PRJ Senayan Demo Tuntut Ganti Rugi Biaya Sewa,http://news.detik.com/berita/2947750/penyewa-stan-prj-senayan-demo-tuntut-ga...,2,-1,Other Government and Political Issues,Business
6,2015-06-20 09:30:00,Jakarta,Jakarta Raya,Indonesia,Penyewa Stan PRJ Senayan Demo Tuntut Ganti Rugi Biaya Sewa,http://news.detik.com/berita/2947750/penyewa-stan-prj-senayan-demo-tuntut-ga...,7,-1,Employment and Wages,Business
7,2015-06-25 09:30:00,Yogyakarta,Daerah Istimewa Yogyakarta,Indonesia,"Tuntut Bupati Diadili, Mahasiswa Muba di Yogya Pecahkan Kaca dan Coret Asrama",http://news.detik.com/berita/2952012/tuntut-bupati-diadili-mahasiswa-muba-di...,7,-1,Other Government and Political Issues,Education
8,2015-06-24 09:30:00,Jogjakarta,Jogjakarta,Indonesia,"Tuntut Bupati Diadili, Mahasiswa Muba di Yogya Pecahkan Kaca dan Coret Asrama",http://news.detik.com/berita/2952012/tuntut-bupati-diadili-mahasiswa-muba-di...,2,-1,Other Government and Political Issues,Education
9,2015-08-13 09:30:00,Yogyakarta,Daerah Istimewa Yogyakarta,Indonesia,Aksi Pria Bersepeda Hadang Konvoi Harley-Davidson di Sleman,http://regional.kompas.com/read/2015/08/15/18292971/Aksi.Pria.Bersepeda.Hada...,7,-1,Other Civil Unrest,General


In [77]:
# 
# VALIDATION of GSR
# 
###################### 
# Find multiple clean GSR records with same firstReportedLink but different earliestReportedDate 
######################

cleanGsrDf = cleanAllGsrDf

print('------------------')
print("Find GSR records with same firstReportedLink but different earliestReportedDate...")

# drop any GSR with None in firstReportedLink and timestamp
gsrRdd = cleanGsrDf.dropna('any',2,('firstReportedLink','earliestReportedDate')).rdd
print("Number of GSR records: ", gsrRdd.count())

# create a pair RDD where each element is a tuple of (firstReportedLink, row)
linkRowPairRdd = gsrRdd.map(lambda row: (row.asDict()['firstReportedLink'], row))

# create a pair RDD where each element is a tuple of (firstReportedlink, iterable(row))
# there will be one element per firstReportedLink, with the iterable contains all 
# rows that contain the firstReportedLink
linkRowIterPairRdd = linkRowPairRdd.groupByKey()
print("Number of EVENTS (unique firstReportedLink) = ", linkRowIterPairRdd.count())

def getEventsWithManyReportedDate (rowIter):
    # find the unique set of timestamps
    uniqueTimestamps = set()
    for row in rowIter:
        uniqueTimestamps.add(row.asDict()['earliestReportedDate'])
        
    # if more than one timestamp return all rows     
    if len(uniqueTimestamps) > 1:
        return rowIter
    else:
        return None
    
# find elements that have many (more than one) different key:eventDate
# returns a pairRDD wiht tuples (firstReportedLink, iterable(row))
# the elements we are not interested in have None for the iterable(row)
invLinkRowIterPairRdd = linkRowIterPairRdd.mapValues(getEventsWithManyReportedDate)

# remove any elements with None for the iter(row)
# returns a pairRDD with tuples (firstReportedLink, iterable(row))
filterPairRdd = invLinkRowIterPairRdd.filter(lambda data: data[1] != None).sortByKey()
print("Number of EVENTS (unique firstReportedLink) with multiple earliestReportedDate = ", 
      filterPairRdd.count())

# flatten it out to get an RDD of rows
invalidRdd = filterPairRdd.values().flatMap(lambda x: x)
#print(invalidRdd.first())
print("Number of contributing GSR records to these EVENTS = ", invalidRdd.count())

if (invalidRdd.count() > 0)  :
    invalidDf = sqlContext.createDataFrame(invalidRdd)
    displayDf = invalidDf.select('earliestReportedDate', 
                                 'city', 'state', 'country', 'headline', 
                                 'firstReportedLink', 'authorId', 'widespreadEventId', 
                                 'eventType', 'populationGroup').orderBy('firstReportedLink')

    pandas.set_option('display.max_colwidth', 80)
    display.display(displayDf.toPandas())



------------------
Find GSR records with same firstReportedLink but different earliestReportedDate...
Number of GSR records:  3251
Number of EVENTS (unique firstReportedLink) =  2601
Number of EVENTS (unique firstReportedLink) with multiple earliestReportedDate =  105
Number of contributing GSR records to these EVENTS =  309


Unnamed: 0,earliestReportedDate,city,state,country,headline,firstReportedLink,authorId,widespreadEventId,eventType,populationGroup
0,2015-06-19 09:30:00,Jakarta,Jakarta Raya,Indonesia,"Demo Tolak Sutiyoso, Enam Orang Diamankan Pamdal DPR",http://megapolitan.kompas.com/read/2015/06/15/16392731/Demo.Tolak.Sutiyoso.E...,7,-1,Other Government and Political Issues,Education
1,2015-06-15 09:30:00,Jakarta,Jakarta Raya,Indonesia,"Demo Tolak Sutiyoso, Enam Orang Diamankan Pamdal DPR",http://megapolitan.kompas.com/read/2015/06/15/16392731/Demo.Tolak.Sutiyoso.E...,2,-1,Other Government and Political Issues,Education
2,2015-06-23 09:30:00,Jakarta,Jakarta Raya,Indonesia,"Demo Tolak Sutiyoso, Enam Orang Diamankan Pamdal DPR",http://megapolitan.kompas.com/read/2015/06/15/16392731/Demo.Tolak.Sutiyoso.E...,4,-1,Other Government and Political Issues,Education
3,2015-07-06 09:30:00,Jakarta,Jakarta Raya,Indonesia,"Penahanan Bupati Empat Lawang Ricuh, Kamera Fotografer Rusak",http://nasional.kompas.com/read/2015/07/06/20393601/Penahanan.Bupati.Empat.L...,2,-1,Other Government and Political Issues,General
4,2015-07-07 09:30:00,Kuningan,Jakarta Raya,Indonesia,"Penahanan Bupati Empat Lawang Ricuh, Kamera Fotografer Rusak",http://nasional.kompas.com/read/2015/07/06/20393601/Penahanan.Bupati.Empat.L...,4,-1,Other Government and Political Issues,General
5,2015-06-19 09:30:00,Medan,North Sumatra,Indonesia,"Ratusan Pedagang Demo Tutup Jalan di Medan, Lalin Macet Parah",http://news.detik.com/berita/2940753/ratusan-pedagang-demo-tutup-jalan-di-me...,7,-1,Other Government and Political Issues,Business
6,2015-06-12 09:30:00,Medan,North Sumatera,Indonesia,"Ratusan Pedagang Demo Tutup Jalan di Medan, Lalin Macet Parah",http://news.detik.com/berita/2940753/ratusan-pedagang-demo-tutup-jalan-di-me...,0,-1,"Land, Energy and Resources",Business
7,2015-06-14 09:30:00,Yogyakarta,Yogyakarta,Indonesia,Semarak Ruwat Bumi Warga Miliran Yogyakarta Sambut Hari Lingkungan Hidup,http://news.detik.com/berita/2942106/semarak-ruwat-bumi-warga-miliran-yogyak...,0,-1,Other Civil Unrest,General
8,2015-06-19 09:30:00,Yogyakarta,Daerah Istimewa Yogyakarta,Indonesia,Semarak Ruwat Bumi Warga Miliran Yogyakarta Sambut Hari Lingkungan Hidup,http://news.detik.com/berita/2942106/semarak-ruwat-bumi-warga-miliran-yogyak...,7,-1,Other Civil Unrest,General
9,2015-06-20 09:30:00,Central Jakarta,Jakarta Raya,Indonesia,Penyewa Stan PRJ Senayan Demo Tuntut Ganti Rugi Biaya Sewa,http://news.detik.com/berita/2947750/penyewa-stan-prj-senayan-demo-tuntut-ga...,2,-1,Other Government and Political Issues,Business


In [78]:
# 
# VALIDATION of GSR
# 
###################### 
# 1. Find multiple clean GSR records with same firstReportedLink but different location = (city, state, country)
######################

cleanGsrDf = cleanAllGsrDf

print('------------------')
print("Find GSR records with same firstReportedLink but different location (city, state, country)...")

# drop any GSR with None in firstReportedLink and timestamp
#gsrRdd = cleanGsrDf.dropna('any',2,('firstReportedLink')).rdd
gsrRdd = cleanGsrDf.rdd
print("Number of GSR records: ", gsrRdd.count())

# create a pair RDD where each element is a tuple of (firstReportedLink, row)
linkRowPairRdd = gsrRdd.map(lambda row: (row.asDict()['firstReportedLink'], row))

# create a pair RDD where each element is a tuple of (firstReportedlink, iterable(row))
# there will be one element per firstReportedLink, with the iterable contains all 
# rows that contain the firstReportedLink
linkRowIterPairRdd = linkRowPairRdd.groupByKey()
print("Number of EVENTS (unique firstReportedLink) = ",       linkRowIterPairRdd.count())

def getEventsWithManyLocations (rowIter):
    # find the unique set of timestamps
    uniqueLocations = set()
    for row in rowIter:
        city = row.asDict()['city']
        state = row.asDict()['state']
        country = row.asDict()['country']
        uniqueLocations.add((city, state, country))
        
    # if more than one timestamp return all rows     
    if len(uniqueLocations) > 1:
        return rowIter
    else:
        return None
    
# find elements that have many (more than one) different key:eventDate
# returns a pairRDD wiht tuples (firstReportedLink, iterable(row))
# the elements we are not interested in have None for the iterable(row)
invLinkRowIterPairRdd = linkRowIterPairRdd.mapValues(getEventsWithManyLocations)

# remove any elements with None for the iter(row)
# returns a pairRDD with tuples (firstReportedLink, iterable(row))
filterPairRdd = invLinkRowIterPairRdd.filter(lambda data: data[1] != None)

print("Number of EVENTS (unique firstReportedLink) with multiple locations (city,state,country) = ", 
      filterPairRdd.count())


# flatten it out to get an RDD of rows
invalidRdd = filterPairRdd.values().flatMap(lambda x: x)
#print(invalidRdd.first())
print("Number of contributing GSR records to these EVENTS = ", invalidRdd.count())
 
if (invalidRdd.count() > 0)  :    
    invalidDf = sqlContext.createDataFrame(invalidRdd)
    displayDf = invalidDf.select('earliestReportedDate', 
                                 'city', 'state', 'country', 'headline', 
                                 'firstReportedLink', 'authorId', 'widespreadEventId', 
                                 'eventType', 'populationGroup').orderBy('firstReportedLink')

    pandas.set_option('display.max_colwidth', 80)
    display.display(displayDf.toPandas())


------------------
Find GSR records with same firstReportedLink but different location (city, state, country)...
Number of GSR records:  3263
Number of EVENTS (unique firstReportedLink) =  2602
Number of EVENTS (unique firstReportedLink) with multiple locations (city,state,country) =  229
Number of contributing GSR records to these EVENTS =  678


Unnamed: 0,earliestReportedDate,city,state,country,headline,firstReportedLink,authorId,widespreadEventId,eventType,populationGroup
0,2015-07-03 09:30:00,Devonport,TAS,Australia,The Australian crew of an oil tanker is refusing to leave Devonport in Tasma...,,3,-1,Employment and Wages,Labour
1,2015-07-13 09:30:00,Bangkok,,Thailand,Vocational students rally to support Prayut,,6,-1,Other Civil Unrest,Education
2,2015-11-23 10:30:00,Mount Gambier,SA,Australia,,,3,-1,"Land, Energy and Resources",General
3,2015-12-04 10:30:00,Shah Alam,Selangor,Malaysia,,,4,-1,Other Civil Unrest,Religious
4,2015-06-17 09:30:00,Sukabumi,West Java,Indonesia,Ribuan Warga Sukabumi Ikuti Pawai Tarhib Ramadhan,,7,620835211,Other Civil Unrest,Religious
5,2015-06-29 09:30:00,Melbourne,VIC,Australia,Protesters take over Melbourne bridge as Friday night commuters hit again,,6,-1,Other Civil Unrest,General
6,2015-12-29 10:30:00,Bangka,Bangka Belitung,Indonesia,"Saat Kontra Demo, Warga Pro KIP Doa di Pantai Matras",,2,-1,"Land, Energy and Resources",General
7,2015-09-05 09:30:00,Magelang,Central Java,Indonesia,"Kenakan Kostum Tradisional, Pemuda dari 5 Negara Kampanye Pelestarian Candi ...",,4,-1,Other Civil Unrest,General
8,2015-12-29 10:30:00,Pangkal Pinang,Bangka Belitung,Indonesia,"Saat Kontra Demo, Warga Pro KIP Doa di Pantai Matras",,2,-1,"Land, Energy and Resources",General
9,2015-09-11 09:30:00,Ambon,Maluku,Indonesia,"Pendemo Tuntut ""Caretaker"" Bupati SBT dari Putra Daerah",,4,-1,Other Government and Political Issues,Education


In [79]:
# 
# VALIDATION of GSR
# 
###################### 
#    Find multiple clean GSR records with same firstReportedLink, eventDate, & location = (city, state, country) 
#    but different eventType
###################### 

cleanGsrDf = cleanAllGsrDf

print('------------------')
print("Find GSR records with same firstReportedLink, eventDate, & location (city, state, country) ",
      "BUT different eventType...")

eventDf = cleanGsrDf.groupBy('key:eventDate','city','state','location','firstReportedLink')
#print(type(eventDf.first()))


# drop any GSR with None in firstReportedLink and timestamp
gsrRdd = cleanGsrDf.dropna('any',2,('firstReportedLink')).rdd
gsrRdd = cleanGsrDf.rdd
print("Number of GSR records: ", gsrRdd.count())

# create a pair RDD where each element is a tuple of (key, row) 
# where key is a tuple (firstReportedLink, eventdate, city, state, country)
keyRowPairRdd = gsrRdd.map(lambda row: ((row.asDict()['firstReportedLink'], 
                                          row.asDict()['key:eventDate'],
                                          row.asDict()['city'],
                                          row.asDict()['state'],
                                          row.asDict()['country']), row))

# create a pair RDD where each element is a tuple of (key, iterable(row))
# there will be one element per firstReportedLink, with the iterable contains all 
# rows that contain the firstReportedLink
keyRowIterPairRdd = keyRowPairRdd.groupByKey()
print("Number of EVENTS (unique firstReportedLink, eventDate, city, state, country) = ", keyRowIterPairRdd.count())

def getEventsWithManyEventTypes (rowIter):
    # find the unique set of timestamps
    uniqueEventTypes = set()
    for row in rowIter:
        eventType = row.asDict()['eventType']
        uniqueEventTypes.add(eventType)
        
    # if more than one timestamp return all rows     
    if len(uniqueEventTypes) > 1:
        return rowIter
    else:
        return None
    
# find elements that have many (more than one) different key:eventDate
# returns a pairRDD wiht tuples (key iterable(row))
# the elements we are not interested in have None for the iterable(row)
invalidKeyRowIterPairRdd = keyRowIterPairRdd.mapValues(getEventsWithManyEventTypes)

# remove any elements with None for the iter(row)
# returns a pairRDD with tuples (key, iterable(row))
filterPairRdd = invalidKeyRowIterPairRdd.filter(lambda data: data[1] != None)

print("Number of EVENTS (unique firstReportedLink, eventDate, city, state, country) with multiple eventTypes = ", 
      filterPairRdd.count())


# flatten it out to get an RDD of rows
invalidRdd = filterPairRdd.values().flatMap(lambda x: x)
#print(invalidRdd.first())
print("Number of contributing GSR records to these EVENTS = ", invalidRdd.count())

if (invalidRdd.count() > 0)  :
    invalidDf = sqlContext.createDataFrame(invalidRdd)
    displayDf = invalidDf.select('firstReportedLink','key:eventDate', 
                                 'city', 'state', 'country', 'headline', 
                                  'eventType', 'authorId', 'widespreadEventId', 
                                  'populationGroup').orderBy('firstReportedLink')

    pandas.set_option('display.max_colwidth', 80)
    display.display(displayDf.toPandas())


------------------
Find GSR records with same firstReportedLink, eventDate, & location (city, state, country)  BUT different eventType...
Number of GSR records:  3263
Number of EVENTS (unique firstReportedLink, eventDate, city, state, country) =  3019
Number of EVENTS (unique firstReportedLink, eventDate, city, state, country) with multiple eventTypes =  39
Number of contributing GSR records to these EVENTS =  93


Unnamed: 0,firstReportedLink,key:eventDate,city,state,country,headline,eventType,authorId,widespreadEventId,populationGroup
0,http://jabar.pojoksatu.id/bogor/2015/06/26/buruh-bentrok-dengan-warga-polisi...,2015-06-25 09:30:00,Bogor,West Java,Indonesia,"Buruh Bentrok dengan Warga, Polisi Bidik Provokatornya",Other Civil Unrest,7,-1,General
1,http://jabar.pojoksatu.id/bogor/2015/06/26/buruh-bentrok-dengan-warga-polisi...,2015-06-25 09:30:00,Bogor,West Java,Indonesia,"Buruh Bentrok dengan Warga, Polisi Bidik Provokatornya",Employment and Wages,7,-1,Labour
2,http://news.detik.com/jawabarat/2988758/persoalkan-jht-ratusan-buruh-geruduk...,2015-08-11 09:30:00,Bandung,West Java,Indonesia,"Persoalkan JHT, Ratusan Buruh Geruduk Kantor BPJS Bandung",Other Government and Political Issues,2,-1,Labour
3,http://news.detik.com/jawabarat/2988758/persoalkan-jht-ratusan-buruh-geruduk...,2015-08-11 09:30:00,Bandung,West Java,Indonesia,"Persoalkan JHT, Ratusan Buruh Geruduk Kantor BPJS Bandung",Employment and Wages,7,-1,Labour
4,http://news.detik.com/jawatimur/2944904/dilarang-beroperasi-1000-sopir-truk-...,2015-06-17 09:30:00,Banyuwangi,East Java,Indonesia,"Dilarang Beroperasi, 1.000 Sopir Truk Material di Banyuwangi Demo",Employment and Wages,7,-1,Business
5,http://news.detik.com/jawatimur/2944904/dilarang-beroperasi-1000-sopir-truk-...,2015-06-17 09:30:00,Banyuwangi,East Java,Indonesia,"Dilarang Beroperasi, 1.000 Sopir Truk Material di Banyuwangi Demo",Other Government and Political Issues,0,-1,Labour
6,http://regional.kompas.com/read/2015/06/25/18163291/Sopir.Taksi.di.Bandung.T...,2015-06-25 09:30:00,Bandung,West Java,Indonesia,Sopir Taksi di Bandung Tentang Uber,Employment and Wages,4,-1,Business
7,http://regional.kompas.com/read/2015/06/25/18163291/Sopir.Taksi.di.Bandung.T...,2015-06-25 09:30:00,Bandung,West Java,Indonesia,Sopir Taksi di Bandung Tentang Uber,Other Economic Issues,7,-1,Business
8,http://regional.kompas.com/read/2015/08/24/11045231/Tolak.Go-jek.dan.Uber.Ra...,2015-08-24 09:30:00,Bandung,West Java,Indonesia,"Tolak Go-Jek dan Uber, Ratusan Taksi ""Kepung"" Kampus ITB",Other Civil Unrest,2,-1,General
9,http://regional.kompas.com/read/2015/08/24/11045231/Tolak.Go-jek.dan.Uber.Ra...,2015-08-24 09:30:00,Bandung,West Java,Indonesia,"Tolak Go-Jek dan Uber, Ratusan Taksi ""Kepung"" Kampus ITB",Employment and Wages,7,-1,Business


In [80]:
# 
# VALIDATION of GSR
# 
###################### 
#    Find multiple clean GSR records with same firstReportedLink, eventDate, & location = (city, state, country) 
#    but different populationGroup
###################### 

cleanGsrDf = cleanAllGsrDf

print('------------------')
print("Find GSR records with same firstReportedLink, eventDate, & location (city, state, country) ",
      "BUT different populationGroup...")

#eventDf = cleanGsrDf.groupBy('key:eventDate','city','state','location','firstReportedLink')
#print(type(eventDf.first()))


# drop any GSR with None in firstReportedLink 
gsrRdd = cleanGsrDf.dropna('any',2,('firstReportedLink')).rdd
gsrRdd = cleanGsrDf.rdd
print("Number of GSR records: ", gsrRdd.count())

# create a pair RDD where each element is a tuple of (key, row) 
# where key is a tuple (firstReportedLink, eventdate, city, state, country)
keyRowPairRdd = gsrRdd.map(lambda row: ((row.asDict()['firstReportedLink'], 
                                          row.asDict()['key:eventDate'],
                                          row.asDict()['city'],
                                          row.asDict()['state'],
                                          row.asDict()['country']), row))

# create a pair RDD where each element is a tuple of (key, iterable(row))
# there will be one element per firstReportedLink, with the iterable contains all 
# rows that contain the firstReportedLink
keyRowIterPairRdd = keyRowPairRdd.groupByKey()
print("Number of EVENTS (unique firstReportedLink, eventDate, city, state, country) = ", keyRowIterPairRdd.count())

def getEventsWithManyPopulationGroups (rowIter):
    # find the unique set of timestamps
    uniquePopGroups = set()
    for row in rowIter:
        popGroup = row.asDict()['populationGroup']
        uniquePopGroups.add(popGroup)
        
    # if more than one timestamp return all rows     
    if len(uniquePopGroups) > 1:
        return rowIter
    else:
        return None
    
# find elements that have many (more than one) different key:eventDate
# returns a pairRDD wiht tuples (key iterable(row))
# the elements we are not interested in have None for the iterable(row)
invalidKeyRowIterPairRdd = keyRowIterPairRdd.mapValues(getEventsWithManyPopulationGroups)

# remove any elements with None for the iter(row)
# returns a pairRDD with tuples (key, iterable(row))
filterPairRdd = invalidKeyRowIterPairRdd.filter(lambda data: data[1] != None)

print("Number of EVENTS (unique firstReportedLink, eventDate, city, state, country) with multiple populationGroups = ", 
      filterPairRdd.count())


# flatten it out to get an RDD of rows
invalidRdd = filterPairRdd.values().flatMap(lambda x: x)
#print(invalidRdd.first())
print("Number of contributing GSR records to these EVENTS = ", invalidRdd.count())

if (invalidRdd.count() > 0)  :
    invalidDf = sqlContext.createDataFrame(invalidRdd)
    displayDf = invalidDf.select('firstReportedLink','key:eventDate', 
                                 'city', 'state', 'country', 'headline', 
                                  'populationGroup', 'eventType', 'authorId',  
                                  'widespreadEventId').orderBy('firstReportedLink')

    pandas.set_option('display.max_colwidth', 80)
    display.display(displayDf.toPandas())


------------------
Find GSR records with same firstReportedLink, eventDate, & location (city, state, country)  BUT different populationGroup...
Number of GSR records:  3263
Number of EVENTS (unique firstReportedLink, eventDate, city, state, country) =  3019
Number of EVENTS (unique firstReportedLink, eventDate, city, state, country) with multiple populationGroups =  76
Number of contributing GSR records to these EVENTS =  177


Unnamed: 0,firstReportedLink,key:eventDate,city,state,country,headline,populationGroup,eventType,authorId,widespreadEventId
0,http://jabar.pojoksatu.id/bogor/2015/06/26/buruh-bentrok-dengan-warga-polisi...,2015-06-25 09:30:00,Bogor,West Java,Indonesia,"Buruh Bentrok dengan Warga, Polisi Bidik Provokatornya",General,Other Civil Unrest,7,-1
1,http://jabar.pojoksatu.id/bogor/2015/06/26/buruh-bentrok-dengan-warga-polisi...,2015-06-25 09:30:00,Bogor,West Java,Indonesia,"Buruh Bentrok dengan Warga, Polisi Bidik Provokatornya",Labour,Employment and Wages,7,-1
2,http://jabar.tribunnews.com/2015/08/31/pendemo-tunggu-kedatangan-anggota-dpr...,2015-08-31 09:30:00,Garut,West Java,Indonesia,Pendemo Tunggu Kedatangan Anggota DPRD Garut Sambil Main Kartu,General,Other Government and Political Issues,2,-1
3,http://jabar.tribunnews.com/2015/08/31/pendemo-tunggu-kedatangan-anggota-dpr...,2015-08-31 09:30:00,Garut,West Java,Indonesia,Pendemo Tunggu Kedatangan Anggota DPRD Garut Sambil Main Kartu,Business,Other Government and Political Issues,2,-1
4,http://megapolitan.kompas.com/read/2015/08/28/17280101/Ada.Unjuk.Rasa.di.Ist...,2015-08-28 09:30:00,Jakarta,Jakarta Raya,Indonesia,"Ada Unjuk Rasa di Istana, Hindari Jalan Medan Merdeka Utara",Business,Other Economic Issues,7,-1
5,http://megapolitan.kompas.com/read/2015/08/28/17280101/Ada.Unjuk.Rasa.di.Ist...,2015-08-28 09:30:00,Jakarta,Jakarta Raya,Indonesia,"Ada Unjuk Rasa di Istana, Hindari Jalan Medan Merdeka Utara",Education,Other Economic Issues,7,-1
6,http://nasional.kompas.com/read/2015/09/07/11433451/Ijazah.Ditahan.Kemenrist...,2015-09-07 09:30:00,Jakarta,Jakarta Raya,Indonesia,"Ijazah Ditahan Kemenristek Dikti, Dokter Muda Demo di Depan Istana",Education,Other Government and Political Issues,7,-1
7,http://nasional.kompas.com/read/2015/09/07/11433451/Ijazah.Ditahan.Kemenrist...,2015-09-07 09:30:00,Jakarta,Jakarta Raya,Indonesia,"Ijazah Ditahan Kemenristek Dikti, Dokter Muda Demo di Depan Istana",Medical,Other Government and Political Issues,4,-1
8,http://news.detik.com/berita/2987602/buruh-demo-di-depan-gedung-kpk-lalin-ra...,2015-08-10 09:30:00,Jakarta,Jakarta Raya,Indonesia,"Buruh Demo di Depan Gedung KPK, Lalin Rasuna Said Arah Mampang Macet Parah",Labour,Employment and Wages,7,-1
9,http://news.detik.com/berita/2987602/buruh-demo-di-depan-gedung-kpk-lalin-ra...,2015-08-10 09:30:00,Jakarta,Jakarta Raya,Indonesia,"Buruh Demo di Depan Gedung KPK, Lalin Rasuna Said Arah Mampang Macet Parah",General,Employment and Wages,7,-1


In [81]:
# 
# VALIDATION of GSR
# 
###################### 
#    Find multiple clean GSR records with same firstReportedLink, eventDate, & location = (city, state, country) 
#    but different isViolent
###################### 

cleanGsrDf = cleanAllGsrDf

print('------------------')
print("Find GSR records with same firstReportedLink, eventDate, & location (city, state, country) ",
      "BUT different isViolent...")

#eventDf = cleanGsrDf.groupBy('key:eventDate','city','state','location','firstReportedLink')
#print(type(eventDf.first()))


# drop any GSR with None in firstReportedLink 
gsrRdd = cleanGsrDf.dropna('any',2,('firstReportedLink')).rdd
gsrRdd = cleanGsrDf.rdd
print("Number of GSR records: ", gsrRdd.count())

# create a pair RDD where each element is a tuple of (key, row) 
# where key is a tuple (firstReportedLink, eventdate, city, state, country)
keyRowPairRdd = gsrRdd.map(lambda row: ((row.asDict()['firstReportedLink'], 
                                          row.asDict()['key:eventDate'],
                                          row.asDict()['city'],
                                          row.asDict()['state'],
                                          row.asDict()['country']), row))

# create a pair RDD where each element is a tuple of (key, iterable(row))
# there will be one element per firstReportedLink, with the iterable contains all 
# rows that contain the firstReportedLink
keyRowIterPairRdd = keyRowPairRdd.groupByKey()
print("Number of EVENTS (unique firstReportedLink, eventDate, city, state, country) = ", keyRowIterPairRdd.count())

def getEventsWithManyIsViolent (rowIter):
    # find the unique set of timestamps
    uniqueViolent = set()
    for row in rowIter:
        isViolent = row.asDict()['isViolent']
        uniqueViolent.add(isViolent)
        
    # if more than one timestamp return all rows     
    if len(uniqueViolent) > 1:
        return rowIter
    else:
        return None
    
# find elements that have many (more than one) different key:eventDate
# returns a pairRDD wiht tuples (key iterable(row))
# the elements we are not interested in have None for the iterable(row)
invalidKeyRowIterPairRdd = keyRowIterPairRdd.mapValues(getEventsWithManyIsViolent)

# remove any elements with None for the iter(row)
# returns a pairRDD with tuples (key, iterable(row))
filterPairRdd = invalidKeyRowIterPairRdd.filter(lambda data: data[1] != None)

print("Number of EVENTS (unique firstReportedLink, eventDate, city, state, country) with multiple isViolent = ", 
      filterPairRdd.count())


# flatten it out to get an RDD of rows
invalidRdd = filterPairRdd.values().flatMap(lambda x: x)
#print(invalidRdd.first())
print("Number of contributing GSR records to these EVENTS = ", invalidRdd.count())
 
if (invalidRdd.count() > 0)  :  
    invalidDf = sqlContext.createDataFrame(invalidRdd)
    displayDf = invalidDf.select('firstReportedLink','key:eventDate', 
                                 'city', 'state', 'country', 'headline', 'isViolent', 
                                  'populationGroup', 'eventType', 'authorId',  
                                  'widespreadEventId').orderBy('firstReportedLink')

    pandas.set_option('display.max_colwidth', 80)
    display.display(displayDf.toPandas())


------------------
Find GSR records with same firstReportedLink, eventDate, & location (city, state, country)  BUT different isViolent...
Number of GSR records:  3263
Number of EVENTS (unique firstReportedLink, eventDate, city, state, country) =  3019
Number of EVENTS (unique firstReportedLink, eventDate, city, state, country) with multiple isViolent =  4
Number of contributing GSR records to these EVENTS =  10


Unnamed: 0,firstReportedLink,key:eventDate,city,state,country,headline,isViolent,populationGroup,eventType,authorId,widespreadEventId
0,http://megapolitan.kompas.com/read/2016/01/19/15121451/Tak.Digaji.Dua.Bulan....,2016-01-19 10:30:00,Bogor,West Java,Indonesia,"Tak Digaji Dua Bulan, Ratusan Sopir Truk Sampah Demo",True,Labour,Employment and Wages,4,-1
1,http://megapolitan.kompas.com/read/2016/01/19/15121451/Tak.Digaji.Dua.Bulan....,2016-01-19 10:30:00,Bogor,West Java,Indonesia,"Tak Digaji Dua Bulan, Ratusan Sopir Truk Sampah Demo",False,Labour,Employment and Wages,2,-1
2,http://regional.kompas.com/read/2015/07/27/12432301/.Pak.Kajati.Calon.Pimpin...,2015-07-27 09:30:00,Makassar,South Sulawesi,Indonesia,"""Pak Kajati Calon Pimpinan KPK, tapi Selama Menjabat Belum Ada Kasus yang Tu...",True,Education,Other Government and Political Issues,7,-1
3,http://regional.kompas.com/read/2015/07/27/12432301/.Pak.Kajati.Calon.Pimpin...,2015-07-27 09:30:00,Makassar,South Sulawesi,Indonesia,"""Pak Kajati Calon Pimpinan KPK, tapi Selama Menjabat Belum Ada Kasus yang Tu...",False,Education,Other Government and Political Issues,4,-1
4,http://regional.kompas.com/read/2015/10/20/20552121/Tiga.Kampus.di.Makassar....,2015-10-20 10:30:00,Makassar,South Sulawesi,Indonesia,Tiga Kampus di Makassar Berunjuk Rasa Kritik Pemerintahan Jokowi-JK,True,Education,Other Government and Political Issues,4,24825226
5,http://regional.kompas.com/read/2015/10/20/20552121/Tiga.Kampus.di.Makassar....,2015-10-20 10:30:00,Makassar,South Sulawesi,Indonesia,Tiga Kampus di Makassar Berunjuk Rasa Kritik Pemerintahan Jokowi-JK,True,Education,Other Government and Political Issues,4,24825226
6,http://regional.kompas.com/read/2015/10/20/20552121/Tiga.Kampus.di.Makassar....,2015-10-20 10:30:00,Makassar,South Sulawesi,Indonesia,Tiga Kampus di Makassar Berunjuk Rasa Kritik Pemerintahan Jokowi-JK,False,Education,Other Government and Political Issues,4,24825226
7,http://thejakartaglobe.beritasatu.com/news/mosque-attacked-idul-fitri-prayer...,2015-07-17 09:30:00,Jayapura,Papua,Indonesia,"Mosque Attacked, Idul Fitri Prayer Disrupted in Papua",True,General,Other Civil Unrest,7,718835246
8,http://thejakartaglobe.beritasatu.com/news/mosque-attacked-idul-fitri-prayer...,2015-07-17 09:30:00,Jayapura,Papua,Indonesia,"Mosque Attacked, Idul Fitri Prayer Disrupted in Papua",False,Religious,Other Civil Unrest,7,718835246
9,http://thejakartaglobe.beritasatu.com/news/mosque-attacked-idul-fitri-prayer...,2015-07-17 09:30:00,Jayapura,Papua,Indonesia,,True,General,Other Civil Unrest,4,-1


In [82]:
# 
# VALIDATION of GSR
# 
###################### 
#    Find multiple clean GSR records with same firstReportedLink, eventDate, & location = (city, state, country) 
#    but different crowdSize
###################### 

cleanGsrDf = cleanAllGsrDf

print('------------------')
print("Find GSR records with same firstReportedLink, eventDate, & location (city, state, country) ",
      "BUT different crowdSize...")

#eventDf = cleanGsrDf.groupBy('key:eventDate','city','state','location','firstReportedLink')
#print(type(eventDf.first()))


# drop any GSR with None in firstReportedLink 
gsrRdd = cleanGsrDf.dropna('any',2,('firstReportedLink')).rdd
gsrRdd = cleanGsrDf.rdd
print("Number of GSR records: ", gsrRdd.count())

# create a pair RDD where each element is a tuple of (key, row) 
# where key is a tuple (firstReportedLink, eventdate, city, state, country)
keyRowPairRdd = gsrRdd.map(lambda row: ((row.asDict()['firstReportedLink'], 
                                          row.asDict()['key:eventDate'],
                                          row.asDict()['city'],
                                          row.asDict()['state'],
                                          row.asDict()['country']), row))

# create a pair RDD where each element is a tuple of (key, iterable(row))
# there will be one element per firstReportedLink, with the iterable contains all 
# rows that contain the firstReportedLink
keyRowIterPairRdd = keyRowPairRdd.groupByKey()
print("Number of EVENTS (unique firstReportedLink, eventDate, city, state, country) = ", keyRowIterPairRdd.count())

def getEventsWithManyCrowdSize (rowIter):
    # find the unique set of timestamps
    uniqueCrowdSize = set()
    for row in rowIter:
        crowdSize = row.asDict()['crowdSize']
        uniqueCrowdSize.add(crowdSize)
        
    # if more than one timestamp return all rows     
    if len(uniqueCrowdSize) > 1:
        return rowIter
    else:
        return None
    
# find elements that have many (more than one) different key:eventDate
# returns a pairRDD wiht tuples (key iterable(row))
# the elements we are not interested in have None for the iterable(row)
invalidKeyRowIterPairRdd = keyRowIterPairRdd.mapValues(getEventsWithManyCrowdSize)

# remove any elements with None for the iter(row)
# returns a pairRDD with tuples (key, iterable(row))
filterPairRdd = invalidKeyRowIterPairRdd.filter(lambda data: data[1] != None)

print("Number of EVENTS (unique firstReportedLink, eventDate, city, state, country) with multiple crowdSize = ", 
      filterPairRdd.count())


# flatten it out to get an RDD of rows
invalidRdd = filterPairRdd.values().flatMap(lambda x: x)
#print(invalidRdd.first())
print("Number of contributing GSR records to these EVENTS = ", invalidRdd.count())

if (invalidRdd.count() > 0)  :
    invalidDf = sqlContext.createDataFrame(invalidRdd)
    displayDf = invalidDf.select('firstReportedLink','key:eventDate', 
                                 'city', 'state', 'country', 'headline', 'crowdSize', 
                                  'populationGroup', 'eventType', 'authorId',  
                                  'widespreadEventId').orderBy('firstReportedLink')

    pandas.set_option('display.max_colwidth', 80)
    display.display(displayDf.toPandas())


------------------
Find GSR records with same firstReportedLink, eventDate, & location (city, state, country)  BUT different crowdSize...
Number of GSR records:  3263
Number of EVENTS (unique firstReportedLink, eventDate, city, state, country) =  3019
Number of EVENTS (unique firstReportedLink, eventDate, city, state, country) with multiple crowdSize =  34
Number of contributing GSR records to these EVENTS =  77


Unnamed: 0,firstReportedLink,key:eventDate,city,state,country,headline,crowdSize,populationGroup,eventType,authorId,widespreadEventId
0,http://bisniskeuangan.kompas.com/read/2015/08/09/152410426/Protes.Mahalnya.H...,2015-08-09 09:30:00,Bogor,West Java,Indonesia,"Protes Mahalnya Harga Daging, Pedagang Mogok Berjualan",large,Business,Other Economic Issues,7,-1
1,http://bisniskeuangan.kompas.com/read/2015/08/09/152410426/Protes.Mahalnya.H...,2015-08-09 09:30:00,Bogor,West Java,Indonesia,"Protes Mahalnya Harga Daging, Pedagang Mogok Berjualan",small,Business,Other Economic Issues,2,810775211
2,http://jabar.pojoksatu.id/bogor/2015/06/26/buruh-bentrok-dengan-warga-polisi...,2015-06-25 09:30:00,Bogor,West Java,Indonesia,"Buruh Bentrok dengan Warga, Polisi Bidik Provokatornya",large,Labour,Employment and Wages,7,-1
3,http://jabar.pojoksatu.id/bogor/2015/06/26/buruh-bentrok-dengan-warga-polisi...,2015-06-25 09:30:00,Bogor,West Java,Indonesia,"Buruh Bentrok dengan Warga, Polisi Bidik Provokatornya",unknown,General,Other Civil Unrest,7,-1
4,http://megapolitan.kompas.com/read/2015/08/28/17280101/Ada.Unjuk.Rasa.di.Ist...,2015-08-28 09:30:00,Jakarta,Jakarta Raya,Indonesia,"Ada Unjuk Rasa di Istana, Hindari Jalan Medan Merdeka Utara",unknown,Business,Other Economic Issues,7,-1
5,http://megapolitan.kompas.com/read/2015/08/28/17280101/Ada.Unjuk.Rasa.di.Ist...,2015-08-28 09:30:00,Jakarta,Jakarta Raya,Indonesia,"Ada Unjuk Rasa di Istana, Hindari Jalan Medan Merdeka Utara",large,Education,Other Economic Issues,7,-1
6,http://news.pngfacts.com/2015/07/preachers-burn-holiy-bible-in-city-of.html,2015-07-17 09:30:00,Mt Hagen,Western Highlands,Papua New Guinea,‘Preachers’ burn Bible,large,General,Other Civil Unrest,7,-1
7,http://news.pngfacts.com/2015/07/preachers-burn-holiy-bible-in-city-of.html,2015-07-17 09:30:00,Mt Hagen,Western Highlands,Papua New Guinea,‘Preachers’ burn Bible,small,Religious,Other Civil Unrest,7,-1
8,http://regional.kompas.com/read/2015/06/23/10294171/Barang.Bukti.Solar.38.40...,2015-06-23 09:30:00,Denpasar,Bali,Indonesia,"Barang Bukti Solar 38.400 Liter Raib, Massa Tuntut Kajari Denpasar Dicopot",small,General,Other Government and Political Issues,7,-1
9,http://regional.kompas.com/read/2015/06/23/10294171/Barang.Bukti.Solar.38.40...,2015-06-23 09:30:00,Denpasar,Bali,Indonesia,"Barang Bukti Solar 38.400 Liter Raib, Massa Tuntut Kajari Denpasar Dicopot",large,General,Other Government and Political Issues,2,-1


In [83]:
# 
# VALIDATION of GSR
# 
###################### 
#    Find multiple clean GSR records with same firstReportedLink, eventDate, & location = (city, state, country) 
#    but different widespreadEventId
###################### 

cleanGsrDf = cleanAllGsrDf

print('------------------')
print("Find GSR records with same firstReportedLink, eventDate, & location (city, state, country) ",
      "BUT different widespreadEventId...")

#eventDf = cleanGsrDf.groupBy('key:eventDate','city','state','location','firstReportedLink')
#print(type(eventDf.first()))


# drop any GSR with None in firstReportedLink 
gsrRdd = cleanGsrDf.dropna('any',2,('firstReportedLink')).rdd
gsrRdd = cleanGsrDf.rdd
print("Number of GSR records: ", gsrRdd.count())

# create a pair RDD where each element is a tuple of (key, row) 
# where key is a tuple (firstReportedLink, eventdate, city, state, country)
keyRowPairRdd = gsrRdd.map(lambda row: ((row.asDict()['firstReportedLink'], 
                                          row.asDict()['key:eventDate'],
                                          row.asDict()['city'],
                                          row.asDict()['state'],
                                          row.asDict()['country']), row))

# create a pair RDD where each element is a tuple of (key, iterable(row))
# there will be one element per firstReportedLink, with the iterable contains all 
# rows that contain the firstReportedLink
keyRowIterPairRdd = keyRowPairRdd.groupByKey()
print("Number of EVENTS (unique firstReportedLink, eventDate, city, state, country) = ", keyRowIterPairRdd.count())

def getEventsWithManyWidespreadEventId (rowIter):
    # find the unique set of timestamps
    uniqueWidespreadEventId = set()
    for row in rowIter:
        widespreadEventId  = row.asDict()['widespreadEventId']
        uniqueWidespreadEventId.add(widespreadEventId )
        
    # if more than one timestamp return all rows     
    if len(uniqueWidespreadEventId ) > 1:
        return rowIter
    else:
        return None
    
# find elements that have many (more than one) different key:eventDate
# returns a pairRDD wiht tuples (key iterable(row))
# the elements we are not interested in have None for the iterable(row)
invalidKeyRowIterPairRdd = keyRowIterPairRdd.mapValues(getEventsWithManyWidespreadEventId )

# remove any elements with None for the iter(row)
# returns a pairRDD with tuples (key, iterable(row))
filterPairRdd = invalidKeyRowIterPairRdd.filter(lambda data: data[1] != None)

print("Number of EVENTS (unique firstReportedLink, eventDate, city, state, country) with multiple WidespreadEventId  = ", 
      filterPairRdd.count())


# flatten it out to get an RDD of rows
invalidRdd = filterPairRdd.values().flatMap(lambda x: x)
#print(invalidRdd.first())
print("Number of contributing GSR records to these EVENTS = ", invalidRdd.count())

if (invalidRdd.count() > 0)  :
    invalidDf = sqlContext.createDataFrame(invalidRdd)
    displayDf = invalidDf.select('firstReportedLink','key:eventDate', 
                                 'city', 'state', 'country', 'headline', 'crowdSize', 
                                  'populationGroup', 'eventType', 'authorId',  
                                  'widespreadEventId').orderBy('firstReportedLink')

    pandas.set_option('display.max_colwidth', 80)
    display.display(displayDf.toPandas())


------------------
Find GSR records with same firstReportedLink, eventDate, & location (city, state, country)  BUT different widespreadEventId...
Number of GSR records:  3263
Number of EVENTS (unique firstReportedLink, eventDate, city, state, country) =  3019
Number of EVENTS (unique firstReportedLink, eventDate, city, state, country) with multiple WidespreadEventId  =  55
Number of contributing GSR records to these EVENTS =  120


Unnamed: 0,firstReportedLink,key:eventDate,city,state,country,headline,crowdSize,populationGroup,eventType,authorId,widespreadEventId
0,http://batam.tribunnews.com/2015/09/01/ribuan-buruh-unjuk-rasa-di-kantor-wak...,2015-09-01 09:30:00,Batam,Kepulauan Riau,Indonesia,"Ribuan Buruh Unjuk Rasa di Kantor Wako Batam, Ini yang Mereka Tuntut",large,Labour,Employment and Wages,2,831775235
1,http://batam.tribunnews.com/2015/09/01/ribuan-buruh-unjuk-rasa-di-kantor-wak...,2015-09-01 09:30:00,Batam,Kepulauan Riau,Indonesia,"Ribuan Buruh Unjuk Rasa di Kantor Wako Batam, Ini yang Mereka Tuntut",large,Labour,Employment and Wages,2,908775229
2,http://bisniskeuangan.kompas.com/read/2015/08/09/152410426/Protes.Mahalnya.H...,2015-08-09 09:30:00,Bogor,West Java,Indonesia,"Protes Mahalnya Harga Daging, Pedagang Mogok Berjualan",large,Business,Other Economic Issues,7,-1
3,http://bisniskeuangan.kompas.com/read/2015/08/09/152410426/Protes.Mahalnya.H...,2015-08-09 09:30:00,Bogor,West Java,Indonesia,"Protes Mahalnya Harga Daging, Pedagang Mogok Berjualan",small,Business,Other Economic Issues,2,810775211
4,http://megapolitan.kompas.com/read/2015/08/17/14134301/Giliran.Pedagang.Ayam...,2015-08-17 09:30:00,Bogor,West Java,Indonesia,Giliran Pedagang Ayam di Bogor Mogok Berjualan,large,Business,Other Economic Issues,2,822775239
5,http://megapolitan.kompas.com/read/2015/08/17/14134301/Giliran.Pedagang.Ayam...,2015-08-17 09:30:00,Bogor,West Java,Indonesia,Giliran Pedagang Ayam di Bogor Mogok Berjualan,large,Business,Other Economic Issues,4,-1
6,http://news.detik.com/berita/2974529/warga-muslim-solo-demo-kecam-insiden-to...,2015-07-24 09:30:00,Solo,Central Java,Indonesia,Warga Muslim Solo Demo Kecam Insiden Tolikara,large,Religious,Other Civil Unrest,2,725775224
7,http://news.detik.com/berita/2974529/warga-muslim-solo-demo-kecam-insiden-to...,2015-07-24 09:30:00,Solo,Central Java,Indonesia,Warga Muslim Solo Demo Kecam Insiden Tolikara,large,Religious,Other Civil Unrest,2,720775224
8,http://news.detik.com/berita/3056621/buruh-konvoi-tuntut-naik-gaji-jalan-ray...,2015-10-29 10:30:00,Bogor,West Java,Indonesia,"Buruh Konvoi Tuntut Naik Gaji, Jalan Raya Bogor Macet",large,Labour,Employment and Wages,2,27775220
9,http://news.detik.com/berita/3056621/buruh-konvoi-tuntut-naik-gaji-jalan-ray...,2015-10-29 10:30:00,Bogor,West Java,Indonesia,"Buruh Konvoi Tuntut Naik Gaji, Jalan Raya Bogor Macet",large,Labour,Employment and Wages,2,-1


In [84]:
# 
# VALIDATION of GSR
# 
######################     
# are there any GSR with earliestReportedDate > timestamp
###################### 

cleanGsrDf = cleanAllGsrDf

print('------------------')
print("Find GSR with earliestReportedDate > timestamp...")

def badDates(earliestReportedDate, timestamp):
    if earliestReportedDate.date() > timestamp.date() :
        return True
    else:
        return False

sqlContext.registerFunction("badDates", badDates, BooleanType())
cleanGsrDf.registerTempTable('cleanGsrT')
invalidDf = sqlContext.sql("SELECT * FROM cleanGsrT WHERE badDates(earliestReportedDate, timestamp)")
num = invalidDf.count()


if num == 0:
    print("No GSR found with earliestReportedDate > timestamp...")
else:
    print(num, " GSR with earliestReportedDate > timestamp.......")
   

    displayDf = invalidDf.select('timestamp','key:eventDate', 'earliestReportedDate', 
                                'firstReportedLink', 'city', 'state', 'country', 'headline').orderBy('firstReportedLink')

    pandas.set_option('display.max_colwidth', 80)
    display.display(displayDf.toPandas())


------------------
Find GSR with earliestReportedDate > timestamp...
22  GSR with earliestReportedDate > timestamp.......


Unnamed: 0,timestamp,key:eventDate,earliestReportedDate,firstReportedLink,city,state,country,headline
0,2015-10-07 10:30:00,2015-10-08 10:30:00,2015-10-08 10:30:00,http://news.detik.com/jawatimur/3039214/ribuan-buruh-tuntut-kenaikan-upah-da...,Jombang,East Java,Indonesia,Ribuan Buruh Tuntut Kenaikan Upah dan Tolak PHK
1,2015-10-07 10:30:00,2015-10-09 10:30:00,2015-10-09 10:30:00,http://news.detik.com/jawatimur/3040391/ratusan-warga-tuntut-perbaikan-overp...,Jombang,East Java,Indonesia,Ratusan Warga Tuntut Perbaikan Overpass di Tol Kertosono-Mojokerto
2,2016-01-04 10:30:00,2016-01-05 10:30:00,2016-01-05 10:30:00,http://regional.kompas.com/read/2016/01/05/14244481/Cemari.Lingkungan.Pabrik...,Gowa,South Sulawesi,Indonesia,"Cemari Lingkungan, Pabrik Aspal Didemo Puluhan Petani"
3,2015-07-09 09:30:00,2015-08-09 09:30:00,2015-08-09 09:30:00,http://www.abc.net.au/news/2015-09-08/fish-protest/6758486,,QLD,Australia,Fish wholesalers take local fish off the menu in protest of net free zones
4,2015-09-18 00:00:00,2015-12-09 10:30:00,2015-12-09 10:30:00,http://www.abc.net.au/news/2015-09-13/save-the-repat-protest-at-parliament-e...,,SA,Australia,Vietnam veterans end Save the Repat protest on steps of South Australian Par...
5,2015-02-10 10:30:00,2015-09-30 09:30:00,2015-09-30 09:30:00,http://www.bangkokpost.com/news/general/713260/mild-penalty-over-fatal-bridg...,Ayutthaya,,Thailand,Mild' penalty over fatal bridge collapse draws protest
6,0215-10-02 23:44:20,2015-10-03 09:30:00,2015-10-03 09:30:00,http://www.fijitimes.com/story.aspx?id=324392,Suva,Central,Fiji,
7,2015-02-10 10:30:00,2015-09-28 09:30:00,2015-09-28 09:30:00,http://www.matichon.co.th/news_detail.php?newsid=1443441720&grpid=03&catid=19,Nakhon Nayok,,Thailand,ชาวบ้านกว่า200คน บุกศาลากลางนครนายก ประท้วงสร้างสะพานพังไม่เสร็จสักที
8,2015-02-10 10:30:00,2015-09-29 09:30:00,2015-09-26 09:30:00,http://www.matichon.co.th/news_detail.php?newsid=1443505174&grpid=03&catid=19,Prathum Thani,,Thailand,ชาวธัญบุรี ประท้วงรื้อสะพานลอยทำสะพานกลับรถ วอนทบทวนก่อน
9,2015-10-16 00:00:00,2015-11-10 10:30:00,2015-11-10 10:30:00,http://www.matichon.co.th/news_detail.php?newsid=1444549903&grpid=03&catid=19,Phichit,,Thailand,ชาวบ้านใน จ.พิจิตร กว่า300 รวมตัวขับไล่เจ้าอาวาสวัด หวิดปะทะกลุ่มลูกศิษย์


In [85]:
# 
# VALIDATION of GSR
# 
######################     
# are there any GSR with key:eventDate > timestamp
###################### 

cleanGsrDf = cleanAllGsrDf.withColumnRenamed('key:eventDate', 'eventDate')

print('------------------')
print("Find GSR with key:eventDate > timestamp...")

def badDates(eventDate, timestamp):
    if eventDate.date() > timestamp.date() :
        return True
    else:
        return False

sqlContext.registerFunction("badDates", badDates, BooleanType())
cleanGsrDf.registerTempTable('cleanGsrT')
invalidDf = sqlContext.sql("SELECT * FROM cleanGsrT WHERE badDates(eventDate, timestamp)")
num = invalidDf.count()


if num == 0:
    print("No GSR found with key:eventDate > timestamp...")
else:
    print(num, " GSR with key:eventDate > timestamp.......")
   

    displayDf = invalidDf.select('timestamp','eventDate', 'earliestReportedDate', 
                                'firstReportedLink', 'city', 'state', 'country', 'headline').orderBy('firstReportedLink')

    pandas.set_option('display.max_colwidth', 80)
    display.display(displayDf.toPandas())

------------------
Find GSR with key:eventDate > timestamp...
28  GSR with key:eventDate > timestamp.......


Unnamed: 0,timestamp,eventDate,earliestReportedDate,firstReportedLink,city,state,country,headline
0,2015-06-17 09:30:00,2015-07-14 09:30:00,2015-06-15 09:30:00,http://megapolitan.kompas.com/read/2015/06/15/16392731/Demo.Tolak.Sutiyoso.E...,Jakarta,Jakarta Raya,Indonesia,"Demo Tolak Sutiyoso, Enam Orang Diamankan Pamdal DPR"
1,2015-10-07 10:30:00,2015-10-08 10:30:00,2015-10-08 10:30:00,http://news.detik.com/jawatimur/3039214/ribuan-buruh-tuntut-kenaikan-upah-da...,Jombang,East Java,Indonesia,Ribuan Buruh Tuntut Kenaikan Upah dan Tolak PHK
2,2015-10-07 10:30:00,2015-10-09 10:30:00,2015-10-09 10:30:00,http://news.detik.com/jawatimur/3040391/ratusan-warga-tuntut-perbaikan-overp...,Jombang,East Java,Indonesia,Ratusan Warga Tuntut Perbaikan Overpass di Tol Kertosono-Mojokerto
3,2016-01-04 10:30:00,2016-01-05 10:30:00,2016-01-05 10:30:00,http://regional.kompas.com/read/2016/01/05/14244481/Cemari.Lingkungan.Pabrik...,Gowa,South Sulawesi,Indonesia,"Cemari Lingkungan, Pabrik Aspal Didemo Puluhan Petani"
4,2015-07-09 09:30:00,2015-08-09 09:30:00,2015-08-09 09:30:00,http://www.abc.net.au/news/2015-09-08/fish-protest/6758486,,QLD,Australia,Fish wholesalers take local fish off the menu in protest of net free zones
5,2015-09-18 00:00:00,2015-12-09 10:30:00,2015-12-09 10:30:00,http://www.abc.net.au/news/2015-09-13/save-the-repat-protest-at-parliament-e...,,SA,Australia,Vietnam veterans end Save the Repat protest on steps of South Australian Par...
6,2015-02-10 10:30:00,2015-09-30 09:30:00,2015-01-10 10:30:00,http://www.bangkokpost.com/business/telecom/713716/tot-staff-demand-900-mhz-...,Bangkok,,Thailand,TOT staff demand 900-MHz spectrum
7,2015-02-10 10:30:00,2015-09-30 09:30:00,2015-09-30 09:30:00,http://www.bangkokpost.com/news/general/713260/mild-penalty-over-fatal-bridg...,Ayutthaya,,Thailand,Mild' penalty over fatal bridge collapse draws protest
8,2015-06-18 09:30:00,2015-08-17 09:30:00,2015-06-18 09:30:00,http://www.emtv.com.pg/article.aspx?slug=Female-Student-Abducted-Raped-And-M...,,Central Province,Papua New Guinea,
9,2015-06-18 09:30:00,2015-08-17 09:30:00,2015-06-18 09:30:00,http://www.emtv.com.pg/article.aspx?slug=Female-Student-Abducted-Raped-And-M...,,Central Province,Papua New Guinea,


In [86]:
# 
# VALIDATION of GSR
# 
######################     
# are there any GSR with key:eventDate > earliestReportedDate
###################### 


cleanGsrDf = cleanAllGsrDf.withColumnRenamed('key:eventDate', 'eventDate')

print('------------------')
print("Find GSR with key:eventDate > earliestReportedDate...")

def badDates(eventDate, timestamp):
    if eventDate.date() > timestamp.date() :
        return True
    else:
        return False

sqlContext.registerFunction("badDates", badDates, BooleanType())
cleanGsrDf.registerTempTable('cleanGsrT')
invalidDf = sqlContext.sql("SELECT * FROM cleanGsrT WHERE badDates(eventDate, earliestReportedDate)")
num = invalidDf.count()


if num == 0:
    print("No GSR found with key:eventDate > earliestReportedDate...")
else:
    print(num, " GSR with key:eventDate > earliestReportedDate.......")
   

    displayDf = invalidDf.select('timestamp','eventDate', 'earliestReportedDate', 
                                'firstReportedLink', 'city', 'state', 'country', 'headline').orderBy('firstReportedLink')

    pandas.set_option('display.max_colwidth', 80)
    display.display(displayDf.toPandas())

------------------
Find GSR with key:eventDate > earliestReportedDate...
37  GSR with key:eventDate > earliestReportedDate.......


Unnamed: 0,timestamp,eventDate,earliestReportedDate,firstReportedLink,city,state,country,headline
0,2016-01-08 10:30:00,2016-01-07 10:30:00,2016-01-05 10:30:00,http://banjarmasin.tribunnews.com/2016/01/05/pedagang-serbu-balaikota-protes...,Banjarmasin,East Kalimantan,Indonesia,"Pedagang Serbu Balaikota, Protes Karena Kios Dibongkar Pengelola"
1,2015-07-14 09:30:00,2015-07-13 09:30:00,2015-07-11 09:30:00,http://english.astroawani.com/malaysia-news/three-injured-low-yat-plaza-riot...,Bukit Bintang,Kuala lumpur,Malaysia,19 arrested in Low Yat brawl
2,2015-12-13 10:30:00,2015-12-11 10:30:00,2015-12-07 10:30:00,http://fijisun.com.fj/2015/12/11/silent-march-says-it-all/,Labasa,Fiji,Fiji,Silent march says it all
3,2015-06-17 09:30:00,2015-07-14 09:30:00,2015-06-15 09:30:00,http://megapolitan.kompas.com/read/2015/06/15/16392731/Demo.Tolak.Sutiyoso.E...,Jakarta,Jakarta Raya,Indonesia,"Demo Tolak Sutiyoso, Enam Orang Diamankan Pamdal DPR"
4,2015-08-09 09:30:00,2015-08-08 09:30:00,2015-07-25 09:30:00,http://mynewshub.cc/2015/08/08/perhimpunan-808-di-sekitar-pasar-seni-berlang...,Kuala Lumpur,Kuala Lumpur,Malaysia,Perhimpunan 808 di Sekitar pasar seni berlangsung aman
5,2015-07-17 09:30:00,2015-07-16 09:30:00,2015-07-06 09:30:00,http://news.detik.com/berita/2970649/takbiran-keliling-di-jakarta-meriah-mul...,Jakarta,Jakarta Raya,Indonesia,"Takbiran Keliling di Jakarta Meriah, Mulai dari Kopaja Sampai Pemotor"
6,2015-09-23 09:30:00,2015-09-22 09:30:00,2015-09-21 09:30:00,http://news.detik.com/berita/3026241/warga-yogyakarta-ini-khusyuk-salat-idul...,Jogjakarta,Jogjakarta,Indonesia,Warga Yogyakarta Ini Khusyuk Salat Idul Adha di Atas Becak
7,2015-07-17 09:30:00,2015-07-14 09:30:00,2014-07-15 09:30:00,http://regional.kompas.com/read/2015/07/15/20000001/Warga.Batam.Minta.THR.ke...,Batam,Riau,Indonesia,Warga Batam Minta THR kepada Wali Kota
8,2015-11-03 10:30:00,2015-11-01 10:30:00,2015-10-31 10:30:00,http://regional.kompas.com/read/2015/10/26/12233211/Tolak.Go-Jek.Ratusan.Tuk...,Sukabumi,West Java,Indonesia,"Tolak Go-Jek, Ratusan Tukang Ojek Geruduk Kantor Wali Kota Bandung"
9,2015-11-17 10:30:00,2015-11-16 10:30:00,2015-11-06 10:30:00,http://regional.kompas.com/read/2015/11/16/21585041/.Long.March.Massa.Penola...,Semarang,Central Java,Indonesia,"""Long March"" Massa Penolak Pabrik Semen Tiba di Kabupaten Demak"


In [87]:
# take an element (event_key, iterable(row)) and merge all the rows into a 
# de-duplicated event. Each column will need to be considered separately.
# 
# The return value is a Row object
#
# Fields that are unique for this event
#     key:eventDate,                                
#     country,                                       
#     state,                                         
#     city,                                          
#     firstReportedLink,                             
#     eventType,                                     
#     populationGroup                                
#
# Fields where we collect all values from the underlying GSR records and return a tuple
#     tuple(all_key:id)
#     tuple(all_authorId)                         
#     tuple(all_timestamp)                                    
#     tuple(all_timestampRevision)                              
#     tuple(all_earliestReportedDate)                            
#     tuple(all_otherLinks)                         
#     tuple(all_comment)                           
#     tuple(all_widespreadEventId)                  
#
# Fields where we collect all values from the contributing GSR records, 
# determine an authoritative value and return the authoritative value and all values
#     newsSourceName, tuple(all_newsSourceName)      : take the longest string from all_newsSourceName
#     headline, tuple(all_headlines)                 : take the longest string from all_headlines
#     englishHeadline, tuple(all_englishHeadline)    : take the longest string from all_englishHeadlines
#     eventDescription, tuple(all_eventDescription)  : take the longest string from all_eventDescription
#     isViolent, tuple(all_isViolent)                : take the most common value
#     crowdSize, tuple(all_corwdSize)                : take the most common value
 
from collections import Counter

def makeGsrRecord(keyRowTuple):
    eventKey = keyRowTuple[0]
    rowIter = keyRowTuple[1]
    
    # get the event key fields from the first row
    # TBD - can we make the eventKey a dict?
    firstReportedLink = eventKey[0]
    keyEventDate = eventKey[1]
    city = eventKey[2]  
    state = eventKey[3] 
    country = eventKey[4]
    eventType = eventKey[5]     
    populationGroup = eventKey[6] 
    
    # lists to store elements from the multiple GSR records 
    all_keyId = []
    all_headline = []
    all_englishHeadline = []
    all_newsSourceName = []
    all_eventDescription = []
    all_authorId = []
    all_otherLinks = []
    all_comment = []
    all_isViolent = []
    all_earliestReportedDate = []
    all_timestamp = []
    all_crowdSize = []
    all_widespreadEventId = []
    all_timestampRevision = []
    
    for row in rowIter :
        print(row)
        all_keyId.append(row.asDict()['key:id'])
        all_headline.append(row.asDict()['headline'])
        all_englishHeadline.append(row.asDict()['englishHeadline'])
        all_newsSourceName.append(row.asDict()['newsSourceName'])
        all_eventDescription.append(row.asDict()['eventDescription'])
        all_authorId.append(row.asDict()['authorId'])
        all_otherLinks.append(row.asDict()['otherLinks'])
        all_comment.append(row.asDict()['comment'])
        all_isViolent.append(row.asDict()['isViolent'])
        all_earliestReportedDate.append(row.asDict()['earliestReportedDate'])
        all_timestamp.append(row.asDict()['timestamp'])
        all_crowdSize.append(row.asDict()['crowdSize'])
        all_widespreadEventId.append(row.asDict()['widespreadEventId'])
            
    # take the longest string for these elements 
    # TODO ########################
    # must deal with elements of NoneType
    best_news = []
    best_headline= []
    best_englishHeadline = []
    best_eventDescription = []
    
    # need to deal with None  values
    #best_news = sorted(all_newsSourceName, key=len, reverse=True)[0]
    #best_headline = sorted(all_headline, key=len, reverse=True)[0]
    #best_englishHeadline = sorted(all_englishHeadline, key=len, reverse=True)[0]
    #best_eventDescription = sorted(all_eventDescription, key=len, reverse=True)[0]
    
    # take the most common elements
    best_isViolent = Counter(all_isViolent).most_common(1).pop()[0]
    best_crowdSize = Counter(all_crowdSize).most_common(1).pop()[0]
    
    # create the de-duplicated Row
    
    GsrRecord = Row("keyEventDate", "keyId", "authorId", 
                    "timestamp", "timestampRevision", "widespreadEventId",
                    "eventType", "populationGroup", 
                    "country", "state", "city",
                    "earliestReportedDate", "crowdSize", "isViolent",
                    "newsSourceName", "headline", "englishHeadline",
                    "eventDescription", "firstReportedLink", "otherLinks", "comment")
    return GsrRecord(keyEventDate = keyEventDate,
               keyid = all_keyId,
               authorId = all_authorId,
               timestamp = all_timestamp,
               timestampRevision = all_timestampRevision,
               widespreadEventId = all_widespreadEventId,
               eventType = eventType,
               populationGroup = populationGroup,
               country = country,
               state = state,
               city = city,
               earliestReportedDate = all_earliestReportedDate,
               crowdSize = ([best_crowdSize, all_crowdSize]),
               isViolent = ([best_isViolent, all_isViolent]),
               newsSourceName =([best_news, all_newsSourceName]),
               headline = ([best_headline, all_headline]),
               englishHeadline = ([best_englishHeadline, all_englishHeadline]),
               eventDescription = ([best_eventDescription, all_eventDescription]),
               firstReportedLink = firstReportedLink,
               otherLinks = all_otherLinks,
               comment = all_comment)

In [88]:
# 
# DEDUPLICATION OF GSR 
# 
# Consider only the following columns when comparing rows
#     key:eventDate, 
#     country, state, city,
#     firstReportedLink,
#     eventType, populationGroup 
#
# Create a DF with deduplicated events. For columns that are not part of the event "key" add two columns each - one 
# containing the "best guess" for the value, and the other an iterable over all the GSR records. Best guess values are  
#     
#     authorId: use a value of zero to represent deduplication
#     timestampRevision: replace with time of deduplication ?
#     earliestReportedDate: take the earliest value of all duplicate records
#     newsSourceName: take the longest string
#     headline: take the longest string  
#     englishHeadline: take the longest string 
#     eventDescription: take the longest string
#     otherLinks: parse and combine all values, ensuring no duplicates
#     comment: combine all values 
#     isViolent: XOR of all values 
#     crowdSize: TBD
#     key:id:    TBD
#

cleanGsrDf = cleanAusGsrDf
print("Number of cleaned GSR records: ", cleanGsrDf.count())
groupedDf = cleanGsrDf.groupBy('key:eventDate', 'city', 'state', 'country', 
                               'firstReportedLink', 'eventType', 'populationGroup')
cleanGsrRdd = cleanGsrDf.rdd

# make a rdd of tuple (event_key, row)
# create a pair RDD where each element is a tuple of (event_key, row) 
# where event_key is a tuple (firstReportedLink, eventdate, city, state, country, eventType, populationGroup)


######## TBD - can we make the eventKey a dict?
print (cleanGsrRdd.first())

keyRowPairRdd = cleanGsrRdd.map(lambda row: ((row.asDict()['firstReportedLink'], 
                                              row.asDict()['key:eventDate'],
                                              row.asDict()['city'],
                                              row.asDict()['state'],
                                              row.asDict()['country'],
                                              row.asDict()['eventType'],
                                              row.asDict()['populationGroup']), row))

# create a pair RDD where each element is a tuple of (event_key, iterable(row))
# there will be one element per event_key, with the iterable contains all rows that match the event_key 
keyRowIterPairRdd = keyRowPairRdd.groupByKey()
print("Number of EVENTS = ", keyRowIterPairRdd.count())

# make a deduplicated GSR record for each event
dedupGsrRdd = keyRowIterPairRdd.map(lambda x: makeGsrRecord(x))
print("Number of dedup records = ", keyRowIterPairRdd.count())
print(dedupGsrRdd.first())

from pyspark.sql.types import *

fields = [StructField("key:eventDate", TimestampType(), True), 
          StructField("key:id", ArrayType(LongType()), True), 
          StructField("authorId", ArrayType(LongType()), True), 
          StructField("timestamp", ArrayType(TimestampType()), True), 
          StructField("timestampRevision", ArrayType(TimestampType()), True), 
          StructField("widespreadEventId", ArrayType(LongType()), True), 
          StructField("eventType", StringType(), True), 
          StructField("populationGroup", StringType(), True), 
          StructField("country", StringType(), True), 
          StructField("state", StringType(), True), 
          StructField("city", StringType(), True), 
          StructField("earliestReportedDate", ArrayType(TimestampType()), True),
          StructField("crowdSize", ArrayType(StringType()), True), 
          StructField("isViolent", ArrayType(BooleanType()), True), 
          StructField("newsSourceName", ArrayType(StringType()), True), 
          StructField("headline", ArrayType(StringType()), True), 
          StructField("englishHeadline", ArrayType(StringType()), True), 
          StructField("eventDescription", ArrayType(StringType()), True), 
          StructField("firstReportedLink", StringType(), True), 
          StructField("otherLinks", ArrayType(StringType()), True), 
          StructField("comment", ArrayType(StringType()), True)] 

schema = StructType(fields)

dedupGsrDf = sqlContext.createDataFrame(dedupGsrRdd, schema)


dedupGsrDf.printSchema()
print(dedupGsrDf.first())




Number of cleaned GSR records:  948
Row(key:eventDate=datetime.datetime(2015, 4, 7, 9, 30), key:id=806711755214, authorId=6, timestamp=datetime.datetime(2015, 6, 7, 9, 30), timestampRevision=datetime.datetime(2015, 6, 7, 9, 30), widespreadEventId=711755213, eventType='Other Government and Political Issues', populationGroup='General', country='Australia', state='NSW', city='Sydney', earliestReportedDate=datetime.datetime(2015, 6, 7, 9, 30), crowdSize='unknown', isViolent=False, newsSourceName='theage', headline='Motorcylists rally in Roma Street against anti-association laws', englishHeadline=None, eventDescription='Motorcyclists  protest against anti-association laws.', firstReportedLink='http://www.theage.com.au/queensland/motorcylists-rally-in-roma-street-against-antiassociation-laws-20150704-gi55o8.html', otherLinks=None, comment=None)
Number of EVENTS =  862
Number of dedup records =  862


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 736.0 failed 4 times, most recent failure: Lost task 0.3 in stage 736.0 (TID 16702, c6-ninetales.d2dcrc.net): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/local/hdfs-volume/tmp/hadoop/hadoop/nm-local-dir/usercache/hadoop/appcache/application_1456277346727_0056/container_1456277346727_0056_01_000003/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/local/hdfs-volume/tmp/hadoop/hadoop/nm-local-dir/usercache/hadoop/appcache/application_1456277346727_0056/container_1456277346727_0056_01_000003/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/local/hdfs-volume/tmp/hadoop/hadoop/nm-local-dir/usercache/hadoop/appcache/application_1456277346727_0056/container_1456277346727_0056_01_000003/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/lib/spark/default/python/pyspark/rdd.py", line 1295, in takeUpToNumLeft
  File "<ipython-input-88-d5aa482d4d0f>", line 55, in <lambda>
  File "<ipython-input-87-b6f1866d9dfb>", line 129, in makeGsrRecord
TypeError: __call__() got an unexpected keyword argument 'keyEventDate'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor59.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/local/hdfs-volume/tmp/hadoop/hadoop/nm-local-dir/usercache/hadoop/appcache/application_1456277346727_0056/container_1456277346727_0056_01_000003/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/local/hdfs-volume/tmp/hadoop/hadoop/nm-local-dir/usercache/hadoop/appcache/application_1456277346727_0056/container_1456277346727_0056_01_000003/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/local/hdfs-volume/tmp/hadoop/hadoop/nm-local-dir/usercache/hadoop/appcache/application_1456277346727_0056/container_1456277346727_0056_01_000003/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/lib/spark/default/python/pyspark/rdd.py", line 1295, in takeUpToNumLeft
  File "<ipython-input-88-d5aa482d4d0f>", line 55, in <lambda>
  File "<ipython-input-87-b6f1866d9dfb>", line 129, in makeGsrRecord
TypeError: __call__() got an unexpected keyword argument 'keyEventDate'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [None]:
# 
# DEDUPLICATION OF GSR 
# 
# Consider only the following columns when comparing rows
#     timestamp, 
#     country, state, city,
#     eventType, populationGroup, widespreadEvent
#     firstReportedLink
#


# TODO:   Duplicate events are those that match these fields. When creating the deduplicated event
# must consider what to do with other fields.......
#     
#     timestampRevision: replace with time of deduplication ?
#     earliestReportedDate: take the earliest value of all duplicate records
#     newsSourceName: take the longest string
#     headline: take the longest string  - Note that these will be the same or similar
#               (as should be taken from firstReportedLink - but could be data entry differences)
#     englishHeadline: take the longest string - Note that these will be the same or similar
#                     (as should be taken from firstReportedLink - but could be data entry / translation differences)
#     eventDescription: take the longest string
#     otherLinks: parse and combine all values, ensuring no duplicates
#     comment: combine all values 
#
# TBD - what to do with these??
#     key:id - choose one or take a new one???
#     authorId: combine all ???
#     crowdSize: Use it to define the event and review inconsistency between authors during validation 
#                OR just pick largest value (NOTE these are strings) ???
#     isViolent: ?? use it to define the event and review inconsistency between authors during validation 
#                OR take XOR of all values (i.e. if anyone coded as violent use violent)
#

tempDf = cleanAusGsrDf.withColumnRenamed('key:eventDate', 'eventDate')
cleanAusGsrDf = tempDf

sqlContext.registerDataFrameAsTable(cleanAusGsrDf, "cleanGsr_t")
sqlContext.tables().show()

#### NOTE we have not yet cleaned the city names....
print("Number of cleaned GSR records: ", cleanAusGsrDf.count())

print("de-duplicate events: Based on key:eventdate, city, state, country, firstReportedLink, eventType, populationGroup")
dedupGsrDf = sqlContext.sql("SELECT DISTINCT eventDate, city, state, country, "
                            "eventType, populationGroup, firstReportedLink " 
                            "FROM cleanGsr_t "
                            "ORDER BY eventDate ASC")
print("Number of de-duplicated events: ", dedupGsrDf.count())
print(dedupGsrDf.show(5))

print("de-duplicate events: Based on city, state, country, firstReportedLink, eventType, populationGroup")
notime_dedupGsrDf = sqlContext.sql("SELECT DISTINCT  city, state, country, "
                            "eventType, populationGroup, firstReportedLink " 
                            "FROM cleanGsr_t "
                            )
print("Number of de-duplicated events: ", notime_dedupGsrDf.count())
print(notime_dedupGsrDf.show(5))


In [None]:
# 
# GSR REPORTING
#
# Provide dashboard to view the following
#      Number of GSR events per country/state/city per month
#      Types of GSR events (eventType, populationGroup, isVisolent, widespread) per country/state/city per month
#      GSR data quality
#          data cleansing issues per field
#          number of duplicate entries per country per month
#          GSR manual validation requests per country per month
#


# how many records in a location on a given day
# are there any duplicate entries ??
