In [1]:
from tqdm import tqdm
from matplotlib import pyplot as plt
import time
import numpy as np
import sys
import h5py

import networkx as nx
import sqlite3 as s3
from datetime import datetime, timedelta

#from multiprocessing import Process, Pool, Queue
import multiprocessing as mp

In [2]:
conn = s3.connect('CTROT')
c = conn.cursor()

## Presets

In [None]:
#Date handling
#Get the earliest date in the data
c.execute("SELECT * FROM retweets ORDER BY date(created_at) ASC Limit 1")
start_date = datetime.strptime(c.fetchall()[0][0],"%Y-%m-%d 00:00:00") 
#Get the final date in the data
c.execute("SELECT * FROM retweets ORDER BY date(created_at) DESC Limit 1")
end_date = datetime.strptime(c.fetchall()[0][0],"%Y-%m-%d 00:00:00")
total_days = end_date - start_date
total_days = total_days.days

In [3]:
start_date = datetime.strptime("2009-10-21 00:00:00","%Y-%m-%d 00:00:00")
end_date = datetime.strptime("2017-10-24 00:00:00","%Y-%m-%d 00:00:00")
total_days = 2925

In [4]:
print start_date
print end_date
print total_days

2009-10-21 00:00:00
2017-10-24 00:00:00
2925


In [5]:
conn.close()

## Parallel processing k-values with queue

In [6]:
class Networker(mp.Process):
    def __init__(self, task_queue, write_queue):
        mp.Process.__init__(self)
        self.task_queue = task_queue
        self.write_queue = write_queue
    
    def run(self):
        proc_name = self.name
        did_this = False
        while True:
            next_date = self.task_queue.get()
            if next_date==None:
                print '%s: Exiting' % proc_name
                self.task_queue.task_done()
                break
            answer = next_date()
            for thing in answer:
                self.write_queue.put(thing)
                if thing[1]=='2017-10-24 00:00:00' and did_this==False:
                    self.write_queue.put(None)
                    did_this = True
            self.task_queue.task_done()
        return
    
class createNetwork(object):
    def __init__(self,start_date, days):
        self.start_date = start_date
        self.days = days
        if type(self.start_date)==str:
            self.start_date = datetime.strptime(start_date,"%Y-%m-%d 00:00:00")

    def __call__(self):
        g = nx.Graph()
        conn = s3.connect('CTROT')
        c = conn.cursor()
        print self.start_date
        sys.stdout.flush()
        for current_date in (self.start_date + timedelta(x) for x in range(self.days)):
            looking = True
            tries = 0
            while looking:
                try:
                    c.execute("SELECT * FROM retweets WHERE created_at='"+current_date.strftime("%Y-%m-%d 00:00:00")+"'" )
                    looking = False
                except Exception as inst:
                    time.sleep(0.1)
                    tries+=1
                    print "Attempted %i selections..."%tries,
                    print inst
                    sys.stdout.flush()
                    continue
            for user in c.fetchall():
                g.add_edge(user[1],user[2])
        g.remove_edges_from(g.selfloop_edges())

        #Then calculate k-values and output them into the database 
        k_values_doc = nx.core_number(g)
        k_values = [(user,self.start_date.strftime("%Y-%m-%d 00:00:00"),k_values_doc[user]) for user in k_values_doc]

#         #Then calculate normalization and output it to a file
#         g_norm = nx.Graph(g)
#         if nx.number_of_nodes(g_norm)>4:
#             g_norm = nx.double_edge_swap(g_norm, nswap=2*nx.number_of_nodes(g_norm), max_tries=float("inf"))
#         k_values_doc = nx.core_number(g_norm)
#         k_values_norm = [(user,self.start_date.strftime("%Y-%m-%d"),k_values_doc[user]) for user in k_values_doc]
       
        conn.close()
        return k_values
    
