# <center> Data management: Conversion and integration of csv files on flights, airports and airlines into json files </center>

In [28]:
import pandas as pd  # for csv reader
import datetime 
import numpy as np  # for checking NaN values
import json
import jsonschema
from jsonschema import validate, RefResolver
from jsonschema.validators import validator_for

## Create validator for json files

In [59]:
# define basic function to validate json data
def validateJson(validator, jsonData):
    try:
        validator.validate(jsonData)
    except jsonschema.exceptions.ValidationError as err:
        return False
    return True

In [60]:
# load all json schema
airline = json.loads(open('./Json-schema/JsonAirlineSchema.json').read())
airport = json.loads(open('/Json-schema/JsonAirportSchema.json').read())
flight = json.loads(open('./Json-schema/JsonFlightSchema.json').read())

In [63]:
#resolve reference for JsonFlightSchema
schema_store = {
  airline.get('$id','JsonAirlineSchema.json') : airline,
  airport.get('$id','JsonAirportSchema.json') : airport,
  flight.get('$id','JsonFlightSchema.json') : flight,
}

# create resolver instance
resolver = RefResolver.from_schema(airline, store=schema_store)

# create validator for the base schema
Validator = validator_for(airline)

# create validator for all of them
flight_validator = Validator(schema, resolver=resolver)

## Creation of json files on flights 

### Data loading and preparation 

In [75]:
def load_csv(filepath):
  data = pd.read_csv('./2018_1.csv', usecols=list(range(0, 32)),
                    # reading dates as string to simplify casting 
                    dtype={'CRS_DEP_TIME': str, 'DEP_TIME': str, 
                            'CRS_ARR_TIME': str, 'ARR_TIME': str})
  
  # converting crs departure/arrival time into date, these columns NOT contains NaN
  data['CRS_DEP_TIME'] = pd.to_datetime(data['CRS_DEP_TIME'], format='%H%M')
  data['CRS_ARR_TIME'] = pd.to_datetime(data['CRS_ARR_TIME'], format='%H%M')
  # converting real dep/arr time into date, if value is NaN it will be converted to NaT
  data['DEP_TIME'] = pd.to_datetime(data['DEP_TIME'], errors='coerce', format='%H%M')
  data['ARR_TIME'] = pd.to_datetime(data['ARR_TIME'], errors='coerce', format='%H%M')

  return data

In [83]:
# Example file to check the type of data to be processed 
temp = load_csv('./2018_1.csv')
temp.head()

Unnamed: 0,YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,OP_CARRIER,ORIGIN,ORIGIN_CITY_NAME,ORIGIN_STATE_NM,DEST,DEST_CITY_NAME,DEST_STATE_NM,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,DEP_DELAY_GROUP,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,ARR_DELAY_GROUP,CANCELLED,CANCELLATION_CODE,DIVERTED,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY
0,2018,1,14,7,9E,SYR,"Syracuse, NY",New York,DTW,"Detroit, MI",Michigan,1900-01-01 05:35:00,NaT,,,1900-01-01 07:35:00,NaT,,,1.0,B,0.0,120.0,,,374.0,2,,,,,
1,2018,1,3,3,9E,SYR,"Syracuse, NY",New York,LGA,"New York, NY",New York,1900-01-01 13:58:00,1900-01-01 13:48:00,-10.0,-1.0,1900-01-01 15:19:00,1900-01-01 15:06:00,-13.0,-1.0,0.0,,0.0,81.0,78.0,42.0,198.0,1,,,,,
2,2018,1,6,6,9E,SYR,"Syracuse, NY",New York,LGA,"New York, NY",New York,1900-01-01 13:58:00,1900-01-01 14:10:00,12.0,0.0,1900-01-01 15:19:00,1900-01-01 15:43:00,24.0,1.0,0.0,,0.0,81.0,93.0,45.0,198.0,1,12.0,0.0,12.0,0.0,0.0
3,2018,1,7,7,9E,SYR,"Syracuse, NY",New York,LGA,"New York, NY",New York,1900-01-01 13:58:00,1900-01-01 13:47:00,-11.0,-1.0,1900-01-01 15:19:00,1900-01-01 14:55:00,-24.0,-2.0,0.0,,0.0,81.0,68.0,38.0,198.0,1,,,,,
4,2018,1,8,1,9E,SYR,"Syracuse, NY",New York,LGA,"New York, NY",New York,1900-01-01 13:58:00,1900-01-01 13:50:00,-8.0,-1.0,1900-01-01 15:19:00,1900-01-01 15:09:00,-10.0,-1.0,0.0,,0.0,81.0,79.0,39.0,198.0,1,,,,,


In [80]:
# free memory
del temp

### Functions to create a dictionary (key-value) for columns that contain null values 

In [73]:
# function that return True if a value is NaN or NaT in case of Date types
def is_NaN(val):
  # check string NaN
  if isinstance(val, str) and not (val == '' or pd.isnull(val)):
    return False
  # check NaT/NaN value for date
  elif not pd.isnull(val):
    return False

  return True

