In [None]:
import os
from datetime import datetime
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,desc,row_number,col,year,month,hour,dayofmonth,dayofweek,to_timestamp,size,isnan,lit,date_format,to_timestamp,struct
from pyspark.sql.types import MapType, StringType, IntegerType, StructType, StructField, FloatType, ArrayType, DoubleType

In [None]:
90*(1586053727192-1586052435239)/(1000*3600)

In [None]:
try:
    spark
except NameError:
    spark=SparkSession.builder.appName("").getOrCreate()

In [4]:
source='cuebiq'
country='MX'
n_chunks=1#30
cutoff_night=22
cutoff_morning=8
start_date='2020-01-01'
end_date=datetime.today().strftime('%Y-%m-%d')
directories=[x.strftime('%Y-%m-%d').replace('-','')+'00' for x in pd.date_range(start_date,end_date)]
fs=spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())

if os.getenv('CLUSTER')=='PRINCE':
    path_to_data='/scratch/spf248/covid/data'
    directories=directories[:1]
else:
    path_to_data='/user/spf248/covid/data'
#     directories=directories[:1]
    
paths=[]
for directory in directories:
    
    path_to_directory=os.path.join(
    path_to_data,
    source,
    's3',
    country,
    directory)
    
    if not fs.exists(spark.sparkContext._jvm.org.apache.hadoop.fs.Path(path_to_directory)):
        continue
        
    list_status=fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(path_to_directory))
    print(directory)

    paths.extend([file.getPath().toString().replace('hdfs://dumbo','').replace('file:','') for file in list_status])
    paths=sorted([path for path in paths if '.csv.gz' in path])
    print('# Files:', len(paths))

if os.getenv('CLUSTER')=='PRINCE':
    paths=paths[:n_chunks]

print('# Chunks:', n_chunks)
paths_chunks=np.array_split(paths, n_chunks)

schema= StructType([
StructField("_c0", FloatType(), False),
StructField("_c1", StringType(), False),
StructField("_c2", FloatType(), False),
StructField("_c3", FloatType(), False),
StructField("_c4", FloatType(), False),
StructField("_c5", FloatType(), False),
StructField("_c6", FloatType(), False),
StructField("_c7", StringType(), False),
StructField("_c8", StringType(), False),])

2020010100
# Files: 2000
2020010200
# Files: 4000
2020010300
# Files: 6000
2020010400
# Files: 8000
2020010500
# Files: 10000
2020010600
# Files: 12000
2020010700
# Files: 14000
2020010800
# Files: 16000
2020010900
# Files: 18000
2020011000
# Files: 20000
2020011100
# Files: 22000
2020011200
# Files: 24000
2020011300
# Files: 26000
2020011400
# Files: 28000
2020011500
# Files: 30000
2020011600
# Files: 32000
2020011700
# Files: 34000
2020011800
# Files: 36000
2020011900
# Files: 38000
2020012000
# Files: 40000
2020012100
# Files: 42000
2020012200
# Files: 44000
2020012300
# Files: 46000
2020012400
# Files: 48000
2020012500
# Files: 50000
2020012600
# Files: 52000
2020012700
# Files: 54000
2020012800
# Files: 56000
2020012900
# Files: 58000
2020013000
# Files: 60000
2020013100
# Files: 62000
2020020100
# Files: 64000
2020020200
# Files: 66000
2020020300
# Files: 68000
2020020400
# Files: 70000
2020020500
# Files: 72000
2020020600
# Files: 74000
2020020700
# Files: 76000
2020020800
# Fil

In [5]:
def load_data(paths_chunk):

    df=spark.read.option(
    'compression', 'gzip').option(
    'header', 'false').option(
    "multiLine", "true").option(
    'escape','"').option(
    "encoding", "UTF-8").option(
    "delimiter", "\t").schema(schema).csv(list(paths_chunk))
    
    column_names=[
    'timestamp',
    'cuebiq_id',
    'device_type',
    'latitude',
    'longitude',
    'accuracy',
    'time_zone_offset',
    'classification_type',
    'transformation_type']
    df=df.toDF(*column_names)

    df=df.withColumn("time",to_timestamp(df["timestamp"]+df["time_zone_offset"]))
    df=df.withColumn("date", date_format(col("time"), "yyyy-MM-dd"))
    df=df.withColumn('dayofweek',date_format("time","u"))
    df=df.withColumn('hour',hour("time")) 
    df=df.withColumn('point', struct('longitude','latitude'))
    
    return df.select('cuebiq_id','device_type','time','date','dayofweek','hour','point','classification_type')

