In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 33 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 46.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=970b42ffa5fa39e927fcf191df8df8d6c405335da04f47103256979c1cb13c53
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [None]:
import sys
from pyspark.sql import SparkSession, functions, types
from pyspark.context import SparkContext
from pyspark.sql.types import IntegerType,StringType
from pyspark.sql.functions import *
from pyspark.sql.functions import isnan, when, count, col
#ML Algorithms
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, SQLTransformer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LinearSVC

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')
path_prefix = "/content/gdrive/MyDrive/cadorsdata"

import sys
sys.path.insert(1, path_prefix)

Mounted at /content/gdrive


In [None]:
spark = SparkSession.builder.appName('ETL temp').getOrCreate()
# assert spark.version >= '3.0' # make sure we have Spark 3.0+
spark.sparkContext.setLogLevel('WARN')
sc = spark.sparkContext

In [None]:
aircraft_df = spark.read.json('/content/gdrive/MyDrive/cadorsdata_aircraft/*')
weather_df = spark.read.json('/content/gdrive/MyDrive/cadorsdata_weather/*')
print(aircraft_df.count())
print(aircraft_df.dropDuplicates())
print(weather_df.count())
print(weather_df.dropDuplicates())

303018
DataFrame[CADORS Number: string, Fatalities:: bigint, GR: string, Injuries:: bigint, Occurrence Category: array<string>, Occurrence Date:: string, Occurrence Event Information: array<string>, Occurrence Location:: string, Occurrence Type:: string, Province:: string, aircraft_category: string, amateur_built: string, damage: string, flight_rule: string, gear_type: string, hour: bigint, label: bigint, latitude: double, longitude: double, make: string, month: bigint, operator_type: string, phase_of_flight: string, pos: bigint, registration_mark: string, year: bigint, year_built: string]
279769
DataFrame[CADORS Number: string, prcp: string, pres: string, snow: string, tavg: string, tmax: string, tmin: string, tsun: string, wdir: string, wpgt: string, wspd: string]


In [None]:
aircraft_df_acci=aircraft_df.filter(col('Occurrence Type:')=='Accident').cache()
weather_df_acci=weather_df.join(aircraft_df_acci.select(['CADORS Number']),['CADORS Number']).dropDuplicates().cache()
print(aircraft_df_acci.count())
print(aircraft_df_acci.dropDuplicates())
print(weather_df_acci.count())
print(weather_df_acci.dropDuplicates())

7846
DataFrame[CADORS Number: string, Fatalities:: bigint, GR: string, Injuries:: bigint, Occurrence Category: array<string>, Occurrence Date:: string, Occurrence Event Information: array<string>, Occurrence Location:: string, Occurrence Type:: string, Province:: string, aircraft_category: string, amateur_built: string, damage: string, flight_rule: string, gear_type: string, hour: bigint, label: bigint, latitude: double, longitude: double, make: string, month: bigint, operator_type: string, phase_of_flight: string, pos: bigint, registration_mark: string, year: bigint, year_built: string]
7678
DataFrame[CADORS Number: string, prcp: int, pres: int, snow: int, tavg: int, tmax: int, tmin: int, tsun: int, wdir: int, wpgt: int, wspd: int]


In [None]:
weather_df = weather_df.withColumn('prcp', weather_df['prcp'].cast(IntegerType()))
weather_df = weather_df.withColumn('snow', weather_df['snow'].cast(IntegerType()))
weather_df = weather_df.withColumn('pres', weather_df['pres'].cast(IntegerType()))
weather_df = weather_df.withColumn('tavg', weather_df['tavg'].cast(IntegerType()))
weather_df = weather_df.withColumn('tmax', weather_df['tmax'].cast(IntegerType()))
weather_df = weather_df.withColumn('tmin', weather_df['tmin'].cast(IntegerType()))
weather_df = weather_df.withColumn('tsun', weather_df['tsun'].cast(IntegerType()))
weather_df = weather_df.withColumn('wdir', weather_df['wdir'].cast(IntegerType()))
weather_df = weather_df.withColumn('wpgt', weather_df['wpgt'].cast(IntegerType()))
weather_df = weather_df.withColumn('wspd', weather_df['wspd'].cast(IntegerType()))

CLASSIFICATION FOR ONLY ACCIDENTS

In [None]:
def merge(x):
  if len(x)==0:
    return 'NA'
  temp=''
  x=sorted(x)
  for i in range(len(x)):
    temp+=x[i]
  return temp
  
