In [1]:
import pandas as pd
import re
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import types as t
from datetime import datetime, timedelta


from sql_queries import *

In [2]:
import findspark
findspark.init()

import pyspark
import random

sc = pyspark.SparkContext(appName="Pi")
num_samples = 100000000

def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1

count = sc.parallelize(range(0, num_samples)).filter(inside).count()

pi = 4 * count / num_samples
print(pi)

sc.stop()

3.14162228


In [3]:
def parsell(string):
    string = string.strip().lower()

    if string.endswith('w') or string.endswith('s'):
        sign = -1
    else:
        sign = 1

    string = re.sub(r"[^0-9.]", " ", string).strip()

    numeric_ll = float(string)
    return numeric_ll * sign

def parquet_wr(parquet_filename, df):

    start_time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
    
    # Write dataframe to parquet file:
    df.write.mode("overwrite").parquet(parquet_filename + "_" + start_time)

    # Read parquet file to Spark dataframe:
    df = spark.read.parquet(parquet_filename + "_" + start_time)

In [4]:
# Start the clocks
#start = datetime.now()
#start_str = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
print("START ETL pipeline process")
results_all = []

START ETL pipeline process


In [5]:
def define_paths():
    """
    Define data locations.
    """
    path_d = {}

    path_d["input_data"] = 'data/'
    path_d["output_data"] = 'data/output_data/'      
    path_d["data_storage"] = 'parquet' 
    path_d["hurdat"] = 'https://www.aoml.noaa.gov/hrd/hurdat/hurdat2.html'
        
    return path_d

In [6]:
path_d = define_paths()

In [7]:
def create_spark_session():
    """
    Create Spark session.
    """
    print("Create Spark session")
    
    import findspark
    findspark.init()
    findspark.find()
    
    conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
    sc = pyspark.SparkContext(conf=conf)

    x = 2
    
    if x == 1:
        spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate() 
    else:
        #spark = SparkSession.builder.getOrCreate()
        spark = SparkSession(sc)
    
    return spark

In [8]:
# Create Spark session for the pipeline.
spark = create_spark_session()

Create Spark session


In [9]:
"""
def parse_input_files(path_d, path, extension):
    # Get (from directory) the files matching extension
    all_files = []
    for root, dirs, files in os.walk(path):
        files = glob.glob(os.path.join(root, extension))
        for f in files :
            all_files.append(os.path.abspath(f))

    return all_files
"""

'\ndef parse_input_files(path_d, path, extension):\n    # Get (from directory) the files matching extension\n    all_files = []\n    for root, dirs, files in os.walk(path):\n        files = glob.glob(os.path.join(root, extension))\n        for f in files :\n            all_files.append(os.path.abspath(f))\n\n    return all_files\n'

In [10]:
# Parse input data dir
"""
input_files = parse_input_files(PATHS, PATHS["i94_data"], "*.sas7bdat")
    input_files_reordered = reorder_paths(input_files)
    PATHS["i94_files"] = input_files_reordered
    print(f"i94_files: {PATHS['i94_files']}")
"""

'\ninput_files = parse_input_files(PATHS, PATHS["i94_data"], "*.sas7bdat")\n    input_files_reordered = reorder_paths(input_files)\n    PATHS["i94_files"] = input_files_reordered\n    print(f"i94_files: {PATHS[\'i94_files\']}")\n'

