In [13]:
#import pyspark

!pip install pyspark




[notice] A new release of pip is available: 24.2 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [14]:
import pyspark
print(pyspark.__version__)

3.5.3


****
**col:** References a column by name in a DataFrame.

**mean / avg:** Calculates the average of a specified column.

**mode:** PySpark lacks a built-in mode function; it can be computed by grouping and sorting values by count.

**udf:** Creates a User Defined Function for custom transformations on columns.

**corr:** Calculates the correlation between two columns.

**hour:** Extracts the hour from a timestamp column.

**dayofweek:** Extracts the day of the week from a date column.

**month:** Extracts the month from a date column.

**substring:** Extracts a part of a string from a specified column.

**concat:** Concatenates multiple columns into a single column.

**count:** Counts the number of non-null entries in a column or group.

**max:** Finds the maximum value of a specified column.
****

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, mode, avg, udf, corr, hour, substring, concat, dayofweek, count, month, max,desc
from pyspark.sql.types import StringType, FloatType, DoubleType

In [16]:
spark = SparkSession.builder \
    .appName("FlightData") \
    .getOrCreate()

In [17]:
df = spark.read.csv('flights.csv',inferSchema=True, header=True, sep=",")

In [18]:
df.show()

+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|                name|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
|  0|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228|   EWR| IAH|   227.0|    1400|   5|    15|2013-01-01 05:00:00|United Air Lines ...|
|  1|2013|    1|  1|   533.0|           529|      4.0|   850.0|           830|     20.0|     UA|  1714| N24211|   LGA| IAH|   227.0|    1416|   5|    29|2013-01-01 05:00:00|United Air Lines ...|
|  2|2013|    1|  1|   54

In [19]:
#check for null values
null_columns = []
for column in df.columns:
    null_count = df.filter(col(column).isNull()).count()
    if null_count != 0:
        print(f"Column '{column}' has {null_count} null values")
        null_columns.append(column)

print("Null Coloumn :", null_columns)


Column 'dep_time' has 8255 null values
Column 'dep_delay' has 8255 null values
Column 'arr_time' has 8713 null values
Column 'arr_delay' has 9430 null values
Column 'tailnum' has 2512 null values
Column 'air_time' has 9430 null values
Null Coloumn : ['dep_time', 'dep_delay', 'arr_time', 'arr_delay', 'tailnum', 'air_time']


****Working with Null column****

In [20]:
for col in null_columns:
    col_type = df.schema[col].dataType
    print(f"Coloumn '{col}' is of type '{col_type}'")

Coloumn 'dep_time' is of type 'DoubleType()'
Coloumn 'dep_delay' is of type 'DoubleType()'
Coloumn 'arr_time' is of type 'DoubleType()'
Coloumn 'arr_delay' is of type 'DoubleType()'
Coloumn 'tailnum' is of type 'StringType()'
Coloumn 'air_time' is of type 'DoubleType()'


In [21]:

for col_name in null_columns:
    col_type = df.schema[col_name].dataType
    print(f"Column '{col_name}' is of type '{col_type}'")
    
    # For string type columns, fill nulls with the mode value
    
    
    if isinstance(col_type, StringType):
        mode_val = df.groupBy(col_name).count().orderBy(desc("count")).first()[0]
        print(f"Mode for column '{col_name}' is '{mode_val}'")
        df = df.fillna({col_name: str(mode_val)}) 
    
    # For double type columns, fill nulls with the mean value
    
    
    elif isinstance(col_type, DoubleType):
        mean_val = df.agg(avg(col_name)).collect()[0][0]
        
        # Fill null with mean value for double columns
        df = df.fillna({col_name: float(mean_val)})  
        print(f"The mean value for column '{col_name}' is {mean_val}")


Column 'dep_time' is of type 'DoubleType()'
The mean value for column 'dep_time' is 1349.1099473093045
Column 'dep_delay' is of type 'DoubleType()'
The mean value for column 'dep_delay' is 12.639070257304708
Column 'arr_time' is of type 'DoubleType()'
The mean value for column 'arr_time' is 1502.0549985825894
Column 'arr_delay' is of type 'DoubleType()'
The mean value for column 'arr_delay' is 6.89537675731489
Column 'tailnum' is of type 'StringType()'
Mode for column 'tailnum' is 'None'
Column 'air_time' is of type 'DoubleType()'
The mean value for column 'air_time' is 150.68646019807787
