# Data pipeline form pyspark(databricks) to snowflake

# I - Loading data

 ## 1 - Loading airlines data

In [0]:
#libraries
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, LongType as long
import  pyspark.sql.functions as F
from pyspark.sql import types as t

In [0]:
airlines = spark.read.csv('dbfs:/FileStore/tables/airlines/airlines.csv',schema="IATA_CODE STRING,AIRLINE STRING")
airlines.show(5,False)

In [0]:
airlines.printSchema()


In [0]:
airlines.describe().show()

In [0]:
def process_airlines_data(spark,input_data, output_data,snowflake_options,snowflake_table):
  """
  This function reads the airlines file and load into a data frame.
  drop the duplicates if exists and load into snowFlake
  """
  airlines_schema = R([
    Fld("IATA_CODE" ,Str()),
    Fld("AIRLINE",Str()),
    ])
  
  airlines = spark.read.csv(input_data,schema=airlines_schema)
  airlines = airlines.drop_duplicates()
  
  #loading to snowflake
  airlines.write.format("snowflake").options(**snowflake_options).option("dbtable", snowflake_table).mode('append').options(header=True).save()

## 2 - Loading airports data

In [0]:
airports = spark.read.csv('dbfs:/FileStore/tables/airlines/airports.csv', inferSchema=True, header=True)
airports.show(5,False)
airports.printSchema()

In [0]:
airports.describe().show()

In [0]:
def process_airports_data(spark,input_data, output_data,snowflake_options,snowflake_table):
  """
  This function reads the airports file and load into a data frame.
  drop the duplicates if exists and load into snowFlake
  """
  airports_schema = R([
    Fld("IATA_CODE" ,Str()),
    Fld("AIRPORT",Str()),
    Fld("CITY" ,Str()),
    Fld("STATE",Str()),
    Fld("COUNTRY" ,Str()),
    Fld("LATITUDE",Dbl()),
    Fld("LONGITUDE",Dbl())
    ])
  
  airports = spark.read.csv(input_data,schema=airports_schema)
  airports = airports.drop_duplicates()
  
  #loading to snowflake
  airports.write.format("snowflake").options(**snowflake_options).option("dbtable", snowflake_table).mode('append').options(header=True).save()

## 3 - Loading flights data

In [0]:
flights = spark.read.csv("dbfs:/FileStore/tables/flights2/", inferSchema=True, header=True)
flights.show(5)


In [0]:
flights.printSchema()

In [0]:
def process_flights_data(spark,input_data, snowflake_options,snowflake_table):
  """
  This function reads the flights file and load into a data frame.
  drop the duplicates if exists and load into snowFlake
  """
  flights_schema = R([
    Fld("YEAR" , Int() ),
    Fld("MONTH" , Int() ),
    Fld("DAY" , Int() ),
    Fld("DAY_OF_WEEK" , Int() ),
    Fld("AIRLINE" , Str() ),
    Fld("FLIGHT_NUMBER" , Int() ),
    Fld("TAIL_NUMBER" , Str() ),
    Fld("ORIGIN_AIRPORT" , Str() ),
    Fld("DESTINATION_AIRPORT" , Str() ),
    Fld("SCHEDULED_DEPARTURE" , Int() ),
    Fld("DEPARTURE_TIME" , Int() ),
    Fld("DEPARTURE_DELAY" , Int() ),
    Fld("TAXI_OUT" , Int() ),
    Fld("WHEELS_OFF" , Int() ),
    Fld("SCHEDULED_TIME" , Int() ),
    Fld("ELAPSED_TIME" , Int() ),
    Fld("AIR_TIME" , Int() ),
    Fld("DISTANCE" , Int() ),
    Fld("WHEELS_ON" , Int() ),
    Fld("TAXI_IN" , Int() ),
    Fld("SCHEDULED_ARRIVAL" , Int() ),
    Fld("ARRIVAL_TIME" , Int() ),
    Fld("ARRIVAL_DELAY" , Int() ),
    Fld("DIVERTED" , Int() ),
    Fld("CANCELLED" , Int() ),
    Fld("CANCELLATION_REASON" , Str() ),
    Fld("AIR_SYSTEM_DELAY" , Int() ),
    Fld("SECURITY_DELAY" , Int() ),
    Fld("AIRLINE_DELAY" , Int() ),
    Fld("LATE_AIRCRAFT_DELAY" , Int() ),
    Fld("WEATHER_DELAY" , Int() )
 ])

  
  flights = spark.read.csv(input_data,schema=flights_schema)
  flights = flights.drop_duplicates()
  
  #loading to snowflake
  flights.write.format("snowflake").options(**snowflake_options).option("dbtable", snowflake_table).mode('append').options(header=True).save()

### 4 - Loading Calendar table

