## Put the tweet data into PostGIS — it'll be easier to deal with

In [92]:
import pymongo
import psycopg2
import time
import json
import zipfile
import string

In [2]:
# Run 'mongod' in shell to start the database server

local = pymongo.MongoClient()
print local.database_names()

[u'local', u'san_francisco_db', u'topic_analysis']


In [3]:
conn_string = "host='datagarden.local' dbname='smmaurer' user='smmaurer' password=''"
conn = psycopg2.connect(conn_string)
cur = conn.cursor()
print conn.encoding

### 1. First load the California tweets from the class exercise

In [4]:
print local.san_francisco_db.collection_names()
print local.san_francisco_db.tweets.count()

[u'tweets', u'system.indexes']
1000000


In [5]:
m = local.san_francisco_db.tweets
print m.find_one()

{u'user_id': 224874450, u'text': u'@Tanner_Cortez hey checkout the website: http://t.co/LuktJ6hiws', u'cluster': -1.0, u'location': {u'type': u'Point', u'coordinates': [-121.88355687, 37.44609999]}, u'timeStamp': u'Wed Sep 11 04:38:08 +0000 2013', u'lat': 37.44609999, u'lng': -121.88355687, u'_id': ObjectId('5315595baa5e8a07ff04a78c'), u'id': 377652254096228352L}


In [34]:
try:
    cur.execute('DROP TABLE IF EXISTS tweets.ca_263;')
    cur.execute('''    
    CREATE TABLE tweets.ca_263 (
        id BIGINT NOT NULL,
        user_id BIGINT,
        text CHARACTER VARYING,
        coords public.geometry(Point, 4326),
        ts CHARACTER VARYING,
        CONSTRAINT ca_263_id_pkey PRIMARY KEY (id));
    ''')
    cur.execute('GRANT SELECT ON TABLE tweets.ca_263 TO smmaurer;')
    conn.commit()    
except:
    conn.commit()

In [35]:
t0 = time.time()
try:
    for t in m.find(): # can test with arg limit=N
        cur.execute('''
        INSERT INTO tweets.ca_263
        VALUES (%s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s)
        ''', (
            t['id'],
            t['user_id'],
            t['text'],
            t['location']['coordinates'][0],
            t['location']['coordinates'][1],
            t['timeStamp']))
        
except Exception, e:
    print str(e)
    print t
    
conn.commit()
print int(time.time()-t0), 'sec.'

351 sec.


In [36]:
cur.execute('''
CREATE INDEX ca_263_user_id_index ON tweets.ca_263 USING btree (user_id);
''')
cur.execute('''
CREATE INDEX ca_263_coords_index ON tweets.ca_263 USING GIST (coords);
''')
conn.commit()

### 2. Now repeat with the partial SF extract from what Alexey was collecting

In [38]:
print local.topic_analysis.collection_names()
print local.topic_analysis.tweets_02.count()

[u'tweets_02', u'system.indexes']
1500000


In [39]:
m = local.topic_analysis.tweets_02
print m.find_one()

{u'text': u'finally got to see the new bridge lol \U0001f309', u'_id': ObjectId('528811ce992f710c0a61708a'), u'id': 401873845293817856L, u'coordinates': {u'type': u'Point', u'coordinates': [-122.35409981, 37.81748883]}, u'user': {u'id': 258187942}}


In [80]:
try:
    cur.execute('DROP TABLE IF EXISTS tweets.sf_alexey;')
    cur.execute('''    
    CREATE TABLE tweets.sf_alexey (
        id BIGINT NOT NULL,
        user_id BIGINT,
        text CHARACTER VARYING,
        coords public.geometry(Point, 4326),
        ts CHARACTER VARYING,
        CONSTRAINT sf_alexey_id_pkey PRIMARY KEY (id));
    ''')
    cur.execute('GRANT SELECT ON TABLE tweets.sf_alexey TO smmaurer;')
    
except Exception, e:
    print str(e)

conn.commit()

In [84]:
# There are some duplicate tweet id's (N=163), and we have to check for them explicitly

t0 = time.time()
try:
    for t in m.find():
        cur.execute('''
        SELECT id FROM tweets.sf_alexey WHERE id = %s;
        ''', (t['id'],))
        if len(cur.fetchall()) == 0:
            cur.execute('''
            INSERT INTO tweets.sf_alexey
            VALUES (%s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s)
            ''', (
                t['id'],
                t['user']['id'],
                t['text'],
                t['coordinates']['coordinates'][0],
                t['coordinates']['coordinates'][1],
                ""))
            conn.commit()
        