In [11]:
def process_hurdat_data(spark, path_d):
    """
    Load input data (HURDAT) from input path
    Write / read the data to / from Spark
    Store the data as parquet staging files
    """

    print("Processing HURDAT data")
    
    # Read CSV
    col_names = ['Date','Time','RecordIdentifier','SystemStatus','Latitude','Longitude','MaxSustWind','MaxPressure',
                 'NE34','SE34','SW34','NW34',
                 'NE50','SE50','SW50','NW50',
                 'NE64','SE64','SW64','NW64']
    hurdat_df = pd.read_csv(path_d["hurdat"], skiprows = 2, low_memory=False, names=col_names)
    
    # Initial cleaning / reshaping
    
    #remove ghost row
    hurdat_df = hurdat_df.drop([0])

    #check if row is convoluted header row (contains ALPHA characters)
    hurdat_df['IsStormHdr'] = ~hurdat_df['Date'].str.isdigit()

    #create empty columns to receive header data
    hurdat_df['StormIdentifier'] = ''
    hurdat_df['StormName'] = ''
    hurdat_df['StormSamples'] = ''
    
    #Iterate over rows to get header data and write to list
    Lidentifier = []
    Lname = []
    Lsamples = []

    identifier = ""
    name = ""
    samples = ""

    for row in hurdat_df.itertuples(index=True):
        if (getattr(row, "IsStormHdr") == True):
            identifier = getattr(row, "Date")
            name = getattr(row, "Time")
            samples = getattr(row, "RecordIdentifier")
        Lidentifier.append(identifier)
        Lname.append(name)
        Lsamples.append(samples)    
    
    #write list data into dataframe
    hurdat_df.StormIdentifier = Lidentifier
    hurdat_df.StormName = Lname
    hurdat_df.StormSamples = Lsamples    
    
    #split into storms and tracks
    hurdat_storms_df = hurdat_df[hurdat_df['IsStormHdr'] == True].copy()
    hurdat_storms_df = hurdat_storms_df[['StormIdentifier','StormName','StormSamples']]

    hurdat_tracks_df = hurdat_df[hurdat_df['IsStormHdr'] == False].copy()  
    
    
    Tlatitude = [parsell(lat) for lat in hurdat_tracks_df['Latitude']]
    hurdat_tracks_df['Latitude'] = Tlatitude

    Tlongitude = [parsell(lon) for lon in hurdat_tracks_df['Longitude']]
    hurdat_tracks_df['Longitude'] = Tlongitude 
    
    # --------------------------------------------------------
    # Read data to Spark
    print("Reading HURDAT to Spark")
    hurdat_storm_schema = t.StructType([
                                t.StructField("StormIdentifier", t.StringType(), False),
                                t.StructField("StormName", t.StringType(), False),
                                t.StructField("StormSamples", t.StringType(), False),
                            ])

    hurdat_track_schema = t.StructType([
                                t.StructField('Date', t.StringType(), False),
                                t.StructField('Time', t.StringType(), False),
                                t.StructField('RecordIdentifier', t.StringType(), False),
                                t.StructField('SystemStatus', t.StringType(), False),
                                t.StructField('Latitude', t.StringType(), False),
                                t.StructField('Longitude', t.StringType(), False),
                                t.StructField('MaxSustWind', t.StringType(), False),
                                t.StructField('MaxPressure', t.StringType(), False),                   
                                t.StructField('NE34', t.StringType(), False),
                                t.StructField('SE34', t.StringType(), False),
                                t.StructField('SW34', t.StringType(), False),
                                t.StructField('NW34', t.StringType(), False),    
                                t.StructField('NE50', t.StringType(), False),
                                t.StructField('SE50', t.StringType(), False),
                                t.StructField('SW50', t.StringType(), False),
                                t.StructField('NW50', t.StringType(), False),
                                t.StructField('NE64', t.StringType(), False),
                                t.StructField('SE64', t.StringType(), False),
                                t.StructField('SW64', t.StringType(), False),
                                t.StructField('NW64', t.StringType(), False),
                                t.StructField('IsStormHdr', t.StringType(), False),             
                                t.StructField("Identifier", t.StringType(), False),
                                t.StructField("Name", t.StringType(), False),
                                t.StructField("Samples", t.StringType(), False)
                                ])

    hurdat_storms_df_spark = spark.createDataFrame(hurdat_storms_df, schema=hurdat_storm_schema)
    hurdat_tracks_df_spark = spark.createDataFrame(hurdat_tracks_df, schema=hurdat_track_schema)

    
    parquet_wr(path_d["output_data"] + "hurdat_storms_stage.parquet", hurdat_storms_df_spark)
    parquet_wr(path_d["output_data"] + "hurdat_tracks_stage.parquet", hurdat_tracks_df_spark)

    
    print("HURDAT processing complete")

    return hurdat_storms_df_spark, hurdat_tracks_df_spark

In [12]:
# Process all input files
hurdat_df_spark = process_hurdat_data(spark, path_d)
hurdat_storms_df_spark = hurdat_df_spark[0]
hurdat_tracks_df_spark = hurdat_df_spark[1]

Processing HURDAT data
Reading HURDAT to Spark
HURDAT processing complete


In [13]:
"""
# Cleaning the data:
"""

'\n# Cleaning the data:\n'

