In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import pandas as pd
import numpy as np 
import re

from sklearn.metrics import precision_recall_curve, mutual_info_score, normalized_mutual_info_score

from pyspark.ml.feature import HashingTF, Tokenizer, IDF, StopWordsRemover, CountVectorizer, VectorAssembler
from pyspark.sql import functions as F
from pyspark.sql.functions import explode, lit, col, when, lower

import matplotlib.pyplot as plt

from hashtag_dict import topic_dict

from Evaluation import *
from FeatureExtraction import *
from Filter import *
from TestFramework import *
from LPFormulations import *
from CoverageCalculator import *

# Review Previous Work

In [3]:
class API:
    def __init__(self, directory, frac=1, seed=123):
        if frac < 1:
            self.data = spark.read.parquet(directory).sample(False, frac, seed)
        else:
            self.data = spark.read.parquet(directory)
          
    def get_raw_data(self):
         return self.data

    def get_filtered_data(self, terms=[], hashtags=None, users=None, locs=None, mentions=None):   
        return filter_data(self.data, terms, hashtags, users, locs, mentions)
    
    
class DataPreprocessor():
    def __init__(self, data, topic, load_labled=False, filter_empty_hashtags=False):
        if filter_empty_hashtags == True:
            self.data = data.filter(col("hashtag") != "empty_hashtag")
        else:
            self.data = data

        if load_labled == True:
            labled_dir = "/mnt/1e69d2b1-91a9-473c-a164-db90daf43a3d/labeled_data/" + topic
            pos_dir = labled_dir + "/pos"
            topical_tweet_ids = spark.read.parquet(pos_dir)
            topical_tweets = topical_tweet_ids.join(self.data, "tweet_id").withColumn("label", lit(1))
            
            neg_dir = labled_dir + "/neg"
            non_topical_tweet_ids = spark.read.parquet(neg_dir)#.sample(False, 0.1, 123)
            non_topical_tweets = non_topical_tweet_ids.join(self.data, "tweet_id").withColumn("label", lit(0))

            self.labled_data = topical_tweets.union(non_topical_tweets)
        else:
            self.labled_data = None

        tokenizer = Tokenizer(inputCol="hashtag", outputCol="each_hashtag")
        hashtags_df = tokenizer.transform(self.data)

        hashtag = hashtags_df.select("tweet_id","create_time","each_hashtag")
        self.hash_exploded = hashtag.withColumn('each_hashtag', explode('each_hashtag'))
    
    def get_labled_data(self, topic):
        if self.labled_data == None:
            topic_lables = topic_dict[topic]

            topical_ids = self.hash_exploded.select(self.hash_exploded.tweet_id)\
                                                    .where(self.hash_exploded.each_hashtag\
                                                            .isin(topic_lables))\
                                                    .distinct().cache()

            labled_topical = topical_ids.withColumn("topical", lit(1))
            self.labled_data = self.data.join(labled_topical, self.data.tweet_id == labled_topical.tweet_id, "left")\
                                                      .select(self.data.create_time,\
                                                             self.data.from_id,\
                                                             self.data.from_user,\
                                                             self.data.hashtag,\
                                                             self.data.location,\
                                                             self.data.mention,\
                                                             self.data.tweet_id,\
                                                             self.data.term,\
                                                              F.when(labled_topical.topical == 1, 1.0)\
                                                                .otherwise(0.0).alias("label"))
        return self.labled_data
    
    def get_num_of_positvies(self):
        if self.labled_data == None:
            return 0
        
        return self.labled_data.where(col("label") == 1.0).count()
    
    def get_data_size(self):
        if self.labled_data == None:
            return 0
        
        return self.labled_data.count()
        
    def temporal_split(self, topic, balance_data=True, seed=0, remove_topic_hashtags=False, ch=False):
        topic_lables = topic_dict[topic]
        labled_data = self.get_labled_data(topic)
        tweet_labels = labled_data.select("tweet_id", "label")
        
        df_birthday = self.hash_exploded.join(tweet_labels,\
                                 self.hash_exploded.tweet_id == tweet_labels.tweet_id,\
                                 "inner").select(self.hash_exploded.create_time,\
                                                 self.hash_exploded.each_hashtag,\
                                                 self.hash_exploded.tweet_id)
        
        ## Find out the "birthday", or the earliest appearing time of each hashtag. 
        ## (add an extra column of 1 to mark as topical, will be used in a join later)
        ordered_hashtag_set = df_birthday.\
                              groupby("each_hashtag").\
                              agg({"create_time": "min"}).\
                              orderBy('min(create_time)', ascending=True).\
                              withColumnRenamed("min(create_time)", "birthday").\
                              where(df_birthday.each_hashtag.isin(topic_lables)).cache()
                            
        time_span = ordered_hashtag_set.count()

        train_valid_split_time = np.floor(np.multiply(time_span, 0.75)).astype(int)
        valid_test_split_time = np.floor(np.multiply(time_span, 0.85)).astype(int)

        # Converting to Pandas for random row access.
        pd_ordered_hashtag_set = ordered_hashtag_set.toPandas()
        
        # locate the timestamp of the cutoff point. Will be used later to split Dataframe.
        train_valid_time = pd_ordered_hashtag_set.iloc[train_valid_split_time]['birthday']
        valid_test_time = pd_ordered_hashtag_set.iloc[valid_test_split_time]['birthday']
    
        training_set = labled_data.where(col("create_time") <= train_valid_time)
        validation_set = labled_data.where((col("create_time") > train_valid_time) \
                                           & (col("create_time") <= valid_test_time))
        test_set = labled_data.where(col("create_time") > valid_test_time)
        
        if remove_topic_hashtags == True:
            train_hashtags = pd_ordered_hashtag_set[:train_valid_split_time]['each_hashtag'].tolist()
            valid_hashtags = pd_ordered_hashtag_set[train_valid_split_time:valid_test_split_time]['each_hashtag']\
                                .tolist()
            test_hashtags = pd_ordered_hashtag_set[valid_test_split_time:]['each_hashtag'].tolist()
            
            valid_test_hashtags = valid_hashtags + test_hashtags
            hashtags_filter = [re.sub(r'(.*)', r'\\b\1\\b', hashtag) for hashtag in valid_test_hashtags]
            valid_test_hashtags_regex = '|'.join(hashtags_filter)
            if ch == True:
                invalid_train_ids = (training_set.filter(lower(training_set['hashtag']).rlike(valid_test_hashtags_regex)))\
                                        .select("tweet_id").distinct().rdd.flatMap(lambda x: x).collect()
            else:
                invalid_train_ids = (training_set.filter(training_set['hashtag'].rlike(valid_test_hashtags_regex)))\
                                        .select("tweet_id").distinct().rdd.flatMap(lambda x: x).collect()                
            training_set = training_set.where(~col("tweet_id").isin(invalid_train_ids)) 

            train_test_hashtags = train_hashtags + test_hashtags
            hashtags_filter = [re.sub(r'(.*)', r'\\b\1\\b', hashtag) for hashtag in train_test_hashtags]
            train_test_hashtags_regex = '|'.join(hashtags_filter)
            if ch == True:
                invalid_validation_ids = (validation_set.filter(lower(validation_set['hashtag']).rlike(train_test_hashtags_regex)))\
                                        .select("tweet_id").distinct().rdd.flatMap(lambda x: x).collect()
            else:
                invalid_validation_ids = (validation_set.filter(validation_set['hashtag'].rlike(train_test_hashtags_regex)))\
                                        .select("tweet_id").distinct().rdd.flatMap(lambda x: x).collect()

                
            print("Invalid Validation:" + str(len(invalid_validation_ids)))
            validation_set = validation_set.where(~col("tweet_id").isin(invalid_validation_ids))
            
            train_valid_hashtags = train_hashtags + valid_hashtags
            train_valid_hashtags_filter = [re.sub(r'(.*)', r'\\b\1\\b', hashtag) for hashtag in train_valid_hashtags]
            train_valid_hashtags_regex = '|'.join(train_valid_hashtags_filter)
            if ch == True:
                invalid_test_ids = (test_set.filter(lower(test_set['hashtag']).rlike(train_valid_hashtags_regex)))\
                                        .select("tweet_id").distinct().rdd.flatMap(lambda x: x).collect()
            else:
                invalid_test_ids = (test_set.filter(test_set['hashtag'].rlike(train_test_hashtags_regex)))\
                                        .select("tweet_id").distinct().rdd.flatMap(lambda x: x).collect()

                
            print("Invalid Test:" + str(len(invalid_test_ids)))
                
            test_set = test_set.where(~col("tweet_id").isin(invalid_test_ids))
        
        if balance_data == True:
            train_pos_count = training_set.where(col("label") == 1).count()
            train_neg_count = training_set.where(col("label") == 0).count()
            train_pos_neg_ratio = float(train_pos_count) / train_neg_count
            
            training_set_balanced = training_set.sampleBy("label", fractions={0: 2*train_pos_neg_ratio, 1: 1}, seed=seed)
            
            valid_pos_count = validation_set.where(col("label") == 1).count()
            valid_neg_count = validation_set.count() - valid_pos_count
            valid_pos_neg_ratio = float(valid_pos_count) / valid_neg_count
            
            validation_set_balanced = validation_set.sampleBy("label", fractions={0.0: valid_pos_neg_ratio, 1.0: 1}, seed=seed)
            
            return training_set, training_set_balanced, validation_set, validation_set_balanced, test_set
            
        return training_set, validation_set, test_set