class Writer(mp.Process):
    def __init__(self,write_queue,result_queue,folder):
        mp.Process.__init__(self)
        self.write_queue = write_queue
        self.result_queue = result_queue
        self.folder = folder

    def run(self):
        proc_name = self.name
        with h5py.File(self.folder+'data.hdf5','w') as f:
            while True:
                next_write = self.write_queue.get()
                if next_write == None:
                    print '%s: Exiting' % proc_name
                    self.write_queue.task_done()
                    break
                #What form does next_write take if the queue is just a queue of tuples or something?
                #user, date, value
                if next_write[0] not in f.keys():
                    f.create_dataset(str(next_write[0]),(total_days,),dtype='i')
                f[str(next_write[0])][t.index(next_write[1])] = next_write[2]
                self.write_queue.task_done()
                answer = "Done."
                self.result_queue.put(answer)
        return

In [8]:
if __name__ == '__main__':
    test_date_num = total_days
    t = [start_date + timedelta(x) for x in range(test_date_num)]
    t = [current_date.strftime("%Y-%m-%d 00:00:00") for current_date in t]
    
    #File structure
    folder = 'k_timelines/'
    
    # Establish communication queues
    dates = mp.JoinableQueue()
    writes = mp.JoinableQueue()
    results = mp.Queue()
    
    # Start consumers
    num_networkers = mp.cpu_count()
    print 'Creating %d network creators' % num_networkers
    networkers = [ Networker(dates, writes)
                  for i in xrange(num_networkers) ]
    for w in networkers:
        w.daemon = True
        w.start()
    writer = Writer(writes,results,folder)
    #writer.daemon = True
    writer.start()
    
    # Enqueue jobs
    num_jobs = test_date_num
    for date in t:
        dates.put(createNetwork(date, 7))
    
    # Add a poison pill for each consumer
    for i in xrange(num_networkers):
        dates.put(None)

    # Wait for all of the tasks to finish
    #dates.join()
    #writes.put(None)
    
    # Start printing results
#     while num_networkers:
#         result = results.get()
#         print 'Result:', result
#         num_networkers -= 1

Creating 8 network creators
2009-10-24 00:00:00
2009-10-23 00:00:00
2009-10-25 00:00:00
2009-10-22 00:00:00
2009-10-26 00:00:00
2009-10-27 00:00:00
2009-10-21 00:00:00
2009-10-28 00:00:00
2009-10-29 00:00:00
2009-10-30 00:00:00
2009-10-31 00:00:00
2009-11-05 00:00:00
2009-11-01 00:00:00
2009-11-03 00:00:00
2009-11-04 00:00:00
2009-11-02 00:00:00
2009-11-07 00:00:00
2009-11-06 00:00:00
2009-11-08 00:00:00
2009-11-09 00:00:00
2009-11-10 00:00:00
2009-11-11 00:00:00
2009-11-12 00:00:00
2009-11-15 00:00:00
2009-11-13 00:00:00
2009-11-17 00:00:00
2009-11-14 00:00:00
2009-11-19 00:00:00
2009-11-18 00:00:00
2009-11-20 00:00:00
2009-11-21 00:00:00
2009-11-16 00:00:00
2009-11-22 00:00:00
2009-11-25 00:00:00
2009-11-28 00:00:00
2009-11-26 00:00:00
2009-11-23 00:00:00
2009-11-24 00:00:00
2009-11-27 00:00:00
2009-11-30 00:00:00
2009-11-29 00:00:00
2009-12-02 00:00:00
2009-12-01 00:00:00
2009-12-06 00:00:00
2009-12-03 00:00:00
2009-12-04 00:00:00
2009-12-05 00:00:00
2009-12-07 00:00:00
2009-12-09 0

