In [46]:
# Imports
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import col,row_number,lead,min, max,split, concat_ws,sum
from pyspark.sql.types import *
import findspark
from pyspark.sql.functions import col, isnan, when, count, date_format, mean
import pandas as pd


In [2]:
# Inicializa o spark
findspark.init()

In [3]:
# Criando o Spark Context
sc = SparkContext(appName="Projeto_Transporte")

In [4]:
# Cria a sessão
spark = SparkSession.builder.getOrCreate()

In [5]:
# Carrega como dataframe do Spark
df = pd.read_excel("./dados/TFL Bus Safety.xlsx")
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 23158 entries, 0 to 23157
Data columns (total 12 columns):
 #   Column                     Non-Null Count  Dtype         
---  ------                     --------------  -----         
 0   Year                       23158 non-null  int64         
 1   Date Of Incident           23158 non-null  datetime64[ns]
 2   Route                      23158 non-null  object        
 3   Operator                   23158 non-null  object        
 4   Group Name                 23158 non-null  object        
 5   Bus Garage                 23158 non-null  object        
 6   Borough                    23158 non-null  object        
 7   Injury Result Description  23158 non-null  object        
 8   Incident Event Type        23158 non-null  object        
 9   Victim Category            23158 non-null  object        
 10  Victims Sex                23158 non-null  object        
 11  Victims Age                23158 non-null  object        
dtypes: d

In [6]:
df[df.Borough == 'None']

Unnamed: 0,Year,Date Of Incident,Route,Operator,Group Name,Bus Garage,Borough,Injury Result Description,Incident Event Type,Victim Category,Victims Sex,Victims Age


In [7]:
df.columns = ['Year', 'Date_Incident', 'Route', 'Operator', 'Group_Name',
              'Bus_Garage', 'Borough', 'Injury_Result_Description',
              'Incident_Event_Type', 'Victim_Category', 'Victims_Sex', 'Victims_Age']

In [8]:
#cria o esquema dos metadados
df_schema = StructType([StructField("Year", IntegerType(), True), StructField("Date_Incident", TimestampType(), True),
                        StructField("Route", StringType(), True), StructField("Operator", StringType(), True),
                        StructField("Group_Name", StringType(), True), StructField("Bus_Garage", StringType(), True),
                        StructField("Borough", StringType(), True),
                        StructField("Injury_Result_Description", StringType(), True),
                        StructField("Incident_Event_Type", StringType(), True),
                        StructField("Victim_Category", StringType(), True),
                        StructField("Victims_Sex", StringType(), True), StructField("Victims_Age", StringType(), True)])

In [9]:
df_spark = spark.createDataFrame(df, schema=df_schema)

In [10]:
# Tipo do objeto
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [11]:
# Visualiza os dados
df_spark.show()

+----+-------------------+-----+--------------+-------------+--------------------+--------------------+-------------------------+-------------------+---------------+-----------+-----------+
|Year|      Date_Incident|Route|      Operator|   Group_Name|          Bus_Garage|             Borough|Injury_Result_Description|Incident_Event_Type|Victim_Category|Victims_Sex|Victims_Age|
+----+-------------------+-----+--------------+-------------+--------------------+--------------------+-------------------------+-------------------+---------------+-----------+-----------+
|2015|2015-01-01 00:00:00|    1|London General|     Go-Ahead|Garage Not Available|           Southwark|     Injuries treated ...|   Onboard Injuries|      Passenger|       Male|      Child|
|2015|2015-01-01 00:00:00|    4|     Metroline|    Metroline|Garage Not Available|           Islington|     Injuries treated ...|   Onboard Injuries|      Passenger|       Male|    Unknown|
|2015|2015-01-01 00:00:00|    5|   East London|   

In [12]:
# Visualiza os metadados (schema)
df_spark.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Date_Incident: timestamp (nullable = true)
 |-- Route: string (nullable = true)
 |-- Operator: string (nullable = true)
 |-- Group_Name: string (nullable = true)
 |-- Bus_Garage: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Injury_Result_Description: string (nullable = true)
 |-- Incident_Event_Type: string (nullable = true)
 |-- Victim_Category: string (nullable = true)
 |-- Victims_Sex: string (nullable = true)
 |-- Victims_Age: string (nullable = true)