# Natural Disaster Tweets in US

In [23]:
directory = "/mnt/66e695cd-1a0c-4e3b-9a50-55e01b788529/Training_data/Staging_final/"
api = API(directory)
raw_data = api.get_raw_data()

dp = DataPreprocessor(raw_data, "Natr_Disaster", load_labled=True, filter_empty_hashtags=True)

In [24]:
labled_data = dp.get_labled_data("Natr_Disaster")

In [26]:
labled_data#show(10)

[('tweet_id', 'string'),
 ('create_time', 'double'),
 ('from_id', 'string'),
 ('from_user', 'string'),
 ('hashtag', 'string'),
 ('location', 'string'),
 ('mention', 'string'),
 ('term', 'string'),
 ('label', 'int')]

In [27]:
users = pd.read_csv('users.txt', sep=" ", header=None)
user_list = users.values.tolist()
users = [u[0] for u in user_list]

In [29]:
natr_disaster_tweets = labled_data.filter(col("label") == 1)

In [30]:
natr_disaster_tweets_pd = natr_disaster_tweets.toPandas()

In [31]:
len(natr_disaster_tweets_pd)

89440

In [34]:
natr_disaster_tweets_pd

tweet_id        object
create_time    float64
from_id         object
from_user       object
hashtag         object
location        object
mention         object
term            object
label            int64
dtype: object

