In [1]:
# This schedular performs sample transfers for small files. 
# then it performs gets the local maxima from the throughput curve.

# Pre-requisite:

# Libraries:

In [2]:
import os
import random
import sys, time
import datetime
import math
import logging
import threading
import pandas as pd
import itertools

# Variables:


In [3]:
# Feel free to update those variable. If you know 
# what you are doing! :-)

# Script mode:
# There are couple of modes that you can set.
# "experiment" - it will perform experiments with     
mode = "experiment"

if mode == "experiment":
    ######### VARIABLES WE WANT YOU TO SET. ######################
    exp_name = "exp_test"
    transfer_type = ['I','C']  # I - individual transfer, small or large files, 
                               # C - small and large file xfer concurrently.
    number_of_groups = 2   # how many file types you wanna transfer as contending requests.
    small_folder = 'sample_1_0'
    large_folder = 'sample_3_0'
    param_samples = [5, 10, 15, 20, 25] # uniform sampling of parameters
    fixed_pp = 32
    times = 5   # number of times you wanna do the transfers. 5 means it will transfer a request 
                # 5 times consequitively.
    # Columns in the log:
    # columns = ['exp_name','xfer_id','type','file_size','#files',
    #           'bandwidth','rtt','buffer_size','p','cc','pp','throughput',
    #          'time', 'source','destination']
    # exp_log = pd.DataFrame(columns=columns)
    
    
    

# node information: 
source = 'evenstar.cse.buffalo.edu'
destination = 'didclab-ws10.cse.buffalo.edu'
port = 50000
source_path = "/home/zulkar/dataset/first_one/"
destination_path = "/home/zulkar/received/"


    

# Network information: 
bandwidth = 1  # Gbps
rtt = 0.2   # ms
tcp_bs = 25600
# Dataset information:
# group range is a dictionary with key as group names:
# group means file size range e.g. small, medium, large 
# value of this dictionary is a list:
#      first element : lowest filesize
#      second element : hight filesize
#      third element : number of files
#      fourth element : sample folders
group_range = {'1':[1,40,100,10],'2':[100,600,50,4],'3':[900,10000,10,10]}
ind_low  = 0
ind_high = 1
ind_n = 2
ind_samples = 3


#################### Derived variables : Don't Set those ! ################
full_source = "file://" + source_path  + "/"
full_destination = 'ftp://' + destination + ":" + str(port) + destination_path

buffer_size = int((bandwidth*1024*rtt*1000)/8)


######################################################################

print("source node: ", source)
print("destination node: ", destination)
print("control port: ", port, " This port is same to both source and destination, just to make life easy")
print("dataset path in source: ", source_path)
print("destination path: ", destination_path)
print("link bandwidth: ", bandwidth)
print("rtt: ", rtt)
print("buffer size: ", buffer_size)

source node:  evenstar.cse.buffalo.edu
destination node:  didclab-ws10.cse.buffalo.edu
control port:  50000  This port is same to both source and destination, just to make life easy
dataset path in source:  /home/zulkar/dataset/first_one/
destination path:  /home/zulkar/received/
link bandwidth:  1
rtt:  0.2
buffer size:  25600


# Methods:

In [4]:
def generate_param_combinations(params,stype='single'):
    """ This generates all the permutations of parameter settings. 
        Attribute : params - this is the sample list of parameters. 
                    stype  - can have two values 'single' or 'concurrent'
                    
        returns - list [(small_p, small_cc), (large_p, large_cc)] when stype='concurrent'
        returns - list [(p, cc)] when stype = 'single'.
        """
    single_list = [] # param combinations for single transfers.
    for permut in itertools.permutations(params, 2):
        single_list.append(permut)
    
    if stype == 'single':
        return single_list
    elif stype == 'concurrent':
        con_list = []
        for permut_c in itertools.permutations(single_list, 2):
            #print(subset3)
            con_list.append(permut_c)
        return con_list

In [5]:
lo = generate_param_combinations(param_samples,stype='concurrent')
lo[0][0][0]

5

In [6]:
def get_folder_size(folder):
    """ This function takes full path of a folder as input
        Returns size of whole folder in (bytes)
    """
    total_size = os.path.getsize(folder)
    for item in os.listdir(folder):
        itempath = os.path.join(folder, item)
        if os.path.isfile(itempath):
            total_size += os.path.getsize(itempath)
        elif os.path.isdir(itempath):
            total_size += getFolderSize(itempath)
    return total_size # bytes

