In [1]:
from pyspark.sql import SparkSession
import os
from traffic.core import Traffic
import pandas as pd
import h5py
from tqdm.auto import tqdm
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import FloatType,  IntegerType, StringType
from joblib import load as load_scaler
from elephas.ml_model import ElephasEstimator
from src.processing_utils.preprocessing import preprocess_traffic, generate_aux_columns, seconds_till_arrival
import random
from src.models.fnn import build_sequential

In [37]:
os.environ['PYSPARK_PYTHON'] = 'C:/Users/dario/anaconda3/envs/Lufthansa-Arrival-Time-Prediction/python.exe'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/dario/anaconda3/envs/Lufthansa-Arrival-Time-Prediction/python.exe'

spark = SparkSession.builder \
    .appName("LoadData") \
    .config("spark.executor.memory", "16g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .master("local[*]") \
    .getOrCreate()


columns = [
    "distance",
    "altitude",
    "geoaltitude",
    "vertical_rate",
    "groundspeed",
    "holiday",
    'sec_sin',
    'sec_cos',
    'day_sin',
    'day_cos',
    'bearing_sin',
    'bearing_cos',
    'track_sin',
    'track_cos',
    'latitude_rad',
    'longitude_rad',
    'weekday_0',
    'weekday_1',
    'weekday_2',
    'weekday_3',
    'weekday_4',
    'weekday_5',
    'weekday_6',
    'seconds_till_arrival',
    "day",
]
COLS_TO_SCALE = ["distance", "altitude", "geoaltitude", "vertical_rate", "groundspeed"]
PATH_SCALER = os.path.join("..", "trained_models", "std_scaler_reg_new.bin")
scaler = load_scaler(PATH_SCALER)
def load_data_batch_spark(file_batch):

    schema = StructType([
        StructField("distance", FloatType(), True),
        StructField("altitude", FloatType(), True),
        StructField("geoaltitude", FloatType(), True),
        StructField("vertical_rate", FloatType(), True),
        StructField("groundspeed", FloatType(), True),
        StructField("holiday", IntegerType(), True),
        StructField('sec_sin',FloatType(), True),
        StructField('sec_cos',FloatType(), True),
        StructField('day_sin',FloatType(), True),
        StructField('day_cos',FloatType(), True),
        StructField('bearing_sin',FloatType(), True),
        StructField('bearing_cos',FloatType(), True),
        StructField('track_sin',FloatType(), True),
        StructField('track_cos',FloatType(), True),
        StructField('latitude_rad',FloatType(), True),
        StructField('longitude_rad',FloatType(), True),
        StructField('weekday_0',IntegerType(), True),
        StructField('weekday_1', IntegerType(), True),
        StructField('weekday_2', IntegerType(), True),
        StructField('weekday_3', IntegerType(), True),
        StructField('weekday_4', IntegerType(), True),
        StructField('weekday_5', IntegerType(), True),
        StructField('weekday_6', IntegerType(), True),
        StructField('seconds_till_arrival', FloatType(), True),
        StructField('day', StringType(), True),
    ])
    print(len(schema), len(columns))
    # Placeholder for the final DataFrame
    all_flights = None
    first_month = True
    for file in file_batch:

        with h5py.File(file, 'r') as f:
            first_day = True

            for key in tqdm(list(f.keys())[:5], desc=file):
                # Retrieve the data using your existing method
                new_flights = Traffic.from_file(file, key=key,
                                                parse_dates=["day", "firstseen", "hour", "last_position",
                                                             "lastseen", "timestamp"]).data
                if first_day:
                    df_flights = new_flights.copy()
                    first_day = False
                else:
                    df_flights = pd.concat([df_flights, new_flights])

            df_flights = preprocess_traffic(df_flights, remove_noise=True)
            df_flights = generate_aux_columns(df_flights)
            df_flights["day"] = df_flights.firstseen.astype(str)
            df_flights = df_flights.dropna()
            df_flights["seconds_till_arrival"] = seconds_till_arrival(df_flights)
            df_flights[COLS_TO_SCALE] =scaler.transform(df_flights[COLS_TO_SCALE])


        if first_month:
            all_flights = spark.createDataFrame(df_flights[columns], schema=schema)
            first_month = False
        else:
            add = spark.createDataFrame(df_flights[columns], schema=schema)
            all_flights = all_flights.union(add)

    return all_flights


In [38]:
PATH_DATA = os.path.join("..", "data", "raw")
files = [os.path.join(PATH_DATA,file) for file in os.listdir(PATH_DATA)]
files

['..\\data\\raw\\Frankfurt_LH_2201.h5',
 '..\\data\\raw\\Frankfurt_LH_2202.h5',
 '..\\data\\raw\\Frankfurt_LH_2203.h5',
 '..\\data\\raw\\Frankfurt_LH_2204.h5',
 '..\\data\\raw\\Frankfurt_LH_2205.h5',
 '..\\data\\raw\\Frankfurt_LH_2206.h5',
 '..\\data\\raw\\Frankfurt_LH_2207.h5',
 '..\\data\\raw\\Frankfurt_LH_2208.h5',
 '..\\data\\raw\\Frankfurt_LH_2209.h5',
 '..\\data\\raw\\Frankfurt_LH_2210.h5',
 '..\\data\\raw\\Frankfurt_LH_2211.h5',
 '..\\data\\raw\\Frankfurt_LH_2212.h5']

In [39]:
spark_df = load_data_batch_spark(files)

25 25


..\data\raw\Frankfurt_LH_2201.h5:   0%|          | 0/5 [00:00<?, ?it/s]

  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


..\data\raw\Frankfurt_LH_2202.h5:   0%|          | 0/5 [00:00<?, ?it/s]

here


  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


..\data\raw\Frankfurt_LH_2203.h5:   0%|          | 0/5 [00:00<?, ?it/s]

here


  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


..\data\raw\Frankfurt_LH_2204.h5:   0%|          | 0/5 [00:00<?, ?it/s]

here


  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


..\data\raw\Frankfurt_LH_2205.h5:   0%|          | 0/5 [00:00<?, ?it/s]

here


  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


..\data\raw\Frankfurt_LH_2206.h5:   0%|          | 0/5 [00:00<?, ?it/s]

here


  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


..\data\raw\Frankfurt_LH_2207.h5:   0%|          | 0/5 [00:00<?, ?it/s]

here


  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


..\data\raw\Frankfurt_LH_2208.h5:   0%|          | 0/5 [00:00<?, ?it/s]

here


  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


..\data\raw\Frankfurt_LH_2209.h5:   0%|          | 0/5 [00:00<?, ?it/s]

here


  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


..\data\raw\Frankfurt_LH_2210.h5:   0%|          | 0/5 [00:00<?, ?it/s]

here


  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


..\data\raw\Frankfurt_LH_2211.h5:   0%|          | 0/5 [00:00<?, ?it/s]

here


  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


..\data\raw\Frankfurt_LH_2212.h5:   0%|          | 0/5 [00:00<?, ?it/s]

here


  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


In [43]:
spark_df.write.csv("../data/processed/test_df")


Py4JJavaError: An error occurred while calling o1484.csv.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:288)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:851)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1218)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1423)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$26(FileFormatWriter.scala:277)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:642)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:277)
	... 42 more