In [35]:
natr_disaster_tweets_us = natr_disaster_tweets_pd.loc[natr_disaster_tweets_pd['from_user'].str.lower().isin(users)]

In [37]:
natr_disaster_tweets_us

Unnamed: 0,tweet_id,create_time,from_id,from_user,hashtag,location,mention,term,label
4,343103567302914048,1.370652e+09,31172792,cfdshark,tropicalstorm humor charlestonsc hurricaneseason,loc_charleston_sc,empty_mention,hahahaha,1
9,344105385210294272,1.370890e+09,935451420,SandyUpdatesNY,NJ Sandy,loc_new_york,OccupySandyNJ,rt two state hotlines established for elevatio...,1
10,317440764449808384,1.364533e+09,157597434,SHOUtime21,TheU Hurricanes,loc_fremont__la_ca,empty_mention,clank clank clank clank clank clank clank,1
12,317448620343431168,1.364535e+09,494254456,salcamaro,hurricane tropicaldepression tmltalk goleafsgo,loc_markham,MapleLeafs JLupul,thats right folks turn that into a what a goal by,1
16,410543672288350209,1.386734e+09,297428826,FirstNightArt,Giants Abstract Painting Storm Fire,loc_london,ZeanaRomanovna,rt between two and,1
26,366029121278582784,1.376118e+09,1170711727,KirstinNoel722,PrayForWaynesville Flood Volunteer,loc_springfield_mo,empty_mention,spending one of my days off going home to help...,1
27,372078318461276161,1.377560e+09,110753128,86Cane,Miami Hurricanes,loc_florida,Manny_Navarro,rt video golden donofrio coley kehoe green and...,1
29,505607979308032000,1.409396e+09,19850176,sandrasantander,earthquake CA,loc_los_angeles_california,quakesLA,rt 243 magnitude 13 mi from tehachapi united s...,1
36,485317857509904385,1.404558e+09,1394917244,NWSOPC,MODIS Arthur,loc_college_park_md,empty_mention,rgb airmass product shows losing tropical char...,1
42,405101239891136512,1.385437e+09,112895255,tina5925,Haiyan ani,loc_manhattan_nyc,wspa,rt our disaster teamsdr ian dacreis featured i...,1


In [11]:
natr_disaster_tweets_us.to_csv('natdis_us.csv', encoding='UTF-8', index=False)

UnicodeEncodeError: 'ascii' codec can't encode characters in position 4-9: ordinal not in range(128)

In [60]:
n = pd.read_csv("natdis_us_neg.csv", dtype={'tweet_id': str})

In [5]:
p.tweet_id

0        343103567302914048
1        344105385210294272
2        317440764449808384
3        317448620343431168
4        410543672288350209
5        366029121278582784
6        372078318461276161
7        505607979308032000
8        485317857509904385
9        405101239891136512
10       491790335509422081
11       491791421800599554
12       428301499769569280
13       335841029280366592
14       521862354339123200
15       521870520628019200
16       399633192862228480
17       406640952116129792
18       527715916604530689
19       508553035010670592
20       401133096965128192
21       401136313975332864
22       497155760493559809
23       336773800534347777
24       336779290886668289
25       364479896367546368
26       380976672801308672
27       400556325618589696
28       400561908257787904
29       400563233628516352
                ...        
15511    402149427135385600
15512    402149817196883968
15513    402149884335120384
15514    402150232449765376
15515    40215057216

In [20]:
negative_tweets = labled_data.filter(col("label") == 0).sample(False, 0.05, 123)

In [21]:
negative_tweets_pd = negative_tweets.toPandas()

In [25]:
negative_tweets_pd.shape

(6897456, 9)

In [4]:
us_labled_negative = negative_tweets_pd.loc[negative_tweets_pd['from_user'].str.lower().isin(users)]

NameError: name 'negative_tweets_pd' is not defined

In [17]:
len(tweet_ids)

1123617

In [27]:
us_labled_negative.to_csv('natdis_us_neg2.csv', encoding='UTF-8', index=False)

