In [1]:
import category_encoders as ce
import pandas as pd
import re
import gensim.downloader as api

def hash_encode_username(df_t):
    """[hash binning of username into 512 bins]

    Args:
        df_t ([pd.DataFrame]): [Input dataframe]

    Returns:
        [pd.DataFrame]: [returns dataframe with username replaced]
    """
    encoder=ce.HashingEncoder(cols='Username',n_components=512,max_process=1)
    newdf=encoder.fit_transform(df_t)
    return newdf
def clean_timestamp(df_t):
    """[converts timestamp data from string to pandas datetime]

    Args:
        df_t ([pd.DataFrame]): [Input Dataframe]

    Returns:
        [pd.DataFrame]: [returns dataframe with timestamp column cleaned]
    """
    df_t['Timestamp']=pd.to_datetime(df_t['Timestamp'])
    return df_t
def create_entitylist(df_t):
    """[Converts entity list into column with with mean of entity confidence score
    , median of entity confidence score and max of entity confidence score. Also
    converts the entity into glove embedding, for one of them whenever possible
     ]

    Args:
        df_t ([pd.DataFrame]): [Input DataFrame]

    Returns:
        [pd.DataFrame]: [return Dataframe with entity column processed]
    """
    comblist=[]
    comblist2=[]
    for i in df_t.Entities.str.split(';').tolist():
        newlist=[]
        stringlist=[]
        for j in i:
            if j=='null' or j=='':
                'here'
            else:
                newlist.append(j.split(":")[-1])
                stringlist.append(j.split(":")[1])
        newlist = newlist[:42]+ [None]*(42 - len(newlist[:42]))
        comblist.append(newlist[:-1])
        comblist2.append(stringlist)
    newdf=pd.DataFrame(comblist)
    newdf = newdf.apply(pd.to_numeric)
    newdf2=newdf.mean(axis=1)
    newdf3=newdf.median(axis=1)
    newdf4=newdf.max(axis=1)
    df_t=pd.concat([df_t,newdf2], axis=1)
    df_t=pd.concat([df_t,newdf3], axis=1)
    df_t=pd.concat([df_t,newdf4], axis=1)


    dfnew=newdf.idxmax(axis=1)
    df_t=df_t.drop('Entities',axis=1)
    slist=[]
    corpus=ray.get(stored_model)
    for index,strings in enumerate(comblist2):
        if strings!=[]:
            if corpus.has_index_for(strings[int(dfnew.iloc[index])].lower()):
                embed=corpus[strings[int(dfnew.iloc[index])].lower()]
            else:
                embed=None
            count=0
            while embed is None and count!=len(strings):
                if corpus.has_index_for(strings[count].lower()):
                    embed=corpus[strings[count].lower()]
                else:
                    embed=None            
                count+=1
        else:
            embed=None
            
    if embed is  None:
        embed=[None]*200
    slist.append(embed)
    sdf=pd.DataFrame(slist)
    df_t=pd.concat([df_t, sdf], axis=1)

    return df_t
            
def split_sentiment(df_t):
    """[Split sentiment columns into 2 with only their absolutes remaining]

    Args:
        df_t ([pd.DataFrame]): [Input Dataframe]

    Returns:
        [pd.DataFrame]: [return Dataframe with sentiment column processed]
    """
    newdf=pd.DataFrame(df_t.Sentiment.str.split('-').tolist())
    newdf = newdf.apply(pd.to_numeric)
    df_t=df_t.drop('Sentiment',axis=1)
    df_t=pd.concat([df_t, newdf], axis=1)
    return df_t
            
def hash_encode_column(df):
    """[hash encode column into 64 columns]

    Args:
        df ([pd.Series]): [Input a series and return a dataframe]

    Returns:
        [pd.Dataframe]: [Dataframe of series converted into 64 columns]
    """
    encoder=ce.HashingEncoder(n_components=64,max_process=1)
    newdf=encoder.fit_transform(pd.DataFrame(df))
    return newdf
def hash_encode_column2(df):
    """[hash encode column(pd.series) into 256 columns]

    Args:
        df ([pd.Series]): [Input a series and return a dataframe]

    Returns:
        [pd.Dataframe]: [Dataframe of series converted into 256 columns]
    """
    encoder=ce.HashingEncoder(n_components=256,max_process=1)
    newdf=encoder.fit_transform(pd.DataFrame(df))
    return newdf
def clean_general(df_t,column):
    """[hash encode a column after splitting the column into a list.]

    Args:
        df_t ([pd.Dataframe]): [input dataframe]
        column ([string]): ['column name to be cleaned']

    Returns:
        [pd.DataFrame]: [Return the dataframe after it has been hash encoded and the column has been 
        converted into a list]
    """
    totallist=[]
    listtoprocess=df_t[column].str.split(' ').tolist()
    for j in listtoprocess:
        if j==None:
            totallist.append([None])
        else:
            if j[0]==None or j[0]=="null;" or j[0]=="":
                totallist.append([None])
            else:
                totallist.append([j[0]])
    df=pd.DataFrame(totallist)
    df_t=pd.concat([df_t,hash_encode_column(df)], axis=1)
    return df_t
