Task : 1 BIG DATA ANALYSIS
Data Analytics Internship at CodTech

Author : Rutuja Salve

Internship Period : 6 Weeks (20th March 2025 to 05th May 2025)

Domain : Data Analytics

Task: Big Data Analysis on Airline Dataset
Use the Airline Delay Dataset to perform big data analysis using tools like PySpark or Dask. The goal is to demonstrate scalability and derive meaningful insights from a large dataset.

The dataset contains information about individual flights, such as the airline, origin, destination, scheduled departure time, actual departure time, and whether or not the flight was delayed.

##Load the Dataset

PySpark Example:

In [2]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Airline Delay Prediction") \
    .getOrCreate()

# Load the dataset
df = spark.read.csv("/content/Airline_Dataset.csv", header=True, inferSchema=True)

# Show the schema and first few rows
df.printSchema()
df.show(5)

root
 |-- FL_DATE: date (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: double (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- TAXI_OUT: double (nullable = true)
 |-- WHEELS_OFF: double (nullable = true)
 |-- WHEELS_ON: double (nullable = true)
 |-- TAXI_IN: double (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- ARR_TIME: double (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)
 |-- ACTUAL_ELAPSED_TIME: double (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- CARRIER_DELAY: double (nullable = true)
 |-- WEATHER_DELAY: doub

Dask Example:

In [3]:
import dask.dataframe as dd

# Define the data types for problematic columns, including CRS_ARR_TIME and CRS_DEP_TIME
dtypes = {
    'CANCELLATION_CODE': 'object',  # Specify as string/object type
    'DEP_DELAY': 'float64',         # Example: Ensure numeric columns are correctly typed
    'ARR_DELAY': 'float64',         # Example: Ensure numeric columns are correctly typed
    'CRS_ARR_TIME': 'float64',     # Specify as float64 to accommodate potential variations
    'CRS_DEP_TIME': 'float64',     # Specify as float64 to accommodate potential variations
}

# Load the dataset with specified dtypes
df = dd.read_csv("/content/Airline_Dataset.csv", dtype=dtypes)

# Show the first few rows
print(df.head())

      FL_DATE OP_CARRIER  OP_CARRIER_FL_NUM ORIGIN DEST  CRS_DEP_TIME  \
0  2018-01-01         UA               2429    EWR  DEN        1517.0   
1  2018-01-01         UA               2427    LAS  SFO        1115.0   
2  2018-01-01         UA               2426    SNA  DEN        1335.0   
3  2018-01-01         UA               2425    RSW  ORD        1546.0   
4  2018-01-01         UA               2424    ORD  ALB         630.0   

   DEP_TIME  DEP_DELAY  TAXI_OUT  WHEELS_OFF  ...  CRS_ELAPSED_TIME  \
0    1512.0       -5.0      15.0      1527.0  ...             268.0   
1    1107.0       -8.0      11.0      1118.0  ...              99.0   
2    1330.0       -5.0      15.0      1345.0  ...             134.0   
3    1552.0        6.0      19.0      1611.0  ...             190.0   
4     650.0       20.0      13.0       703.0  ...             112.0   

   ACTUAL_ELAPSED_TIME  AIR_TIME  DISTANCE  CARRIER_DELAY  WEATHER_DELAY  \
0                250.0     225.0    1605.0            NaN 

##Perform Data Cleaning

Handle missing values.

Drop unnecessary columns.

Create a target variable for prediction (e.g., Delayed = 1 if DepDelay > 15 minutes, else 0).

PySpark Example:

In [13]:
# Drop rows with missing values in 'DEP_DELAY'
df = df.dropna(subset=["DEP_DELAY"]) # Changed 'DepDelay' to 'DEP_DELAY'

# Create target variable 'Delayed'
# Since you are working with Dask, you need to use dask's way to create a new column
df['Delayed'] = df['DEP_DELAY'].apply(lambda x: 1 if x > 15 else 0, meta=('Delayed', 'int64'))


# Drop unnecessary columns
df = df.drop(["CANCELLATION_CODE", "DIVERTED", "ARR_DELAY", "ARR_TIME", "FL_DATE"], axis=1)  # Dask drop uses axis=1 for columns

# Show the cleaned dataset
print(df.head()) # Dask uses .head() to display the first few rows

  OP_CARRIER  OP_CARRIER_FL_NUM ORIGIN DEST  CRS_DEP_TIME  DEP_TIME  \
0         UA               2429    EWR  DEN        1517.0    1512.0   
1         UA               2427    LAS  SFO        1115.0    1107.0   
2         UA               2426    SNA  DEN        1335.0    1330.0   
3         UA               2425    RSW  ORD        1546.0    1552.0   
4         UA               2424    ORD  ALB         630.0     650.0   

   DEP_DELAY  TAXI_OUT  WHEELS_OFF  WHEELS_ON  ...  ACTUAL_ELAPSED_TIME  \
0       -5.0      15.0      1527.0     1712.0  ...                250.0   
1       -8.0      11.0      1118.0     1223.0  ...                 83.0   
2       -5.0      15.0      1345.0     1631.0  ...                126.0   
3        6.0      19.0      1611.0     1748.0  ...                182.0   
4       20.0      13.0       703.0      926.0  ...                106.0   

   AIR_TIME  DISTANCE  CARRIER_DELAY  WEATHER_DELAY  NAS_DELAY  \
0     225.0    1605.0            NaN            NaN     

##Feature Engineering
Encode categorical variables (e.g., UniqueCarrier, Origin, Dest).

Normalize numerical features (e.g., Distance, CRSDepTime).

In [5]:
# Drop rows with missing values in 'DEP_DELAY'
df = df.dropna(subset=["DEP_DELAY"]) # Changed 'DepDelay' to 'DEP_DELAY'

# Create target variable 'Delayed'
# Since you are working with Dask, you need to use dask's way to create a new column
df['Delayed'] = df['DEP_DELAY'].apply(lambda x: 1 if x > 15 else 0, meta=('Delayed', 'int64'))

# *** Comment out or remove the line dropping unnecessary columns to keep 'UniqueCarrier', 'Origin', 'Dest' ***
# df = df.drop(["CANCELLATION_CODE", "DIVERTED", "ARR_DELAY", "ARR_TIME", "FL_DATE"], axis=1)  # Dask drop uses axis=1 for columns

# Show the cleaned dataset
print(df.head()) # Dask uses .head() to display the first few rows

      FL_DATE OP_CARRIER  OP_CARRIER_FL_NUM ORIGIN DEST  CRS_DEP_TIME  \
0  2018-01-01         UA               2429    EWR  DEN        1517.0   
1  2018-01-01         UA               2427    LAS  SFO        1115.0   
2  2018-01-01         UA               2426    SNA  DEN        1335.0   
3  2018-01-01         UA               2425    RSW  ORD        1546.0   
4  2018-01-01         UA               2424    ORD  ALB         630.0   

   DEP_TIME  DEP_DELAY  TAXI_OUT  WHEELS_OFF  ...  ACTUAL_ELAPSED_TIME  \
0    1512.0       -5.0      15.0      1527.0  ...                250.0   
1    1107.0       -8.0      11.0      1118.0  ...                 83.0   
2    1330.0       -5.0      15.0      1345.0  ...                126.0   
3    1552.0        6.0      19.0      1611.0  ...                182.0   
4     650.0       20.0      13.0       703.0  ...                106.0   

   AIR_TIME  DISTANCE  CARRIER_DELAY  WEATHER_DELAY  NAS_DELAY SECURITY_DELAY  \
0     225.0    1605.0            Na

##Split the Dataset
Split the data into training and testing sets.

In [9]:
train_data, test_data = df.random_split([0.8, 0.2], random_state=42) # Changed 'seed' to 'random_state' to match Dask API

## Build and Train the Model
Use a machine learning algorithm (e.g., Logistic Regression, Random Forest).

In [13]:
!pip install dask-ml
import dask.dataframe as dd
from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import train_test_split

# ... (your data loading and preprocessing code) ...

# Split the dataset using Dask-ML
X = df[['CRS_DEP_TIME', 'DEP_DELAY', 'DISTANCE']] # Assuming these are your features
y = df['Delayed']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Convert Dask DataFrames to Dask Arrays
# **Fix:** Ensure that y_train and y_test are also converted to Dask Arrays
X_train = X_train.to_dask_array(lengths=True)
X_test = X_test.to_dask_array(lengths=True)
y_train = y_train.to_dask_array(lengths=True)  # Convert y_train to Dask Array
y_test = y_test.to_dask_array(lengths=True)  # Convert y_test to Dask Array


# Build and train the model using Dask-ML
lr = LogisticRegression()
model = lr.fit(X_train, y_train)

# Evaluate the model
predictions = model.predict(X_test)
# ... (evaluate the predictions using Dask-ML metrics) ...






##Evaluate the Model
Calculate metrics like accuracy, precision, recall, and F1-score.

In [15]:
!pip install dask-ml
import dask.dataframe as dd
from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import train_test_split
from dask_ml.metrics import accuracy_score # Import accuracy_score from dask_ml.metrics

# ... (your data loading and preprocessing code) ...

# Split the dataset using Dask-ML
X = df[['CRS_DEP_TIME', 'DEP_DELAY', 'DISTANCE']] # Assuming these are your features
y = df['Delayed']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Convert Dask DataFrames to Dask Arrays
X_train = X_train.to_dask_array(lengths=True)
X_test = X_test.to_dask_array(lengths=True)
y_train = y_train.to_dask_array(lengths=True)  # Convert y_train to Dask Array
y_test = y_test.to_dask_array(lengths=True)  # Convert y_test to Dask Array

# Build and train the model using Dask-ML
lr = LogisticRegression()
model = lr.fit(X_train, y_train)

# Evaluate the model
predictions = model.predict(X_test)

# Calculate accuracy using Dask-ML's accuracy_score
accuracy = accuracy_score(y_test, predictions)  # Use dask_ml.metrics.accuracy_score
print("Accuracy:", accuracy)





Accuracy: 1.0