In [4]:
p = pd.read_csv("natdis_us.csv", dtype={'tweet_id': str}).dropna()
n = pd.read_csv("natdis_us_neg2.csv", dtype={'tweet_id': str}).dropna()

In [5]:
res2 = pd.concat([p, n])

In [6]:
id_subset = res2.tweet_id.iloc[30000:70000].tolist()

In [70]:
res.to_csv('natdis_us_tweets2.csv', encoding='utf-8', index=False)

In [5]:
res2 = pd.read_csv('natdis_us_tweets2.csv', dtype={'tweet_id': str})

In [251]:
res2

Unnamed: 0,tweet_id,create_time,from_id,from_user,hashtag,location,mention,term,label
0,343103567302914048,1.370652e+09,31172792,cfdshark,tropicalstorm humor charlestonsc hurricaneseason,loc_charleston_sc,empty_mention,hahahaha,1.0
1,344105385210294272,1.370890e+09,935451420,SandyUpdatesNY,NJ Sandy,loc_new_york,OccupySandyNJ,rt two state hotlines established for elevatio...,1.0
2,317440764449808384,1.364533e+09,157597434,SHOUtime21,TheU Hurricanes,loc_fremont__la_ca,empty_mention,clank clank clank clank clank clank clank,1.0
3,317448620343431168,1.364535e+09,494254456,salcamaro,hurricane tropicaldepression tmltalk goleafsgo,loc_markham,MapleLeafs JLupul,thats right folks turn that into a what a goal by,1.0
4,410543672288350209,1.386734e+09,297428826,FirstNightArt,Giants Abstract Painting Storm Fire,loc_london,ZeanaRomanovna,rt between two and,1.0
5,366029121278582784,1.376118e+09,1170711727,KirstinNoel722,PrayForWaynesville Flood Volunteer,loc_springfield_mo,empty_mention,spending one of my days off going home to help...,1.0
6,372078318461276161,1.377560e+09,110753128,86Cane,Miami Hurricanes,loc_florida,Manny_Navarro,rt video golden donofrio coley kehoe green and...,1.0
7,505607979308032000,1.409396e+09,19850176,sandrasantander,earthquake CA,loc_los_angeles_california,quakesLA,rt 243 magnitude 13 mi from tehachapi united s...,1.0
8,485317857509904385,1.404558e+09,1394917244,NWSOPC,MODIS Arthur,loc_college_park_md,empty_mention,rgb airmass product shows losing tropical char...,1.0
9,405101239891136512,1.385437e+09,112895255,tina5925,Haiyan ani,loc_manhattan_nyc,wspa,rt our disaster teamsdr ian dacreis featured i...,1.0


In [9]:
id_subset_int

