### ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Import Pyspark SQL Data Types

In [0]:
from pyspark.sql.types import *

### ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Snowflake Connection Options
Use secrets DBUtil to get Snowflake credentials and provide the following options:

In [0]:
user = dbutils.secrets.get("data-warehouse", "<snowflake-user>")
password = dbutils.secrets.get("data-warehouse", "<snowflake-password>")

options = dict(sfUrl="sqa35008.us-east-1.snowflakecomputing.com",
               sfUser=user,
               sfPassword=password,
               sfDatabase="USER_NW",
               sfSchema="PUBLIC",
               sfWarehouse="INTERVIEW_WH")

### ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Read from CSV files
Read from CSV with DataFrameReader's `csv` method and the following options:

Manually define the schema by creating a `StructType` with column names and data types. We can read data faster by creating a `StructType` with the schema names and data types.

In [0]:
airlinesCsvPath = "/airlines.csv"

airlinesDefinedSchema = StructType([
  StructField("IATA_CODE", StringType(), True),
  StructField("AIRLINE", StringType(), True)
])

airlinesDF = (spark.read
              .option("header", True)
              .schema(airlinesDefinedSchema)
              .csv(airlinesCsvPath)
)

In [0]:
airportsCsvPath = "/airports.csv"

airportsDefinedSchema = StructType([
  StructField("IATA_CODE", StringType(), True),
  StructField("AIRPORT", StringType(), True),
  StructField("CITY", StringType(), True),
  StructField("STATE", StringType(), True),
  StructField("COUNTRY", StringType(), True),
  StructField("LATITUDE", DoubleType(), True),
  StructField("LONGITUDE", DoubleType(), True)
])

airportsDF = (spark.read
              .option("header", True)
              .schema(airportsDefinedSchema)
              .csv(airportsCsvPath)
)

In [0]:
flightsCsvPath = "/flights*.csv"

flightsDF = (spark.read
              .option("header", True)
              .option("infer_schema",True)
              .csv(flightsCsvPath)
)

flightsDF = flightsDF.withColumn("YEAR",flightsDF.YEAR.cast("int"))\
                   .withColumn("MONTH",flightsDF.MONTH.cast("int"))\
                   .withColumn("DAY",flightsDF.DAY.cast("int"))\
                   .withColumn("DAY_OF_WEEK",flightsDF.DAY_OF_WEEK.cast("int"))\
                   .withColumn("DEPARTURE_DELAY",flightsDF.DEPARTURE_DELAY.cast("int"))\
                   .withColumn("TAXI_OUT",flightsDF.TAXI_OUT.cast("int"))\
                   .withColumn("SCHEDULED_TIME",flightsDF.SCHEDULED_TIME.cast("int"))\
                   .withColumn("ELAPSED_TIME",flightsDF.ELAPSED_TIME.cast("int"))\
                   .withColumn("AIR_TIME",flightsDF.AIR_TIME.cast("int"))\
                   .withColumn("DISTANCE",flightsDF.DISTANCE.cast("int"))\
                   .withColumn("WHEELS_ON",flightsDF.WHEELS_ON.cast("int"))\
                   .withColumn("TAXI_IN",flightsDF.TAXI_IN.cast("int"))\
                   .withColumn("SCHEDULED_ARRIVAL",flightsDF.SCHEDULED_ARRIVAL.cast("int"))\
                   .withColumn("DIVERTED",flightsDF.DIVERTED.cast("int"))\
                   .withColumn("CANCELLED",flightsDF.CANCELLED.cast("int"))\
                   .withColumn("AIR_SYSTEM_DELAY",flightsDF.AIR_SYSTEM_DELAY.cast("int"))\
                   .withColumn("SECURITY_DELAY",flightsDF.SECURITY_DELAY.cast("int"))\
                   .withColumn("AIRLINE_DELAY",flightsDF.AIRLINE_DELAY.cast("int"))\
                   .withColumn("LATE_AIRCRAFT_DELAY",flightsDF.LATE_AIRCRAFT_DELAY.cast("int"))\
                   .withColumn("WEATHER_DELAY",flightsDF.WEATHER_DELAY.cast("int"))

flightsDF = flightsDF.na.fill('')

-sandbox
### ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Write Data to Snowflake Tables

In [0]:
airlinesDF.write \
  .format("snowflake") \
  .options(**options) \
  .option("dbtable", "AIRLINES") \
  .save(mode='overwrite')

In [0]:
airportsDF.write \
  .format("snowflake") \
  .options(**options) \
  .option("dbtable", "AIRPORTS") \
  .save(mode='overwrite')

In [0]:
flightsDF.write \
  .format("snowflake") \
  .options(**options) \
  .option("dbtable", "FLIGHTS") \
  .save(mode='overwrite')

-sandbox
### ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Read Data from Snowflake

In [0]:
airlinestableDF = spark.read \
                .format("snowflake") \
                .options(**options) \
                .option("query","SELECT * FROM AIRLINES") \
                .load()

In [0]:
airportstableDF = spark.read \
                .format("snowflake") \
                .options(**options) \
                .option("query","SELECT * FROM AIRPORTS") \
                .load()

In [0]:
flightstableDF = spark.read \
                .format("snowflake") \
                .options(**options) \
                .option("query","SELECT * FROM FLIGHTS") \
                .load()

-sandbox
### ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Data Quality Check!

In [0]:
assert(airlinesDF.count() == airlinestableDF.count())

In [0]:
assert(airportsDF.count() == airportstableDF.count())

In [0]:
assert(flightsDF.count() == flightstableDF.count())