In [1]:
import ray
import json
import time
import glob
from src.seanmod import *

## Experiment 1: Simple Test of the Ray Library:



In [11]:
#Only call once! 
ray.init(num_cpus=2)

@ray.remote
def f(x):
    i=0;
    for i in range(x,300000001):
        i = i+1
    return i

@ray.remote
def g(x):
    return pd.DataFrame({"A":[1,2],"B":[3,4]})

@ray.remote
def h(x):
    return x

    
futures = [f.remote(i) for i in range(10)]
print(ray.get(futures))

ray.shutdown()

2019-10-11 14:05:44,203	INFO resource_spec.py:205 -- Starting Ray with 25.68 GiB memory available for workers and up to 12.84 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).


{'node_ip_address': '192.168.0.185',
 'redis_address': '192.168.0.185:48509',
 'object_store_address': '/tmp/ray/session_2019-10-11_14-05-44_200435_6340/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2019-10-11_14-05-44_200435_6340/sockets/raylet',
 'webui_url': None,
 'session_dir': '/tmp/ray/session_2019-10-11_14-05-44_200435_6340'}

[300000001, 300000001, 300000001, 300000001, 300000001, 300000001, 300000001, 300000001, 300000001, 300000001]


### Note the Following:

- Only call ray.init() once per session.
    - can configure memory, number of physical cores, object space with this method.

#### Simplest Run Pattern to run: 

1) Call ray.init(...with args)

2) Encapsulate your entire block of code in a function, parameterized.

3) For inputs, create a list of tuples, pass to function block.

4) Iterate over list, and call function.remote(< input >). A reference address will be returned as the function is computing.

5) use the ray.get() function on given IDs, to get results.

6) Call ray.shutdown() to clean up idle processes, when work is done.


### Experiment 2:

How long does it take to load our three test files (450mb), and write them to disk?




In [10]:
#Simple Timeit usage https://stackoverflow.com/questions/7370801/measure-time-elapsed-in-python

@ray.remote
def get3frames(fileName):

    #Lets read in the tweets, and convert them to a DataFrame
    topLevelFields = ["id","created_at", "text","quote_count","reply_count","retweet_count",
                      "favorite_count","user_id","user_screen_name"]
    
    userFields = ["id", "created_at", "name", "screen_name", "verified", "followers_count", 
                  "friends_count", "favourites_count", "statuses_count", 
                 "profile_background_color","profile_text_color"]

    userCheckDict = {"test": 1}

    #make an empty DF.
    tweetDF = pd.DataFrame(columns=topLevelFields + ["hashtag_list"])
    userDF = pd.DataFrame(columns=userFields)  #list(map(lambda x: "user_" + x , userFields)))

    baseDir = "./data/"

    def parsetweet(jsonObj):
        retList = []
        for label in topLevelFields[:len(topLevelFields) - 2]: #Last two fields in user sub-obj.
            retList.append(jsonObj[label])

        hashTagString = ""
        if jsonObj["entities"]["hashtags"]:
            for item in jsonObj["entities"]["hashtags"]:
                hashTagString = hashTagString + "," + item["text"]
            hashTagString = hashTagString[1:]
        
        retList.append(hashTagString)
        
        retList.append(jsonObj["user"]["id"])
        retList.append(jsonObj["user"]["screen_name"])
        return retList       

    def parseuser(jsonObj):
        #first, extract id from jsonObj for user. Is it in our table of users?
        #To check (quickly) for a users presence, we need a hash table.
        #In python, dictionaries serve the same purpose
        retList = []
        if jsonObj["user"]["id"] not in userCheckDict:
            userCheckDict[jsonObj["user"]["id"]] = 1
            for label in userFields:
                retList.append(jsonObj["user"][label])
 
        return retList
        
    with open(baseDir + fileName + ".json", "r") as tweets_file:
        for i,line in enumerate(tweets_file):
            try:
                tweet = json.loads(line)
                tweetDF.loc[tweetDF.shape[0]] = parsetweet(tweet)
                retList = parseuser(tweet)
                if retList: #Not empty: 
                    userDF.loc[userDF.shape[0]] = retList
            except ValueError as e:
                print('Handling run-time error:', e)            
        tweets_file.close()
    tweetDF.to_csv("./data/stage1/" + fileName + "_tweets.csv")
    userDF.to_csv("./data/stage1/" + fileName + "_users.csv")
    return 
    