In [7]:
def get_file_info(folder):
    """This function computes total number of files and total file size.
    input: folder - a full valid path
    output : total_files, total_size
    """
    total_size = os.path.getsize(folder)
    for item in os.listdir(folder):
        itempath = os.path.join(folder, item)
        if os.path.isfile(itempath):
            total_size += os.path.getsize(itempath)
        elif os.path.isdir(itempath):
            total_size += getFolderSize(itempath)
    print("Total size of directory: ", total_size, "Bytes")        
    total_files = 0
    for root, dirs, files in os.walk(folder):
        total_files += len(files)
    print("Total files: ", total_files)
    return total_files, total_size # bytes
    
    

In [None]:
# Get the total dataset size in bytes:
n, s = get_file_info(source_path+'sample_1_0/')
print("Average file size in dataset :", s/(1024*1024), "Mbytes")

In [None]:
source_path

In [None]:
ls /home/zulkar/dataset/first_one/sample

In [None]:
def experiment():
    """ Perform data transfer experiments. In this experiment, we will discover the effect of concurrent transfers.
    In this experiment, we group data in to two file size groups - small - large. We will see how this transfers 
    are doing under same background traffic. Assumption is data transfer didn't experience background traffic between 
    1 or 2 hours. First transfer a small file, then transfer a large file, then transfer them concurrently. """
    
    from multiprocessing.pool import ThreadPool
    pool = ThreadPool(processes=4)
    
    
    # make all the permutations of parameter combinations for concurrent transfers.
    all_permuts = generate_param_combinations(params,stype='concurrent')  
    
    # create the source folder path :
    small_path_src = source_path + small_folder + '/'
    large_path_src = source_path + large_folder + '/'
    
    # create the destination folder path :
    small_path_dst = destination_path + small_folder + '/'
    large_path_dst = destination_path + large_folder + '/'
    
    # full url of the source and destination folders:
    full_small_src = "file://" + small_path_src  + "/"
    full_large_src = "file://" + large_path_src  + "/"
    
    full_small_dest = 'ftp://' + destination + ":" + str(port) + small_path_dst
    full_large_dest = 'ftp://' + destination + ":" + str(port) + large_path_dst
    
    # columns = ['exp_name','xfer_id','type','file_size','#files',
    #           'bandwidth','rtt','buffer_size','p','cc','pp','throughput',
    #          'time', 'source','destination']
    
    # list of logs where each log entry is a dictionary.
    logs = []
    
    def log_gen(exp_name = 'default', transfer_id= 0,etype = 'I', file_size = 0,
                num_files = 0, bandwidth = 0, rtt = 0, tcp_bs = 0, p = 0, cc = 0,
                pp = 0, throughput = 0, time = 'default', source='default', destination='default' ):
        
        log_dict = {}
        log_dict['exp_id'] = exp_name
        log_dict['transfer_id'] = transfer_id
        log_dict['type'] = etype
    
        log_dict['file_size'] = s
        log_dict['num_files'] = n
        log_dict['bandwidth'] = bandwidth
        log_dict['rtt'] = rtt
        log_dict['tcp_bs'] = tcp_bs
        log_dict['p'] = p
        log_dict['cc'] = cc
        log_dict['pp'] = pp
        log_dict['throughput'] = throughput
        log_dict['time'] = time
        log_dict['source'] = source 
        log_dict['destination'] = destination
        
        return log_dict
        
        
    for idx, permut in enumerate(all_permuts):
        small_p = permut[0][0]
        small_cc = permut[0][1]
        small_pp = fixed_pp
        
        large_p = permut[1][0]
        large_cc = permut[1][1]
        large_pp = fixed_pp
        
        # perform small transfer:
        small_time, small_th = perform_transfer(full_small_src, full_small_dest, p = small_p, cc=small_cc, 
                                                pp=small_pp, fast=True, tcp_bs=tcp_bs)
        
        
        n,s = get_file_info(small_path)
        
        log = log_gen(exp_name = exp_name, transfer_id= idx,etype = 'I', file_size = s,
                num_files = n, bandwidth = bandwidth, rtt = rtt, tcp_bs = tcp_bs, p = small_p, cc = small_cc,
                pp = small_pp, throughput = small_th, time = small_time, source=source, destination=destination )
        
        logs.append(log)
        
        
        # perform large transfer:
        large_time, large_th = perform_transfer(full_large_src, full_large_dest, p = large_p, cc=large_cc, 
                                                pp=large_pp, fast=True, tcp_bs=tcp_bs)
        
        n,s = get_file_info(large_path)
        log = log_gen(exp_name = exp_name, transfer_id= idx,etype = 'I', file_size = s,
                num_files = n, bandwidth = bandwidth, rtt = rtt, tcp_bs = tcp_bs, p = large_p, cc = large_cc,
                pp = large_pp, throughput = large_th, time = large_time, source=source, destination=destination )
        
        logs.append(log)
        
        
        # perform small and large transfer:
    
        results = []
        for i in range(5):
            async_result = pool.apply_async(perform_transfer, (source, destination)) # tuple of args for foo
            #async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo
            results.append(async_result)
        # do some other stuff in the main process

        for r in results:
            return_val = r.get()  # get the return value from your function.

