# DS2002 - Nick Kellogg 

### Importing Libraries

In [None]:
import os
import json
import pymongo
import pyspark.pandas as pd 
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

### Instantiate Global Variables

In [None]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "vtf6hv-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "aviation_accidents_database"

connection_properties = {
"user" : "vtf6hv",
  "password" : "kallie23185",
  "driver" : "org.nickdb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.vtf6hv"
atlas_database_name = "aviation_accidents_database"
atlas_user_name = "vtf6hv"
atlas_password = "kallie23185"

# Data Files (JSON) Information ###############################
dst_database = "aviation_accidents_database_dlh"

base_dir = "dbfs:/FileStore/ds-2002-project/aviation_accidents_data"     # update if needed, had to locally download cause was being weird but worked
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

airline_accidents_stream_dir = f"{stream_dir}/airline_accidents"
faa_incidents_stream_dir = f"{stream_dir}/faa_incidents_data"
ntsb_aviation_stream_dir = f"{stream_dir}/ntsb_aviation_data"
world_aircraft_stream_dir = f"{database_dir}/world_aircraft_summary_data"

# Delete the Streaming Files ################################## 
# dbutils.fs.rm(f"{database_dir}/fact_orders", True) 
# dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 
# dbutils.fs.rm(f"{database_dir}/fact_inventory_transactions", True)    come back and edit if needed

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

### Defining Global Functions

In [None]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading CSV file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collections(client, db_name, data_directory, csv_files):              # updated function for CSV Files
    db = client[db_name]    
        
    for collection_name, csv_file in csv_files.items():
        db[collection_name].drop()
        csv_path = os.path.join(data_directory, csv_file)
        try:
            df = pd.read_csv(csv_path, encoding='utf-8')  # try with utf-8
        except UnicodeDecodeError:
            df = pd.read_csv(csv_path, encoding='ISO-8859-1')  # when error throws, do this
        records = df.to_dict(orient='records')
        db[collection_name].insert_many(records)
    
    client.close()




## Populating the Dimensions by Ingesting Reference (Cold-path) Data

### Fetching reference data from the Azure MySQL Database

#### Creating new MetaBricks Database

In [None]:
%sql 
DROP DATABASE IF EXISTS aviation_accidents_database_dlh CASCADE;

In [None]:
%sql
CREATE DATABASE IF NOT EXISTS aviation_accidents_database_dlh
COMMENT "Nick Kellogg - Aviation Accidents Database"
LOCATION "dbfs:/FileStore/ds-2002-project/aviation_accidents_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "Capstone Project 2");

#### Creating New Tables for other tables that is sourced from DATE table in Azure MySQL Database

In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://vtf6hv-mysql.mysql.database.azure.com:3306/aviation_accidents_database", 
  dbtable "dim_date",
  user "vtf6hv",    
  password "Kallie_23185!"  
)

In [None]:
%sql
USE DATABASE aviation_accidents_dlh;

CREATE OR REPLACE TABLE aviation_accidents_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds-2002-project/aviation_accidents_dlh"
AS SELECT * FROM view_date

<style scoped>
  .table-result-container {
    max-height: 300px;
    overflow: auto;
  }
  table, th, td {
    border: 1px solid black;
    border-collapse: collapse;
  }
  th, td {
    padding: 5px;
  }
  th {
    text-align: left;
  }
</style><div class='table-result-container'><table class='table-result'><thead style='background-color: white'><tr><th>num_affected_rows</th><th>num_inserted_rows</th></tr></thead><tbody></tbody></table></div>

#### Creating new table sourced the aviation dimension data from the Azure SQL Database

In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW faa_view_airline_accident
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://vtf6hv-mysql.mysql.database.azure.com:3306/aviation_accidents_database",
  dbtable "faa_airline_accidents",
  user "vtf6hv",    
  password "Kallie_23185!"  
)

In [None]:
%sql
USE DATABASE aviation_accidents_dlh;

CREATE OR REPLACE TABLE aviation_accidents_dlh.faa_airline_accidents
COMMENT "Products Dimension Table"
LOCATION "dbfs:/FileStore/aviation_accidents_dlh/faa_airline_accidents"
AS SELECT * FROM view_faa_airline_accident

<style scoped>
  .table-result-container {
    max-height: 300px;
    overflow: auto;
  }
  table, th, td {
    border: 1px solid black;
    border-collapse: collapse;
  }
  th, td {
    padding: 5px;
  }
  th {
    text-align: left;
  }
</style><div class='table-result-container'><table class='table-result'><thead style='background-color: white'><tr><th>num_affected_rows</th><th>num_inserted_rows</th></tr></thead><tbody></tbody></table></div>

### Fetching Reference Data from MongoDB Atlas (displaying my batch too)

In [None]:
display(dbutils.fs.ls(batch_dir)) 

<style scoped>
  .table-result-container {
    max-height: 300px;
    overflow: auto;
  }
  table, th, td {
    border: 1px solid black;
    border-collapse: collapse;
  }
  th, td {
    padding: 5px;
  }
  th {
    text-align: left;
  }
</style><div class='table-result-container'><table class='table-result'><thead style='background-color: white'><tr><th>path</th><th>name</th><th>size</th><th>modificationTime</th></tr></thead><tbody><tr><td>dbfs:/FileStore/aviation_accidents_dlh/batch/airline_accidents.csv</td><td>airline_accidents.csv</td><td>11245</td><td>1833195448000</td></tr><tr><td>dbfs:/FileStore/aviation_accidents_dlh/batch/faa_incidents_data.csv</td><td>faa_incidents_data.csv</td><td>9874</td><td>1833195448000</td></tr><tr><td>dbfs:/FileStore/aviation_accidents_dlh/batch/ntsb_aviation_data.csv</td><td>ntsb_aviation_data.csv</td><td>7841</td><td>1833195448000</td></tr><tr><td>dbfs:/FileStore/aviation_accidents_dlh/batch/world_aircract_summary.csv</td><td>world_aircraft_summary.csv</td><td>9412</td><td>1833195448000</td></tr></tbody></table></div>

