# PHONE_DATA CLEANING 

data from  https://www.cs.rutgers.edu/~dz220/data.html, collected in 2013-10-22

## IMPORTS

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

from math import *
from collections import Counter

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1595,application_1602681287149_27001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## READ FILE

In [2]:
spark = SparkSession.builder.getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# récupérer les données 
data = spark.read.csv("/user/ngwh3132/WORK/cancan_project/phone_data.csv", header='False', sep=',')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
data.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

38218717

In [5]:
data.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)

#### Rename columns

In [6]:
data = data.withColumnRenamed('_c0', 'id').withColumnRenamed('_c1', 'ts').withColumnRenamed('_c2', 'lat').withColumnRenamed('_c3', 'long').distinct()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
data.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------+-----------+----------+
|        id|      ts|        lat|      long|
+----------+--------+-----------+----------+
|0055555664|20:44:56| 114.053436|  22.62122|
|0055555805|01:39:00|114.0589583|22.5465278|
|0055555820|16:40:06|113.8494444|22.7838194|
|0055555897|08:38:51|114.0648611|22.5518056|
|0055555908|20:00:21|113.8494444|22.7838194|
|0055555926|09:32:46|     114.05|22.5629167|
|0055555932|05:59:48|   114.0575|22.5470139|
|0055555934|20:37:32|114.0520833|22.5388889|
|0055555963|11:37:37|114.0500694|22.5410417|
|0055555963|22:01:10|114.0579167| 22.530625|
|0055555996|16:25:39|113.8494444|22.7838194|
|0055556086|21:50:26| 114.062017| 22.639292|
|0055556091|20:55:56|114.0594444|22.6859028|
|0055556091|23:31:39|114.0594444|22.6859028|
|0055556100|21:00:59|  114.05875|22.5359722|
|0055556100|21:29:16|  114.05875|22.5359722|
|0055556100|22:46:33|114.0634722|22.5370139|
|0055556101|13:43:28|114.0591667| 22.561875|
|0055556101|14:32:11|114.0591667| 22.561875|
|005555610

#### Count null data in each column

In [8]:
df_count_nan = data.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in data.columns]).toPandas().T
df_count_nan

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

      0
id    0
ts    0
lat   0
long  0

In [9]:
data.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: string (nullable = true)
 |-- ts: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)

#### Convert date from string to timestamp type

