In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import when

In [10]:
schema = StructType([
    StructField("Year", IntegerType(), True),
    StructField("Month", IntegerType(), True),
    StructField("DayofMonth", IntegerType(), True),
    StructField("DayOfWeek", IntegerType(), True),
    StructField("DepTime", DoubleType(), True),
    StructField("CRSDepTime", IntegerType(), True),
    StructField("ArrTime", DoubleType(), True),
    StructField("CRSArrTime", IntegerType(), True),
    StructField("UniqueCarrier", StringType(), True),
    StructField("FlightNum", IntegerType(), True),
    StructField("ActualElapsedTime", DoubleType(), True),
    StructField("CRSElapsedTime", IntegerType(), True),
    StructField("ArrDelay", DoubleType(), True),
    StructField("DepDelay", DoubleType(), True),
    StructField("Origin", StringType(), True),
    StructField("Dest", StringType(), True),
    StructField("Distance", DoubleType(), True),
    StructField("Cancelled", IntegerType(), True),
    StructField("Diverted", IntegerType(), True)
])

In [11]:
# Creating a SparkSession
spark = SparkSession.builder \
    .appName('my_app') \
    .config('spark.master', 'local[*]') \
    .getOrCreate()

# # Setting the log level to ERROR
spark.sparkContext.setLogLevel("ERROR")

# Setting the timeParserPolicy to LEGACY for backward compatibility
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")


def load_data():
    col = ['TailNum', 'AirTime', 'TaxiIn', 'TaxiOut', 'CancellationCode',
                    'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']
    return spark.read.option("header", True).schema(schema).csv("df_91.csv.gz").drop(*col).dropna(how='any'), spark.read.option("header", True).schema(schema).csv("df_01.csv.gz").drop(*col).dropna(how='any')

In [12]:
df_91, df_01 = load_data()


In [35]:
def remove_all_null_columns(df_91,df_01):
    return df_91.drop(*['_c0','TailNum', 'AirTime', 'TaxiIn', 'TaxiOut', 'CancellationCode',
                    'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']).dropna(how='any'),df_01.drop(*['CancellationCode', 'CarrierDelay', 'WeatherDelay', 'NASDelay',
                    'SecurityDelay', 'LateAircraftDelay']).dropna(how='any')

In [4]:
df_91, df_01 = remove_all_null_columns(df_91,df_01)

NameError: name 'remove_all_null_columns' is not defined

In [13]:
df_01.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- ArrDelay: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- Diverted: integer (nullable = true)



In [41]:
df_91 =  df_91.withColumn("status", when(df_91["ArrDelay"] > 1, "1").otherwise("0"))

In [43]:
df_91.show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-----------------+--------------+--------+--------+------+----+--------+---------+--------+------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|ActualElapsedTime|CRSElapsedTime|ArrDelay|DepDelay|Origin|Dest|Distance|Cancelled|Diverted|status|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-----------------+--------------+--------+--------+------+----+--------+---------+--------+------+
|1991|    1|         1|        2| 1709.0|      1705| 1944.0|      1944|           US|      112|            155.0|           159|     0.0|     4.0|   TPA| SYR|  1104.0|        0|       0|     0|
|1991|    1|         2|        3| 1704.0|      1705| 1946.0|      1944|           US|      112|            162.0|           159|     2.0|    -1.0|   TPA| SYR|  1104.0|        0|       0|     1|
|1991|    1|         3|       