In [2]:
sc

In [3]:
import fiona
import fiona.crs
import shapely
import rtree

import pandas as pd
import geopandas as gpd
from functools import partial

In [4]:
TWEETS_FN = 'tweets-100m.txt'
DRUGS_FN = 'drug_*.txt'
CITIES_FN = '500cities_tracts.geojson'

In [5]:
tweets = sc.textFile(TWEETS_FN, use_unicode=True).cache()
tweets.take(5)

['450845003775234048|35.82840584|-78.58814956|MarkaelDupree|Tue Apr 01 03:59:59 +0000 2014|@RatedRnasty3_ haha hopefully !|haha hopefully',
 '450845003854536704|32.80947163|-116.91216669|darthraedar|Tue Apr 01 03:59:59 +0000 2014|@kenraldchua SAME|same',
 '450845003896479744|33.01608281|-97.30442766|hungrypoop|Tue Apr 01 03:59:59 +0000 2014|i wish i could play the moog synthesizer|could moog play synthesizer the wish',
 "450845003904868354|41.61400034|-87.91314315|Socalfan21|Tue Apr 01 03:59:59 +0000 2014|@ChiBulls_Chris we can rewrite a script and reenact the show if you'd like.|and can like reenact rewrite script show the you",
 '450845003925843968|30.1110886|-95.12464941|NikeZROking|Tue Apr 01 03:59:59 +0000 2014|@KKalbitz haha. Your dms are going to shut down twitter after tonight|after are dms down going haha shut tonight twitter your']

In [5]:
tweets.count()

26673

In [6]:
keywords = set(sc.textFile(DRUGS_FN, use_unicode=True).cache().collect())
bTerms = sc.broadcast(keywords)

In [7]:
def countWords(_, rows):
    for row in rows:
        words = row.split()
        yield len(words), 1

words = sc.textFile(DRUGS_FN, use_unicode=True).cache()
words.mapPartitionsWithIndex(countWords).reduceByKey(lambda x, y: x+y).collect()

[(6, 6), (3, 175), (1, 2611), (4, 37), (7, 1), (2, 763), (5, 16), (8, 1)]

In [7]:
def createList(_, rows):
    import re
    
    separator = re.compile('\W+')
    
    # 8 because we have 1 8-gram
    n = 8
    
    for row in rows:
        phrases = []
        fields = row.split('|')
        
        # still split per word
        words = separator.split(fields[5].lower())
        length = len(words)

        # stream the array of words
        for i in range(length):
            # create the n-grams from the first word
            for j in range(1, n+1):
                # stop if the remaining words < 8
                if i+j > length:
                    break
                # join words by spaces
                phrases.append(' '.join(words[i:i+j]))
        
        # return an array of phrases
        yield ((float(fields[2]), float(fields[1])), phrases)

phrases = tweets.mapPartitionsWithIndex(createList)
phrases.take(5)