In [None]:
a = 1
b = 2
(a,b)

In [None]:
from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=4)
results = []
for i in range(5):
    async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo
    results.append(async_result)
# do some other stuff in the main process

for r in results:
    return_val = r.get()  # get the return value from your function.

In [None]:
def perform_transfer(source, destination, p = 1, cc=1, pp=1, fast=True, tcp_bs=25600):
    """To do a transfer you need to provide:
            source 
            destination
            p
            cc
            pp
            fast
            tcp-bs
       performs the transfer and returns 
           throughput
           time
    """
    space = " "
    globus_cmd = "globus-url-copy" + \
                    space + "-tcp-bs" + space + str(tcp_bs) + \
                    space + "-p" + space + str(p) + \
                    space + "-cc" + space + str(cc) + \
                    space + "-pp" + space + str(pp) + \
                    space + full_source + \
                    space + full_destination
    globus_cmd 
    
    print("started data transfer: ")
    print("globus command:",globus_cmd)
    
    
    transfer_start_time = time.time()
    os.system(globus_cmd)
    transfer_finish_time = time.time()  # in seconds
    
    time_required = transfer_finish_time - transfer_start_time 
    bytes_xfered = getFolderSize(sample_path)
    
    
    throughput = ( bytes_xfered *8) / (time_required * 1024 * 1024 )     # in Giga bit per seconds
    throughput
    
    print("transfer finished !")
    print("bytes transferred: ", bytes_xfered)
    print("time needed for transfer: ", time_required, "s.")
    print("throughput: ", throughput)
    
    return time_required, throughput

In [None]:
def foo(bar, baz):
    n = datetime.datetime.now()
    #print('hello {0}'.format(bar))
    print(n)
    return 'foo' + baz

In [None]:
import threading

def worker(num):
    """thread worker function"""
    print ('Worker: %s' % num)
    return

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

    

In [None]:
import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='[%(levelname)s] (%(threadName)-10s) %(message)s',
                    )

def worker():
    logging.debug('Starting')
    time.sleep(2)
    logging.debug('Exiting')

def my_service():
    logging.debug('Starting')
    time.sleep(3)
    logging.debug('Exiting')

t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker) # use default name

w.start()
w2.start()
t.start()

In [None]:
def foo(bar, baz):
  print('hello {0}'.format(bar))
  return 'foo' + baz

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)

results = []
for i in range(5):
    async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo
    results.append(async_result)
# do some other stuff in the main process

for r in results:
    return_val = r.get()  # get the return value from your function.

In [None]:
import itertools

stuff = [ 5, 10, 15, 20, 25]

list1 = []
for subset1 in itertools.permutations(stuff, 2):
    #print(subset1)
    list1.append(subset1)
list2 = []
for subset2 in itertools.permutations(stuff, 2):
    #print(subset2)
    list2.append(subset2)
list3 = []
for subset3 in itertools.permutations(list1, 2):
    #print(subset3)
    list3.append(subset3)
len(list3)

In [None]:
if mode == "experiment":
    do_experiment():

In [None]:
# perform initial transfer from small files.
# group_1 is the small files:


In [None]:
# initialize sample data structure for samples to be transferred.
# its a three level deep dictionary:
# level one : map - group_id to sample_id
# level two : map - sample_id to sample_fields
# level three : map sample_fields to corresponding values
# sample_info = { <group_id> : { <sample_id> : { 'path': <path_val>,
#                                         'size': <samplesize in bytes>,
#                                         'transferred': <transferred_or_not boolean>,
#                                         'throughput' : <throughput_val in Gbps>,
#                                         'transfer-time' : <val>,
#                                         'p' : <p_value>,
#                                         'cc' : <cc_value>,
#                                         'pp' : <pp_value>,
#                                         'fast' : <fast value>,
#                                         'tcp-bs': <tcp_buffer size>,
#                                         'traffic-intensity': <intensity from 0 to 1>}}}

sample_info = {}
for key, value in group_range.items():
    group_id = key
    n_samples = value[ind_samples]
    
    level_two_dict = {}
    for sample in range(n_samples):
        level_three_dict = {}
        
        # Compute path:
        sample_folder_name = 'sample_' + group_id + '_' + str(sample)
        full_sample_path = source_path + sample_folder_name
        
        level_three_dict['path'] = full_sample_path
        
        # Compute size:
        level_three_dict['size'] = getFolderSize(full_sample_path)
        
        level_three_dict['transferred'] = False
        
        level_three_dict['p'] = 0
        
        level_three_dict['cc'] = 0
        
        level_three_dict['pp'] = 0
        
        level_three_dict['fast'] = 'false'
        
        level_three_dict['tcp-bs'] = 25600
        
        level_three_dict['throughput'] = 0
        
        level_three_dict['transfer-time'] = 0 
        
        
        # Add level three dict to level two dict
        sample_name = str(sample)
        level_two_dict[sample_name] = level_three_dict
    
    # Add level two dict to level one dict  (sample_info)  
    sample_info[group_id] = level_two_dict
#sample_info

In [None]:
sample_info['1']['2']

In [None]:
# Get the total dataset size in bytes:
dataset_size = getFolderSize(source_path)
print("Total size of dataset :", dataset_size/(1024*1024), "Mbytes")

In [None]:
# get information of all small samples:
group_id = '1'
sample_id = '0'

# get local 



In [None]:
#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support

def func(a, b):
    return a + b

def main():
    a_args = [1,2,3]
    second_arg = 1
    with Pool() as pool:
        L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
        M = pool.starmap(func, zip(a_args, repeat(second_arg)))
        N = pool.map(partial(func, b=second_arg), a_args)
        assert L == M == N


freeze_support()
main()


In [None]:
from multiprocessing import Pool
 
def doubler(number):
    return number * 2
 
if __name__ == '__main__':
    numbers = [5, 10, 20]
    pool = Pool(processes=3)
    print(pool.map(doubler, numbers))

In [None]:
from multiprocessing import Pool
 
def doubler(number):
    n = datetime.datetime.now()
    time.sleep(4)
    return n
 

pool = Pool(processes=5)
for i in range(5):
    result = pool.map(doubler, (25,))
    print(result)
    

In [None]:
from multiprocessing import Process, Lock

def func1():
    n = datetime.datetime.now()
    print ('func1: starting: ',n )
    
    
    
    n = datetime.datetime.now()
    print ('func1: finishing :',n)
    return 'hello'
def func2():
    n = datetime.datetime.now()
    print ('func2: starting: ', n)
    time.sleep(3)
    n = datetime.datetime.now()
    print ('func2: finishing: ', n)


p1 = Process(target=func1,args=('hi'))
p1.start()
p2 = Process(target=func2)
p2.start()
p1.join()
p2.join()
print("I am here!")

In [None]:
l.acquire()
try:
    print('hello world', i)
finally:
    l.release()

In [None]:
# Right one for multi-threading : 


import datetime
n1 = datetime.datetime.now()
print(n1)
thread_return1={'success': False, 'result':n1}
thread_return2={'success': False, 'result':n1}

from threading import Thread
def task(thread_return,tk):
    time.sleep(tk)
    n = datetime.datetime.now()
    thread_return['success'] = True
    thread_return['result'] = n
t1 = Thread(target=task, args=(thread_return1,10))
t2 = Thread(target=task, args=(thread_return2,4))

t1.start()
t2.start()

t1.join()
t2.join()

print(thread_return1)
print(thread_return2)