In [6]:
for i,paths_chunk in enumerate(paths_chunks):
    
    df=load_data(paths_chunk)
    df.cache()
    
    if not i:
        
        device_type=df.select('cuebiq_id','device_type').drop_duplicates(subset=['cuebiq_id'])
        
        daily_pings=df.groupby('cuebiq_id','date').count().withColumnRenamed('count','n_pings')
        
        daily_personal=df.filter(df['classification_type']=='PERSONAL_AREA').groupby(
        'cuebiq_id','point','date').count().withColumnRenamed('count','n_pings')
        
        hourly_personal=df.filter(df['classification_type']=='PERSONAL_AREA').filter(df['dayofweek']<=5).groupby(
        'cuebiq_id','point','hour').count().withColumnRenamed('count','n_pings')
        
    else:
        
        device_type=device_type.unionByName(
        df.select('cuebiq_id','device_type')).drop_duplicates(subset=['cuebiq_id'])
        
        daily_pings=daily_pings.unionByName(
        df.groupby('cuebiq_id','date').count().withColumnRenamed('count','n_pings'))
        
        daily_personal=daily_personal.unionByName(df.filter(
        df['classification_type']=='PERSONAL_AREA').groupby(
        'cuebiq_id','point','date').count().withColumnRenamed('count','n_pings'))
        
        hourly_personal=hourly_personal.unionByName(
        df.filter(df['classification_type']=='PERSONAL_AREA').filter(df['dayofweek']<=5).groupby(
        'cuebiq_id','point','hour').count().withColumnRenamed('count','n_pings'))
        
    df.unpersist()

In [7]:
daily_pings=daily_pings.groupby('cuebiq_id','date').agg({'n_pings':'sum'}).withColumnRenamed('sum(n_pings)','n_pings')
daily_pings.write.mode("overwrite").parquet(os.path.join(path_to_data,source,'aggregates',country,'daily_pings'))

daily_personal=daily_personal.groupby('cuebiq_id','point','date').agg({'n_pings':'sum'}).withColumnRenamed('sum(n_pings)','n_pings')
daily_personal.write.mode("overwrite").parquet(os.path.join(path_to_data,source,'aggregates',country,'daily_personal'))

hourly_personal=hourly_personal.groupby('cuebiq_id','point','hour').agg({'n_pings':'sum'}).withColumnRenamed('sum(n_pings)','n_pings')
hourly_personal.write.mode("overwrite").parquet(os.path.join(path_to_data,source,'aggregates',country,'hourly_personal'))

DataFrame[cuebiq_id: string, point: struct<longitude:float,latitude:float>, hour: int, n_pings: bigint]

In [8]:
daily_personal.cache()

users=daily_pings.groupby('cuebiq_id').agg(
{'date':'count','n_pings':'sum'}).withColumnRenamed(
'count(date)','n_days').withColumnRenamed('sum(n_pings)','n_pings')

# print('# Users:', users.count())

users=users.join(daily_personal.drop_duplicates(
subset=['cuebiq_id','point']).groupby('cuebiq_id').agg(
{'point':'count'}).withColumnRenamed('count(point)','n_personal'),on='cuebiq_id')

# print('# Users:', users.count())

users=users.join(device_type,on='cuebiq_id')

# print('# Users:', users.count())

users.write.mode("overwrite").parquet(os.path.join(path_to_data,source,'aggregates',country,'users'))

In [9]:
daily_personal.unpersist()
hourly_personal.cache()

users_personal=daily_personal.groupby('cuebiq_id','point').agg(
{'n_pings':'sum'}).withColumnRenamed('sum(n_pings)','n_pings')

# print('# Personal Locations:', users_personal.count())

users_personal=users_personal.join(hourly_personal.filter((
hourly_personal['hour']>=cutoff_night)|(hourly_personal['hour']<=cutoff_morning)).groupby(
'cuebiq_id','point').agg({'n_pings':'sum'}).withColumnRenamed('sum(n_pings)','n_weeknights'),
on=['cuebiq_id','point'])

# print('# Personal Locations:', users_personal.count())

hourly_personal_max=hourly_personal.groupby('cuebiq_id','point').agg(
{'n_pings':'max'}).withColumnRenamed('max(n_pings)','n_pings')

users_personal=users_personal.join(
hourly_personal.join(hourly_personal_max,on=['cuebiq_id','point','n_pings']).drop_duplicates(
subset=['cuebiq_id','point','n_pings']).drop('n_pings').withColumnRenamed(
'hour','most_freq_hour'),on=['cuebiq_id','point'])

# print('# Personal Locations:', users_personal.count())

users_personal.write.mode("overwrite").parquet(os.path.join(path_to_data,source,'aggregates',country,'users_personal'))