[((-78.58814956, 35.82840584),
  ['',
   ' ratedrnasty3_',
   ' ratedrnasty3_ haha',
   ' ratedrnasty3_ haha hopefully',
   ' ratedrnasty3_ haha hopefully ',
   'ratedrnasty3_',
   'ratedrnasty3_ haha',
   'ratedrnasty3_ haha hopefully',
   'ratedrnasty3_ haha hopefully ',
   'haha',
   'haha hopefully',
   'haha hopefully ',
   'hopefully',
   'hopefully ',
   '']),
 ((-116.91216669, 32.80947163),
  ['',
   ' kenraldchua',
   ' kenraldchua same',
   'kenraldchua',
   'kenraldchua same',
   'same']),
 ((-97.30442766, 33.01608281),
  ['i',
   'i wish',
   'i wish i',
   'i wish i could',
   'i wish i could play',
   'i wish i could play the',
   'i wish i could play the moog',
   'i wish i could play the moog synthesizer',
   'wish',
   'wish i',
   'wish i could',
   'wish i could play',
   'wish i could play the',
   'wish i could play the moog',
   'wish i could play the moog synthesizer',
   'i',
   'i could',
   'i could play',
   'i could play the',
   'i could play the moog',
   

In [8]:
def filterTweets(bTerms, _, rows):
    terms = bTerms.value
    
    for row in rows:
        coor, phrase_list = row
        if len(terms.intersection(phrase_list)) > 0:
            yield coor, terms.intersection(phrase_list)

drugTweets = phrases.mapPartitionsWithIndex(partial(filterTweets, bTerms))
drugTweets.take(5)

[((-117.88537093, 33.59660902), {'boat'}),
 ((-100.29277778, 25.63861111), {'c', 'h'}),
 ((-74.02517944, 40.21185001), {'weed'}),
 ((-78.80715424, 43.00647335), {'sherm'}),
 ((-80.1286475, 25.96150019), {'friend'})]

In [9]:
drugTweets.count()

1164

In [11]:
def filterOld(bTerms, _, rows):
    import re
    
    separator = re.compile('\W+')
    terms = bTerms.value
    for row in rows:
        fields = row.split('|')
        words = separator.split(fields[6].lower())
        if len(terms.intersection(words)) > 0:
            yield ((float(fields[2]), float(fields[1])), words)

drugSingle = tweets.mapPartitionsWithIndex(partial(filterOld, bTerms))
drugSingle.count()

998

In [28]:
zones = gpd.read_file(CITIES_FN).to_crs(fiona.crs.from_epsg(2263))
# index = rtree.Rtree()
# for idx,geometry in enumerate(zones.geometry):
#     index.insert(idx, geometry.bounds)

In [7]:
index.bounds

[-24802085.76790066, -5246167.90464653, 2020022.4877256171, 13753978.829248838]

In [9]:
zones.geometry[1].is_valid

True

In [11]:
def createIndex(shapefile):
    import rtree
    import fiona.crs
    import geopandas as gpd
    zones = gpd.read_file(shapefile).to_crs(fiona.crs.from_epsg(2263))
    index = rtree.Rtree()
    for idx,geometry in enumerate(zones.geometry):
        index.insert(idx, geometry.bounds)
    return (index, zones)

def findZone(p, index, zones):
    match = index.intersection((p.x, p.y, p.x, p.y))
    for idx in match:
        if zones.geometry[idx].contains(p):
            return (zones.plctract10[idx], zones.plctrpop10[idx])
    return None

def processTweets(shapefile, pid, records):
    import pyproj
    import shapely.geometry as geom
    
    proj = pyproj.Proj(init="epsg:2263", preserve_units=True)    
    index, zones = createIndex(shapefile.value)
    counts = {}
#     population = {}
#     norm_counts = {}
    
    for row in records:
        lnglat, tweet = row
        p = geom.Point(proj(lnglat[0], lnglat[1]))
        zone = findZone(p, index, zones)
        if zone:
#             yield zone, pop
            counts[zone[0]] = counts.get(zone[0], 0) + 1
#             population[zone[0]] = zone[1]
        
#     for key, value in counts.items():
#         norm_counts[key] = round(value / population[key], 6)
    
#     return norm_counts.items()
    return counts.items()

shapefile = sc.broadcast(CITIES_FN)
locationTweets = drugTweets.mapPartitionsWithIndex(partial(processTweets, shapefile)) \
                    .reduceByKey(lambda x,y: x+y)\
                    .sortByKey()
locationTweets.count()

314

In [13]:
oldLocTweets = drugSingle.mapPartitionsWithIndex(partial(processTweets, shapefile)) \
                    .sortByKey()
oldLocTweets.count()

262

In [11]:
temp = sc.parallelize([(u'0107000-01073000100', 1),(u'0107000-01073000100', 1),(u'0107000-01073000100', 1),(u'0107000-01073000100', 1),(u'0107000-01073000100', 3),(u'0107000-01073000100', 1),(u'0107000-01073000100', 1),(u'0107000-01073000100', 1),(u'0107000-01073000100', 1),(u'0107000-01073000100', 1),(u'0107000-01073000100', 1),(u'0107000-01073000100', 1),(u'0107000-01073000100', 1),(u'0107000-01073000100', 1),(u'0107000-01073000100', 1),(u'0107000-01073000100', 1),(u'0107000-01073000300', 1),(u'0107000-01073000300', 2),(u'0107000-01073000300', 1),(u'0107000-01073000300', 1),(u'0107000-01073000300', 3),(u'0107000-01073000300', 1),(u'0107000-01073000300', 1),(u'0107000-01073000300', 1),(u'0107000-01073000300', 7),(u'0107000-01073000300', 1),(u'0107000-01073000300', 1),(u'0107000-01073000300', 1),(u'0107000-01073000300', 1),(u'0107000-01073000300', 1),(u'0107000-01073000300', 2),(u'0107000-01073000300', 4),(u'0107000-01073000300', 1),(u'0107000-01073000300', 1),(u'0107000-01073000300', 1),(u'0107000-01073000400', 1),(u'0107000-01073000400', 4),(u'0107000-01073000400', 2),(u'0107000-01073000400', 1),(u'0107000-01073000400', 1),(u'0107000-01073000400', 1),(u'0107000-01073000400', 1),(u'0107000-01073000400', 4),(u'0107000-01073000400', 1),(u'0107000-01073000400', 1),(u'0107000-01073000400', 3),(u'0107000-01073000400', 1),(u'0107000-01073000400', 1),(u'0107000-01073000400', 3),(u'0107000-01073000400', 1),(u'0107000-01073000400', 1),(u'0107000-01073000400', 1),(u'0107000-01073000400', 1),(u'0107000-01073000400', 2),(u'0107000-01073000400', 1),(u'0107000-01073000400', 1),(u'0107000-01073000400', 1),(u'0107000-01073000400', 1),(u'0107000-01073000400', 1),(u'0107000-01073000400', 1),(u'0107000-01073000400', 1),(u'0107000-01073000400', 1),(u'0107000-01073000400', 1),(u'0107000-01073000400', 1),(u'0107000-01073000400', 3),(u'0107000-01073000400', 1),(u'0107000-01073000400', 1),(u'0107000-01073000400', 4),(u'0107000-01073000400', 1),(u'0107000-01073000400', 4),(u'0107000-01073000400', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 2),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000500', 1),(u'0107000-01073000700', 1),(u'0107000-01073000700', 1),(u'0107000-01073000700', 1),(u'0107000-01073000700', 2),(u'0107000-01073000700', 1),(u'0107000-01073000700', 1),(u'0107000-01073000700', 2),(u'0107000-01073000700', 1),(u'0107000-01073000700', 1),(u'0107000-01073000700', 1),(u'0107000-01073000700', 1),(u'0107000-01073000700', 2),(u'0107000-01073000700', 1),(u'0107000-01073000700', 4),(u'0107000-01073000700', 1),(u'0107000-01073000700', 1),(u'0107000-01073000700', 2),(u'0107000-01073000700', 1),(u'0107000-01073000700', 1),(u'0107000-01073000700', 1),(u'0107000-01073000700', 1),(u'0107000-01073000700', 2),(u'0107000-01073000700', 2),(u'0107000-01073000700', 1),(u'0107000-01073000700', 1),(u'0107000-01073000700', 1),(u'0107000-01073000700', 1),(u'0107000-01073000700', 2),(u'0107000-01073000700', 2),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 2),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 2),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 2),(u'0107000-01073000800', 1),(u'0107000-01073000800', 4),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 2),(u'0107000-01073000800', 1),(u'0107000-01073000800', 2),(u'0107000-01073000800', 2),(u'0107000-01073000800', 4),(u'0107000-01073000800', 1),(u'0107000-01073000800', 4),(u'0107000-01073000800', 2),(u'0107000-01073000800', 3),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 2),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073000800', 2),(u'0107000-01073000800', 1),(u'0107000-01073000800', 5),(u'0107000-01073000800', 1),(u'0107000-01073000800', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 2),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 7),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 2),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 2),(u'0107000-01073001100', 3),(u'0107000-01073001100', 2),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 2),(u'0107000-01073001100', 2),(u'0107000-01073001100', 3),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 2),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 2),(u'0107000-01073001100', 3),(u'0107000-01073001100', 2),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 2),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 4),(u'0107000-01073001100', 2),(u'0107000-01073001100', 2),(u'0107000-01073001100', 1),(u'0107000-01073001100', 2),(u'0107000-01073001100', 5),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 2),(u'0107000-01073001100', 2),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001100', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 3),(u'0107000-01073001200', 2),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 5),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 2),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 2),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 5),(u'0107000-01073001200', 3),(u'0107000-01073001200', 3),(u'0107000-01073001200', 2),(u'0107000-01073001200', 2),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 7),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 2),(u'0107000-01073001200', 2),(u'0107000-01073001200', 1),(u'0107000-01073001200', 2),(u'0107000-01073001200', 2),(u'0107000-01073001200', 1),(u'0107000-01073001200', 2),(u'0107000-01073001200', 5),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001200', 2),(u'0107000-01073001200', 1),(u'0107000-01073001200', 1),(u'0107000-01073001400', 1),(u'0107000-01073001400', 1),(u'0107000-01073001400', 1),(u'0107000-01073001400', 1),(u'0107000-01073001400', 1),(u'0107000-01073001400', 1),(u'0107000-01073001400', 1),(u'0107000-01073001400', 1),(u'0107000-01073001400', 1),(u'0107000-01073001500', 1),(u'0107000-01073001500', 1),(u'0107000-01073001500', 1),(u'0107000-01073001500', 1),(u'0107000-01073001500', 1),(u'0107000-01073001500', 1),(u'0107000-01073001500', 1),(u'0107000-01073001500', 1),(u'0107000-01073001500', 2),(u'0107000-01073001500', 1),(u'0107000-01073001500', 1),(u'0107000-01073001500', 1),(u'0107000-01073001500', 1),(u'0107000-01073001500', 1),(u'0107000-01073001600', 1),(u'0107000-01073001600', 1),(u'0107000-01073001600', 1),(u'0107000-01073001600', 4),(u'0107000-01073001600', 1),(u'0107000-01073001600', 1),(u'0107000-01073001600', 1),(u'0107000-01073001600', 4),(u'0107000-01073001600', 2),(u'0107000-01073001600', 1),(u'0107000-01073001600', 3),(u'0107000-01073001600', 1),(u'0107000-01073001600', 1),(u'0107000-01073001600', 1),(u'0107000-01073001600', 2),(u'0107000-01073001902', 2),(u'0107000-01073001902', 4),(u'0107000-01073001902', 1),(u'0107000-01073001902', 3),(u'0107000-01073001902', 1),(u'0107000-01073001902', 2),(u'0107000-01073001902', 1),(u'0107000-01073001902', 1),(u'0107000-01073001902', 2),(u'0107000-01073001902', 1),(u'0107000-01073001902', 1),(u'0107000-01073001902', 3),(u'0107000-01073001902', 3),(u'0107000-01073001902', 2),(u'0107000-01073001902', 2),(u'0107000-01073001902', 1),(u'0107000-01073001902', 2),(u'0107000-01073001902', 3),(u'0107000-01073001902', 1),(u'0107000-01073001902', 1),(u'0107000-01073001902', 1),(u'0107000-01073001902', 1),(u'0107000-01073001902', 1),(u'0107000-01073001902', 1),(u'0107000-01073001902', 1),(u'0107000-01073001902', 1),(u'0107000-01073001902', 2),(u'0107000-01073001902', 1),(u'0107000-01073001902', 1),(u'0107000-01073001902', 1),(u'0107000-01073001902', 1),(u'0107000-01073001902', 3),(u'0107000-01073001902', 1),(u'0107000-01073001902', 2),(u'0107000-01073001902', 4),(u'0107000-01073001902', 1),(u'0107000-01073001902', 2),(u'0107000-01073001902', 1),(u'0107000-01073001902', 1),(u'0107000-01073001902', 2),(u'0107000-01073001902', 1),(u'0107000-01073001902', 1),(u'0107000-01073001902', 1),(u'0107000-01073001902', 6),(u'0107000-01073001902', 1),(u'0107000-01073001902', 2),(u'0107000-01073001902', 4),(u'0107000-01073001902', 2),(u'0107000-01073001902', 1),(u'0107000-01073002000', 1),(u'0107000-01073002000', 1),(u'0107000-01073002000', 1),(u'0107000-01073002000', 1),(u'0107000-01073002000', 1),(u'0107000-01073002000', 1),(u'0107000-01073002000', 4),(u'0107000-01073002000', 1),(u'0107000-01073002000', 1),(u'0107000-01073002000', 2),(u'0107000-01073002000', 1),(u'0107000-01073002000', 1),(u'0107000-01073002000', 3),(u'0107000-01073002000', 1),(u'0107000-01073002000', 1),(u'0107000-01073002000', 1),(u'0107000-01073002000', 1),(u'0107000-01073002000', 1),(u'0107000-01073002000', 1),(u'0107000-01073002000', 1),(u'0107000-01073002000', 1),(u'0107000-01073002000', 1),(u'0107000-01073002000', 1),(u'0107000-01073002000', 1),(u'0107000-01073002000', 1),(u'0107000-01073002100', 1),(u'0107000-01073002100', 1),(u'0107000-01073002100', 1),(u'0107000-01073002100', 1),(u'0107000-01073002100', 1),(u'0107000-01073002100', 1),(u'0107000-01073002100', 1),(u'0107000-01073002100', 2),(u'0107000-01073002100', 1),(u'0107000-01073002100', 1),(u'0107000-01073002100', 1),(u'0107000-01073002100', 1),(u'0107000-01073002100', 1),(u'0107000-01073002100', 1),(u'0107000-01073002100', 1),(u'0107000-01073002100', 1),(u'0107000-01073002100', 1),(u'0107000-01073002100', 1),(u'0107000-01073002100', 1),(u'0107000-01073002100', 1)])