2010-12-04 00:00:00
2010-12-05 00:00:00
2010-12-07 00:00:00
2010-12-06 00:00:00
2010-12-08 00:00:00
2010-12-10 00:00:00
2010-12-09 00:00:00
2010-12-12 00:00:00
2010-12-11 00:00:00
2010-12-13 00:00:00
2010-12-14 00:00:00
2010-12-15 00:00:00
2010-12-16 00:00:00
2010-12-17 00:00:00
2010-12-18 00:00:00
2010-12-19 00:00:00
2010-12-22 00:00:00
2010-12-23 00:00:00
2010-12-20 00:00:00
2010-12-24 00:00:00
2010-12-25 00:00:00
2010-12-21 00:00:00
2010-12-26 00:00:00
2010-12-27 00:00:00
2010-12-28 00:00:00
2010-12-29 00:00:00
2010-12-30 00:00:00
2011-01-01 00:00:00
2010-12-31 00:00:00
2011-01-02 00:00:00
2011-01-04 00:00:00
2011-01-03 00:00:00
2011-01-05 00:00:00
2011-01-06 00:00:00
2011-01-08 00:00:00
2011-01-07 00:00:00
2011-01-09 00:00:00
2011-01-10 00:00:00
2011-01-11 00:00:00
2011-01-12 00:00:00
2011-01-14 00:00:00
2011-01-13 00:00:00
2011-01-15 00:00:00
2011-01-16 00:00:00
2011-01-18 00:00:00
2011-01-17 00:00:00
2011-01-19 00:00:00
2011-01-20 00:00:00
2011-01-21 00:00:00
2011-01-22 00:00:00


2012-01-18 00:00:00
2012-01-19 00:00:00
2012-01-20 00:00:00
2012-01-21 00:00:00
2012-01-22 00:00:00
2012-01-23 00:00:00
2012-01-24 00:00:00
2012-01-25 00:00:00
2012-01-27 00:00:00
2012-01-26 00:00:00
2012-01-28 00:00:00
2012-01-29 00:00:00
2012-01-30 00:00:00
2012-01-31 00:00:00
2012-02-01 00:00:00
2012-02-02 00:00:00
2012-02-03 00:00:00
2012-02-04 00:00:00
2012-02-05 00:00:00
2012-02-06 00:00:00
2012-02-07 00:00:00
2012-02-08 00:00:00
2012-02-09 00:00:00
2012-02-10 00:00:00
2012-02-11 00:00:00
2012-02-13 00:00:00
2012-02-12 00:00:00
2012-02-14 00:00:00
2012-02-16 00:00:00
2012-02-15 00:00:00
2012-02-17 00:00:00
2012-02-18 00:00:00
2012-02-19 00:00:00
2012-02-20 00:00:00
2012-02-21 00:00:00
2012-02-22 00:00:00
2012-02-23 00:00:00
2012-02-25 00:00:00
2012-02-24 00:00:00
2012-02-26 00:00:00
2012-02-27 00:00:00
2012-02-28 00:00:00
2012-02-29 00:00:00
2012-03-01 00:00:00
2012-03-02 00:00:00
2012-03-03 00:00:00
2012-03-04 00:00:00
2012-03-05 00:00:00
2012-03-06 00:00:00
2012-03-07 00:00:00


Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
    send(obj)
IOError: [Errno 32] Broken pipe


2013-01-09 00:00:00
2013-01-10 00:00:00
2013-01-11 00:00:00
2013-01-12 00:00:00
2013-01-13 00:00:00
2013-01-14 00:00:00
2013-01-15 00:00:00
2013-01-16 00:00:00
2013-01-17 00:00:00
2013-01-18 00:00:00
2013-01-19 00:00:00
2013-01-20 00:00:00
2013-01-21 00:00:00


## Testing h5py file

In [None]:
f = h5py.File("k_timelines/data.hdf5",'r')

In [None]:
with open("userlist_z.txt",'r') as fa:
    egolist = [thing.replace("\n","") for thing in fa.readlines()]

In [None]:
for thing in tqdm(egolist):
    if thing in f.keys():
        plt.plot(f[thing][:])
        plt.show()