#### The following notebook performs data collection, data preparation and feature engineering on the presences.json file which list all the presences registered in the Unicam buildings. 

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('younicam-AI').getOrCreate()

spark

## 1 - File preparation

#### Import the data related to the registered presences which is composed of: 
 - _id: the unique id given by MongoDB
 - aula: the room
 - polo: the building
 - sede: the city
 - posto: the seat
 - inDate: the datetime for the room access
 - outDate: the datetime for the room exit
 - date: the datetime for the last modification made on the record

In [2]:
presencesDF = spark.read.json("../data/raw/presences.json", multiLine=True)

presencesDF.toPandas()

Unnamed: 0,_id,aula,date,inDate,outDate,polo,posto,sede
0,5fa8ef7d1bd2a03f4641a15e,1,2020-11-09T07:27:57.078Z,2020-11-09T07:27:57.078Z,2020-11-09T12:05:00.362Z,1,1,1
1,5fa8efa51bd2a03f4641a15f,1,2020-11-09T07:28:37.074Z,2020-11-09T07:28:37.074Z,2020-11-09T12:05:00.363Z,1,2,1
2,5fa8f0751bd2a03f4641a160,1,2020-11-09T07:32:05.879Z,2020-11-09T07:32:05.878Z,2020-11-09T12:05:00.364Z,1,3,1
3,5fa8f0811bd2a03f4641a161,1,2020-11-09T07:32:17.390Z,2020-11-09T07:32:17.390Z,2020-11-09T07:32:20.897Z,1,4,1
4,5fa8f0891bd2a03f4641a162,1,2020-11-09T07:32:25.980Z,2020-11-09T07:32:25.980Z,2020-11-09T07:32:36.245Z,1,5,1
...,...,...,...,...,...,...,...,...
9836,5fd9dcb7ff3b76b96dd7987b,17,2020-12-16T10:08:55.168Z,2020-12-16T10:08:55.168Z,,11,13,1
9837,5fd9ddcfff3b76b96dd7988b,19,2020-12-16T10:13:35.299Z,2020-12-16T10:13:35.299Z,,5,12,1
9838,5fd9ddf1ff3b76b96dd7988e,1,2020-12-16T10:14:09.471Z,2020-12-16T10:14:09.471Z,,1,46,1
9839,5fd9de6cff3b76b96dd79891,13,2020-12-16T10:16:12.267Z,2020-12-16T10:16:12.267Z,,9,15,4


#### Perform some operations to check the state of the data and change the names to improve readibility

In [3]:
presencesDF.dtypes

[('_id', 'string'),
 ('aula', 'string'),
 ('date', 'string'),
 ('inDate', 'string'),
 ('outDate', 'string'),
 ('polo', 'string'),
 ('posto', 'string'),
 ('sede', 'string')]

In [4]:
presencesDF.count()

9841

## 2 - Data cleaning and preparation
#### Start be renaming some columns to improve readibility

In [5]:
presencesDF = presencesDF.withColumnRenamed("aula", "room")
presencesDF = presencesDF.withColumnRenamed("polo", "building")
presencesDF = presencesDF.withColumnRenamed("sede", "city")
presencesDF = presencesDF.withColumnRenamed("posto", "seat")

presencesDF.columns

['_id', 'room', 'date', 'inDate', 'outDate', 'building', 'seat', 'city']

#### Let's look for null values inside each column

In [6]:
from pyspark.sql.functions import isnull, when, count, col

presencesDF.select([count(when(isnull(c), c)).alias(c) for c in presencesDF.columns]).toPandas()

Unnamed: 0,_id,room,date,inDate,outDate,building,seat,city
0,0,0,0,0,223,0,0,0


#### As you can see there are some null values inside the outDate column. This can happen because at the moment of data extraction there were some "active" presences that, of course cannot have the outDate. 
#### Those null values needs to be removed.

In [7]:
presencesDF = presencesDF.replace('?', None).dropna(how='any')

presencesDF.count()

9618

#### Drop the date column because it stores the date of the last modification made on the record and it is redundant since the last update made on the record is perfomed at the exit time that saved in the outDate field.

In [8]:
presencesDF = presencesDF.drop("date")

presencesDF.columns