In [44]:
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Input, LeakyReLU, Dropout, Dense, Activation
from tensorflow.keras.optimizers import Adam



def build_sequential(
        lr: float = 0.001,
        input_dims: tuple = (23,),
        output_dims: int = 1,
        layer_sizes: tuple = (1024, 512, 256),
        dropout_rate: float = 0.2,
        activation: str = "softplus",
        loss: str = "MAE",
):
    model = Sequential()
    model.add(Input(shape=input_dims))
    for size in layer_sizes:
        model.add(Dense(size))
        model.add(LeakyReLU(alpha=0.05))
        model.add(Dropout(dropout_rate))
    model.add(Dense(output_dims, activation=activation))
    model.compile(optimizer=Adam(learning_rate=lr), loss=loss)
    return model

model = build_sequential()


In [45]:
flight_ids = spark_df.select("day").distinct().rdd.flatMap(lambda x: x).collect()

In [46]:
random.shuffle(flight_ids)

In [47]:
ind_last = int(len(flight_ids)*0.95)
flights_train = flight_ids[:ind_last]
flights_test = flight_ids[ind_last:]
train_df = spark_df.filter(spark_df.day.isin(flights_train))
test_df = spark_df.filter(spark_df.day.isin(flights_test))

In [48]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import rand
train_df = train_df.orderBy(rand())
train_df = train_df.drop("day")
feature_columns = [col_name for col_name in train_df.columns if col_name != 'seconds_till_arrival']
vec_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
train_df = vec_assembler.transform(train_df)

