# Home - work - other inference 
This notebook contains the detail information to infer home, work and others stations per user.

This is a rule-based approach with the following rules: 
- Home: Most common first transaction of the day
before noon.
- Work: Most common transaction such that difference
between consecutive transaction in the same day is
greater than 6 h.
- Other: Transactions in any other station that are not
classified as home or work. 

We use row_number() algorith to rank 1,2,3,4 (no ties if two stations have the same number of transactions) 

In [1]:
import pyspark as ps
from pyspark.sql.functions import *
sc = ps.SparkContext(appName="home_work")

In [2]:
from os import path
import time 
import random 
import pandas as pd 
import numpy as np 
import seaborn as sns
import matplotlib.pyplot as plt
import warnings
import glob
import geopandas as gpd
warnings.filterwarnings('ignore')

from pyspark.sql.functions import *
from pyspark.sql import * #This enables the SparkSession object
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.functions import pow, col, sqrt

spark = SparkSession.builder\
        .master("local") \
        .appName("home_work") \
        .getOrCreate()

## Process 
- map new ids 
- filter frequent users (I would need to create a frequent/non-frequen users file again with the new IDs)[before march 8]
- filter out observation after march 8. We will work only with observations between Jan-01-2020, and March-8-2020. 
- Merge with clean stations as well. 
- Find ranks per ID 
- Create a file with the following columns: ID, Home_station, Work_station


In [3]:
def time_format(df, date_col = 'fechatransaccion', time_col = "horatransaccion"):
    ''' Returns dataframe with date and time as a timespamt format
    Input: 
     - df: PySpark dataframe: Raw transactions
     - date_col: str. Column name for date 
     - time_col: str. Column name for time
     
     Return: 
     PySpark dataframe. 
     Time col:
     - timestamp: time in 'to_timestamp' format '''
    # add time
    df = df.withColumn("timestamp",
        to_timestamp(col(time_col),"HH:mm:ss"))\
        .withColumn("hour", hour(col("timestamp")))\
        .withColumn("minute", minute(col("timestamp")))\
        .withColumn("second", second(col("timestamp")))\
        .withColumn('time', col('hour') + col('minute')/60 + col('second')/3600)
    
    # add date
    df = df.withColumn('date', to_date(unix_timestamp(col(date_col), 'yyyyMMdd').cast("timestamp")))
    return df

In [4]:
def pre_proccess(df):
    ''' Preproccess transactions data '''
    
    df_ = df.transform(lambda df: time_format(df))\
                  .select(['card_id', 'date', 'time', 'station_name'])\
                  .orderBy(['card_id', 'date', 'time'])
    return df_

In [5]:
def home_location(df):
    '''Estimates the home location of a card_id'''
    
    w = Window().partitionBy(col("card_id")).orderBy(col("date") ) 
    
    df = df.select("*", lag(df.date).over(w).alias("lag_date"))\
            .withColumn('first_day', ~(col('date') == col('lag_date')))\
            .na.fill(value = True)\
            .withColumn('first_day_noon', (col('first_day')) & (col('time')<12))\
            .filter(col('first_day_noon'))\
            .groupBy(['card_id', 'station_name'])\
            .agg({'date':'count', 'time': 'mean'})\
            .withColumn('rank',row_number().over(Window.partitionBy("card_id").orderBy(desc("count(date)"))))\
            .filter(col('rank')==1)
    
    final_df = df.select(col('card_id'),
                         col('station_name').alias('home_station'), 
                         col('count(date)').alias('transacciones_h'), 
                         col('avg(time)').alias('time_h'))
    

    return final_df

In [6]:
def work_location(df):
    w = Window().partitionBy(col("card_id"), col('date')).orderBy(col("time") ) 
    
    df = df.select("*", lag(df.time).over(w).alias("lag_time")).dropna(subset = ('lag_time'))\
            .withColumn('time_difference', col('time') - col('lag_time'))\
            .filter(col('time_difference')>6)\
            .groupBy(['card_id', 'station_name'])\
            .agg({'date':'count', 'time': 'mean'})\
            .withColumn('rank',row_number().over(Window.partitionBy("card_id").orderBy(desc("count(date)"))))\
            .filter(col('rank')==1)
    
    final_df = df.select(col('card_id'),
                        col('station_name').alias('work_station'), 
                        col('count(date)').alias('transacciones_w'), 
                        col('avg(time)').alias('time_w'))
    
    return final_df

In [7]:
def home_work_inference(transactions):
    """ Infer home and work stations for a given tranvel sequence"""
    df_p = transactions
    home = home_location(df_p)#.show()
    work = work_location(df_p)
    
    return home.join(work, on = 'card_id', how = 'outer')

In [8]:
output_path = '../data/output/'
input_path = '../data/input/'

transactions = spark.read.csv(output_path + 'tables/transactions_frequent_users.csv',
                              header =True, sep = ',')

transactions = pre_proccess(transactions)
hw_location = home_work_inference(transactions).cache()
# hw_location.to_csv(output_path + 'tables/hw_#location_v2.csv', index = False)

In [9]:
%%time
hw_location.show()

+--------+--------------------+---------------+------------------+--------------------+---------------+------------------+
| card_id|        home_station|transacciones_h|            time_h|        work_station|transacciones_w|            time_w|
+--------+--------------------+---------------+------------------+--------------------+---------------+------------------+
|10000172|(04000) Cabecera ...|            135| 8.935946502057607|     (04108) El Polo|             38| 18.43856725146199|
|10000454|    (07005) ALQUERIA|             87|  8.76148148148148|      (02302) Virrey|             62|17.322526881720428|
|10000472|    (09002) Consuelo|            139| 8.235349720223823|  (04103) Las Ferias|             43|17.980458656330754|
|10000591|    (02502) Terminal|             88| 8.217159090909092|      (02200) Alcalá|             61|17.789339708561016|
|10000720|(50003) Corral Mo...|             34| 6.635457516339869|(02000) Cabecera ...|             16| 17.15161458333333|
|10000723|   (03

In [10]:
%%time
hw_location_final = hw_location.toPandas()
hw_location_final.to_csv(output_path + 'tables/hw_location.csv', index = False)

CPU times: user 18.2 s, sys: 753 ms, total: 18.9 s
Wall time: 1min 20s