In [14]:
# Process Dimension tables.

def process_joined_hurdat_data( spark, path_d, hurdat_storms_df_spark, hurdat_tracks_df_spark):
    """
    Load input data
    Join tables
    Write / read the data to / from Spark
    Store the data as parquet dimension files

    """
    print("Creating table")

    hurdat_df_spark_joined = hurdat_storms_df_spark\
                        .join(hurdat_tracks_df_spark, \
                        (hurdat_storms_df_spark.StormIdentifier == hurdat_tracks_df_spark.Identifier))   
    
    # Create table
    hurdat_df_spark_joined.createOrReplaceTempView("hurdat_table_DF")
    hurdat_table = spark.sql(hurdat_table_createquery)

    print("HURDAT schema:")
    hurdat_table.printSchema()

    parquet_wr(path_d["output_data"] + "hurdat_table.parquet", hurdat_table)
    
    print("HURDAT table complete")

    return hurdat_table

In [15]:
hurdat_table = process_joined_hurdat_data( spark, path_d, hurdat_storms_df_spark, hurdat_tracks_df_spark)

Creating table
HURDAT schema:
root
 |-- track_id: string (nullable = false)
 |-- storm_id: string (nullable = false)
 |-- storm_name: string (nullable = false)
 |-- sample_count: string (nullable = false)
 |-- date: string (nullable = false)
 |-- time: string (nullable = false)
 |-- system_status: string (nullable = false)
 |-- latitude: string (nullable = false)
 |-- longitude: string (nullable = false)
 |-- max_sust_wind: string (nullable = false)
 |-- max_pressure: string (nullable = false)
 |-- NE34: string (nullable = false)
 |-- SE34: string (nullable = false)
 |-- SW34: string (nullable = false)
 |-- NW34: string (nullable = false)
 |-- NE50: string (nullable = false)
 |-- SE50: string (nullable = false)
 |-- SW50: string (nullable = false)
 |-- NW50: string (nullable = false)
 |-- NE64: string (nullable = false)
 |-- SE64: string (nullable = false)
 |-- SW64: string (nullable = false)
 |-- NW64: string (nullable = false)

HURDAT table complete


In [16]:
"""
# Process Fact table.
#immigrations_table_df = process_immigrations_data(spark, PATHS, i94_df_spark_clean, country_codes_i94_df_spark, airport_codes_i94_df_spark, time_table_df, start_str)
"""

'\n# Process Fact table.\n#immigrations_table_df = process_immigrations_data(spark, PATHS, i94_df_spark_clean, country_codes_i94_df_spark, airport_codes_i94_df_spark, time_table_df, start_str)\n'

In [17]:
def check_data_quality( spark, hurdat_table):
    """
    Check data quality for HURDAT table
    """

    results = { "hurdat_count": 0,
                "hurdat": ""           
              }

    print("Checking HURDAT table...")

    hurdat_table.createOrReplaceTempView("hurdat_table_DF")
    hurdat_table_check1 = spark.sql(hurdat_table_check1_query)
    hurdat_table_flag1 = hurdat_table_check1.collect()[0][0] > 0
    hurdat_table_check2 = spark.sql(hurdat_table_check2_query)
    hurdat_table_flag2 = hurdat_table_check2.collect()[0][0] < 1

    hurdat_flag = any([hurdat_table_flag1, hurdat_table_flag2])

    if hurdat_flag:
        results['hurdat_count'] = hurdat_table_check2.collect()[0][0]
        results['hurdat'] = "NOK"
    else:
        results['hurdat_count'] = hurdat_table_check2.collect()[0][0]
        results['hurdat'] = "OK" 

    print("NULLS:")
    hurdat_table_check1.show(1)
    print("ROWS:")
    hurdat_table_check2.show(1)
    # --------------------------------------------------------
    print("Checking data quality complete")

    return results

In [18]:
results = check_data_quality( spark, hurdat_table)
results_all.append(results)

print(results_all)

Checking HURDAT table...
NULLS:
+--------+
|count(1)|
+--------+
|       0|
+--------+

ROWS:
+--------+
|count(1)|
+--------+
|   51840|
+--------+

Checking data quality complete
[{'hurdat_count': 51840, 'hurdat': 'OK'}]


In [19]:
print("ETL pipeline complete")

ETL pipeline complete