In [0]:
def process_calendar(fights,snowflake_options,snowflake_table):
  """
    This function take as input the fight datarame, select dates columns
    add two columns for month and day of the week
    Then load into snowflake
  """
  
  month_lst = ['January', 'Feburary', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December']
  days = ('Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday')

  month_name = udf(lambda x: month_lst[int(x%12) - 1], Str())
  day_name = udf(lambda x : days[int(x%7)-1], Str())

  time_table = flights.select("YEAR","MONTH","DAY","DAY_OF_WEEK").drop_duplicates()


  time_table= time_table.withColumn("MONTH_NAME",month_name(col("MONTH")))          
  time_table= time_table.withColumn("DAY_NAME",day_name(col("DAY")))  
  
  #loading to snowflake
  time_table.write.format("snowflake").options(**snowflake_options).option("dbtable", snowflake_table).mode('append').options(header=True).save()


In [0]:
process_calendar(flights,SNOWFLAKE_OPTIONS,'CALENDAR')

## Preparing Data for Modeling

In [0]:
import pandas as pd

data =[]
### Handling duplicates

#airline count
all_airlines = airlines.count()
distinct_arlines = airlines.distinct().count()

#flights count
all_flights = flights.count()
distinct_flights = flights.distinct().count()

#airports count
all_airport = airports.count()
distinct_airport = airports.distinct().count()

print(f'all arlines ={all_airlines}, distinct airlines={distinct_arlines}')
print(f'all flights ={all_flights},  distinct flights={distinct_flights}')
print(f'all airport ={all_airport}, distinct airport={distinct_airport}')

data.append(['airlines',distinct_arlines,0])
data.append(['flights',distinct_flights,0])
data.append(['airport',distinct_airport,0])

data_check=pd.DataFrame(data,columns=['dataset','source_count','destination_count'])
display(data_check)


dataset,source_count,destination_count
airlines,15,0
flights,3920766,0
airport,322,0


In [0]:
print(f' The flight dataset has {len(flights.columns)} columns')

In [0]:
# spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

In [0]:
import os
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

### Defining Snowflake connection options

In [0]:
SNOWFLAKE_OPTIONS = {
    'sfURL': os.environ.get("SNOWFLAKE_URL", "ofa61631.us-east-1.snowflakecomputing.com"),
    'sfAccount': os.environ.get("SNOWFLAKE_ACCOUNT", "ofa61631"),
    'sfUser': os.environ.get("SNOWFLAKE_USER", "TKONCHOU"),
    'sfPassword': os.environ.get("SNOWFLAKE_PASSWORD", "Tida1973"),
    'sfDatabase': os.environ.get("SNOWFLAKE_DATABASE", "INTERVIEW_WH"),
    'sfSchema': os.environ.get("SNOWFLAKE_SCHEMA", "PUBLIC"),
    'sfWarehouse': os.environ.get("SNOWFLAKE_WAREHOUSE", "DATASCIENCE_WH"),
    'sfRole':  os.environ.get("SNOWFLAKE_ROLE", "accountadmin"),
    }

## Loading Data

In [0]:
data_frame = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**SNOWFLAKE_OPTIONS).option('dbtable', "airports").load()

In [0]:
data_frame

In [0]:
spark.range(5).write.format("snowflake").options(**SNOWFLAKE_OPTIONS).option("dbtable","INTERVIEW_WH").save()
#spark.airports.write.format("snowflake").options(**SNOWFLAKE_OPTIONS).option("dbtable","INTERVIEW_WH").save()

In [0]:
SparkSession.builder.appName(‘Pyspark_snowflake’).getOrCreate()
spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

In [0]:
airports.write.format("snowflake").options(**SNOWFLAKE_OPTIONS).option("dbtable", "AIRPORTS").mode('append').options(header=True).save()

In [0]:
airlines.write.format("snowflake").options(**SNOWFLAKE_OPTIONS).option("dbtable", "AIRLINES").mode('append').options(header=True).save()

In [0]:
flights.write.format("snowflake").options(**SNOWFLAKE_OPTIONS).option("dbtable", "flights").mode('append').options(header=True).save()

In [0]:
airline_count = spark.read.format("snowflake") \
  .options(**SNOWFLAKE_OPTIONS) \
  .option("query",  "SELECT COUNT(*) as cnt FROM public.AIRLINES") \
  .load()

airport_count = spark.read.format("snowflake") \
  .options(**SNOWFLAKE_OPTIONS) \
  .option("query",  "SELECT COUNT(*) as cnt FROM public.AIRPORTS") \
  .load()

flight_count = spark.read.format("snowflake") \
  .options(**SNOWFLAKE_OPTIONS) \
  .option("query",  "SELECT COUNT(*) as cnt FROM public.FLIGHTS") \
  .load()

data_check.loc[data_check.dataset=='airlines','destination_count'] = airline_count.collect()[0][0]
data_check.loc[data_check.dataset=='airport','destination_count'] = airport_count.collect()[0][0]
data_check.loc[data_check.dataset=='flights','destination_count'] = flight_count.collect()[0][0]

display(data_check)

dataset,source_count,destination_count
airlines,15,15
flights,3920766,3920766
airport,322,322