In [10]:
data = data.withColumn('ts', F.unix_timestamp(F.col('ts'), "HH:mm:ss").cast('timestamp'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Regroup lat & long to one column 'pos'

In [11]:
data = data.select('id', 'ts', F.struct(F.col('lat').cast('float').alias('lat'), F.col('long').cast('float').alias('long')).alias('pos'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Aggregate all data by 'id' and collect all events & positions in lists

In [14]:
data = data.groupby('id').agg(F.collect_set('ts').alias('ts_list'), F.collect_list('pos').alias('pos_list')).withColumn('nb_cdr', F.size(F.col('ts_list')))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
data.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: string (nullable = true)
 |-- ts_list: array (nullable = true)
 |    |-- element: timestamp (containsNull = true)
 |-- pos_list: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- lat: float (nullable = true)
 |    |    |-- long: float (nullable = true)
 |-- nb_cdr: integer (nullable = false)

In [17]:
data.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------------+--------------------+------+
|        id|             ts_list|            pos_list|nb_cdr|
+----------+--------------------+--------------------+------+
|0055555868|[1970-01-01 18:14...|[[113.84944,22.78...|     1|
|0055555984|[1970-01-01 20:45...|[[114.05625,22.53...|    62|
|0055556089|[1970-01-01 13:37...|[[114.06486,22.54...|    24|
|0055556368|[1970-01-01 21:29...|[[114.054794,22.5...|     4|
|0055556383|[1970-01-01 09:11...|[[114.059166,22.5...|     1|
|0055556411|[1970-01-01 13:37...|[[114.032776,22.5...|     1|
|0055556447|[1970-01-01 17:53...|[[114.06789,22.53...|     1|
|0055556511|[1970-01-01 17:35...|[[114.04385,22.53...|     1|
|0055556545|[1970-01-01 22:25...|[[113.82449,22.79...|   190|
|0055556552|[1970-01-01 12:15...|[[114.07076,22.54...|    47|
|0055556678|[1970-01-01 23:53...|[[114.03368,22.61...|    76|
|0055556716|[1970-01-01 21:36...|[[114.01319,22.66...|    39|
|0055557115|[1970-01-01 20:03...|[[114.06479,22.53...|     2|
|0055557

### Define the functions to calculate the metrics we need to our model of classification

1. day_night function

In [18]:
def day_night(user_events):
    """
    Calcule le nombre d'évènements par jour et par nuit 
    retourne le nombre par jour(entre 6h et 21h) et le nombre d'évènement par nuit en prenant une partie de la nuit précédente N-1 (de minuit à 6h) 
    et une partie de la nuit N ( de 21h à minuit)

    """

    sorted_list = sorted(user_events)

    day_ts = range(7, 22)
    day = [x for x in sorted_list if x.hour in day_ts]
       
    d = len(day)
    n = len(sorted_list) - d
     
    return (d,n)

schema1 = StructType((
StructField("nb_event_day", IntegerType(), True),
StructField("nb_event_night", IntegerType(), True)
))
dn_udf = F.udf(day_night, schema1)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2. inter_event_time function

In [19]:
def inter_event_time(user_events):
    """
    Calcule l'inter_event_time 
    retourne la moyenne de tous les inter_time
    Calcule l'entropie des inter_events time  
    retourne l'entropie
  
    """     

    sorted_list = sorted(user_events)
    #seconds = (sorted_list[-1] - sorted_list[0]).total_seconds()
#     days = (sorted_list[-1] - sorted_list[0]).days

#     if (days > 3): # au moins 4 jours sur 7 d'observation
    diff = [(t - s).total_seconds() for s, t in zip(sorted_list, sorted_list[1:]) if (t-s).total_seconds() > 1]

    l1 = len(diff)

    if l1 > 1:# on prend les imsi avec au moins 4 events par jours

        X =  [(elem, diff.count(elem)) for elem in set(diff)]

        x0 = X[0][1]

        entropy = (x0/l1) * log(x0/l1)

        for x in X[1:l1+1]:

             entropy+= (x[1]/l1 ) * log(x[1]/l1)

        entropy = -round((entropy / log(l1)), 2)

        diff_avg = sum(diff) // len(sorted_list)

        return (int(diff_avg), entropy)   
schema2 = StructType((
StructField("ie_avg", IntegerType(), True),
StructField("entropy", FloatType(), True)
))
ie_entropy_udf = F.udf(inter_event_time, schema2)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3. distance_on_unit_sphere function used to calculate the radius of gyration 

In [20]:
def distance_on_unit_sphere(pos1, pos2):
    # approximate radius of earth in km
    R = 6373.0

    lat1 = radians(pos1[0])
    lon1 = radians(pos1[1])
    lat2 = radians(pos2[0])
    lon2 = radians(pos2[1])

    dlon = lon2 - lon1
    dlat = lat2 - lat1

    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * asin(sqrt(a))

    arc = R * c

    return arc

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
def radius_of_gyration(positions):
    """
    Returns the radius of gyration, the *equivalent distance* of the mass from
    the center of gravity, for all visited places. [GON2008]_

    References
    ----------
    .. [GON2008] Gonzalez, M. C., Hidalgo, C. A., & Barabasi, A. L. (2008).
        Understanding individual human mobility patterns. Nature, 453(7196),
        779-782.
    """
    
    positions = [tuple(l) for l in positions]
    d = Counter(p for p in positions
                if p is not None)
    
    sum_weights = sum(d.values())
    positions = list(d.keys())  # Unique positions

    if len(positions) == 0:
        return None

    barycenter = [0, 0]
    for pos, t in d.items():
        barycenter[0] += pos[0] * t
        barycenter[1] += pos[1] * t

    barycenter[0] /= sum_weights
    barycenter[1] /= sum_weights

    r = 0.
    for pos, t in d.items():
                                                                              
        arc = distance_on_unit_sphere(barycenter, pos)
        r += float(t) / sum_weights * (arc ** 2)                                       
        
        
    return round(sqrt(abs(r)), 3)

rg_udf = F.udf(radius_of_gyration, FloatType())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

4. the entropy of positions

In [22]:
def entropy_positions(positions):

    c = Counter(p for p in positions if p is not None)
    lenc = len(c.values())
    if lenc == 0:
        return None
    if lenc == 1:
        return 0
    sumc = sum(c.values())
    probas = [p/sumc for p in c.values()]
    e = 0
    for pr in probas:
        e -= pr * log(pr,2)
    e = round(e/log(sumc,2),2)
    return e

pos_entropy_udf = F.udf(entropy_positions, FloatType())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

5. Apply all the udf functions to the df

In [25]:
new_data = data.select('id', 'ts_list', 'pos_list', 'nb_cdr', dn_udf(
F.col('ts_list')).alias('day_night'), ie_entropy_udf(F.col('ts_list')).alias('ie_entropy'), pos_entropy_udf(F.col('pos_list')).alias('pos_entropy'),
    rg_udf(F.col('pos_list')).alias('rg'))  

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### The results 

In [26]:
new_data.select('id', 'day_night', 'ie_entropy', 'nb_cdr', 'rg', 'pos_entropy').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------+----------+------+-----+-----------+
|        id|day_night|ie_entropy|nb_cdr|   rg|pos_entropy|
+----------+---------+----------+------+-----+-----------+
|0055555868|    [1,0]|      null|     1|  0.0|       null|
|0055555984|   [58,4]|[333,0.96]|    62|0.177|        0.3|
|0055556089|   [24,0]| [70,0.96]|    24|0.781|       0.45|
|0055556368|    [3,1]| [832,1.0]|     4|0.174|        0.5|
|0055556383|    [1,0]|      null|     1|  0.0|       null|
|0055556411|    [1,0]|      null|     1|  0.0|       null|
|0055556447|    [1,0]|      null|     1|  0.0|       null|
|0055556511|    [1,0]|      null|     1|  0.0|       null|
|0055556545| [180,10]|[127,0.83]|   190|0.294|       0.02|
|0055556552|   [47,0]|[657,0.92]|    47|1.111|       0.45|
|0055556678|  [35,41]|[156,0.88]|    76|1.413|       0.35|
|0055556716|   [32,7]| [45,0.93]|    39|  0.0|       null|
|0055557115|    [2,0]|      null|     2|  0.0|       null|
|0055557214|    [6,3]| [999,1.0]|     9|0.257|       0.3

In [27]:
new_data.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: string (nullable = true)
 |-- ts_list: array (nullable = true)
 |    |-- element: timestamp (containsNull = true)
 |-- pos_list: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- lat: float (nullable = true)
 |    |    |-- long: float (nullable = true)
 |-- nb_cdr: integer (nullable = false)
 |-- day_night: struct (nullable = true)
 |    |-- nb_event_day: integer (nullable = true)
 |    |-- nb_event_night: integer (nullable = true)
 |-- ie_entropy: struct (nullable = true)
 |    |-- ie_avg: integer (nullable = true)
 |    |-- entropy: float (nullable = true)
 |-- pos_entropy: float (nullable = true)
 |-- rg: float (nullable = true)

### Save the resulted df 

In [29]:
# sauvegarder le résultats
new_data.repartition(1).write.parquet("/user/ngwh3132/WORK/cancan_project/3G_2W/df_phone_data.parquet", mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# **THE END**