---
# Algoritmos para Big Data

**Handout 1 - Data Processing with Spark**

**2024/25**

This lab class aims to introduce basic PySpark operations to process data that was stored in csv or json files.

This notebook should contain the implementation of the tasks presented in the handout.

Hence both handout and notebook must be considered together as one.

---
# Task A - Data ingestion

### 1.

**Datasests**

The two files can be downloaded from, respectively 

https://bigdata.iscte-iul.eu/datasets/iot_devices.json

https://bigdata.iscte-iul.eu/datasets/fire-calls.csv

### 2.

## Spark setup

In [None]:
# Basic imports
import pyspark
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F

In [None]:
# Build SparkSession
spark = SparkSession.builder.appName("DataProcessing").getOrCreate()

## Reading data

In [None]:
# Reading data
data_dir = 
file_iot = data_dir + 'iot_devices.json'
file_calls = data_dir + 'sf-fire-calls.csv'

data_iot =  spark.read.json(
data_calls = spark.read.csv(


## Checking data

**Basic structures and data types**

- Columns and rows
- DataFrames
- Datatypes

### 3.

In [None]:
# Checking iot data
data_iot.
data_iot.
data_iot.

In [None]:
# Checking calls data
data_calls.
data_calls.
data_calls.

In [None]:
col_iot = 
col_calls = 
print(len(col_iot), col_iot)
print(len(col_calls), col_calls)

**Expected results**
```raw
15 ['battery_level', 'c02_level', 'cca2', 'cca3', 'cn', 'device_id', 'device_name', 'humidity', 'ip', 'latitude', 'lcd', 'longitude', 'scale', 'temp', 'timestamp']
28 ['CallNumber', 'UnitID', 'IncidentNumber', 'CallType', 'CallDate', 'WatchDate', 'CallFinalDisposition', 'AvailableDtTm', 'Address', 'City', 'Zipcode', 'Battalion', 'StationArea', 'Box', 'OriginalPriority', 'Priority', 'FinalPriority', 'ALSUnit', 'CallTypeGroup', 'NumAlarms', 'UnitType', 'UnitSequenceInCallDispatch', 'FirePreventionDistrict', 'SupervisorDistrict', 'Neighborhood', 'Location', 'RowID', 'Delay']
```

### 4.

In [None]:
# get the first row
first_row = data_iot
first_row

### 5.

In [None]:
# get the first five rows
five_rows = data_iot
five_rows

---
# Task B - Data cleaning

### 1.

## Removing duplicates if any

In [None]:
# Checking first: use dropDuplicates() and count()
print(f'  data_iot: number of rows: {data_iot. }, after dropduplicates: {data_iot. }')
print(f'data_calls: number of rows: {data_calls. }, after dropduplicates: {data_calls. }')

In [None]:
# Remove data_iot duplicates
data_iot = data_iot.
data_iot.count()

# No need for data_calls

## Handling missing values

### 2.

In [None]:
# Checking first

# use dropna(how='any')
# how='all' remove only if all values are NA
print(f"After dropna would be: data_iot = {data_iot.  } rows")
print(f"After dropna would be: data_calls = {data_calls. } rows")


In [None]:
# Let us see in calls where the NULL values are (by columns)
dict_nulls_calls = {col: data_calls.filter(data_calls[col].isNull()).count() for col in data_calls.columns}
dict_nulls_calls

In [None]:
# Let us see in iot where the NULL values are (by columns)
dict_nulls_iot = 
dict_nulls_iot

### 3.

In [None]:
# Removal of columns 'AvailableDtTm', 'OriginalPriority', 'CallTypeGroup'
cols_to_dismiss = 

# Use drop() and reference the list with *
data_cals = data_calls.

# or the other way around: pick up the valid ones

### 4.

In [None]:
# data_calls.describe().show()

# Checking the number of rows before and after removing NULLs
print(f"after droping columns data_calls = {data_calls. } rows, after removing NULLs: {data_calls. } rows")


### 5.

In [None]:
# Drop nulls now
data_calls = data_calls.
data_calls.count()

**Note:** We could have use fill in, instead of dropping the rows:

`df_calls.na.fill({'column: value})`

In [None]:
# The outcome
# Show one record in vertical mode for better reading
data_calls.

### 6.

In [None]:
# Transforming datatypes
# Create two new columns 'CallDateTS' and 'WatchDateTS' with withColumn and F.to_timestamp()
data_calls = ( data_calls
              
)

---
# Task C - Data transformation

### 1.

In [None]:
# Adding transformed (derived) columns
# Two new columns 'CallDateMonth' and 'CallDateWeekDay' and use F.month and F.dayofweek on 'CallDateTS'
data_calls = ( data_calls
              
              
)

data_calls.show(5)

### 2.

In [None]:
# Change name of column NumAlarms to NumberAlarms with withColumnRenamed
data_calls = data_calls.

data_calls.show(5)

### 3.

In [None]:
# Use sort() with F.col().desc() on 'CallNumber'
data_calls = data_calls.sort(

data_calls.show(5)

---
# Task D - Data aggregation

## Basic aggregation

### 1.

In [134]:
# Find out min, max, average, sum, count
# Use agg() either with {'column name', 'function'} or F.avg() F.min() F.max() and F.sum()
data_calls.
data_calls.
data_calls.
data_calls.
data_calls.

SyntaxError: invalid syntax (1513841095.py, line 3)

### 2.

In [None]:
# Use describe()
data_calls.

## Grouped aggregation

### 3.

In [None]:
# use grouBy() on 'CallType' and agg() on 'CallDateMonth', 'CallDateWeekDay' and 'Delay'
# show at most 50 rows with .show(50, truncate=False)
( data_calls.
 
 
)

**Expected result**
```raw
+--------------------------------------------+------------------+------------------+--------------------+
|CallType                                    |avg(CallDateMonth)|avg(Delay)        |avg(CallDateWeekDay)|
+--------------------------------------------+------------------+------------------+--------------------+
|Elevator / Escalator Rescue                 |6.208121827411167 |4.435448395939084 |4.355329949238579   |
|Marine Fire                                 |8.6               |7.9033333400000005|4.4                 |
|Aircraft Emergency                          |11.0              |7.7               |1.0                 |
|Confined Space / Structure Collapse         |7.875             |6.085416649999999 |4.0                 |
|Alarms                                      |6.555663587749149 |3.8584001800838688|4.0523821098687405  |
|Odor (Strange / Unknown)                    |6.303571428571429 |5.943005935714283 |4.151785714285714   |
|Citizen Assist / Service Call               |6.599781897491821 |5.742093786913859 |3.911668484187568   |
|HazMat                                      |6.023255813953488 |6.879844916279069 |4.6976744186046515  |
|Watercraft in Distress                      |8.5               |7.885714400000002 |4.928571428571429   |
|Explosion                                   |7.0               |4.646111146666666 |3.966666666666667   |
|Vehicle Fire                                |6.264285714285714 |3.889761918392857 |4.2178571428571425  |
|Suspicious Package                          |5.75              |11.045833499999999|3.25                |
|Extrication / Entrapped (Machinery, Vehicle)|6.7               |5.035000019999999 |4.0                 |
|Other                                       |6.496359223300971 |6.528357611383497 |3.936893203883495   |
|Outside Fire                                |6.282632146709816 |4.508881697572809 |3.9773462783171523  |
|Traffic Collision                           |6.532761788120024 |4.007960809130437 |4.063380281690141   |
|Assist Police                               |4.363636363636363 |10.645454609090908|4.181818181818182   |
|Gas Leak (Natural and LP Gases)             |6.343832020997375 |4.363604546456693 |3.979002624671916   |
|Water Rescue                                |6.106024096385542 |5.9738955807229   |4.125301204819277   |
|Electrical Hazard                           |6.377777777777778 |4.313481479555555 |3.8266666666666667  |
|High Angle Rescue                           |6.714285714285714 |7.1488095285714275|4.142857142857143   |
|Structure Fire                              |6.566578316294333 |3.5555240030039217|4.0411460305439855  |
|Industrial Accidents                        |6.076923076923077 |4.028205084615384 |3.0                 |
|Medical Incident                            |6.451672010994045 |3.9335766504404934|4.025931605887391   |
|Mutual Aid / Assist Outside Agency          |8.0               |7.35              |5.0                 |
|Fuel Spill                                  |6.585714285714285 |6.202619058571429 |4.4                 |
|Smoke Investigation (Outside)               |6.694267515923567 |4.766348174267515 |3.751592356687898   |
|Train / Rail Incident                       |4.7368421052631575|32.2991227368421  |2.526315789473684   |
+--------------------------------------------+------------------+------------------+--------------------+
```

### 4.

In [None]:
# Use orderBy() on column 'avg(Delay)' and with option ascending=False
( data_calls.
 
 
)

**Expected result**
```raw
+--------------------------------------------+------------------+------------------+--------------------+
|CallType                                    |avg(CallDateMonth)|avg(Delay)        |avg(CallDateWeekDay)|
+--------------------------------------------+------------------+------------------+--------------------+
|Train / Rail Incident                       |4.7368421052631575|32.2991227368421  |2.526315789473684   |
|Suspicious Package                          |5.75              |11.045833499999999|3.25                |
|Assist Police                               |4.363636363636363 |10.645454609090908|4.181818181818182   |
|Marine Fire                                 |8.6               |7.9033333400000005|4.4                 |
|Watercraft in Distress                      |8.5               |7.885714400000002 |4.928571428571429   |
|Aircraft Emergency                          |11.0              |7.7               |1.0                 |
|Mutual Aid / Assist Outside Agency          |8.0               |7.35              |5.0                 |
|High Angle Rescue                           |6.714285714285714 |7.1488095285714275|4.142857142857143   |
|HazMat                                      |6.023255813953488 |6.879844916279069 |4.6976744186046515  |
|Other                                       |6.496359223300971 |6.528357611383497 |3.936893203883495   |
|Fuel Spill                                  |6.585714285714285 |6.202619058571429 |4.4                 |
|Confined Space / Structure Collapse         |7.875             |6.085416649999999 |4.0                 |
|Water Rescue                                |6.106024096385542 |5.9738955807229   |4.125301204819277   |
|Odor (Strange / Unknown)                    |6.303571428571429 |5.943005935714283 |4.151785714285714   |
|Citizen Assist / Service Call               |6.599781897491821 |5.742093786913859 |3.911668484187568   |
|Extrication / Entrapped (Machinery, Vehicle)|6.7               |5.035000019999999 |4.0                 |
|Smoke Investigation (Outside)               |6.694267515923567 |4.766348174267515 |3.751592356687898   |
|Explosion                                   |7.0               |4.646111146666666 |3.966666666666667   |
|Outside Fire                                |6.282632146709816 |4.508881697572809 |3.9773462783171523  |
|Elevator / Escalator Rescue                 |6.208121827411167 |4.435448395939084 |4.355329949238579   |
|Gas Leak (Natural and LP Gases)             |6.343832020997375 |4.363604546456693 |3.979002624671916   |
|Electrical Hazard                           |6.377777777777778 |4.313481479555555 |3.8266666666666667  |
|Industrial Accidents                        |6.076923076923077 |4.028205084615384 |3.0                 |
|Traffic Collision                           |6.532761788120024 |4.007960809130437 |4.063380281690141   |
|Medical Incident                            |6.451672010994045 |3.9335766504404934|4.025931605887391   |
|Vehicle Fire                                |6.264285714285714 |3.889761918392857 |4.2178571428571425  |
|Alarms                                      |6.555663587749149 |3.8584001800838688|4.0523821098687405  |
|Structure Fire                              |6.566578316294333 |3.5555240030039217|4.0411460305439855  |
+--------------------------------------------+------------------+------------------+--------------------+
```

### 5.

In [None]:
# groupby on 'CallType' and 'Neighborhood'
# aggregate on count(CallNumber) with alias CountCalls, avg(Delay) with alias AvgDelay,
# min(Delay) with alias MinDelay, and max(Delay) with alias MaxDelay
# in descending order by CallType and then Neighborhood.

( data_calls.
 

 
)


**Expected result**
```raw
+--------------------------------------------+------------------+------------------+--------------------+
|CallType                                    |avg(CallDateMonth)|avg(Delay)        |avg(CallDateWeekDay)|
+--------------------------------------------+------------------+------------------+--------------------+
|Train / Rail Incident                       |4.7368421052631575|32.2991227368421  |2.526315789473684   |
|Suspicious Package                          |5.75              |11.045833499999999|3.25                |
|Assist Police                               |4.363636363636363 |10.645454609090908|4.181818181818182   |
|Marine Fire                                 |8.6               |7.9033333400000005|4.4                 |
|Watercraft in Distress                      |8.5               |7.885714400000002 |4.928571428571429   |
|Aircraft Emergency                          |11.0              |7.7               |1.0                 |
|Mutual Aid / Assist Outside Agency          |8.0               |7.35              |5.0                 |
|High Angle Rescue                           |6.714285714285714 |7.1488095285714275|4.142857142857143   |
|HazMat                                      |6.023255813953488 |6.879844916279069 |4.6976744186046515  |
|Other                                       |6.496359223300971 |6.528357611383497 |3.936893203883495   |
|Fuel Spill                                  |6.585714285714285 |6.202619058571429 |4.4                 |
|Confined Space / Structure Collapse         |7.875             |6.085416649999999 |4.0                 |
|Water Rescue                                |6.106024096385542 |5.9738955807229   |4.125301204819277   |
|Odor (Strange / Unknown)                    |6.303571428571429 |5.943005935714283 |4.151785714285714   |
|Citizen Assist / Service Call               |6.599781897491821 |5.742093786913859 |3.911668484187568   |
|Extrication / Entrapped (Machinery, Vehicle)|6.7               |5.035000019999999 |4.0                 |
|Smoke Investigation (Outside)               |6.694267515923567 |4.766348174267515 |3.751592356687898   |
|Explosion                                   |7.0               |4.646111146666666 |3.966666666666667   |
|Outside Fire                                |6.282632146709816 |4.508881697572809 |3.9773462783171523  |
|Elevator / Escalator Rescue                 |6.208121827411167 |4.435448395939084 |4.355329949238579   |
|Gas Leak (Natural and LP Gases)             |6.343832020997375 |4.363604546456693 |3.979002624671916   |
|Electrical Hazard                           |6.377777777777778 |4.313481479555555 |3.8266666666666667  |
|Industrial Accidents                        |6.076923076923077 |4.028205084615384 |3.0                 |
|Traffic Collision                           |6.532761788120024 |4.007960809130437 |4.063380281690141   |
|Medical Incident                            |6.451672010994045 |3.9335766504404934|4.025931605887391   |
|Vehicle Fire                                |6.264285714285714 |3.889761918392857 |4.2178571428571425  |
|Alarms                                      |6.555663587749149 |3.8584001800838688|4.0523821098687405  |
|Structure Fire                              |6.566578316294333 |3.5555240030039217|4.0411460305439855  |
+--------------------------------------------+------------------+------------------+--------------------+
only showing top 20 rows
````

## Graphics 

Showing outcome of data aggregation with plots so we can better understand

### 6.

In [None]:
# !pip install plotly 
# https://plotly.com/python/
import plotly
import pandas as pd
import plotly.express as px
plotly.__version__

In [None]:
# Get data aggregation
data_plot = ( data_calls
    .groupBy('CallType')
    .count()
    .orderBy('count', ascending=False)
)

# data_plot.show(truncate=False)

In [None]:
# draw the bar plot
fig = px.bar(data_plot.toPandas(),x='CallType',y='count')
fig.show()