# 01: Starting with Pyspark

### Imports

In [9]:
# Imports
import pyspark
import numpy as np
import os
from pyspark.sql import SparkSession


### Loading Dataframe and Creating Session

In [None]:
# Create a DataFrame
data = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
columns = ["id", "name"]

# Creating the spark session
spark = SparkSession.builder.appName("Practice").getOrCreate()

# Creating a spark dataframe
df = spark.createDataFrame(data, columns)

# Showing the dataframe
df.show()

# It works!


+---+-------+
| id|   name|
+---+-------+
|  1|  Alice|
|  2|    Bob|
|  3|Charlie|
+---+-------+



---
### Loading and Reading Data

In [11]:
# Loading from csv
df = spark.read.csv('./datasets/airlines_flights_data.csv')

# Notice how the header isn't functioning correctly.
df.show()

+-----+---------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|  _c0|      _c1|    _c2|        _c3|           _c4|  _c5|          _c6|             _c7|    _c8|     _c9|     _c10| _c11|
+-----+---------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|index|  airline| flight|source_city|departure_time|stops| arrival_time|destination_city|  class|duration|days_left|price|
|    0| SpiceJet|SG-8709|      Delhi|       Evening| zero|        Night|          Mumbai|Economy|    2.17|        1| 5953|
|    1| SpiceJet|SG-8157|      Delhi| Early_Morning| zero|      Morning|          Mumbai|Economy|    2.33|        1| 5953|
|    2|  AirAsia| I5-764|      Delhi| Early_Morning| zero|Early_Morning|          Mumbai|Economy|    2.17|        1| 5956|
|    3|  Vistara| UK-995|      Delhi|       Morning| zero|    Afternoon|          Mumbai|Economy|    2.25|        1| 5955|
|    4|  Vistara

In [12]:
# Fixing headers as the first row
df = spark.read.option('header','true').csv('./datasets/airlines_flights_data.csv')

# Now it functions correctly
df.show()

+-----+---------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|index|  airline| flight|source_city|departure_time|stops| arrival_time|destination_city|  class|duration|days_left|price|
+-----+---------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|    0| SpiceJet|SG-8709|      Delhi|       Evening| zero|        Night|          Mumbai|Economy|    2.17|        1| 5953|
|    1| SpiceJet|SG-8157|      Delhi| Early_Morning| zero|      Morning|          Mumbai|Economy|    2.33|        1| 5953|
|    2|  AirAsia| I5-764|      Delhi| Early_Morning| zero|Early_Morning|          Mumbai|Economy|    2.17|        1| 5956|
|    3|  Vistara| UK-995|      Delhi|       Morning| zero|    Afternoon|          Mumbai|Economy|    2.25|        1| 5955|
|    4|  Vistara| UK-963|      Delhi|       Morning| zero|      Morning|          Mumbai|Economy|    2.33|        1| 5955|
|    5|  Vistara

In [13]:
# Printing Schema.
df.printSchema()

# Notice how some variables are ``string``, even though they should be ``int`` datatype. Let's fix that in the next code cell.

root
 |-- index: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- source_city: string (nullable = true)
 |-- departure_time: string (nullable = true)
 |-- stops: string (nullable = true)
 |-- arrival_time: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- class: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- days_left: string (nullable = true)
 |-- price: string (nullable = true)



### Inferring Schema

In [None]:
# Fixing the schema's datatypes is done via loading with inferschema.
df = spark.read.option('header','true').csv('./datasets/airlines_flights_data.csv', inferSchema=True)

# Notice how it fixes these 
df.printSchema()

# But this can be inefficient, because it meticulously reads the whole file line by line, taking computing power.

root
 |-- index: integer (nullable = true)
 |-- airline: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- source_city: string (nullable = true)
 |-- departure_time: string (nullable = true)
 |-- stops: string (nullable = true)
 |-- arrival_time: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- class: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- days_left: integer (nullable = true)
 |-- price: integer (nullable = true)



### Using a pre-determined schema up the schema

In [15]:
# This needs a new import. The datatypes you'll use are places after 'import'.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType

# Example schema definition (you need to adjust this to your actual CSV columns)
schema = StructType([
    StructField("Year", IntegerType(), True),
    StructField("Month", IntegerType(), True),
    StructField("DayofMonth", IntegerType(), True),
    StructField("DayOfWeek", IntegerType(), True),
    StructField("Airline", StringType(), True),
    StructField("FlightNum", StringType(), True),
    StructField("Origin", StringType(), True),
    StructField("Dest", StringType(), True),
    StructField("DepTime", StringType(), True),  
    StructField("ArrTime", StringType(), True),   
    StructField("DepDelay", DoubleType(), True),
    StructField("ArrDelay", DoubleType(), True),
    StructField("Distance", DoubleType(), True)
])

# Load with predefined schema
df = spark.read.option("header", "true").schema(schema).csv("./datasets/airlines_flights_data.csv")

# Printing Schema
df.printSchema()

# This is faster and more efficient.

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Airline: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrDelay: double (nullable = true)
 |-- Distance: double (nullable = true)