def con(x):
    if x=='':
        return 'NA'
    return x

In [None]:
transform_udf=udf(merge,StringType())
makeNA_udf = udf(con,StringType())

#picking features from df( Main Data)
featureset1=aircraft_df_acci.select(['CADORS Number','Occurrence Category','hour','GR','Occurrence Event Information','month']).dropDuplicates()
featureset1=featureset1.withColumn('Occurrence Event Information_merged',transform_udf(col('Occurrence Event Information'))).withColumn('Occurrence Category_merged',transform_udf(col('Occurrence Category')))
featureset1=featureset1.withColumn('Occurrence Event Information_merged',makeNA_udf(col('Occurrence Event Information_merged'))).withColumn('Occurrence Category_merged',makeNA_udf(col('Occurrence Category_merged')))
print("FEATURES 1\n")
print(featureset1.count())
print(featureset1.head(5))


FEATURES 1

7678
[Row(CADORS Number='2000C0204', Occurrence Category=['Abnormal runway contact'], hour=18, GR='Alberta', Occurrence Event Information=[], month=3, Occurrence Event Information_merged='NA', Occurrence Category_merged='Abnormal runway contact'), Row(CADORS Number='2000C0770', Occurrence Category=['Abnormal runway contact'], hour=14, GR='Alberta', Occurrence Event Information=[], month=7, Occurrence Event Information_merged='NA', Occurrence Category_merged='Abnormal runway contact'), Row(CADORS Number='2000O0271', Occurrence Category=['Other', 'Runway excursion'], hour=21, GR='Ontario', Occurrence Event Information=[], month=4, Occurrence Event Information_merged='NA', Occurrence Category_merged='OtherRunway excursion'), Row(CADORS Number='2000O0466', Occurrence Category=['Other'], hour=14, GR='Ontario', Occurrence Event Information=[], month=6, Occurrence Event Information_merged='NA', Occurrence Category_merged='Other'), Row(CADORS Number='2003O1133', Occurrence Category

In [None]:
#features from aircraft data
# aircraft_information1=aircraft_information1.join(df.select('CADORS Number'),['CADORS Number'])
featureset2 = aircraft_df_acci.groupBy('CADORS Number').agg(collect_list('flight_rule').alias('flight_rule'),collect_list('year_built').alias('year_built'),collect_list('amateur_built').alias('amateur_built'),collect_list('gear_type').alias('gear_type'),collect_list('phase_of_flight').alias('phase_of_flight'),collect_list('damage').alias('damage'),collect_list('operator_type').alias('operator_type'))
featureset2=featureset2.withColumn('flight_rule_merged',transform_udf(col('flight_rule'))).withColumn('year_built_merged',transform_udf(col('year_built'))).withColumn('amateur_built_merged',transform_udf(col('amateur_built'))).withColumn('gear_type_merged',transform_udf(col('gear_type'))).withColumn('phase_of_flight_merged',transform_udf(col('phase_of_flight'))).withColumn('damage_merged',transform_udf(col('damage'))).withColumn('operator_type_merged',transform_udf(col('operator_type')))
featureset2=featureset2.withColumn('flight_rule_merged',makeNA_udf(col('flight_rule_merged'))).withColumn('year_built_merged',makeNA_udf(col('year_built_merged'))).withColumn('amateur_built_merged',makeNA_udf(col('amateur_built_merged'))).withColumn('gear_type_merged',makeNA_udf(col('gear_type_merged'))).withColumn('phase_of_flight_merged',makeNA_udf(col('phase_of_flight_merged'))).withColumn('damage_merged',makeNA_udf(col('damage_merged'))).withColumn('operator_type_merged',makeNA_udf(col('operator_type_merged')))
print("FEATURES 2\n")
print(featureset2.count())
print(featureset2.head(5))


FEATURES 2

7678
[Row(CADORS Number='1993C0151', flight_rule=['NA'], year_built=['1956'], amateur_built=['No'], gear_type=['NA'], phase_of_flight=['Unknown'], damage=['Unknown'], operator_type=['Commercial'], flight_rule_merged='NA', year_built_merged='1956', amateur_built_merged='No', gear_type_merged='NA', phase_of_flight_merged='Unknown', damage_merged='Unknown', operator_type_merged='Commercial'), Row(CADORS Number='1993O0043', flight_rule=['NA'], year_built=['1976'], amateur_built=['No'], gear_type=['Land'], phase_of_flight=['Roll Out'], damage=['Substantial'], operator_type=['Commercial'], flight_rule_merged='NA', year_built_merged='1976', amateur_built_merged='No', gear_type_merged='Land', phase_of_flight_merged='Roll Out', damage_merged='Substantial', operator_type_merged='Commercial'), Row(CADORS Number='1993O0077', flight_rule=['NA'], year_built=['1991'], amateur_built=['No'], gear_type=['Land'], phase_of_flight=['Landing'], damage=['Minor'], operator_type=['Commercial'], fli

In [None]:
# #features from weather data
featureset3=weather_df_acci.select(['CADORS Number','prcp','snow']).dropDuplicates()
featureset3=featureset3.dropna(how='any')
print("FEATURES 3\n")
print(featureset3.count())
print(featureset3.head(5))

FEATURES 3

5888
[Row(CADORS Number='2000Q0399', prcp=1, snow=0), Row(CADORS Number='1995O0436', prcp=0, snow=0), Row(CADORS Number='2012O1389', prcp=0, snow=0), Row(CADORS Number='1995O0367', prcp=0, snow=0), Row(CADORS Number='2006C0787', prcp=0, snow=0)]


In [None]:
## Final data for ML
ff=featureset1.join(featureset2,['CADORS Number'],how='inner')
ff=ff.join(featureset3,['CADORS Number'],how='inner')
print(ff.count())
print(ff.show())
ff.printSchema()

5888
+-------------+--------------------+----+----------------+----------------------------+-----+-----------------------------------+--------------------------+-----------+----------+-------------+---------+---------------+-------------+-------------+------------------+-----------------+--------------------+----------------+----------------------+-------------+--------------------+----+----+
|CADORS Number| Occurrence Category|hour|              GR|Occurrence Event Information|month|Occurrence Event Information_merged|Occurrence Category_merged|flight_rule|year_built|amateur_built|gear_type|phase_of_flight|       damage|operator_type|flight_rule_merged|year_built_merged|amateur_built_merged|gear_type_merged|phase_of_flight_merged|damage_merged|operator_type_merged|prcp|snow|
+-------------+--------------------+----+----------------+----------------------------+-----+-----------------------------------+--------------------------+-----------+----------+-------------+---------+----------

In [None]:
######################### ML DATA ####################################
finalMLData=ff.join(aircraft_df.select(['CADORS Number','label']),['CADORS Number'])
print(finalMLData.filter(finalMLData['label']==1).count()) 
#771 
train, validation = finalMLData.randomSplit([0.75, 0.25])
train = train.cache()
validation = validation.cache()

771


LOGISTIC REGRESSION

In [None]:
# ################################# Pipeline ############################################
str_indexer = StringIndexer(inputCols=['Occurrence Category_merged','GR','Occurrence Event Information_merged','flight_rule_merged','year_built_merged','amateur_built_merged','gear_type_merged','phase_of_flight_merged','damage_merged','operator_type_merged'], outputCols=['Occurrence Category_merged1','GR1','Occurrence Event Information_merged1','flight_rule_merged1','year_built_merged1','amateur_built_merged1','gear_type_merged1','phase_of_flight_merged1','damage_merged1','operator_type_merged1'],handleInvalid="keep")
assembler = VectorAssembler(inputCols=['hour','month','Occurrence Category_merged1','GR1','Occurrence Event Information_merged1','flight_rule_merged1','year_built_merged1','amateur_built_merged1','gear_type_merged1','phase_of_flight_merged1','phase_of_flight_merged1','damage_merged1','operator_type_merged1','operator_type_merged1','prcp','snow'], outputCol="features")
################### ML Model ####################################
classifier = LogisticRegression(featuresCol="features",labelCol="label")
pipeline = Pipeline(stages=[str_indexer,assembler,classifier])
# pipeline = Pipeline(stages=[str_indexer])
model =    pipeline.fit(train)

In [None]:
pred=model.transform(validation)
print(pred.show(20))
eval=pred.select(pred.label,pred.rawPrediction,pred.prediction)
evaluator = BinaryClassificationEvaluator()
score=evaluator.evaluate(eval)
print(score)

In [None]:
# pred.write.json("/content/gdrive/MyDrive/cadors_LR/")

0.5750645301867185


SVM

In [None]:
# ################################# Pipeline ############################################
str_indexer1 = StringIndexer(inputCols=['Occurrence Category_merged','GR','Occurrence Event Information_merged','flight_rule_merged','year_built_merged','amateur_built_merged','gear_type_merged','phase_of_flight_merged','damage_merged','operator_type_merged'], outputCols=['Occurrence Category_merged1','GR1','Occurrence Event Information_merged1','flight_rule_merged1','year_built_merged1','amateur_built_merged1','gear_type_merged1','phase_of_flight_merged1','damage_merged1','operator_type_merged1'],handleInvalid="keep")
assembler1 = VectorAssembler(inputCols=['hour','month','GR1','Occurrence Category_merged1','Occurrence Event Information_merged1','flight_rule_merged1','year_built_merged1','amateur_built_merged1','gear_type_merged1','phase_of_flight_merged1','phase_of_flight_merged1','damage_merged1','operator_type_merged1','operator_type_merged1','prcp','snow'], outputCol="features")
################### ML Model ####################################
classifier1 = LinearSVC(featuresCol="features",labelCol="label")
pipeline1 = Pipeline(stages=[str_indexer1,assembler1,classifier1])
# pipeline = Pipeline(stages=[str_indexer])
model1 =    pipeline1.fit(train)

In [None]:
pred1=model1.transform(validation)
print(pred1.show(20))
eval1=pred1.select(pred1.label,pred1.rawPrediction,pred1.prediction)
evaluator1 = BinaryClassificationEvaluator()
score1=evaluator1.evaluate(eval1)
print(score1)

+-------------+--------------------+----+--------------------+----------------------------+-----+-----------------------------------+--------------------------+-----------+----------+-------------+----------------+-------------------+--------------------+--------------------+------------------+-----------------+--------------------+----------------+----------------------+----------------+--------------------+----+----+-----+---------------------------+---+------------------------------------+-------------------+------------------+---------------------+-----------------+-----------------------+--------------+---------------------+--------------------+--------------------+----------+
|CADORS Number| Occurrence Category|hour|                  GR|Occurrence Event Information|month|Occurrence Event Information_merged|Occurrence Category_merged|flight_rule|year_built|amateur_built|       gear_type|    phase_of_flight|              damage|       operator_type|flight_rule_merged|year_built_mer

In [None]:
# target_df = predictions.select([col('`{}`'.format(c)).cast(StringType()).alias(c) for c in predictions.columns])
temp=spark.read.json('/content/gdrive/MyDrive/cadors_SVC/*')
temp.write.option('header',True).csv('/content/gdrive/MyDrive/ccadors_SVC_sample')
# target_df.write.json("/content/gdrive/MyDrive/cadors_SVC/")

In [None]:
predictions = spark.read.json('/content/gdrive/MyDrive/cadors_LR/')
predictions.count()

1574

In [None]:
temp=target_df.filter((col('Occurrence Category_merged')!='NA'))


In [None]:
temp.filter(col('prediction')==1).show()
predictions.filter(col('prediction')==1).show()

+-------------+---+---+-------------------+--------------------------+---------------------------+----------------------------+-----------------------------------+------------------------------------+-------------+--------------------+---------------------+------+-------------+--------------+--------+-----------+------------------+-------------------+---------+----------------+-----------------+----+-----+-----+-------------+--------------------+---------------------+---------------+----------------------+-----------------------+----+----------+-----------+-------------+----+----------+-----------------+------------------+
|CADORS Number| GR|GR1|Occurrence Category|Occurrence Category_merged|Occurrence Category_merged1|Occurrence Event Information|Occurrence Event Information_merged|Occurrence Event Information_merged1|amateur_built|amateur_built_merged|amateur_built_merged1|damage|damage_merged|damage_merged1|features|flight_rule|flight_rule_merged|flight_rule_merged1|gear_type|gear_t

In [None]:
target_df = predictions.select([col('`{}`'.format(c)).cast(StringType()).alias(c) for c in predictions.columns])
# target_df = predictions.sample(withReplacement=False, fraction=0.10).select([col('`{}`'.format(c)).cast(StringType()).alias(c) for c in predictions.columns])

In [None]:
temp.write.option('header',True).csv('/content/gdrive/MyDrive/cadorsdata_LR_sample')

In [None]:
print(predictions.count())
print('True Positive: ',predictions.filter((col('prediction')==1) & (col('label')==1)).count())
print('True Negative: ',predictions.filter((col('prediction')==0) & (col('label')==0)).count())
print('False Positive: ',predictions.filter((col('prediction')==1) & (col('label')==0)).count())
print('False Negative: ',predictions.filter((col('prediction')==0) & (col('label')==1)).count())

1574
True Positive:  2
True Negative:  1373
False Positive:  9
False Negative:  190