In [49]:
df_transform_fin = train_df.select('features','seconds_till_arrival')

In [52]:
from keras import optimizers
# Set and Serialize Optimizer
optimizer_conf = Adam(learning_rate=0.0001)
opt_conf = optimizers.serialize(optimizer_conf)

# Initialize SparkML Estimator and Get Settings
estimator = ElephasEstimator()
estimator.setFeaturesCol("features")
estimator.setLabelCol("seconds_till_arrival")
estimator.set_keras_model_config(model.to_json())
estimator.set_num_workers(3)
estimator.set_epochs(2)
estimator.set_batch_size(128)
estimator.set_verbosity(1)
estimator.set_optimizer_config(opt_conf)
estimator.set_mode("synchronous")
estimator.set_loss("mae")
estimator.set_categorical_labels(False)


ElephasEstimator_f7df5f77f5a0

In [None]:
fitted_model = estimator.fit(df_transform_fin)


>>> Fit model


In [17]:
df_test = test_df.toPandas()

In [18]:
X = df_test.drop(columns=["seconds_till_arrival", "day"])
y = df_test.seconds_till_arrival

In [19]:
model = build_sequential(input_dims=(23,))
model.set_weights(fitted_model.weights)

In [20]:
model.evaluate(X,y)



547.7393188476562

In [25]:
df_test

Unnamed: 0,distance,altitude,geoaltitude,vertical_rate,groundspeed,holiday,sec_sin,sec_cos,day_sin,day_cos,...,longitude_rad,weekday_0,weekday_1,weekday_2,weekday_3,weekday_4,weekday_5,weekday_6,seconds_till_arrival,day
0,-0.479459,-2.106921,-2.088638,0.835945,-2.582180,0,0.991264,0.131896,-0.493776,0.869589,...,0.169794,0,0,0,1,0,0,0,2465.0,2022-12-01 05:29:22+00:00
1,-0.479341,-2.082564,-2.066489,0.759652,-2.482535,0,0.991359,0.131175,-0.493776,0.869589,...,0.169994,0,0,0,1,0,0,0,2455.0,2022-12-01 05:29:22+00:00
2,-0.479102,-2.035880,-2.022192,0.708790,-2.313140,0,0.991540,0.129805,-0.493776,0.869589,...,0.170402,0,0,0,1,0,0,0,2436.0,2022-12-01 05:29:22+00:00
3,-0.478460,-1.926273,-1.919503,1.191978,-1.725240,0,0.991931,0.126776,-0.493776,0.869589,...,0.171522,0,0,0,1,0,0,0,2394.0,2022-12-01 05:29:22+00:00
4,-0.477986,-1.778101,-1.776544,1.446288,-1.466165,0,0.992242,0.124323,-0.493776,0.869589,...,0.172607,0,0,0,1,0,0,0,2360.0,2022-12-01 05:29:22+00:00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
15356,-0.632407,-2.102861,-2.098705,-0.283017,-2.671859,0,-0.946156,0.323711,-0.432776,0.901502,...,0.150851,1,0,0,0,0,0,0,81.0,2022-12-05 17:30:40+00:00
15357,-0.632491,-2.104891,-2.098705,-0.283017,-2.671859,0,-0.946109,0.323849,-0.432776,0.901502,...,0.150816,1,0,0,0,0,0,0,79.0,2022-12-05 17:30:40+00:00
15358,-0.632633,-2.110981,-2.104746,-0.232155,-2.671859,0,-0.945991,0.324193,-0.432776,0.901502,...,0.150758,1,0,0,0,0,0,0,74.0,2022-12-05 17:30:40+00:00
15359,-0.632981,-2.119099,-2.112800,-0.283017,-2.681824,0,-0.945802,0.324743,-0.432776,0.901502,...,0.150614,1,0,0,0,0,0,0,66.0,2022-12-05 17:30:40+00:00


