In [None]:
%%bash 
pip3 install dask[complete] toolz cloudpickle

In [17]:
from __future__ import division
import scipy.stats as stats
import pandas as pd
import dask.dataframe as dd
import numpy as np
import time
import sys
import csv
import os

In [47]:
start = time.time()

directory = "CDR_Data_IC/SET1TSV/"
ant_pos_loc = "CDR_Data_IC/ANT_POS.TSV"

ant_pos = pd.read_csv(ant_pos_loc, delim_whitespace=True, header=None, names = ["CT", "Latitude", "Longitude"])
ant_pos.set_index('CT', inplace=True)
ids = list(ant_pos.index.unique())

# The CTs which have the same geo-location.
same_geo = ant_pos[ant_pos.duplicated(["Latitude","Longitude"], keep=False)].sort_values("Latitude")
print(same_geo)

elapsed = time.time() - start
print(elapsed / 60)

      Latitude  Longitude
CT                       
331  -6.450498   6.886308
645  -6.450498   6.886308
822  -5.056944   7.725550
493  -5.056944   7.725550
232  -4.204861   6.658500
233  -4.204861   6.658500
998  -4.013608   5.321946
741  -4.013608   5.321946
743  -4.012956   5.322000
737  -4.012956   5.322000
735  -4.012956   5.322000
732  -4.005167   5.327250
1234 -4.005167   5.327250
275  -3.988471   5.341687
1235 -3.988471   5.341687
899  -3.985378   5.297343
900  -3.985378   5.297343
740  -3.103167   5.721778
400  -3.103167   5.721778
0.0034125526746114094


In [48]:
# Determine the CTs which have been provided but are not present in the CDR data spanning 2 months.
start = time.time()

data = dd.read_csv(directory + "*.TSV", delim_whitespace=True, header=None, names=["Date", "Time", "Outgoing", "Terminating", "Num_Calls", "Total_Duration"], usecols=["Date", "Outgoing", "Terminating"])
present_ids = np.unique(data[["Outgoing", "Terminating"]].values)
missing_data = set(ids) - set(present_ids)
print(missing_data)

elapsed = time.time() - start
print(elapsed / 60)

{1221, 934, 777, 1130, 301, 1231, 976, 1201, 1232, 691, 340, 1236, 1046, 1238, 1215}
1.7505241791407267


In [107]:
# THIS CAN POSSIBLY BE MADE A FUNCTION. Params - folder to TSV files
# PARSE ALL CDR DATA FOR IVORY COAST.
start = time.time()
filename = "CDR_Data_IC/SET1TSV/*.TSV"

# Currently not reading the date, time and duration of calls.
file = dd.read_csv(filename, delim_whitespace=True, header=None, names=["Date", "Time", "Outgoing", "Terminating", "Num_Calls", "Total_Duration"], usecols=["Outgoing", "Terminating", "Num_Calls"])

# Remove any row that has an unknown CT.
known_outgoing = file['Outgoing'] >= 0
known_terminating = file['Terminating'] >= 0
file = file[known_outgoing & known_terminating]
print(type(file))

# Initial Grouping to get all existing CT pairs.
grouped_1 = file.groupby(by=['Outgoing', 'Terminating'])
print(type(grouped_1))

# Sum up Total Number of Calls per (Outgoing, Terminating) pair.
all_pairs = grouped_1.sum().compute()
all_pairs.reset_index(inplace=True)
all_pairs.index.names = ['Index']
print("Total number of calls between CT pairs\n")
print(all_pairs.head())
print(type(all_pairs))

# GROUP BY OUTGOING AND THEN SUM NUM_CALLS & NUM OCCURENCES
grouped_2 = all_pairs.groupby(by=['Outgoing'])

complete_outgoing = grouped_2.sum()
complete_outgoing.reset_index(inplace=True)
complete_outgoing.set_index("Outgoing", inplace=True)

keep_columns=['Num_Calls']
complete_outgoing = complete_outgoing[keep_columns]
print("Complete Outgoing CT Table\n")
print(complete_outgoing.head())
print(type(complete_outgoing))