In [19]:
get3frames("fiveline")

Handling run-time error: Extra data: line 1 column 11825 (char 11824)


Now to move onto the actual test with three files!

In [9]:
ray.shutdown()

In [11]:
#Only call once! 
timeStart = time.time()

ray.init(num_cpus=3) #One File per CPU

nameList = ["comments1","comments2","comments3"]    
idLists = [get3frames.remote(name) for name in nameList]

print(ray.get(idLists))

ray.shutdown()

timeEnd = time.time()

print(timeEnd - timeStart)

2019-10-12 09:53:09,612	INFO resource_spec.py:205 -- Starting Ray with 23.93 GiB memory available for workers and up to 11.99 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).


{'node_ip_address': '192.168.0.185',
 'redis_address': '192.168.0.185:20761',
 'object_store_address': '/tmp/ray/session_2019-10-12_09-53-09_609519_3130/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2019-10-12_09-53-09_609519_3130/sockets/raylet',
 'webui_url': None,
 'session_dir': '/tmp/ray/session_2019-10-12_09-53-09_609519_3130'}

[None, None, None]
377.4397077560425


### Experiment 3: 

In [12]:
@ray.remote
def getframes(argTuple):   
    index = argTuple[0]    
    fileName = argTuple[1]

    #Lets read in the tweets, and convert them to a DataFrame
    topLevelFields = ["id","created_at", "text","quote_count","reply_count","retweet_count",
                      "favorite_count","user_id","user_screen_name"]

    userFields = ["id", "created_at", "name", "screen_name", "verified", "followers_count", 
                  "friends_count", "favourites_count", "statuses_count", 
                 "profile_background_color","profile_text_color"]

    userCheckDict = {"test": 1}

    #make an empty DF.
    tweetDF = pd.DataFrame(columns=topLevelFields + ["hashtag_list"])
    userDF = pd.DataFrame(columns=userFields)  #list(map(lambda x: "user_" + x , userFields)))

    baseDir = "./data/"

    def parsetweet(jsonObj):
        retList = []
        for label in topLevelFields[:len(topLevelFields) - 2]: 
            retList.append(jsonObj[label])

        hashTagString = ""
        if jsonObj["entities"]["hashtags"]:
            for item in jsonObj["entities"]["hashtags"]:
                hashTagString = hashTagString + "," + item["text"]
            hashTagString = hashTagString[1:]
            
        #Finally, we need to add foreign keys to the user table.
        retList.append(jsonObj["user"]["id"])
        retList.append(jsonObj["user"]["screen_name"])
        
        retList.append(hashTagString)
        return retList       

    def parseuser(jsonObj):
        #first, extract id from jsonObj for user. Is it in our table of users?
        #To check (quickly) for a users presence, we need a hash table.
        #In python, dictionaries serve the same purpose
        retList = []
        if jsonObj["user"]["id"] not in userCheckDict:
            userCheckDict[jsonObj["user"]["id"]] = 1
            for label in userFields:
                retList.append(jsonObj["user"][label])

        return retList
        
    with open(fileName, "r") as tweets_file:
        for i,line in enumerate(tweets_file):
            try:
                tweet = json.loads(line)
                tweetDF.loc[tweetDF.shape[0]] = parsetweet(tweet)
                retList = parseuser(tweet)
                if retList: #Not empty: 
                    userDF.loc[userDF.shape[0]] = retList
            except ValueError as e:
                print('Handling run-time error:', e)            
        tweets_file.close()
    tweetDF.to_csv(baseDir + "stage1/tweets/" + "tweets" + str(index) + ".csv")
    userDF.to_csv(baseDir + "stage1/users/" + "users" + str(index) + ".csv")
    return 