def count_general(df_t,column):
    """[count how many items are there in the list for 1 column]

    Args:
        df_t ([pd.Dataframe]): [input dataframe]
        column ([string]): ['column name to be cleaned']

    Returns:
        [pd.DataFrame]: [Return the dataframe with the input column counted and replaced]
    """

    df_t[column]=df_t[column].str.split(' ').str.len() 
    return df_t[(df_t[column]>270)==False]
    
def clean_url(df_t):
    """[Clean the Url column and hash encode the first item in the list for that column]

    Args:
        df_t ([pd.Dataframe]): [Input Dataframe]

    Returns:
        [pd.Dataframe]: [output Dataframe with url column cleaned, converted to list and encoded]
    """
    p = re.compile(":\/\/(.*?)\/|:\/\/(.*?)$")
    totallist=[]
    listtoprocess=df_t['URLs'].str.split(':-:').tolist()
    for j in listtoprocess:
        if j==None:
            totallist.append([None])
        else:
            if j[0]==None or j[0]=="null;" or j[0]=="":
                totallist.append([None])
            else:
                totallist.append([p.findall(j[0])[0][0]])
    df=pd.DataFrame(totallist)
    df_t=pd.concat([df_t,hash_encode_column2(df)], axis=1)
 

    return df_t


    
def count_url(df_t):
    """[count the number of url after the column has been processed to lists]

    Args:
        df_t ([pd.Dataframe]): [Input Dataframe]

    Returns:
       [pd.DataFrame]: [Return the dataframe with the url column counted and replaced]
    """
    df_t['URLs']=df_t.URLs.str.split(' ').str.len() 
    return df_t



def split_timestamp_to_cols(df):
    """[Convert Timestamp into sin and consine second and months to 
    find seasonality patterns within the data for the models to pick up]

    Args:
        df ([pd.DataFrame]): [Input Dataframe]

    Returns:
        [pd.Dataframe]: [returns dataframe with timestamp converted]
    """
  total_seconds_in_day = 60 * 60 * 24
  total_months_in_year = 12

  timestamps = df.Timestamp
  second_of_day = pd.to_datetime(timestamps).dt.second + \
                  pd.to_datetime(timestamps).dt.minute * 60 + \
                  pd.to_datetime(timestamps).dt.hour * 60 * 60
  
  df["sin_second"] = np.sin(2*np.pi*second_of_day/total_seconds_in_day)
  df["cos_second"] = np.cos(2*np.pi*second_of_day/total_seconds_in_day)

  month_of_year = pd.to_datetime(timestamps).dt.month

  df["sin_month"] = np.sin(2*np.pi*month_of_year/total_months_in_year)
  df["cos_month"] = np.cos(2*np.pi*month_of_year/total_months_in_year)
  
  df["year"] = pd.to_datetime(timestamps).dt.year

  df.drop(labels="Timestamp", axis="columns", inplace=True)

  return df


columnlist=[]
for i in range(256):
    columnlist.append('usernamehash_col'+str(i))
columnlist+=['Timestamp','#Followers','#Friends','Retweets',"#Favourites",'Mentions_count',"Hashtag_counts",'URL_counts','confidence_mean','confidence_median','confidence_max']
for i in range(200):
    columnlist.append("Embeddings_"+str(i))
for i in range(2):
    columnlist.append('Sentiments'+str(i))
for i in range(64):
    columnlist.append('Mentionshash_col'+str(i))
for i in range(64):
    columnlist.append('Hashtagshash_col'+str(i))
for i in range(256):
    columnlist.append('URLshash_col'+str(i))        


    
        



In [2]:
import random

Use Ray to parallel process the data

In [None]:
%%time
import numpy as np
import psutil
import ray
import scipy.signal

num_cpus = psutil.cpu_count(logical=False)

ray.init(num_cpus=num_cpus)

@ray.remote
def cleanall(name):
    print(name)
    print('processed'+name[7:])
    df=pd.read_feather(name)
    df=df.drop('level_0',axis=1)
    df=df.drop('index',axis=1)
    print('1')
    df=hash_encode_username(df)
    print('2')
    df=clean_timestamp(df)
    print('3')
    df=create_entitylist(df)
    print('4')
    df= split_sentiment(df)
    print('5')
    df=clean_general(df,"Mentions")
    print('6')
    df=count_general(df,'Mentions')
    print('7')
    df=clean_general(df,"Hashtags")  
    print('8')
    df=count_general(df,'Hashtags')
    print('9')
    df=clean_url(df)
    print('10')
    df=count_url(df)
    print('11')
    df=split_timestamp_to_cols(df)
    df=df.drop('Tweet_Id',axis=1)
    df.columns=columnlist
    df.to_feather('processed3'+name[7:])
corpus = api.load('glove-twitter-200')
stored_model=ray.put(corpus)



2021-07-29 03:58:43,711	INFO services.py:1272 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


In [None]:
files =  glob.glob('./split/*.ftr')
random.shuffle(files)
for file in files:
    cleanall.remote(file)