['_id', 'room', 'inDate', 'outDate', 'building', 'seat', 'city']

#### Cast inDate and outDate into timestamp in order to extrapolate day, month, hour and minutes either for the entrance datetime and the exit datetime. 
#### Then the columns _id, posto, inDate and outDate_ are deleted since they are not needed for the analysis.

In [9]:
from pyspark.sql.functions import year, month, dayofmonth, hour, minute

presencesDF = presencesDF.withColumn("inDate", presencesDF["inDate"].cast("timestamp"))

presencesDF = presencesDF.withColumn("outDate", presencesDF["outDate"].cast("timestamp"))

presencesDF = presencesDF.withColumn("day", dayofmonth(presencesDF["inDate"]))
presencesDF = presencesDF.withColumn("month", month(presencesDF["inDate"]))
presencesDF = presencesDF.withColumn("inHour", hour(presencesDF["inDate"]))
presencesDF = presencesDF.withColumn("inMinute", minute(presencesDF["inDate"]))
presencesDF = presencesDF.withColumn("outHour", hour(presencesDF["outDate"]))
presencesDF = presencesDF.withColumn("outMinute", minute(presencesDF["outDate"]))

presencesDF = presencesDF.drop("_id", "posto", "inDate", "outDate")

presencesDF.dtypes

[('room', 'string'),
 ('building', 'string'),
 ('seat', 'string'),
 ('city', 'string'),
 ('day', 'int'),
 ('month', 'int'),
 ('inHour', 'int'),
 ('inMinute', 'int'),
 ('outHour', 'int'),
 ('outMinute', 'int')]

#### Cast the column room, building and city to integer because the machine learning model works only with integer values

In [10]:
from pyspark.sql.types import IntegerType

presencesDF = presencesDF.withColumn("room", presencesDF["room"].cast(IntegerType()))
presencesDF = presencesDF.withColumn("building", presencesDF["building"].cast(IntegerType()))
presencesDF = presencesDF.withColumn("city", presencesDF["city"].cast(IntegerType()))

presencesDF.dtypes

[('room', 'int'),
 ('building', 'int'),
 ('seat', 'string'),
 ('city', 'int'),
 ('day', 'int'),
 ('month', 'int'),
 ('inHour', 'int'),
 ('inMinute', 'int'),
 ('outHour', 'int'),
 ('outMinute', 'int')]

#### Store the processed data in dedicated folder.

In [11]:
import os

NEW_DATA_DIR = "../data/processed/"

try:
    os.mkdir(NEW_DATA_DIR)
except:
    pass

presencesDF.toPandas().to_json(NEW_DATA_DIR + "presences.json")

## 3 - Feature engineering

#### Since we need the number of people for a given room in a certain day and in a certain hour, we have to count the number of presences considering room, building, city, day, month, hour. 
#### In order to get the number of people present in a time interval, we can explode a sequence of hours (e.g. for a record with inHour: 8 and outHour 13, the sequence of hours will be: [8,9,10,11,12,13]), group by the hour (and other columns) and get the aggregate count for each group.

In [13]:
import pyspark.sql.functions as F

finalDF = presencesDF.withColumn(
    'hour',
    F.explode(F.sequence('inHour', 'outHour'))
).groupBy(
    'room', 'building', 'city', 'day', 'month', 'hour'
).count()

finalDF = finalDF.withColumnRenamed("count", "target")

finalDF.toPandas()

Unnamed: 0,room,building,city,day,month,hour,target
0,5,3,1,9,11,8,4
1,16,2,1,10,11,13,1
2,11,6,1,10,11,15,1
3,22,4,2,13,11,11,7
4,26,14,2,13,11,11,1
...,...,...,...,...,...,...,...
4562,1,1,1,1,12,9,42
4563,9,4,2,2,12,17,16
4564,1,1,1,3,12,8,47
4565,7,7,1,14,12,17,9


#### The generated column _count_ is our target features and all the others are the input values.
#### The dataframe is ready for the ML model so we save it in a dedicated folder.

In [14]:
NEW_DATA_DIR = "../data/engineered/"

try:
    os.mkdir(NEW_DATA_DIR)
except:
    pass

finalDF.toPandas().to_json(NEW_DATA_DIR + "presences.json")