# Miscellaneous

### Create spark session

In [1]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import IntegerType, DoubleType, StringType, StructField, StructType, DataTypeSingleton

spark = (SparkSession.builder
        .master("local[*]")
        .appName("FlightDataAggregation")
        .getOrCreate())


22/03/28 10:04:03 WARN Utils: Your hostname, Soumyas-MacAir.local resolves to a loopback address: 127.0.0.1; using 10.0.0.43 instead (on interface en0)
22/03/28 10:04:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/28 10:04:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Wrapping Spark options in named tuple

In [2]:
from collections import namedtuple
allOptions = namedtuple("allOptions", "csvFormat header inferSchema")
sparkOptions = allOptions("csv", "header", "inferSchema")


In [3]:
flightData ="/Users/soumya/Documents/POC/Spark-3-data/section_4/flight-summary.csv"
flight_summary_df = (spark.read.format(sparkOptions.csvFormat)
                     .option(sparkOptions.header,"true")
                     .option(sparkOptions.inferSchema, "true")
                     .load(flightData))
flight_summary_df.printSchema()


root
 |-- origin_code: string (nullable = true)
 |-- origin_airport: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- origin_state: string (nullable = true)
 |-- dest_code: string (nullable = true)
 |-- dest_airport: string (nullable = true)
 |-- dest_city: string (nullable = true)
 |-- dest_state: string (nullable = true)
 |-- count: integer (nullable = true)



### String definition of schema

In [7]:
cols = "firstName:String:false,middlename:String:true,lastName:String:false,zipCode:String:false,sex:String:false,salary:Int:true"
simpleData = (
  Row("Soumya","","Kole","36636","M",-1),
  Row("Foo","Bar","xxx","","",9000)
)

def convert_to_bool(bool_string: str) -> bool:
    if bool_string.upper().strip() == "TRUE":
        return True
    else:
        return False

def match_type(in_type: str) -> DataTypeSingleton:
    type_dict = {
        "INT" : IntegerType(),
        "DOUBLE": DoubleType(),
        "STRING": StringType()
    }
    return type_dict.get(in_type.upper(), StringType())


def infer_type(field: str) -> StructField:
    splits = field.split(":")
    col_name, data_type, nullable = splits[0], splits[1], convert_to_bool(splits[2]) 
    return StructField(col_name, match_type(data_type), nullable)
tuple_of_struct_fields = *(infer_type(col) for col in cols.split(",")),
schema = StructType(tuple_of_struct_fields)

rdd = spark.sparkContext.parallelize(simpleData)
df = spark.createDataFrame(rdd, schema)
df.printSchema()
df.show()



root
 |-- firstName: string (nullable = false)
 |-- middlename: string (nullable = true)
 |-- lastName: string (nullable = false)
 |-- zipCode: string (nullable = false)
 |-- sex: string (nullable = false)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-------+---+------+
|firstName|middlename|lastName|zipCode|sex|salary|
+---------+----------+--------+-------+---+------+
|   Soumya|          |    Kole|  36636|  M|    -1|
|      Foo|       Bar|     xxx|       |   |  9000|
+---------+----------+--------+-------+---+------+