[460390026715484161,
 460402018235195392,
 460565327668326400,
 460601268667703296,
 460645212390322176,
 460752871789563904,
 460784907862945792,
 460842944451575808,
 460887597007769601,
 460890658866479105,
 460969771807154176,
 460972170974220288,
 460976642102677504,
 461026566869237760,
 461174676165644288,
 461183173817151488,
 461312211571331072,
 461340133040484352,
 461358726415654912,
 461462476710633473,
 461493535511158785,
 461515710808596481,
 461525919740342272,
 461532014068260864,
 461545297416425473,
 461608849540407296,
 461697299002163200,
 461698091725619200,
 461705427567513600,
 461728605291020288,
 461770707722985472,
 461807944753881088,
 461814856958873600,
 461859501126475776,
 461907525924036608,
 461940644135833600,
 462003806180352000,
 462028175061811200,
 462037268329295872,
 462048722952519680,
 462174791169245184,
 462221792527282176,
 462267124556529664,
 462281360028090368,
 462289681514631171,
 462306869798002688,
 462307817697718272,
 462315191292

In [8]:
id_subset_int =list(map(int, id_subset))

In [10]:
len(id_subset_int)

40000

In [11]:
import pyspark.sql.functions as func
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql.functions import col, explode  
from pyspark.ml.feature import Tokenizer

In [256]:
raw_data = sc.textFile("/mnt/1e69d2b1-91a9-473c-a164-db90daf43a3d/Backup_tw_2013_8/2013-08/2013-08-*,\
/mnt/1e69d2b1-91a9-473c-a164-db90daf43a3d/Backup_tw_2013_9/2013-09/2013-09-*,\
/mnt/1e69d2b1-91a9-473c-a164-db90daf43a3d/Backup_tw_2013_2/2013-02/2013-02-*,\
/mnt/1e69d2b1-91a9-473c-a164-db90daf43a3d/Backup_tw_2013_3/2013-03/2013-03-*,\
/mnt/1e69d2b1-91a9-473c-a164-db90daf43a3d/Backup_tw_2013_6/2013-06/2013-06-*,\
/mnt/2b53fde0-61da-4eeb-a038-9910540ff9ad/Backup_tw_2013_10/2013-10/2013-10-*,\
/mnt/2b53fde0-61da-4eeb-a038-9910540ff9ad/Backup_tw_2013_11/2013-11/2013-11-*,\
/mnt/2b53fde0-61da-4eeb-a038-9910540ff9ad/Backup_tw_2013_12/2013-12/2013-12-*,\
/mnt/73dc2fdb-c49c-484c-bef8-7a6fc6abbc70/Backup_tw_2013_4/2013-04/2013-04-*,\
/mnt/73dc2fdb-c49c-484c-bef8-7a6fc6abbc70/Backup_tw_2013_5/2013-05/2013-05-*")

In [12]:
raw_data2 = sc.textFile("/mnt/73dc2fdb-c49c-484c-bef8-7a6fc6abbc70/Backup_tw_2014_1/2014-01/2014-01-*,\
/mnt/381c2633-4d72-4555-9be8-19e922cce4a1/Backup_tw_2014_2/2014-02/2014-02-*,\
/mnt/381c2633-4d72-4555-9be8-19e922cce4a1/Backup_tw_2014_3/2014-03/2014-03-*,\
/mnt/381c2633-4d72-4555-9be8-19e922cce4a1/Backup_tw_2014_4/2014-04/2014-04-*,\
/mnt/b93e71ec-8ddf-4033-bd42-770c05bc68aa/Backup_tw_2014_5/2014-05/2014-05-*,\
/mnt/b93e71ec-8ddf-4033-bd42-770c05bc68aa/Backup_tw_2014_6/2014-06/2014-06-*,\
/mnt/b93e71ec-8ddf-4033-bd42-770c05bc68aa/Backup_tw_2014_7/2014-07/2014-07-*,\
/mnt/4e8ba653-f2f0-4e18-a51e-458026833dee/Backup_tw_2014_8/2014-08/2014-08-*,\
/mnt/4e8ba653-f2f0-4e18-a51e-458026833dee/Backup_tw_2014_9/2014-09/2014-09-*,\
/mnt/4e8ba653-f2f0-4e18-a51e-458026833dee/Backup_tw_2014_10/2014-10/2014-10-*,\
/mnt/66e695cd-1a0c-4e3b-9a50-55e01b788529/Backup_tw_2014_11/2014-11/2014-11-*,\
/mnt/66e695cd-1a0c-4e3b-9a50-55e01b788529/Backup_tw_2014_12/2014-12/2014-12-*")

In [None]:
                        
/mnt/1e69d2b1-91a9-473c-a164-db90daf43a3d/Backup_tw_2013_8/2013-08/2013-08-*,\
/mnt/1e69d2b1-91a9-473c-a164-db90daf43a3d/Backup_tw_2013_9/2013-09/2013-09-*,\
/mnt/1e69d2b1-91a9-473c-a164-db90daf43a3d/Backup_tw_2013_2/2013-02/2013-02-*,\
/mnt/1e69d2b1-91a9-473c-a164-db90daf43a3d/Backup_tw_2013_3/2013-03/2013-03-*,\
/mnt/1e69d2b1-91a9-473c-a164-db90daf43a3d/Backup_tw_2013_6/2013-06/2013-06-*,\
/mnt/2b53fde0-61da-4eeb-a038-9910540ff9ad/Backup_tw_2013_10/2013-10/2013-10-*,\
/mnt/2b53fde0-61da-4eeb-a038-9910540ff9ad/Backup_tw_2013_11/2013-11/2013-11-*,\
/mnt/2b53fde0-61da-4eeb-a038-9910540ff9ad/Backup_tw_2013_12/2013-12/2013-12-*,\
/mnt/73dc2fdb-c49c-484c-bef8-7a6fc6abbc70/Backup_tw_2013_4/2013-04/2013-04-*,\
/mnt/73dc2fdb-c49c-484c-bef8-7a6fc6abbc70/Backup_tw_2013_5/2013-05/2013-05-*,\
/mnt/73dc2fdb-c49c-484c-bef8-7a6fc6abbc70/Backup_tw_2014_1/2014-01/2014-01-*,\
/mnt/381c2633-4d72-4555-9be8-19e922cce4a1/Backup_tw_2014_2/2014-02/2014-02-*,\
/mnt/381c2633-4d72-4555-9be8-19e922cce4a1/Backup_tw_2014_3/2014-03/2014-03-*,\
/mnt/381c2633-4d72-4555-9be8-19e922cce4a1/Backup_tw_2014_4/2014-04/2014-04-*,\
/mnt/b93e71ec-8ddf-4033-bd42-770c05bc68aa/Backup_tw_2014_5/2014-05/2014-05-*,\
/mnt/b93e71ec-8ddf-4033-bd42-770c05bc68aa/Backup_tw_2014_6/2014-06/2014-06-*,\
/mnt/b93e71ec-8ddf-4033-bd42-770c05bc68aa/Backup_tw_2014_7/2014-07/2014-07-*,\
/mnt/4e8ba653-f2f0-4e18-a51e-458026833dee/Backup_tw_2014_8/2014-08/2014-08-*,\
/mnt/4e8ba653-f2f0-4e18-a51e-458026833dee/Backup_tw_2014_9/2014-09/2014-09-*,\
/mnt/4e8ba653-f2f0-4e18-a51e-458026833dee/Backup_tw_2014_10/2014-10/2014-10-*,\
/mnt/66e695cd-1a0c-4e3b-9a50-55e01b788529/Backup_tw_2014_11/2014-11/2014-11-*,\
/mnt/66e695cd-1a0c-4e3b-9a50-55e01b788529/Backup_tw_2014_12/2014-12/2014-12-*")

In [28]:
positive_ids = p.tweet_id.tolist()

In [103]:
len(positive_ids)

NameError: name 'positive_ids' is not defined

In [13]:
import json
    
# Remove invalid tweet which has length less than 1000.
def valid_json(d):
    return len(d) > 1000

# load json object, if a line is invalid, substitute as an empty dict (which has len() == 0 )
def load_json(d):
    try:
        js = json.loads(d)
        
    except ValueError as e:
        js = {}
        
    except Exception:
        js = {}
        
    return js

def raw_parser(d):
    tweet_id = int(d['id'])
    raw_text = d['text']

    if tweet_id in id_subset_int:
        return {"tweet_id": tweet_id, 
                "raw_text": raw_text}
    else:
        return {"tweet_id": 0, 
               "raw_text": None}
    
#    return processed

def us_ids(d):
    return d['tweet_id'] != 0

In [257]:
data_rdd = raw_data.filter(valid_json).map(load_json).filter(lambda x: len(x) > 1).map(raw_parser).filter(us_ids)

In [14]:
data_rdd2 = raw_data2.filter(valid_json).map(load_json).filter(lambda x: len(x) > 1).map(raw_parser).filter(us_ids)

In [92]:
#3.588733114946724e+17
a = data_rdd.take(1)[0]

In [100]:
data_rdd.take(10)

[{'raw_text': u'RT @frost_ella: \u201c@AboutAquarius: An #Aquarius will act crazy whenever and wherever they please.\u201d @silvaangold',
  'tweet_id': 343594401555169280},
 {'raw_text': u"Only 6 hours and 40 minutes till I work and I can't fall asleep. #gonnabealongday",
  'tweet_id': 343598285489045504},
 {'raw_text': u'RT @Southern_Voices: "I don\'t do lonely well" - #JasonAldean',
  'tweet_id': 343605776503414784},
 {'raw_text': u'Only 20 more minutes. Lets hold tough on D, attack the net up front, and get some god damn goals!!! #LETSGOHAWKS #LASucks',
  'tweet_id': 343550348763463680},
 {'raw_text': u'I am so loving Tyler Toffoli.  #LAKings',
  'tweet_id': 343576877727879170},
 {'raw_text': u'my feet are killing me. #serverprobz',
  'tweet_id': 343577485901967360},
 {'raw_text': u'I wish more of my friends were on twitter. #lonely #digitalage #getwithit',
  'tweet_id': 343654539490115586},
 {'raw_text': u'100ft slip and slide on deck at 130 east Jeff #bringbitches',
  'tweet_id': 

In [97]:
id_subset_int[21545]

343594401555169280

In [15]:
schema = StructType([StructField('tweet_id', StringType(), False),
                     StructField('raw_text', StringType(), True),
                    ])

In [258]:
data_df = sqlContext.createDataFrame(data_rdd, schema)

In [16]:
data_df2 = sqlContext.createDataFrame(data_rdd2, schema)

In [259]:
data_df.write.save("/mnt/1e69d2b1-91a9-473c-a164-db90daf43a3d/us_natdis/2013_Raw_txt.parquet", format="parquet")

In [17]:
data_df2.write.save("/mnt/1e69d2b1-91a9-473c-a164-db90daf43a3d/us_natdis/2014_Raw_txt1.parquet", format="parquet")

In [18]:
s = spark.read.parquet("/mnt/1e69d2b1-91a9-473c-a164-db90daf43a3d/us_natdis/2014_Raw_txt1.parquet")

In [19]:
s_pd = s.toPandas()

In [20]:
s_pd.shape

(22916, 2)

In [21]:
texts = pd.DataFrame()

In [22]:
s_pd = s_pd.drop_duplicates()

In [23]:
texts = texts.append(s_pd)

In [24]:
texts.shape

(22876, 2)

In [213]:
# 4236
# 13351
# 22051

s_pd.shape

(7645, 2)

In [25]:
res2 = pd.read_csv('natdis_us_tweets2.csv', dtype={'tweet_id': str})

In [26]:
res_with_rawtext = res2.join(s_pd.set_index("tweet_id"), how="right", on="tweet_id", lsuffix="c_", rsuffix="o_")

In [27]:
res_with_rawtext = res_with_rawtext.drop_duplicates(subset='tweet_id')

In [28]:
res_with_rawtext = res_with_rawtext.drop("term", axis=1)

In [34]:
res_with_rawtext

Unnamed: 0,tweet_id,create_time,from_id,from_user,hashtag,location,mention,label,raw_text
30000,460390026715484161,1.398615e+09,2254947145,DDaystripes,DDay,loc_york_united_kingdom,empty_mention,0.0,We've updated our website and added a page for...
30001,460402018235195392,1.398618e+09,21497938,whatsananna,TEDxGrandviewAve,loc_iowa,baron_batch,0.0,Thinking about @baron_batch and #TEDxGrandview...
30002,460565327668326400,1.398657e+09,612509501,LambentLucky,SpiderMan,loc_southeast,SpiderManMovie,0.0,RT @SpiderManMovie: Find out what fate has in ...
30003,460601268667703296,1.398665e+09,523388470,MaraZupko,StopWhitePeople2014,loc_never_russia,TweetLikeAGirI,0.0,RT @TweetLikeAGirI: #StopWhitePeople2014 http:...
30004,460645212390322176,1.398676e+09,199128850,SeegarsLu,HOUvsPOR,loc_houston_tx,empty_mention,0.0,"That was worse than the ending of ""Jeepers Cre..."
30005,460752871789563904,1.398701e+09,2393736830,jacobpaul710,IThappened,loc_lake_charles_louisiana,Chase widowtx,0.0,@Chase @widowtx #IThappened
30006,460784907862945792,1.398709e+09,549574913,_loudiseoh,MCM,loc_navarre,J4CKMULL,0.0,"RT @J4CKMULL: Oh your boyfriend is you ""#MCM"" ..."
30007,460842944451575808,1.398723e+09,1275774684,BeBe_Powers,etsymnt mothersdaygiftidea,loc_south_bend_wa,Bessiescreation,0.0,RT @Bessiescreation: Mamas day is just around ...
30008,460887597007769601,1.398734e+09,193193669,BABEL_ROCK,nosmoking vape goodmorning coffee paulstanley,loc_san_francisco_ca,empty_mention,0.0,One week #nosmoking and back on the #vape #goo...
30009,460890658866479105,1.398734e+09,193198150,LaLaTheBeauty,LaLaDickinson,loc_â_atlanta_miami_cali__uk_â,LaLaTheBeauty SimplyLaLaTV,0.0,"""Undeniably The Girl Next Door"" #LaLaDickinson..."


In [35]:
res_with_rawtext.to_csv("us_tweets_rawtext_p2.csv", encoding='utf-8', index=False)

In [17]:
t = data_df.filter(data_df.tweet_id == tweet_ids[0])

In [21]:
id = tweet_ids[0]

In [38]:
test = pd.read_csv("us_tweets_rawtext_p2.csv", lineterminator='\n')

In [40]:
test

Unnamed: 0,tweet_id,create_time,from_id,from_user,hashtag,location,mention,label,raw_text
0,460390026715484161,1.398615e+09,2254947145,DDaystripes,DDay,loc_york_united_kingdom,empty_mention,0.0,We've updated our website and added a page for...
1,460402018235195392,1.398618e+09,21497938,whatsananna,TEDxGrandviewAve,loc_iowa,baron_batch,0.0,Thinking about @baron_batch and #TEDxGrandview...
2,460565327668326400,1.398657e+09,612509501,LambentLucky,SpiderMan,loc_southeast,SpiderManMovie,0.0,RT @SpiderManMovie: Find out what fate has in ...
3,460601268667703296,1.398665e+09,523388470,MaraZupko,StopWhitePeople2014,loc_never_russia,TweetLikeAGirI,0.0,RT @TweetLikeAGirI: #StopWhitePeople2014 http:...
4,460645212390322176,1.398676e+09,199128850,SeegarsLu,HOUvsPOR,loc_houston_tx,empty_mention,0.0,"That was worse than the ending of ""Jeepers Cre..."
5,460752871789563904,1.398701e+09,2393736830,jacobpaul710,IThappened,loc_lake_charles_louisiana,Chase widowtx,0.0,@Chase @widowtx #IThappened
6,460784907862945792,1.398709e+09,549574913,_loudiseoh,MCM,loc_navarre,J4CKMULL,0.0,"RT @J4CKMULL: Oh your boyfriend is you ""#MCM"" ..."
7,460842944451575808,1.398723e+09,1275774684,BeBe_Powers,etsymnt mothersdaygiftidea,loc_south_bend_wa,Bessiescreation,0.0,RT @Bessiescreation: Mamas day is just around ...
8,460887597007769601,1.398734e+09,193193669,BABEL_ROCK,nosmoking vape goodmorning coffee paulstanley,loc_san_francisco_ca,empty_mention,0.0,One week #nosmoking and back on the #vape #goo...
9,460890658866479105,1.398734e+09,193198150,LaLaTheBeauty,LaLaDickinson,loc_â_atlanta_miami_cali__uk_â,LaLaTheBeauty SimplyLaLaTV,0.0,"""Undeniably The Girl Next Door"" #LaLaDickinson..."


In [276]:
n = test.append(res_with_rawtext)

In [280]:
n.to_csv("us_tweets_rawtext_p1.csv", encoding='utf-8', index=False)

In [286]:
te = pd.read_csv("/mnt/1e69d2b1-91a9-473c-a164-db90daf43a3d/us_natdis/us_tweets_rawtext_p1.csv")

In [288]:
te.shape

(44571, 9)

\

In [3]:
temporal_split_directory = '/mnt/1e69d2b1-91a9-473c-a164-db90daf43a3d/splitted_data/'

topic = "Natr_Disaster"

train_pd = pd.read_csv(temporal_split_directory + topic + "/training.csv").dropna()
validation_pd = pd.read_csv(temporal_split_directory + topic + "/validation.csv").dropna()
test_pd = pd.read_csv(temporal_split_directory + topic + "/test.csv").dropna().reset_index()

test = test_pd.drop_duplicates(subset='term').reset_index()
valid = validation_pd.drop_duplicates(subset='term').reset_index()
train = train_pd.drop_duplicates(subset='term').reset_index()

tf = TestFramework(train, valid, test)

  interactivity=interactivity, compiler=compiler, result=result)


Initializing Classifier....
Complete!


In [4]:
test_features = test.drop('label', axis=1)
test_target = test['label']

predictions = tf.get_pipeline().predict_proba(test_features)[:, 1]

In [6]:
test['prob'] = predictions

In [2]:
users = pd.read_csv('users.txt', sep=" ", header=None)
user_list = users.values.tolist()
users = [u[0] for u in user_list]

In [3]:
users

['meganasty_',
 'investorjunkie',
 'susanschlegel',
 '4beem',
 'gabewist97',
 'gabewist97',
 'hopscans',
 'erica_boone',
 'erica_boone',
 'ajcfoodandmore',
 'jkravitz',
 'mothergothel23',
 'iangiles15',
 'donandrewbailey',
 'ibrandonleeb',
 'tbrad_',
 'relentlessfmwrx',
 'jm_cali',
 'hpconvergedsys',
 'hpconvergedsys',
 'jessjquinones',
 'vessel_sf',
 'hank_hammond',
 'attitudechick9',
 'carlitaax33',
 'rainface92_',
 'sellabella_babi',
 'tyleredavis99',
 'sassyhotmesss',
 'traycehood',
 'skinnyminnilia',
 'skinnyminnilia',
 'elliotlikescats',
 'keyser_roll',
 'alina_horne',
 'itsbaybiegurl',
 'wumezycubure',
 'olibattles',
 'olibattles',
 'imcallinudaddy_',
 'aknight128',
 'kalilee',
 'kat_white',
 'cliffordg27',
 '_runninjonesy_',
 '_runninjonesy_',
 'bdostrem',
 'bdostrem',
 'ravenguenette',
 'texasrangersluv',
 'connorduane',
 'arielceleste_',
 'arielceleste_',
 'jensenjones_',
 'jensenjones_',
 'elienhart',
 'gschultz07',
 'ayeitsalexys',
 'linktobob',
 'youppldntcare',
 'cduren25

# Top Row

In [29]:
fil_train = train.loc[train['from_user'].str.lower().isin(users)]
fil_train['from_user'] = fil_train['from_user'].str.lower()
fil_train['create_time'] = pd.to_datetime(fil_train['create_time'],unit='s')

fil_train['location'] = fil_train.location.str.replace('loc_','')
fil_train = fil_train[~fil_train.term.str.contains("rt")]

fil_train = fil_train[['from_user', 'create_time', 'location', 'term', 'label']]
fil_train.columns = ['User Id', 'Time', 'Location', 'Text', 'Y']

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  from ipykernel import kernelapp as app
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  app.launch_new_instance()
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy


In [39]:
fil_train.to_csv('filtered_train.tsv', sep='\t', index=False)

In [77]:
fil_test = test.loc[test['from_user'].str.lower().isin(users)]

In [78]:
fil_test['from_user'] = fil_test['from_user'].str.lower()
fil_test['create_time'] = pd.to_datetime(fil_test['create_time'],unit='s')

fil_test['location'] = fil_test.location.str.replace('loc_','')
fil_test = fil_test[~fil_test.term.str.contains("rt")]

fil_test = fil_test[['from_user', 'create_time', 'location', 'term', 'label', 'prob']]
fil_test.columns = ['User Id', 'Time', 'Location', 'Text', 'Y', 'Probability']

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  if __name__ == '__main__':
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  from ipykernel import kernelapp as app
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy


In [79]:
high_score_fil_test = fil_test[fil_test.Probability > 0.75]

In [84]:
high_score_fil_test.to_csv('high_prob_test.tsv', sep='\t', index=False)

In [82]:
low_score_fil_test = fil_test[fil_test.Probability < 0.25]

In [85]:
low_score_fil_test.to_csv('low_prob_test.tsv', sep='\t', index=False)

In [89]:
low_score_fil_test.Probability.min()

5.785827141432337e-10

In [182]:
import pandas
result_ms=pandas.to_datetime('1390968009.0',unit='s')
str(result_ms)

'2014-01-29 04:00:09'