# Filtering

Traverse the tweets and output a filtered subset of the tweets to use.

In [1]:
# Libraries

%run utilities.py
from multiprocessing import Pool  # faster
from time import time as time     # timing

#### Functions

In [2]:
# Functions
def parallelize_series(series, func):
    pool = Pool(6)

    df = pool.map(func, series)

    pool.close()
    pool.join()
    return df

def filter_tweet_df(in_f):
    """
    Formats and returns a dataframe of tweets
    """
    conversions = {'retweet': lambda x: 1 if (x=='RT') else x,
                  'date': lambda t: t.split()[1] +'-'+ t.split()[2] +'-'+ t.split()[-1]
                  }
    dt_parser = lambda x: pd.to_datetime(x, format="%b-%d-%Y")
    
    raw_df = pd.read_csv(in_f, usecols=cols_scrape, 
                         low_memory=False, engine='c', # optimization
                        converters=conversions)
    
    # Drop rows that'll give us trouble
    raw_df.dropna(subset=['date', 'longitude', 'latitude'], how='any', inplace=True)
    raw_df['date'] = dt_parser(raw_df['date'])
    
    return raw_df

def process_file(f):
    """
    Takes a file, gets all of the names/dirs and filters it. 
    """
    raw_f = f.split('/')[-1]
    new_f_name = format_new_file(f, 'filtered')
    new_f_path = name_file_path(new_f_name, outdir) ## HARD coded directory, just need to rerun
    
    try:
        df = filter_tweet_df(f)
        df.to_csv(new_f_path, index=False)
        
        print (raw_f + ': x')
        return {'f': raw_f, 'succeed': True, 'new_f': new_f_path}
    except Exception as e:
        print ("Couldn't read: \t{}".format(f), e)
        return {'f': raw_f, 'succeed': False, 'new_f': new_f_path}

#### Variables

In [3]:
# Grab all of the files
raw_tweets = ls_files_list(external_scrape_dir)

# new directory
outdir = make_new_dir_date(processed_finals_dir)

Directory already exists, but you can still have the file name


#### Run. That. Shit.

In [None]:
# set up
start = time()  # timer
num_files = len(raw_tweets)

# execution
log_list = parallelize_series(raw_tweets, process_file)
log_df = pd.DataFrame(log_list)

# clean up
end = time()
num_converted = log_df[log_df['succeed'] == True].shape[0]

# reporting
print ('{} / {} files were filtered.'.format(num_converted, (num_files-num_converted)))
print ('{} files took: {} seconds.'.format(5, end-start))

Couldn't read: 	../../data/external/scrape/tweets_immigrant_34322.csv
Couldn't read: 	../../data/external/scrape/tweets_immigrant_34324.csv
Couldn't read: 	../../data/external/scrape/tweets_immigrant_34345.csv


#### Log File

In [None]:
log_f = name_file_path('log.csv', outdir)
log_df.to_csv(log_f, index=False)

# take a peek
log_df.head()

#### Combine all files

In [4]:
!tree -L 2 ../../data/

../../data/
├── canonical
│   ├── test.db
│   └── tweets.db
├── external
│   ├── classifier
│   ├── hatebase
│   ├── maps
│   └── scrape -> /data/backed_up/twitter/data/Immigrant/
└── processed
    ├── class
    ├── finals
    └── scrape

10 directories, 2 files


In [14]:
indir = make_new_dir_date(processed_finals_dir)

log_df = pd.read_csv((indir + 'log.csv'))

Directory already exists, but you can still have the file name


In [28]:
# grab only the succesful files
#good_fs = log_df[log_df.succeed == True]['new_f'].tolist()
good_fs = ls_files_list(indir)

combined_f = name_file_path('combined.csv', indir)

In [31]:
## write it out
# first to create headers
(pd.read_csv(good_fs[0])).to_csv(combined_f, index=False)

with open(combined_f, 'a') as f:
    for f_ in good_fs:
        # skip the log
        if (f_.split('/')[-1] != 'log.csv'):
            (pd.)

../../data/processed/finals/5-6/log.csv


In [26]:
combined_f

'../../data/processed/finals/5-6/combined.csv'