In [3]:
#Criação da sessão PySpark e leitura do dataset 'bus-breakdown-and-delays.csv'

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Projeto_Engenharia_Dados_PySpark').getOrCreate()

file = '/content/bus-breakdown-and-delays.csv'

df = spark.read.csv(file, header = True, inferSchema = True)

#df.show(3) Github não permite visualizar o DataFrame total, ocorre a quebra de linhas.
df.printSchema()



root
 |-- School_Year: string (nullable = true)
 |-- Busbreakdown_ID: integer (nullable = true)
 |-- Run_Type: string (nullable = true)
 |-- Bus_No: string (nullable = true)
 |-- Route_Number: string (nullable = true)
 |-- Reason: string (nullable = true)
 |-- Schools_Serviced: string (nullable = true)
 |-- Occurred_On: timestamp (nullable = true)
 |-- Created_On: timestamp (nullable = true)
 |-- Boro: string (nullable = true)
 |-- Bus_Company_Name: string (nullable = true)
 |-- How_Long_Delayed: string (nullable = true)
 |-- Number_Of_Students_On_The_Bus: integer (nullable = true)
 |-- Has_Contractor_Notified_Schools: string (nullable = true)
 |-- Has_Contractor_Notified_Parents: string (nullable = true)
 |-- Have_You_Alerted_OPT: string (nullable = true)
 |-- Informed_On: timestamp (nullable = true)
 |-- Incident_Number: string (nullable = true)
 |-- Last_Updated_On: timestamp (nullable = true)
 |-- Breakdown_or_Running_Late: string (nullable = true)
 |-- School_Age_or_PreK: string (

In [None]:
#Organizando os dados que serão utilizados na análise

#Running Late e Breakdown por empresa
df_filtrado = df.groupBy('Bus_Company_Name', 'Breakdown_or_Running_Late').count()
df_filtrado = df_filtrado.groupBy('Bus_Company_Name').pivot('Breakdown_or_Running_Late', ['Breakdown', 'Running Late']).sum('count')

#Has_Contractor_Notified_Schools (Yer ou No) por empresa
df_filtrado_02 =  df.groupBy('Bus_Company_Name', 'Has_Contractor_Notified_Schools').count()
df_filtrado_02 = df_filtrado_02.groupBy('Bus_Company_Name').pivot('Has_Contractor_Notified_Schools', ['Yes', 'No']).sum('count')

#Has_Contractor_Notified_Parents (Yer ou No) por empresa
df_filtrado_03 = df.groupBy('Bus_Company_Name', 'Has_Contractor_Notified_Parents').count()
df_filtrado_03 = df_filtrado_03.groupBy('Bus_Company_Name').pivot('Has_Contractor_Notified_Parents', ['Yes', 'No']).sum('count')

#Have_You_Alerted_OPT (Yer ou No) por empresa
df_filtrado_04 = df.groupBy('Bus_Company_Name', 'Have_You_Alerted_OPT').count()
df_filtrado_04 = df_filtrado_04.groupBy('Bus_Company_Name').pivot('Have_You_Alerted_OPT', ['Yes', 'No']).sum('count')

#Total de ocorrências de cada empresa
df_total = df.groupBy('Bus_Company_Name').count()
df_total.show() #Neste caso as colunas não extrapolam o limite da tela


+--------------------+-----+
|    Bus_Company_Name|count|
+--------------------+-----+
|         SNT BUS INC| 9647|
|Y & M TRANSIT COR...|   95|
|TWENTY FIRST AV T...|  888|
|MUTUAL BUS CORP. ...|  422|
|DON THOMAS BUSES,...| 3770|
|              ADDIES|   24|
|ALINA SERVICES CORP.|  570|
|B & F SKILLED INC...| 3950|
|LORINDA ENTERPRIS...| 2348|
|CAREFUL BUS SERVI...|  102|
|CONSOLIDATED BUS ...| 2173|
|RELIANT TRANS, IN...| 5743|
|GRANDPA`S BUS CO....| 3829|
|          SMART PICK|  288|
|ALLIED TRANSIT CORP.| 7435|
|PRIDE TRANSPORTAT...| 2417|
|       VAN TRANS LLC|  128|
|         FIRST STEPS|  309|
|CAREFUL BUS SERVI...|  806|
|VINNY`S BUS SERVI...|   20|
+--------------------+-----+
only showing top 20 rows



In [None]:
#Organizando todos os dados filtrados em um único DataSet

df_bus_dados = df_filtrado.join(df_filtrado_02, 'Bus_Company_Name').withColumnRenamed('Yes', 'Notified_School_Yes') \
  .withColumnRenamed('No', 'Notified_School_No')
df_bus_dados = df_bus_dados.join(df_filtrado_03, 'Bus_Company_Name').withColumnRenamed('Yes', 'Notified_Parents_Yes') \
  .withColumnRenamed('No', 'Notified_Parents_No')
df_bus_dados = df_bus_dados.join(df_filtrado_04, 'Bus_Company_Name').withColumnRenamed('Yes', 'Alert_OPT_Yes') \
  .withColumnRenamed('No', 'Alert_OPT_No')
df_bus_dados = df_bus_dados.join(df_total, 'Bus_Company_Name').withColumnRenamed('count', 'Total_Ocorrencias')
#df_bus_dados.show(truncate = False) Github não permite visualizar o DataFrame total, ocorre a quebra de linhas.

+---------------------------+---------+------------+-------------------+------------------+--------------------+-------------------+-------------+------------+-----------------+
|Bus_Company_Name           |Breakdown|Running Late|Notified_School_Yes|Notified_School_No|Notified_Parents_Yes|Notified_Parents_No|Alert_OPT_Yes|Alert_OPT_No|Total_Ocorrencias|
+---------------------------+---------+------------+-------------------+------------------+--------------------+-------------------+-------------+------------+-----------------+
|SNT BUS INC                |1324     |8323        |8845               |802               |1774                |7873               |2232         |7415        |9647             |
|Y & M TRANSIT CORP (B2321) |8        |87          |89                 |6                 |85                  |10                 |2            |93          |95               |
|TWENTY FIRST AV TRANSP (B  |134      |754         |576                |312               |736                

In [None]:
#Substituindo 'NULL' por 0
df_bus_dados = df_bus_dados.fillna(0)
df_bus_dados.show() #Github não permite visualizar o DataFrame total, ocorre a quebra de linhas.

+--------------------+---------+------------+-------------------+------------------+--------------------+-------------------+-------------+------------+-----------------+
|    Bus_Company_Name|Breakdown|Running Late|Notified_School_Yes|Notified_School_No|Notified_Parents_Yes|Notified_Parents_No|Alert_OPT_Yes|Alert_OPT_No|Total_Ocorrencias|
+--------------------+---------+------------+-------------------+------------------+--------------------+-------------------+-------------+------------+-----------------+
|         SNT BUS INC|     1324|        8323|               8845|               802|                1774|               7873|         2232|        7415|             9647|
|Y & M TRANSIT COR...|        8|          87|                 89|                 6|                  85|                 10|            2|          93|               95|
|TWENTY FIRST AV T...|      134|         754|                576|               312|                 736|                152|          133|      

In [None]:
#Salvando os dados tratados e filtrados em um arquivo parquet.

df_bus_dados.write.parquet('bus_dados_df_parquet')