In [74]:
# function that return a py dictionary (json file) with all the information of a
# specific flight! Only valid values will be added to the fill 
# (i.e. NaN values will not be add, refers to db schema)
def json_single_flight(fligth_data):
  time_dic = {}

  # ailine's IATA code
  time_dic['airline'] = fligth_data.OP_CARRIER
  # departure time
  time_dic['crs_dep_time'] = fligth_data.CRS_DEP_TIME.strftime("%H:%M") # provare anche a salvare con .time() e con .to_pydate()

  # saving departure time
  if not is_NaN(fligth_data.DEP_TIME):
    time_dic['dep_time'] = fligth_data.DEP_TIME.strftime("%H:%M")
  # saving delay as integer, cast will be always safe thanks to if
  if not is_NaN(fligth_data.DEP_DELAY):
    time_dic['dep_delay'] = int(fligth_data.DEP_DELAY)
  # saving dep delay group as integer, cast will be always safe thanks to if
  if not is_NaN(fligth_data.DEP_DELAY_GROUP):
    time_dic['dep_delay_group'] = int(fligth_data.DEP_DELAY_GROUP)
  # arrival time (Does not contains NaN)
  time_dic['crs_arr_time'] = fligth_data.CRS_ARR_TIME.strftime("%H:%M")
  # saving arrival time
  if not is_NaN(fligth_data.ARR_TIME):
    time_dic['arr_time'] = fligth_data.ARR_TIME.strftime("%H:%M")
  # saving delay as integer, cast will be always safe thanks to if
  if not is_NaN(fligth_data.ARR_DELAY):
    time_dic['arr_delay'] = int(fligth_data.ARR_DELAY)
  # saving arrival delay group as integer, cast will be always safe thanks to if
  if not is_NaN(fligth_data.ARR_DELAY_GROUP):
    time_dic['arr_delay_group'] = int(fligth_data.ARR_DELAY_GROUP)

  # check if flight was cancelled (No NaN)
  if fligth_data.CANCELLED == 1.0:
    time_dic['cancelled'] = True
    time_dic['cancellation_code'] = fligth_data.CANCELLATION_CODE
  else:
    time_dic['cancelled'] = False
  # check if flight was diverted
  if fligth_data.DIVERTED == 1.0:
    time_dic['diverted'] = True
  else:
    time_dic['diverted'] = False

  # saving crs Elapsed Time of Flight, cast will be always safe thanks to if
  if not is_NaN(fligth_data.CRS_ELAPSED_TIME):
    time_dic['crs_elapsed_time'] = int(fligth_data.CRS_ELAPSED_TIME)
  # saving real Elapsed Time of Flight, cast will be always safe thanks to if
  if not is_NaN(fligth_data.ACTUAL_ELAPSED_TIME):
    time_dic['actual_elapsed_time'] = int(fligth_data.ACTUAL_ELAPSED_TIME)
  # saving air Time of Flight, cast will be always safe thanks to if
  if not is_NaN(fligth_data.AIR_TIME):
    time_dic['air_time'] = int(fligth_data.AIR_TIME)
  # saving carrier_delay of Flight, cast will be always safe thanks to if
  if not is_NaN(fligth_data.CARRIER_DELAY):
    time_dic['carrier_delay'] = int(fligth_data.CARRIER_DELAY)
  # saving weather delay of Flight, cast will be always safe thanks to if
  if not is_NaN(fligth_data.WEATHER_DELAY):
    time_dic['weather_delay'] = int(fligth_data.WEATHER_DELAY)
  # saving nas delay of Flight, cast will be always safe thanks to if
  if not is_NaN(fligth_data.NAS_DELAY):
    time_dic['nas_delay'] = int(fligth_data.NAS_DELAY)
  # saving security delay of Flight, cast will be always safe thanks to if
  if not is_NaN(fligth_data.SECURITY_DELAY):
    time_dic['security_delay'] = int(fligth_data.SECURITY_DELAY)
  # saving late aircraft delay of Flight, cast will be always safe thanks to if
  if not is_NaN(fligth_data.LATE_AIRCRAFT_DELAY):
    time_dic['late_aircraft_delay'] = int(fligth_data.LATE_AIRCRAFT_DELAY)	

  return time_dic

### Creating JSON files and uploading to mongoDB 
Each files groups year, month, origin airport, destination airport and day of the week <br>
(**NOTE:** year and month are grouped automatically as the files are already divided!) 


In [None]:
# TODO: va messo un for che per ogni file presente nella cartella esegue i seguenti passi:
# 1) load (metodo gia implementato)
# 2) group + creazione + upload (gi√† implementati)

In [8]:
# raggruppando per DAY_OF_MONTH per ogni mese vengono circa 130mila file, per DAY_OF_WEEK vengono 30mila

# grouping data
data_groups = data.groupby(['DAY_OF_WEEK', 'ORIGIN', 'DEST'])

In [72]:
# Create a json file and upload it to mongo database for each group of 
# day-origin-destination (Remember: each file contains a specific month and year)
i = 0
for group in data_groups:
  document = {}
  # list of all flights for a specific day, origin and destination
  flights = []
  for flight in group[1].itertuples():
    # Add basic information in case it is a new group
    if document == {}:
      document['year'] = flight.YEAR
      document['month'] = flight.MONTH
      document['day_of_month'] = flight.DAY_OF_MONTH
      document['day_of_week'] = flight.DAY_OF_WEEK
      document['origin'] = flight.ORIGIN
      document['destination'] = flight.DEST
      document['distance'] = int(flight.DISTANCE)
      document['distance_group'] = flight.DISTANCE_GROUP

    # Append all flight information to the list
    flights.append(json_single_flight(flight))

  # add the list of all flight to the document
  document['flights'] = flights

  # If you want to validate json file uncomment the following lines
  # get json format from dictionary
  # json_data = json.dumps(document)
  # print(validateJson(flight_validator,jsonData))

  # writing json to file
  with open('./json/result' + str(i) + '.json', 'w') as fp:
    json.dump(document, fp)
    # alternative way for writing dictionary to json file if dumps is already done before    
    # fp.write(json_data)

  i = i+1
  # upload document to mongo instead of writing file

  break

In [None]:
# for download json folder to check mb
!zip -r temp.zip ./json/ 