### Creating the new mongoDB database and loading JSON data

In [None]:
source_dir = '/dbfs//FileStore/aviation_accidents_dlh/batch'
csv_files = {"airline_accidents" : 'airline_accidents.csv'
              , "faa_incidents" : 'faa_incidents.csv'
              , "ntsb_aviation" : 'ntsb_aviation_data.csv'
              , "world_aircraft_summary" : 'world_aircraft_summary.csv'}

set_mongo_collections(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, csv_files) 

<pymongo.results.InsertManyResult at 0xe91d58451ac0>

### Getting the Dimension Data from the Mongo Collection

In [None]:
%scala

val df_airline = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "aviation_accidents_database")
.option("collection", "airline_accidents").load()
.select(                    "event_date",
                            "investigation_type",
                            "injury_severity",
                            "aircraft_damage",
                             "flight_purpose",
                            "total_fatal_injuries",
                            "total_serious_injuries",
                            "total_minor_injuries",
                            "total_uninjured",
                            "weather_condition")

display(df_airline.head())

<style scoped>
  .table-result-container {
    max-height: 300px;
    overflow: auto;
  }
  table, th, td {
    border: 1px solid black;
    border-collapse: collapse;
  }
  th, td {
    padding: 5px;
  }
  th {
    text-align: left;
  }
</style>
<div class='table-result-container'>
  <table class='table-result'>
    <thead style='background-color: white'>
      <tr>
        <th>event_date</th>
        <th>investigation_type</th>
        <th>injury_severity</th>
        <th>aircraft_damage</th>
        <th>flight_purpose</th>
        <th>total_fatal_injuries</th>
        <th>total_serious_injuries</th>
        <th>total_minor_injuries</th>
        <th>total_uninjured</th>
        <th>weather_condition</th>
      </tr>
    </thead>
    <tbody>
      <tr>
        <td>2017-04-01</td>
        <td>Preliminary</td>
        <td>None</td>
        <td>Minor</td>
        <td>Personal</td>
        <td>0</td>
        <td>0</td>
        <td>1</td>
        <td>1</td>
        <td>Clear</td>
      </tr>
      <tr>
        <td>2017-03-15</td>
        <td>Final</td>
        <td>Serious</td>
        <td>Major</td>
        <td>Instructional</td>
        <td>0</td>
        <td>1</td>
        <td>0</td>
        <td>0</td>
        <td>Cloudy</td>
      </tr>
      <tr>
        <td>2017-02-28</td>
        <td>Preliminary</td>
        <td>Minor</td>
        <td>Destroyed</td>
        <td>Charter</td>
        <td>2</td>
        <td>0</td>
        <td>3</td>
        <td>0</td>
        <td>Rain</td>
      </tr>
      <tr>
        <td>2019-01-20</td>
        <td>Final</td>
        <td>Fatal</td>
        <td>Substantial</td>
        <td>Commercial</td>
        <td>1</td>
        <td>0</td>
        <td>0</td>
        <td>2</td>
        <td>Fog</td>
      </tr>
      <tr>
        <td>2019-05-05</td>
        <td>Preliminary</td>
        <td>None</td>
        <td>Minor</td>
        <td>Personal</td>
        <td>0</td>
        <td>0</td>
        <td>1</td>
        <td>2</td>
        <td>Clear</td>
      </tr>
      <tr>
        <td>2018-04-20</td>
        <td>Final</td>
        <td>None</td>
        <td>Minor</td>
        <td>Personal</td>
        <td>3</td>
        <td>0</td>
        <td>0</td>
        <td>1</td>
        <td>Clear</td>
      </tr>
      <tr>
        <td>2018-01-13</td>
        <td>Preliminary</td>
        <td>Minor</td>
        <td>Minor</td>
        <td>Personal</td>
        <td>1</td>
        <td>0</td>
        <td>1</td>
        <td>2</td>
        <td>Clear</td>
      </tr>
      <tr>
        <td>2019-11-13</td>
        <td>Final</td>
        <td>None</td>
        <td>Minor</td>
        <td>Personal</td>
        <td>0</td>
        <td>0</td>
        <td>1</td>
        <td>2</td>
        <td>Clear</td>
      </tr>
      <tr>
        <td>2020-07-17</td>
        <td>Preliminary</td>
        <td>None</td>
        <td>Minor</td>
        <td>Personal</td>
        <td>3</td>
        <td>0</td>
        <td>0</td>
        <td>1</td>
        <td>Fog</td>
      </tr>
      <tr>
        <td>2019-07-01</td>
        <td>Final</td>
        <td>Minor</td>
        <td>Major</td>
        <td>Personal</td>
        <td>0</td>
        <td>0</td>
        <td>1</td>
        <td>2</td>
        <td>Clear</td>
      </tr>
      <tr>
        <td>2019-09-11</td>
        <td>Preliminary</td>
        <td>None</td>
        <td>Minor</td>
        <td>Personal</td>
        <td>0</td>
        <td>0</td>
        <td>1</td>
        <td>2</td>
        <td>Clear</td>
      </tr>
      <tr>
        <td>2017-10-01</td>
        <td>Preliminary</td>
        <td>None</td>
        <td>Minor</td>
        <td>Personal</td>
        <td>0</td>
        <td>0</td>
        <td>1</td>
        <td>1</td>
        <td>Clear</td>
      </tr>
    </tbody>
  </table>
</div>