# GROUP BY TERMINATING AND THEN SUM NUM_CALLS & NUM OCCURENCES
grouped_3 = all_pairs.groupby(by=['Terminating'])

complete_terminating = grouped_3.sum()
complete_terminating.reset_index(inplace=True)
complete_terminating.set_index("Terminating", inplace=True)

keep_columns=['Num_Calls']
complete_terminating = complete_terminating[keep_columns]
print("Complete Terminating CT Table\n")
print(complete_terminating.head())
print(type(complete_terminating))

elapsed = time.time() - start
print(elapsed / 60)

<class 'dask.dataframe.core.DataFrame'>
<class 'dask.dataframe.groupby.DataFrameGroupBy'>
Total number of calls between CT pairs

       Outgoing  Terminating  Num_Calls
Index                                  
0             1            1      90981
1             1            4        116
2             1            5        459
3             1            9        677
4             1           10        353
<class 'pandas.core.frame.DataFrame'>
Complete Outgoing CT Table

          Num_Calls
Outgoing           
1            394323
2             85712
3             99138
4            133650
5           1700082
<class 'pandas.core.frame.DataFrame'>
Complete Terminating CT Table

             Num_Calls
Terminating           
1               404465
2                85863
3                93574
4               120608
5              1752138
<class 'pandas.core.frame.DataFrame'>
1.6985318342844644


In [108]:
print(len(complete_outgoing.index))
print(len(complete_terminating.index))  

# FEATURE 1: ACTIVITY. 
# Total Activity of each CT. i.e. number of outgoing calls from the CT + number of calls terminating at CT.
total_activity = complete_outgoing.add(complete_terminating,fill_value=0)
total_activity.index.names = ['CT']
total_activity.rename(columns ={'Num_Calls': 'Total_Volume'}, inplace =True)

# Check to see if CTs with same geo-locations have different activities.
subset = total_activity[total_activity.index.isin(same_geo.index.values)]
print(subset)

1214
1216
      Total_Volume
CT                
232      1351350.0
233       154025.0
275       274368.0
331      4153570.0
400       122313.0
493      1258792.0
645      1004115.0
732         3996.0
735        17307.0
737          555.0
740         2058.0
741        12883.0
743        33949.0
822       368267.0
899        58027.0
900       243017.0
998         5626.0
1234           1.0
1235       38265.0


In [None]:
# # Find out the total activity for each CT.
# start = time.time()
# counter = 0

# activity = pd.DataFrame(columns=["CT", "Total_Activity", "Originating_Activity", "Terminating_Activity"])
# activity["CT"] = ids
# activity.set_index('CT', inplace=True)

# for col in activity.columns:
#     activity[col] = pd.to_numeric(activity[col], errors='coerce').fillna(0).astype(int)

# # Plan: Iterate through the file and read, if any -1 (skip) => incoming (increment), outgoing (increment) and update total
# _,_,filenames =  next(os.walk(directory))
# for name in filenames:
#     if name.endswith('.TSV'):
#         with open(directory + name, 'r') as f:
#             reader = csv.reader(f, delimiter='\t')
#             counter+=1
#             print(counter)
            
# #           This is taking forever. Reads about 22 million rows each.  
#             for row in reader: 
                
#                 outgoing = int(row[1])
#                 incoming = int(row[2])
#                 volume = int(row[3])

#                 if(outgoing != -1 and incoming != -1):
#                     prev_originating = activity.loc[outgoing, 'Originating_Activity']
#                     activity.loc[outgoing, 'Originating_Activity'] = prev_originating + volume
                    
#                     prev_terminatng = activity.loc[incoming, 'Terminating_Activity']
#                     activity.loc[incoming, 'Terminating_Activity'] = prev_terminatng + volume
            
# print(activity.head())
# elapsed = time.time() - start
# print(elapsed)


# THIS TAKES 6MINS
# _,_,filenames =  next(os.walk(directory))
# for name in filenames:
#     if name.endswith('.TSV'):
#         file = dd.read_csv(directory + name, delim_whitespace=True, header=None, names=["Date", "Time", "Outgoing", "Terminating", "Num_Calls", "Total_Duration"])
#         for row in file.itertuples():
#             i = 10
#         print("file")