except Exception, e:
    print str(e)
    print t

conn.commit()
print int(time.time()-t0), 'sec.'

2003 sec.


In [85]:
cur.execute('''
CREATE INDEX sf_alexey_user_id_index ON tweets.sf_alexey USING btree (user_id);
''')
cur.execute('''
CREATE INDEX sf_alexey_coords_index ON tweets.sf_alexey USING GIST (coords);
''')
conn.commit()

### 3. And with the database of retrospective Bay Area posts

In [89]:
# Print a sample tweet

d = '/Volumes/smmaurer/Data/Twitter/Bay-area-tweet-retrospective/'
with zipfile.ZipFile(d + 'REST-1.json.zip') as z:
    with z.open('REST-1.json') as f:
        tweet = next(t for t in f)
    
print tweet

{"created_at":"Thu Nov 07 17:45:34 +0000 2013","id":398506527230672896,"id_str":"398506527230672896","text":"@AnissaJaimes That's why you get direct deposit haha","source":"\u003ca href=\"http:\/\/twitter.com\/download\/iphone\" rel=\"nofollow\"\u003eTwitter for iPhone\u003c\/a\u003e","truncated":false,"in_reply_to_status_id":398503887759355905,"in_reply_to_status_id_str":"398503887759355905","in_reply_to_user_id":51533359,"in_reply_to_user_id_str":"51533359","in_reply_to_screen_name":"AnissaJaimes","user":{"id":334896071,"id_str":"334896071"},"geo":{"type":"Point","coordinates":[37.63612964,-122.46396577]},"coordinates":{"type":"Point","coordinates":[-122.46396577,37.63612964]},"place":{"id":"5358b6f78dd95ef6","url":"https:\/\/api.twitter.com\/1.1\/geo\/id\/5358b6f78dd95ef6.json","place_type":"city","name":"San Bruno","full_name":"San Bruno, CA","country_code":"US","country":"United States","contained_within":[],"bounding_box":{"type":"Polygon","coordinates":[[[-122.472817,37.604053],

In [100]:
try:
    cur.execute('DROP TABLE IF EXISTS tweets.ba_retro;')
    cur.execute('''    
    CREATE TABLE tweets.ba_retro (
        id BIGINT NOT NULL,
        user_id BIGINT,
        text CHARACTER VARYING,
        coords public.geometry(Point, 4326),
        ts CHARACTER VARYING,
        lang CHARACTER VARYING,
        source CHARACTER VARYING,
        source_full CHARACTER VARYING,
        CONSTRAINT ba_retro_id_pkey PRIMARY KEY (id));
    ''')
    cur.execute('GRANT SELECT ON TABLE tweets.ba_retro TO smmaurer;')
    conn.commit()    
except:
    conn.commit()

In [99]:
import string

def get_source(source_str):
    # Extract name of Twitter client from 'source' string
    if source_str == 'web':
        return source_str
    try:
        return string.split(string.split(source_str,'>')[1],'<')[0]
    except:
        return ''

In [101]:
t0 = time.time()
d = '/Volumes/smmaurer/Data/Twitter/Bay-area-tweet-retrospective/'
fnames = ['REST-1.json','REST-2.json','REST-3.json','REST-4.json']
errcount = 0

try:
    for name in fnames:
        with zipfile.ZipFile(d + name + '.zip') as z:
            with z.open(name) as f:
                for line in f:
                    try: 
                        # some lines are not full tweets, or missing coords
                        t = json.loads(line)
                        lnglat = t['coordinates']['coordinates'] # [lng,lat]
                    except:
                        if (t['coordinates'] is not None) & (errcount < 100):
                            print line
                            errcount += 1
                        continue
                        
                    cur.execute('''
                    INSERT INTO tweets.ba_retro
                    VALUES (%s,%s,%s,ST_SetSRID(ST_MakePoint(%s,%s),4326),%s,%s,%s,%s)
                    ''', (
                        t['id'],
                        t['user']['id'],
                        t['text'],
                        lnglat[0],
                        lnglat[1],
                        t['created_at'],
                        t['lang'],
                        get_source(t['source']),
                        t['source']))
                    conn.commit()
                    
except Exception, e:
    print str(e)
    print t
    
conn.commit()
print int(time.time()-t0), 'sec.'

2809 sec.


In [None]:
cur.execute('''
CREATE INDEX ba_retro_user_id_index ON tweets.ba_retro USING btree (user_id);
''')
cur.execute('''
CREATE INDEX ba_retro_coords_index ON tweets.ba_retro USING GIST (coords);
''')
conn.commit()

In [107]:
cur.execute('''
CREATE INDEX ba_retro_lang_index ON tweets.ba_retro USING btree (lang);
''')
cur.execute('''
CREATE INDEX ba_retro_source_index ON tweets.ba_retro USING btree (source);
''')
conn.commit()

### 4. And with the stream of tweets

In [102]:
# Print a sample tweet

d = '/Volumes/smmaurer/Data/Twitter/Bay-area-tweet-samples/'
with zipfile.ZipFile(d + 'raw-tweets-2.json.zip') as z:
    with z.open('raw-tweets-2.json') as f:
        tweet = next(t for t in f)
    
print tweet

{"created_at":"Fri Sep 27 19:35:10 +0000 2013","id":383676205725007874,"id_str":"383676205725007874","text":"\u201c@TheLakeShOwKinG: @makafaka you would still be my friend\u201d \ud83d\ude0a ALWAYS!!","source":"\u003ca href=\"http:\/\/twitter.com\/download\/iphone\" rel=\"nofollow\"\u003eTwitter for iPhone\u003c\/a\u003e","truncated":false,"in_reply_to_status_id":383675967681486848,"in_reply_to_status_id_str":"383675967681486848","in_reply_to_user_id":368735686,"in_reply_to_user_id_str":"368735686","in_reply_to_screen_name":"TheLakeShOwKinG","user":{"id":59842816,"id_str":"59842816","name":"Flame.Boyant","screen_name":"makafaka","location":"Bay Area","url":null,"description":"____God first__Family over everything else___","protected":false,"followers_count":374,"friends_count":275,"listed_count":0,"created_at":"Fri Jul 24 17:45:33 +0000 2009","favourites_count":4350,"utc_offset":-25200,"time_zone":"Pacific Time (US & Canada)","geo_enabled":true,"verified":false,"statuses_count":14506,"

In [113]:
try:
    cur.execute('DROP TABLE IF EXISTS tweets.ba_stream;')
    cur.execute('''    
    CREATE TABLE tweets.ba_stream (
        id BIGINT NOT NULL,
        user_id BIGINT,
        text CHARACTER VARYING,
        coords public.geometry(Point, 4326),
        ts CHARACTER VARYING,
        lang CHARACTER VARYING,
        source CHARACTER VARYING,
        source_full CHARACTER VARYING,
        CONSTRAINT ba_stream_id_pkey PRIMARY KEY (id));
    ''')
    cur.execute('GRANT SELECT ON TABLE tweets.ba_stream TO smmaurer;')
    conn.commit()    
except:
    conn.commit()

In [114]:
# There are some duplicate tweet id's here as well

t0 = time.time()
d = '/Volumes/smmaurer/Data/Twitter/Bay-area-tweet-samples/'
fname = 'raw-tweets-2.json'
errcount = 0

try:
    with zipfile.ZipFile(d + fname + '.zip') as z:
        with z.open(fname) as f:
            for line in f:
                try: 
                    # some lines are not full tweets, or missing coords
                    t = json.loads(line)
                    lnglat = t['coordinates']['coordinates'] # [lng,lat]
                except:
                    if (line != '\r\n') & (t['coordinates'] is not None) & (errcount < 100):
                        print repr(line)
                        errcount += 1
                    continue

                cur.execute('''
                SELECT id FROM tweets.ba_stream WHERE id = %s;
                ''', (t['id'],))
                if len(cur.fetchall()) == 0:
                    cur.execute('''
                    INSERT INTO tweets.ba_stream
                    VALUES (%s,%s,%s,ST_SetSRID(ST_MakePoint(%s,%s),4326),%s,%s,%s,%s)
                    ''', (
                        t['id'],
                        t['user']['id'],
                        t['text'],
                        lnglat[0],
                        lnglat[1],
                        t['created_at'],
                        t['lang'],
                        get_source(t['source']),
                        t['source']))
                    conn.commit()
                    
except Exception, e:
    print str(e)
    print t
    
conn.commit()
print int(time.time()-t0), 'sec.'

677 sec.


In [115]:
cur.execute('''
CREATE INDEX ba_stream_user_id_index ON tweets.ba_stream USING btree (user_id);
''')
cur.execute('''
CREATE INDEX ba_stream_coords_index ON tweets.ba_stream USING GIST (coords);
''')
cur.execute('''
CREATE INDEX ba_stream_lang_index ON tweets.ba_stream USING btree (lang);
''')
cur.execute('''
CREATE INDEX ba_stream_source_index ON tweets.ba_stream USING btree (source);
''')
conn.commit()

In [106]:
conn.commit()