In [155]:
import json
import time
import numpy as np
import pyspark.sql.functions as F
from utils import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, ArrayType, IntegerType
from collections import defaultdict

In [156]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [157]:
CELL_AREA    = '/data/cell_area.csv'
PARQUETS_DIR = '/data/parquets'
CURRENT_DIR  = '/home/hellscream/Documents/backendSpark'

In [158]:
time_start = time.time()

## DataFrame Schema

## Create Spark Session

In [159]:
class SparkSessionBase():
    def __init__(self):
        self.spark = SparkSession\
                     .builder\
                     .appName('Mobility')\
                     .getOrCreate()
        
        # Map cell_area.csv
        self.dic_full_df = self.spark.read.format('csv').options(header='true', delimiter='\t')\
                         .load(CURRENT_DIR + CELL_AREA)\
                         .select('*')
                         
        dic_full_pandas = self.dic_full_df.toPandas()
        self.dic_full   = dic_full_pandas.to_dict('index')
        
        self.dic_tow_cell = {}
        for i in self.dic_full:
            self.dic_tow_cell[self.dic_full[i]['id']] = self.dic_full[i]['area_correlator'] 
        
        self.dic_cell_latlon = {}
        for i in self.dic_full:
            self.dic_cell_latlon[self.dic_full[i]['area_correlator']] = [self.dic_full[i]['latitude'], self.dic_full[i]['longitude']]

## Mobility Matrix

In [160]:
class Mobility(SparkSessionBase):
    def __init__(self, date, time_start_lower, time_start_high, time_end_lower,
        time_end_high, time_sleep_lower='01:00', time_sleep_high='04:00'):
        super().__init__()
        self.date = date
        
    def get_mobility_at_time_interval(self, time_start, time_end):
        # load correspondent parquet
        
        time_start *= 3600
        time_end *= 3600
        
        df = self.spark.read.parquet(CURRENT_DIR + PARQUETS_DIR + '/' + self.date)

        get_range_udf = F.udf(lambda elems, a, b : get_range(elems, a, b), ArrayType(IntegerType()))

        df = df.withColumn('range', get_range_udf(df.times, F.lit(time_start), F.lit(time_end)))\
               .select(df.code,\
               F.slice(df.cell_ids, F.col('range')[0], F.col('range')[1]).alias('towers'),\
               F.slice(df.times,  F.col('range')[0], F.col('range')[1]).alias('times'))\
               .where(F.size(F.col('towers')) > 0)
        
        mapp = self.cell_area
        df = df.rdd.flatMap(lambda x : mapp_tow_cell(x, mapp)).toDF(['code', 'towers', 'times'])
        
        count_occurrences_udf = F.udf(lambda x : count_occurrences_and_normalize(x),\
                                      ArrayType(ArrayType(StringType())))
        
        df = df.select('code', count_occurrences_udf(F.col('towers')).alias('towers-count'))
        
        return df
    
    def count_occurrences_and_normalize(elems):
        d = {}
        for i in elems:
            if i not in d:
                d[i] = 1
            else:
                d[i] += 1
        normalize = float(np.sum(np.array([count for count in d.values()])))
        for i in d:
            d[i] /= normalize
            d[i] = round(d[i], 4)
        return list(map(list, d.items()))

    def flat_origin_destination_product(row):
        for cell_start, val_1 in row[0][0]:
            for cell_end, val_2 in row[0][1]:
                yield (cell_start, cell_end, float(val_1) * float(val_2))
    
    def build(self, users_cells_start, users_cells_end):
        union_user_cells = user_cells_start.union(user_cells_end)
        
        union_user_cells = union_user_cells.groupBy('code')\
                                           .agg(F.collect_list('towers-count').alias('cells'), F.count('code')\
                                                .alias('count'))\
                                           .filter(F.col('count') == 2)
        
        rdd = union_user_cells.select('cells').rdd.flatMap(lambda x : flat_origin_destination_product(x))
        df = rdd.toDF(['start', 'end', 'value'])
        df = df.groupBy('start').pivot('end').agg(F.sum('value'))
        
        matriz_pandas = df.toPandas()
        matriz_pandas.to_json('/home/hellscream/Documents/mobility_spark/data/2021-03-01.json', 'index')

### Testing Mobility

In [1]:
mobility_instance = Mobility('2021-03-01', None, None, None, None)

user_cells_start = mobility_instance.get_mobility_at_time_interval(6, 10)
user_cells_end   = mobility_instance.get_mobility_at_time_interval(16, 20)

mobility_instance.build(user_cells_start, user_cells_end)

NameError: name 'Mobility' is not defined

In [161]:
time_end = time.time()
print('time elapsed:' + str((time_end - time_start) / 60))

time elapsed:0.09680790106455485


## User Mobility