In [26]:
df_train = train_df.toPandas()

  Unsupported type in conversion to Arrow: VectorUDT()
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


In [27]:
df_train

Unnamed: 0,distance,altitude,geoaltitude,vertical_rate,groundspeed,holiday,sec_sin,sec_cos,day_sin,day_cos,...,longitude_rad,weekday_0,weekday_1,weekday_2,weekday_3,weekday_4,weekday_5,weekday_6,seconds_till_arrival,features
0,-0.354216,0.899144,0.834975,0.022155,0.626362,0,0.535213,-0.844717,-0.493776,0.869589,...,0.179076,0,0,0,1,0,0,0,3237.0,"[-0.3542162775993347, 0.8991439938545227, 0.83..."
1,-0.459042,0.899144,0.804772,0.047585,0.437038,0,-0.876482,0.481435,-0.463550,0.886071,...,0.191889,0,0,0,0,0,1,0,2349.0,"[-0.459041953086853, 0.8991439938545227, 0.804..."
2,-0.281404,0.738793,0.669867,0.276464,0.347358,0,0.954479,-0.298277,-0.432776,0.901502,...,0.011908,1,0,0,0,0,0,0,3716.0,"[-0.2814040780067444, 0.7387934923171997, 0.66..."
3,-0.067310,0.576413,0.494692,0.022155,0.845579,0,0.759413,0.650609,-0.448229,0.893919,...,0.001987,0,0,0,0,0,0,1,5006.0,"[-0.06731020659208298, 0.5764132142066956, 0.4..."
4,-0.136666,0.744883,0.752421,0.403619,0.476895,0,0.994680,0.103010,-0.478734,0.877960,...,-0.057331,0,0,0,0,1,0,0,4843.0,"[-0.13666552305221558, 0.7448827028274536, 0.7..."
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
328532,-0.124803,0.625127,0.555097,0.123878,0.626362,0,-0.999828,-0.018543,-0.493776,0.869589,...,0.079938,0,0,0,1,0,0,0,4698.0,"[-0.12480297684669495, 0.6251272559165955, 0.5..."
328533,2.525172,0.077094,0.041653,0.607067,1.373693,0,-0.628189,0.778060,-0.463550,0.886071,...,-1.210711,0,0,0,0,0,1,0,24571.0,"[2.5251717567443848, 0.07709381729364395, 0.04..."
328534,-0.175356,0.736764,0.675908,0.022155,0.187927,0,-0.984490,-0.175438,-0.478734,0.877960,...,0.038969,0,0,0,0,1,0,0,4906.0,"[-0.17535614967346191, 0.7367637157440186, 0.6..."
328535,-0.520547,0.623098,0.557111,-1.198531,0.367287,0,-0.476775,-0.879025,-0.448229,0.893919,...,0.100285,0,0,0,0,0,0,1,1395.0,"[-0.5205473303794861, 0.6230975389480591, 0.55..."