In [13]:
# Verifica o número de linhas
df_spark.count()

23158

In [14]:
df_spark.describe()

DataFrame[summary: string, Year: string, Route: string, Operator: string, Group_Name: string, Bus_Garage: string, Borough: string, Injury_Result_Description: string, Incident_Event_Type: string, Victim_Category: string, Victims_Sex: string, Victims_Age: string]

In [15]:
#Remove o datetime para verificar valores ausente. O datetime dá erro
columns = df_spark.columns
columns.pop(1)

'Date_Incident'

In [16]:
df_spark.select([count(when(col(c).contains('None') | col(c).contains('NULL') | \
                            (col(c) == '') | col(c).isNull() | isnan(c), c
                            )).alias(c) for c in columns]).show()

+----+-----+--------+----------+----------+-------+-------------------------+-------------------+---------------+-----------+-----------+
|Year|Route|Operator|Group_Name|Bus_Garage|Borough|Injury_Result_Description|Incident_Event_Type|Victim_Category|Victims_Sex|Victims_Age|
+----+-----+--------+----------+----------+-------+-------------------------+-------------------+---------------+-----------+-----------+
|   0|    0|       0|         0|         0|    561|                        0|                  0|              0|          0|          0|
+----+-----+--------+----------+----------+-------+-------------------------+-------------------+---------------+-----------+-----------+



In [17]:
#Imprimir os valores None identificados pelo Spark
df_borough= df_spark.filter(col("Borough").contains('None'))
df_borough.select("Borough").show()

+-------------------+
|            Borough|
+-------------------+
|None London Borough|
|None London Borough|
|None London Borough|
|None London Borough|
|None London Borough|
|None London Borough|
|None London Borough|
|None London Borough|
|None London Borough|
|None London Borough|
|None London Borough|
|None London Borough|
|None London Borough|
|None London Borough|
|None London Borough|
|None London Borough|
|None London Borough|
|None London Borough|
|None London Borough|
|None London Borough|
+-------------------+
only showing top 20 rows



In [18]:
#Visualizar valores unicos da coluna Borough
df_spark.select('Borough').distinct().show(35)

+--------------------+
|             Borough|
+--------------------+
|          Wandsworth|
|             Croydon|
|              Bexley|
|             Lambeth|
|              Camden|
|Hammersmith & Fulham|
|           Greenwich|
|              Newham|
|       Tower Hamlets|
|  Barking & Dagenham|
|            Hounslow|
|              Barnet|
|              Harrow|
|           Islington|
|               Brent|
|            Haringey|
|             Bromley|
|              Merton|
|         Westminster|
|           Southwark|
|             Hackney|
| None London Borough|
|Richmond Upon Thames|
|             Enfield|
|              Ealing|
|              Sutton|
|       Not specified|
|Kingston upon Thames|
|Kensington & Chelsea|
|            Havering|
|          Hillingdon|
|      Waltham Forest|
|           Redbridge|
|      City of London|
|            Lewisham|
+--------------------+



In [19]:
# Remove valores NA
df_spark = df_spark.dropna('any')

In [20]:
df_spark.count()

23158

In [21]:
#Vamos extrair a coluna Mês
df_spark = df_spark.withColumn("Month", split(col("Date_Incident"),"-").getItem(1))

In [22]:
df_spark=df_spark.withColumn("Month",col("Month").cast("integer"))

In [23]:
df_spark.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Date_Incident: timestamp (nullable = true)
 |-- Route: string (nullable = true)
 |-- Operator: string (nullable = true)
 |-- Group_Name: string (nullable = true)
 |-- Bus_Garage: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Injury_Result_Description: string (nullable = true)
 |-- Incident_Event_Type: string (nullable = true)
 |-- Victim_Category: string (nullable = true)
 |-- Victims_Sex: string (nullable = true)
 |-- Victims_Age: string (nullable = true)
 |-- Month: integer (nullable = true)