In [13]:
#Only call once! 
timeStart = time.time()

ray.init(num_cpus=8) #One File per CPU

startDir = "./data/tweets/"
enumList = list(enumerate(glob.glob(startDir+"*")))

idLists = [getframes.remote(name) for name in enumList]

print(ray.get(idLists))

ray.shutdown()

timeEnd = time.time()

print(timeEnd - timeStart)

2019-10-12 10:06:05,132	INFO resource_spec.py:205 -- Starting Ray with 23.93 GiB memory available for workers and up to 11.97 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).


{'node_ip_address': '192.168.0.185',
 'redis_address': '192.168.0.185:51504',
 'object_store_address': '/tmp/ray/session_2019-10-12_10-06-05_129273_3130/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2019-10-12_10-06-05_129273_3130/sockets/raylet',
 'webui_url': None,
 'session_dir': '/tmp/ray/session_2019-10-12_10-06-05_129273_3130'}

[2m[36m(pid=5459)[0m Handling run-time error: Expecting value: line 1 column 1 (char 0)
[None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None]
1188.776025056839


#### Clean Up Work: Lets join all of our dataframes together:


In [17]:
tweetPath = "./data/stage1/tweets/"
userPath = "./data/stage1/users/"

enumTweets = glob.glob(tweetPath+"*")
enumUsers = glob.glob(userPath+"*")

In [21]:
list(map(lambda x: int(x.replace("./data/stage1/tweets/tweets","").replace(".csv","")),enumTweets))
#enumUsers

[4, 13, 2, 17, 0, 19, 12, 7, 11, 15, 18, 8, 9, 10, 3, 14, 5, 1, 6, 16]

In [None]:
#initialize our Dataframe

    

In [67]:
def concatfile(listPaths,subdir):
    fileMap={}
    for path in listPaths:
        index = int(path.replace(subdir,"").replace(".csv",""))
        fileMap[index] = path
    
    return fileMap

def mergefiles(pathFilesDir,lengthCols,extra):
    listFiles = glob.glob(pathFilesDir+"*")
    #print(listFiles)
    #print(pathFilesDir+extra)
    fileMap = concatfile(listFiles,pathFilesDir+extra)
    frames = [] 
    
    frames.append(pd.read_csv(fileMap[0],header=0,usecols=list(range(1,lengthCols))))
    for i in range(1,len(listFiles)):
        frames.append(pd.read_csv(fileMap[0],header=0,usecols=list(range(1,lengthCols))))
    final = pd.concat(frames)
    final.reset_index(drop=True, inplace=True)
    return final    

    

In [60]:
pathFilesDir="./data/stage1/tweets/"
concatfile(glob.glob(pathFilesDir+"*"),pathFilesDir + "tweets")

{4: './data/stage1/tweets/tweets4.csv',
 13: './data/stage1/tweets/tweets13.csv',
 2: './data/stage1/tweets/tweets2.csv',
 17: './data/stage1/tweets/tweets17.csv',
 0: './data/stage1/tweets/tweets0.csv',
 19: './data/stage1/tweets/tweets19.csv',
 12: './data/stage1/tweets/tweets12.csv',
 7: './data/stage1/tweets/tweets7.csv',
 11: './data/stage1/tweets/tweets11.csv',
 15: './data/stage1/tweets/tweets15.csv',
 18: './data/stage1/tweets/tweets18.csv',
 8: './data/stage1/tweets/tweets8.csv',
 9: './data/stage1/tweets/tweets9.csv',
 10: './data/stage1/tweets/tweets10.csv',
 3: './data/stage1/tweets/tweets3.csv',
 14: './data/stage1/tweets/tweets14.csv',
 5: './data/stage1/tweets/tweets5.csv',
 1: './data/stage1/tweets/tweets1.csv',
 6: './data/stage1/tweets/tweets6.csv',
 16: './data/stage1/tweets/tweets16.csv'}

In [70]:
fullTweetsDF = mergefiles("./data/stage1/tweets/",11,"tweets")
fullTweetsDF.to_csv("./data/stage2/tweets400k.csv")
fullTweetsDF.shape
fullTweetsDF.head()


(400000, 10)

Unnamed: 0,id,created_at,text,quote_count,reply_count,retweet_count,favorite_count,user_id,user_screen_name,hashtag_list
0,1173228641628622851,Sun Sep 15 13:34:40 +0000 2019,RT @ProudCanadianN1: You need to invite the @p...,0,0,0,0,1100072322977939459,PPC_Retweets,
1,1173228642316341248,Sun Sep 15 13:34:40 +0000 2019,RT @EyesOfBlue06: I'm in Ontario.. destroyed ...,0,0,0,0,703601953,cwhoward24,"TrudeauMustGo,LiberalsMustGo,Scheer4PM"
2,1173228643272683521,Sun Sep 15 13:34:40 +0000 2019,RT @CBCPolitics: Scheer will stand by candidat...,0,0,0,0,62314888,neilpk70,
3,1173228643302010886,Sun Sep 15 13:34:40 +0000 2019,RT @acoyne: Contrary to the old Watergate prov...,0,0,0,0,35162290,SeeClickFlash,
4,1173228644107313153,Sun Sep 15 13:34:40 +0000 2019,RT @AndersonBooz: Because @AndrewScheer lies t...,0,0,0,0,1707977161,M_Simmonds_,


In [72]:
fullUsersDF = mergefiles("./data/stage1/users/",12,"users")
fullUsersDF.to_csv("./data/stage2/users400k.csv")
fullUsersDF.shape
fullUsersDF.head()

(144320, 11)

Unnamed: 0,id,created_at,name,screen_name,verified,followers_count,friends_count,favourites_count,statuses_count,profile_background_color,profile_text_color
0,1100072322977939459,Mon Feb 25 16:37:34 +0000 2019,PPC Retweets 🇨🇦,PPC_Retweets,False,1344,3283,5222,60718,F5F8FA,333333
1,703601953,Wed Jul 18 18:58:23 +0000 2012,Chad Howard,cwhoward24,False,111,170,2057,1007,C0DEED,333333
2,62314888,Sun Aug 02 18:13:02 +0000 2009,Count Floyd 🇨🇦,neilpk70,False,619,720,160580,57884,C0DEED,333333
3,35162290,Sat Apr 25 06:09:36 +0000 2009,Mark O'Henly,SeeClickFlash,False,1449,659,17231,32665,352726,3E4415
4,1707977161,Wed Aug 28 18:26:29 +0000 2013,Matt Simmonds,M_Simmonds_,False,232,620,2327,3303,C0DEED,333333


In [52]:
fullUsersDF.head()

Unnamed: 0,id,created_at,text,quote_count,reply_count,retweet_count,favorite_count,user_id,user_screen_name,hashtag_list
0,1173228641628622851,Sun Sep 15 13:34:40 +0000 2019,RT @ProudCanadianN1: You need to invite the @p...,0,0,0,0,1100072322977939459,PPC_Retweets,
1,1173228642316341248,Sun Sep 15 13:34:40 +0000 2019,RT @EyesOfBlue06: I'm in Ontario.. destroyed ...,0,0,0,0,703601953,cwhoward24,"TrudeauMustGo,LiberalsMustGo,Scheer4PM"
2,1173228643272683521,Sun Sep 15 13:34:40 +0000 2019,RT @CBCPolitics: Scheer will stand by candidat...,0,0,0,0,62314888,neilpk70,
3,1173228643302010886,Sun Sep 15 13:34:40 +0000 2019,RT @acoyne: Contrary to the old Watergate prov...,0,0,0,0,35162290,SeeClickFlash,
4,1173228644107313153,Sun Sep 15 13:34:40 +0000 2019,RT @AndersonBooz: Because @AndrewScheer lies t...,0,0,0,0,1707977161,M_Simmonds_,


In [35]:
list(range(1,11))

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]