In [2]:
import pandas as pd
import json
import numpy
from pandas.io.json import json_normalize
from scipy.spatial.distance import cosine
import csv
# from pyspark import SparkContext
# from pyspark import SparkConf
# from pyspark.sql import SQLContext
from pyspark.sql.functions import udf #user defined function
from pyspark.sql.functions import lit, col
from pyspark.sql.types import *
from pyspark.sql import HiveContext
import ast

In [2]:
# Not needed for Spark on EC2, only for local setup
# sc = SparkContext("local", "Region Network")

In [23]:
# loading the raw data
# df = pd.read_csv('../../data/CDR/hash/sample.csv') 
# df.columns = ['index','time','source','dest','call']
sqlCtx = SQLContext(sc)
rdd = sc.textFile("s3n://census-cdr/mi-to-mi/*").map(lambda row:row.split('\t'))
cs_df = sqlCtx.createDataFrame(rdd, ['time','source','dest','call'])


In [24]:
cs_df.show()

time          source dest call                
1383297600000 1      1    1.445982643495844E-4
1383300000000 1      1    2.893335821627406...
1383306000000 1      1    2.170344499879484...
1383306600000 1      1    6.92190810703531E-5 
1383308400000 1      1    7.22991321747922E-5 
1383309000000 1      1    2.107372943154984E-4
1383315000000 1      1    7.395236931786679E-5
1383317400000 1      1    6.92190810703531E-5 
1383331200000 1      1    6.92190810703531E-5 
1383339600000 1      1    7.22991321747922E-5 
1383342000000 1      1    7.551623674280319E-5
1383342600000 1      1    6.92190810703531E-5 
1383265800000 1      10   2.814243229598746E-5
1383288000000 1      10   3.907339743793248E-5
1383296400000 1      10   1.028247752913052E-4
1383297000000 1      10   1.309672075872926...
1383298800000 1      10   1.551390040413683E-4
1383299400000 1      10   1.092125409370522...
1383305400000 1      10   1.569513613199081...
1383307800000 1      10   1.040236849748308...


In [4]:
# loading the raw data
# df = pd.read_csv('../../data/CDR/hash/sample.csv') 
# df.columns = ['index','time','source','dest','call']
# sqlCtx = SQLContext(sc)
# cs_df = sqlCtx.createDataFrame(df)

In [5]:
df.head()

Unnamed: 0,index,time,source,dest,call
0,11340721,1383327600000,7853,1005,0.000323
1,11340722,1383327600000,7853,1012,0.000116
2,11340723,1383327600000,7853,1103,0.000576
3,11340724,1383327600000,7853,1117,0.000427
4,11340725,1383292200000,7853,1131,0.000671


In [6]:

# loading the region-cell data
table = pd.read_csv('../../data/CDR/hash/intersect.csv', header = None) 
table.columns = ['region', 'proportions']
table.index = table.region
table.sort_values(['region'], inplace=True)

# loading the cell-proportion data
prop_table = pd.read_csv('../../data/CDR/hash/cell_intersect.csv', header = None) 
prop_table.columns = ['cell', 'proportions']
prop_table.index = prop_table.cell
prop_table.sort_values(['cell'], inplace=True)


In [18]:
table.head()

Unnamed: 0_level_0,region,proportions
region,Unnamed: 1_level_1,Unnamed: 2_level_1
1,1,"{5255: 0.012533869596943227, 5256: 0.810203385..."
2,2,"{5760: 1.0, 5761: 0.8881973502712377, 5762: 0...."
3,3,"{5761: 0.10963649144088197, 5762: 0.0166565829..."
4,4,"{6656: 0.08041696535588132, 6657: 0.1769243460..."
5,5,"{5761: 0.0021661582885276975, 5762: 0.51612498..."


In [None]:


def get_cells_per_region(table, region_id):
    ids = table.iloc[region_id].proportions
    ids = ast.literal_eval(table.get_value(region_id, "proportions"))
    return ids.keys()

def get_call_data(source, dest):        
    source_dict = get_cells_per_region(table, source)
    dest_dict = get_cells_per_region(table, dest)
    query = "SELECT * from cs_df WHERE "
    
    cs_df.registerTempTable("cs_df")
    i = 1    
    
    for skey in source_dict:                  
        query += "source = " + str(skey) 
        if len(source_dict) > i:
            query += " OR "
        i += 1
    
    subset = sqlCtx.sql(query)
    subset.registerTempTable("subset")
    
    i=1
    query = "SELECT * from subset WHERE "
            
    for dkey in dest_dict:  
        query += "dest = " + str(dkey)
        if len(dest_dict) > i:
            query += " OR "
        i += 1
        
    subset2 = sqlCtx.sql(query)
            
    return subset2

def calculate_actual_call(s_cell, d_cell, call, s_region, d_region):
    """
        Create another column on the subset DataFrame that is proportional to the regions.
    """
    source_prop = ast.literal_eval(prop_table.get_value(s_cell, "proportions"))
    dest_prop = ast.literal_eval(prop_table.get_value(d_cell, "proportions"))

    try:
        final = source_prop[str(s_region)] * dest_prop[str(d_region)] * call
    except:
        final = 0
    
    return final

In [None]:

schema = StructType([
            StructField("time", IntegerType(), nullable=False),
            StructField("adjusted_call", FloatType(), nullable=False),
            StructField("source_region", IntegerType(), nullable=False),
            StructField("dest_region", IntegerType(), nullable=False)            
    ])

region_network = sqlCtx.createDataFrame([], schema)
udf_calls = udf(calculate_actual_call, FloatType())

for s in range(1,81):
    for d in range(1,55):
        # get a subset of records for the source and dest
        subdf = get_call_data(s, d)        
        subdf = subdf.withColumn("source_region", lit(s))
        subdf = subdf.withColumn("dest_region", lit(d))
        print (s, d)
        # create a column with adjusted call values
        try:
            subdf = subdf.select("time","source", "dest", "call", "source_region", "dest_region", udf_calls("source", "dest", "call", "source_region", "dest_region").alias("adjusted_call"))
            # subdf = subdf.withColumn("adjusted_call", udf_calls("source", "dest", "call", "source_region", "dest_region"))
        except:
            continue
        
        # do aggregation for 
        subdf = subdf.groupBy("time").agg({                
                "source_region": "max",
                "dest_region": "max",
                "adjusted_call": "sum"
            })
        region_network = region_network.unionAll(subdf)
    
        
# region_network.show()
# region_network.toPandas().to_csv('../../data/CDR/generated/region-network.csv')

In [None]:
# region_network.show()
# region_network.toPandas().to_csv('../../data/CDR/generated/region-network.csv')

def toCSVLine(data):
    return ','.join(str(d) for d in data)

lines = region_network.map(toCSVLine)
lines.saveAsTextFile('s3n://census-cdr/output/region-based.csv')

print ("A output file has been written.")