In [180]:
class UserMobility(SparkSessionBase):
    def __init__(self, date):
        super().__init__()
        
        self.date = date
        
        '''
        self.imsi_mobility = {}  # amount of km and cell_changes
        self.users_cells_start = {}  # count tower night to set home
        '''
    
    def km_displacement(elems):
        res = []
        for i in range(len(elems)-1):
            d = distance_in_km_between_coordinates(elems[i], elems[i+1])
            res.append(d)
        return sum(res)
    
    def map_area_correlator_to_coord(towers, mapp):
        res = []
        for i in towers:
            res.append(mapp[i])
        return res
    
    def get_users_mobility(self, start_time=25200, end_time=72000, sleep_start_time=3600, sleep_end_time=14400):
        df = self.spark.read.parquet(CURRENT_DIR + PARQUETS_DIR + '/' + self.date)
        
        df = df.rdd.flatMap(lambda rows : between_ab_OR_dc(rows, (start_time, end_time), (sleep_start_time, sleep_end_time))).toDF(['code', 'cell_ids', 'times'])
        df = df.select('*').where(F.size(df.times) > 0)
        
        # imsi -> cell changes & amount of distinct cell
        distinct_cells_df = df.withColumn('distinct_cells', F.array_distinct(F.col('cell_ids'))).select('code', 'distinct_cells', F.size(F.col('distinct_cells')).alias('amount'))
        
        mapp = self.dic_tow_cell
        df = df.rdd.flatMap(lambda x : mapp_tow_cell(x, mapp)).toDF(['code', 'towers', 'times'])
                
        # imsi -> km displacement
        mapp = self.dic_cell_latlon
        km_displacement_df = df.rdd.flatMap(lambda x : mapp_cell_latlon(x, mapp)).toDF(['code', 'cells', 'lat_lon'])
        km_displacement_udf = F.udf(lambda coordinates : amount_km(coordinates), StringType())
        km_displacement_df = km_displacement_df.withColumn('km_displacement', km_displacement_udf(F.col('lat_lon'))).select('code', 'km_displacement')
        
        # imsi -> amount of distinct towers
        distinct_towers_df = df.withColumn('distinct_towers', F.array_distinct(F.col('towers'))).select('code', 'distinct_towers', F.size(F.col('distinct_towers')).alias('amount')) 
        
        # process for establish sleeping zone
        users_cells_start = df.rdd.flatMap(lambda x : accumulate_count(x, sleep_start_time, sleep_end_time)).toDF(['code', 'towers', 'count'])
        users_cells_start = users_cells_start.select('*').where(F.size('towers') > 0)
        
        map_area_correlator_to_coord_udf = F.udf(lambda x : map_area_correlator_to_coord(x, mapp))
        users_cells_start = users_cells_start.withColumn('coords', map_area_correlator_to_coord_udf(F.col('towers'))).select('*')

        normalize_udf = F.udf(lambda values : [val / sum(values) for val in values])
        users_cells_start = users_cells_start.withColumn('weight', normalize_udf(F.col('count'))).select('code', 'towers', 'weight', 'coords')
        
        
        #return (distinct_cells_df, distinct_towers_df)
        #return km_displacement_df
        #return distinct_towers_df
        return users_cells_start   

In [182]:
um = UserMobility('2021-03-01')
set1 = um.get_users_mobility()
set1.show()

+-----+--------------------+--------------------+--------------------+
| code|              towers|              weight|              coords|
+-----+--------------------+--------------------+--------------------+
| 1804|            [dhj7jg]|               [1.0]|[[23.0459, -82.35...|
| 3905|              [dhj6]|               [1.0]|[[22.9357, -82.54...|
| 7216|             [dhn4h]|               [1.0]|[[22.8566, -81.34...|
| 9413|             [dhn6b]|               [1.0]|[[23.0226, -81.19...|
|11117|             [d5zup]|               [1.0]|[[21.8348, -78.76...|
|13219|            [dhj7zc]|               [1.0]|[[23.0962, -82.16...|
|14507|             [d5ymu]|               [1.0]|[[22.1443, -81.019]]|
|17019|            [dhj7tk]|               [1.0]|[[23.1401, -82.38...|
|25918|             [dhjgp]|               [1.0]|[[23.0465, -81.58...|
|31116|              [d5ur]|               [1.0]|[[22.4173, -83.735]]|
|35619|              [dhh2]|               [1.0]|[[22.6033, -83.80...|
|36600

In [185]:
import numpy as np

mean = np.array([.4, .3])
tmpp = np.array([10, 15])

mean + tmpp * 2.0

array([20.4, 30.3])

In [117]:
cells = set1.rdd.take(1)[0][1]
print((cells, len(cells)))

(['dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg', 'dhj7jg'], 24)


In [121]:
tows = set1.rdd.take(1)[0][1]
print((tows, len(tows)))

(['176-1146', '176-1146', '176-1146', '176-1146', '176-1146', '176-1146', '176-1146', '176-1146', '176-1146', '176-1146', '176-1146', '176-1146', '176-1146', '176-1146', '176-1146', '176-1146', '176-1146', '176-1146', '176-1146', '176-1146', '176-1146', '176-1146', '176-1146', '176-1146'], 24)