In [24]:
df_spark=df_spark.withColumn("Month_Year",date_format("Date_Incident",'MM-yyyy'))

In [25]:
df_spark.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Date_Incident: timestamp (nullable = true)
 |-- Route: string (nullable = true)
 |-- Operator: string (nullable = true)
 |-- Group_Name: string (nullable = true)
 |-- Bus_Garage: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Injury_Result_Description: string (nullable = true)
 |-- Incident_Event_Type: string (nullable = true)
 |-- Victim_Category: string (nullable = true)
 |-- Victims_Sex: string (nullable = true)
 |-- Victims_Age: string (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Month_Year: string (nullable = true)



In [26]:
df_spark.select("Month_Year").distinct().show()

+----------+
|Month_Year|
+----------+
|   01-2015|
|   04-2015|
|   05-2015|
|   06-2015|
|   02-2015|
|   09-2015|
|   08-2015|
|   03-2015|
|   11-2015|
|   10-2015|
|   07-2015|
|   12-2015|
|   04-2016|
|   10-2016|
|   12-2016|
|   11-2016|
|   01-2016|
|   05-2016|
|   08-2016|
|   06-2016|
+----------+
only showing top 20 rows



### Pergunta 1 - Qual a quantidade de incidentes por gênero?

In [27]:
df_spark.groupby('Victims_Sex').agg(count(col("Victims_Sex")).alias('Total')).show()

+-----------+-----+
|Victims_Sex|Total|
+-----------+-----+
|     Female|11847|
|    Unknown| 3602|
|       Male| 7709|
+-----------+-----+



### Pergunta 2 - Qual faixa etária esteve mais envolvida nos incidentes?

In [28]:
df_spark.groupby('Victims_Age').agg(count(col("Victims_Age")).alias('Total')).orderBy('Total', ascending = False).show(1)

+-----------+-----+
|Victims_Age|Total|
+-----------+-----+
|      Adult|10754|
+-----------+-----+
only showing top 1 row



### Pergunta 3 - Qual o percentual de incidentes por tipo de evento (Incident Event Type)?

In [29]:
df_spark_perc = df_spark.groupby('Incident_Event_Type').agg(count(col("Incident_Event_Type")).alias('Total'))
df_spark_perc = df_spark_perc.withColumn("Percentual",(df_spark_perc['Total']/df_spark.count())*100)
df_spark_perc.show()

+--------------------+-----+--------------------+
| Incident_Event_Type|Total|          Percentual|
+--------------------+-----+--------------------+
|Vandalism Hooliga...|   73|   0.315225839882546|
|             Assault|  590|   2.547715692201399|
|    Onboard Injuries| 6563|  28.340098454097934|
|  Collision Incident| 4166|   17.98946368425598|
|Safety Critical F...|   66| 0.28499870455134296|
|     Personal Injury| 4596|   19.84627342602988|
|Activity Incident...|  114|  0.4922704896795924|
|                Fire|    6|0.025908973141031175|
|      Slip Trip Fall| 6981|  30.145090249589774|
|             Robbery|    3|0.012954486570515587|
+--------------------+-----+--------------------+



### Pergunta 4 - Como foi a evolução de incidentes por mês ao longo do tempo?

In [35]:
df_spark_evo_incidente =  df_spark.groupby('Year','Month').agg(count(col("Incident_Event_Type")).alias('Total'))
df_spark_evo_incidente.show()

+----+-----+-----+
|Year|Month|Total|
+----+-----+-----+
|2015|    2|  371|
|2015|    4|  470|
|2015|    6|  564|
|2015|    1|  399|
|2015|    5|  472|
|2015|    8|  446|
|2015|   11|  495|
|2015|    9|  487|
|2015|   10|  470|
|2015|    3|  460|
|2015|    7|  558|
|2015|   12|  523|
|2016|   11|  612|
|2016|   10|  551|
|2016|   12|  471|
|2016|    1|  466|
|2016|    4|  504|
|2016|    7|  507|
|2016|    5|  494|
|2016|    2|  484|
+----+-----+-----+
only showing top 20 rows



### Pergunta 5 - Quando o incidente foi “Collision Incident” em qual mês houve o maior número de incidentes envolvendo pessoas do sexo feminino?

In [39]:
df_spark.filter((col('Incident_Event_Type') == 'Collision Incident') & (col('Victims_Sex') == 'Female')).groupby('Month').agg(count(col("Victims_Sex")).alias('Total')).orderBy('Total', ascending = False).show(1)

+-----+-----+
|Month|Total|
+-----+-----+
|    9|  158|
+-----+-----+
only showing top 1 row



### Pergunta 6 -Qual foi a média de incidentes por mês envolvendo crianças (Child)?

In [49]:
df_spark.filter(col('Victims_Age') == 'Child').groupby('Year', 'Month').agg(count('Victims_Age').alias('Total')).groupby('Month').agg(
    {'Total':'mean'}).show()

+-----+------------------+
|Month|        avg(Total)|
+-----+------------------+
|   12|35.666666666666664|
|    1|              31.5|
|    6|             61.25|
|    3|              47.5|
|    5|              51.5|
|    9|             56.25|
|    4|              56.5|
|    8|              52.0|
|    7|              58.0|
|   10|              50.0|
|   11|              40.0|
|    2|              36.5|
+-----+------------------+



### Pergunta 7 - Considerando a descrição de incidente como “Injuries treated on scene”, qual o total de incidentes de pessoas do sexo masculino e sexo feminino?

In [52]:
df_spark.filter((col('Injury_Result_Description')=='Injuries treated on scene')&(col('Victims_Sex') != 'Unknown')).count()

14448

### Pergunta 8 - No ano de 2017 em qual mês houve mais incidentes com idosos (Elderly)?

In [53]:
df_spark.filter((col('Year') == 2017)&(col('Victims_Age') == 'Elderly')).groupby('Month').agg(count('Victims_Age').alias('Total')).orderBy('Total', ascending = False).show(1)

+-----+-----+
|Month|Total|
+-----+-----+
|    7|   81|
+-----+-----+
only showing top 1 row



### Pergunta 9 - Considerando o Operador qual a distribuição de incidentes ao longo do tempo?

In [60]:
df_spark.select('Operator', 'Year','Month').groupby( 'Year','Month', 'Operator').agg(count('Operator').alias('Total')).show()

+----+-----+--------------------+-----+
|Year|Month|            Operator|Total|
+----+-----+--------------------+-----+
|2015|    4|      London Central|   30|
|2015|    5|            Metrobus|   13|
|2015|    1|      Metroline West|   25|
|2015|    1|             Selkent|   27|
|2015|    1|       Tower Transit|    7|
|2015|    2|         East London|   29|
|2015|    6|      London General|   32|
|2015|    5| Arriva London North|   64|
|2015|    5|    London Sovereign|    6|
|2015|    6| Arriva London South|   49|
|2015|    4| Blue Triangle Buses|    2|
|2015|    1|         East London|    4|
|2015|    6|        Abellio West|    2|
|2015|    1|Arriva Kent Thame...|   14|
|2015|    4|            C T Plus|    3|
|2015|    1| Arriva London North|   57|
|2015|    6|    London Sovereign|    6|
|2015|    5|        Abellio West|    3|
|2015|    6|      Metroline West|   29|
|2015|    6|         East London|  101|
+----+-----+--------------------+-----+
only showing top 20 rows



### Pergunta 10 - Qual o tipo de incidente mais comum com ciclistas?

In [61]:
df_spark.filter(col('Victim_Category')== 'Cyclist').groupby('Incident_Event_Type').agg(count('Victim_Category').alias('Total')).orderBy('Total', ascending=False).show(1)

+-------------------+-----+
|Incident_Event_Type|Total|
+-------------------+-----+
| Collision Incident|  256|
+-------------------+-----+
only showing top 1 row

