# Divide the tweets to state-month files

In [1]:
%%time
import pandas as pd
import os
from tqdm.notebook import tqdm,trange
import geopandas as gpd
from pandarallel import pandarallel
import us
# import tqdm
pandarallel.initialize(progress_bar=True, nb_workers= 60)
tqdm.pandas()

def get_data(row):
    file_name = row["in_file_name"]
    try:
        data = pd.read_csv(input_path+ file_name, delimiter = '\t',dtype={"fips":str,"GPS":str})
        data['tweet_date'] = pd.to_datetime(data['tweet_date'], errors='coerce')
        return data
    except Exception as e:
        print(e)
        return None
    
def split_ms(input_path,output_path,year):
    print("reading..")
    pd_inputFiles = pd.DataFrame(os.listdir(input_path),columns=["in_file_name"])
    pd_inputFiles["month"] = pd_inputFiles["in_file_name"].map(lambda x: list(map(int,x.split(".")[0].split("_")))[1])
    # pd_inputFiles = pd_inputFiles[pd_inputFiles["month"] ==1]
    pd_inputFiles["data"] =  pd_inputFiles.parallel_apply(get_data,axis=1)
    # pd_inputFiles["data"] =  pd_inputFiles.progress_apply(get_data,axis=1)
    
    print("writing..")
    for k,data in pd_inputFiles.groupby("month"):
        # if k !=1:
        #     continue
        if data["data"] is None:
            continue
        if data["data"].values.shape[0] == 0:
            print("&&")
            continue
        data_ = pd.concat(data["data"].values,ignore_index=True)
        for i,item in data_.groupby("state"):
            # if i != "TX":
            #     continue
            fine_path = output_path + "%d_%d_%s.parquet"%(year,k,i)
            # if os.path.exists(fine_path):
            #     print("##")
            #     continue
            try:
                item = item.astype({"user_location":str})
                item.to_parquet(fine_path)
            except Exception as e:
                print(e)

# df.apply(func)
def save_tweets(x,alread_set):
    file_name = x.file_name

    s = us.states.lookup(file_name.split("_")[2])
    # print(s.abbr)
    # if s.abbr == "TX":
    #     return 0
    
    cenus_shp = gpd.read_file(path + file_name)
    cenus_shp = cenus_shp.to_crs("EPSG:4326")
    
    for i in range(1,13):
        out_filename = "%d_%d_%s.csv"%(year,i,s.abbr)
        if out_filename in alread_set:
            continue
        

        f = output_path + "%d_%d_%s.parquet"%(year,i,s.abbr)
        # print(f)
        
        # cenus_shp = gpd.read_file(path + file_name)
        # cenus_shp = cenus_shp.to_crs("EPSG:4326")
        try:
            df = pd.read_parquet(f)
        except Exception as e:
            # print(e,f)
            continue
        df = gpd.GeoDataFrame(df, geometry= gpd.points_from_xy(df.longitude,df.latitude),crs="EPSG:4326")
        join_inner_df = df.sjoin(cenus_shp, how="inner")
        join_inner_df= join_inner_df[['message_id', 'tweet_date', 'tweet_text', 'tweet_lang', 'place',
               'user_id', 'user_name', 'user_location', 'followers', 'friends',
               'latitude', 'longitude', 'GPS', 'spatialerror', 'fips', 'county',
               'state',  'GEOID20']]
        join_inner_df.to_csv(outpath + out_filename,encoding = "utf-8-sig")
        
        # break
    return 1

            
path = "data/census_data/"
year = 2015
print(year)
input_path = "RyanProject/tweets/%d_new/"%year
output_path = 'RyanProject/tweet_states/%d_new/parquet/'%year
outpath = 'RyanProject/results/census/%d_new/'%year

os.makedirs(output_path,exist_ok=True)
os.makedirs(outpath,exist_ok=True)

split_ms(input_path,output_path,year)       

print("done!")
alread_set = set()
# alread_set =  set(os.listdir(outpath))
states_list = pd.Series(os.listdir(path),name="file_name").to_frame()
# states_list = states_list[states_list["file_name"]=="tl_2021_48_tabblock20.zip"]
temp=states_list.progress_apply(lambda x:save_tweets(x,alread_set),axis=1)
# temp=states_list.parallel_apply(lambda x:save_tweets(x,alread_set),axis=1)


INFO: Pandarallel will run on 60 workers.
INFO: Pandarallel will use Memory file system to transfer data between the main process and workers.
2015
reading..


VBox(children=(HBox(children=(IntProgress(value=0, description='0.00%', max=13), Label(value='0 / 13'))), HBox…

writing..
done!


  0%|          | 0/51 [00:00<?, ?it/s]

CPU times: user 1h 58min 44s, sys: 2min 22s, total: 2h 1min 7s
Wall time: 2h 2min 30s


Chwck if they include all the states

In [8]:
import os
all_states=set()
for file_name in os.listdir(outpath):
    # print(file_name)
    state = file_name.split("_")[-1].strip(".csv")
    # print(states)
    all_states.add(state)
    # break
# all_states