In [15]:
temp.reduceByKey(lambda x, y: x+y).collect()

[('0107000-01073000300', 32),
 ('0107000-01073001400', 9),
 ('0107000-01073001500', 15),
 ('0107000-01073001902', 85),
 ('0107000-01073002000', 31),
 ('0107000-01073000400', 56),
 ('0107000-01073000700', 40),
 ('0107000-01073000800', 76),
 ('0107000-01073002100', 21),
 ('0107000-01073000100', 18),
 ('0107000-01073001100', 93),
 ('0107000-01073001600', 25),
 ('0107000-01073000500', 25),
 ('0107000-01073001200', 88)]

In [20]:
zones[zones['plctract10'] == '0137000-01089011100']

Unnamed: 0,plctract10,plctrpop10,geometry
4946,0137000-01089011100,4,(POLYGON ((-2809821.347481699 -1743808.1056464...


In [18]:
18/3042

0.005917159763313609

In [29]:
zones.columns

Index(['plctract10', 'plctrpop10', 'geometry'], dtype='object')

In [93]:
tracts = sqlContext.read.load(CITIES_FN, format="json")
tracts_df = tracts.select(explode(tracts.features))
tracts_df = tracts_df.withColumn('plctract10', tracts_df['col.properties.plctract10'])\
                .withColumn('plctrpop10', tracts_df['col.properties.plctrpop10'])\
                .select(['plctract10', 'plctrpop10'])
tracts_df = tracts_df.filter(tracts_df.plctrpop10 > 0)

In [94]:
tracts_df.count()

20465

In [95]:
21346-20465

881

In [66]:
from pyspark.sql.functions import explode, split
features = temp_df.select(explode(temp_df.features))
features.printSchema()

root
 |-- col: struct (nullable = true)
 |    |-- geometry: struct (nullable = true)
 |    |    |-- coordinates: array (nullable = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |    |    |-- element: double (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |-- properties: struct (nullable = true)
 |    |    |-- plctract10: string (nullable = true)
 |    |    |-- plctrpop10: long (nullable = true)
 |    |-- type: string (nullable = true)



In [79]:
props = features.withColumn('plctract10', features['col.properties.plctract10'])\
        .withColumn('plctrpop10', features['col.properties.plctrpop10'])\
        .select(['plctract10', 'plctrpop10'])

+-------------------+----------+
|         plctract10|plctrpop10|
+-------------------+----------+
|0107000-01073005910|      4612|
|0107000-01073010801|       168|
|0107000-01073012701|        44|
|0107000-01073012703|       498|
|0107000-01073012704|       113|
|0135896-01073012704|        27|
|0135896-01073012907|      2696|
|0135896-01073014409|      3266|
|0135896-01117030217|      4878|
|0135896-01117030303|      2728|
|0135896-01117030304|      3288|
|0137000-01089010612|      2654|
|0137000-01089010622|      8725|
|0137000-01089010800|       860|
|0137000-01089010901|     11386|
|0150000-01097001502|      1608|
|0473000-04013421201|      1342|
|0477000-04019001100|      2900|
|0137000-01089010902|        49|
|0477000-04019003600|      4552|
+-------------------+----------+
only showing top 20 rows



In [82]:
props.filter(props.plctrpop10 >= 0).count()

881

In [80]:
props_rdd = props.rdd.map(tuple)

In [81]:
props_rdd.take(3)

[('0107000-01073005910', 4612),
 ('0107000-01073010801', 168),
 ('0107000-01073012701', 44)]

In [48]:
df = zones[['plctract10', 'plctrpop10']]
spark_df = spark.createDataFrame(df).cache()

AttributeError: 'SparkContext' object has no attribute 'createDataFrame'

In [42]:
spark_df = spark_df.filter(spark_df.plctrpop10 > 0)

In [43]:
test = spark_df.rdd.map(tuple)
test.take(10)

[('0107000-01073005910', 4612),
 ('0107000-01073010801', 168),
 ('0107000-01073012701', 44),
 ('0107000-01073012703', 498),
 ('0107000-01073012704', 113),
 ('0135896-01073012704', 27),
 ('0135896-01073012907', 2696),
 ('0135896-01073014409', 3266),
 ('0135896-01117030217', 4878),
 ('0135896-01117030303', 2728)]

In [44]:
trial = locationTweets.join(test)

In [45]:
trial.take(10)

[('0150000-01097002900', (1, 3667)),
 ('0412000-04013810500', (1, 5168)),
 ('0412000-04013812100', (1, 6092)),
 ('0427400-04013422606', (2, 9135)),
 ('0427820-04013092716', (2, 3908)),
 ('0455000-04013618500', (1, 3647)),
 ('0477000-04019001100', (1, 2900)),
 ('0606000-06001422600', (1, 1215)),
 ('0613392-06073013313', (1, 9464)),
 ('0616350-06065040821', (1, 5691))]

In [97]:
trial.mapValues(lambda x: round(x[0]/x[1], 2)).sortByKey().collect()

[('0137000-01089000202', 0.0),
 ('0137000-01089001500', 0.0),
 ('0150000-01097002900', 0.0),
 ('0150000-01097003605', 0.0),
 ('0151000-01101002400', 0.0),
 ('0151000-01101002800', 0.0),
 ('0177256-01125012405', 0.0),
 ('0404720-04013082027', 0.0),
 ('0412000-04013810500', 0.0),
 ('0412000-04013812100', 0.0),
 ('0427400-04013422606', 0.0),
 ('0427820-04013092716', 0.0),
 ('0427820-04013104214', 0.0),
 ('0446000-04013420303', 0.0),
 ('0446000-04013422104', 0.0),
 ('0454050-04013610900', 0.0),
 ('0455000-04013103611', 0.0),
 ('0455000-04013114100', 0.0),
 ('0455000-04013116100', 0.0),
 ('0455000-04013116604', 0.0),
 ('0455000-04013618500', 0.0),
 ('0473000-04013318800', 0.0),
 ('0473000-04013319000', 0.0),
 ('0473000-04013319104', 0.0),
 ('0473000-04013320100', 0.0),
 ('0477000-04019000700', 0.0),
 ('0477000-04019001100', 0.0),
 ('0477000-04019004029', 0.0),
 ('0485540-04027000301', 0.0),
 ('0541000-05119001900', 0.0),
 ('0602252-06013307205', 0.0),
 ('0603526-06029002400', 0.0),
 ('06060

In [31]:
spark_df.show()

+-------------------+----------+
|         plctract10|plctrpop10|
+-------------------+----------+
|0107000-01073005910|      4612|
|0107000-01073010801|       168|
|0107000-01073012701|        44|
|0107000-01073012703|       498|
|0107000-01073012704|       113|
|0135896-01073012704|        27|
|0135896-01073012907|      2696|
|0135896-01073014409|      3266|
|0135896-01117030217|      4878|
|0135896-01117030303|      2728|
|0135896-01117030304|      3288|
|0137000-01089010612|      2654|
|0137000-01089010622|      8725|
|0137000-01089010800|       860|
|0137000-01089010901|     11386|
|0150000-01097001502|      1608|
|0473000-04013421201|      1342|
|0477000-04019001100|      2900|
|0137000-01089010902|        49|
|0477000-04019003600|      4552|
+-------------------+----------+
only showing top 20 rows

