<a href="https://colab.research.google.com/github/monicman/dataengineering/blob/master/Capstone_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 62kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 50.7MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130387 sha256=a831a50f539a821227ae269a8cbe3bcac82eb519949d810ad890e2e357bb0780
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.4


In [0]:
import pandas as pd
from pyspark.sql.functions import isnull, when, count, col
from pyspark.sql import SparkSession
from pyspark import SparkFiles
import urllib
from urllib.request import urlopen   
import io
import requests
from zipfile import ZipFile
import os
import numpy as np

In [0]:
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType

def ingest_files(url,filename):
  spark.sparkContext.addFile(url)
  if url[-3:] == 'csv':
    return spark.read.csv(SparkFiles.get(filename), header=True)
  elif url[-3:] == 'zip':
    url = urlopen(url)
    output = open(filename, 'wb')    # note the flag:  "wb"        
    output.write(url.read())
    output.close()
    df = pd.read_table(filename)
    df.iloc[:, 3:6] = df.iloc[:, 3:6].apply(pd.to_datetime, errors='coerce', format='%d/%m/%Y')
    myschema = StructType([StructField('REGISTER_NAME', StringType(), True),
                         StructField('BN_NAME', StringType(), True),
                         StructField('BN_STATUS', StringType(), True),
                         StructField('BN_REG_DT', DateType(), True),
                         StructField('BN_CANCEL_DT', DateType(), True),
                         StructField('BN_RENEW_DT', DateType(), True),
                         StructField('BN_STATE_NUM', StringType(), True),
                         StructField('BN_STATE_OF_REG', StringType(), True),
                         StructField('BN_ABN', DoubleType(), True)])
    return spark.createDataFrame(df, schema=myschema)

In [0]:
ipgod101 = 'https://data.gov.au/data/dataset/a4210de2-9cbb-4d43-848d-46138fefd271/resource/e5cbeafc-5fb3-4dfd-bd22-afe81b6ab1e1/download/ipgod101.csv'
ipgod102 = 'https://data.gov.au/data/dataset/a4210de2-9cbb-4d43-848d-46138fefd271/resource/846990df-db42-4ad7-bbd6-567fd37a2797/download/ipgod102.csv'  
ABN = 'https://data.gov.au/data/dataset/bc515135-4bb6-4d50-957a-3713709a76d3/resource/839cc783-876f-47a2-a70c-0fe606977517/download/business_names_201909.zip'

staging_ipgod101 = ingest_files(ipgod101,'ipgod101.csv')
staging_ipgod102 = ingest_files(ipgod102,'ipgod102.csv')
staging_abr = ingest_files(ABN,'zipFile.zip')

In [0]:
print("Data quality check: Count of all values for table ipgod101")
staging_ipgod101.select([count(c).alias(c) for c in staging_ipgod101.columns]).show()
print("Data quality check: Count of null values for table ipgod101")
staging_ipgod101.select([count(when(isnull(c), c)).alias(c) for c in staging_ipgod101.columns]).show()

Data quality check: Count of all values for table ipgod101
+------------------+----------------+------------+---------------+----------+-------+-------+----------+------------------+-----------+----------------------+
|australian_appl_no|application_year|sealing_year|applicant_count|australian|foreign| entity|non_entity|patent_status_type|patent_type|primary_ipc_mark_value|
+------------------+----------------+------------+---------------+----------+-------+-------+----------+------------------+-----------+----------------------+
|           1403904|         1403904|      505382|        1370763|   1403904|1403904|1403904|   1403904|           1044354|    1403881|                908730|
+------------------+----------------+------------+---------------+----------+-------+-------+----------+------------------+-----------+----------------------+

Data quality check: Count of null values for table ipgod101
+------------------+----------------+------------+---------------+----------+-------+

In [0]:
print("Data quality check: Count of all values for table ipgod102")
staging_ipgod102.select([count(c).alias(c) for c in staging_ipgod102.columns]).show()
print("Data quality check: Count of null values for table ipgod102")
staging_ipgod102.select([count(when(isnull(c), c)).alias(c) for c in staging_ipgod102.columns]).show()

Data quality check: Count of all values for table ipgod102
+------------------+-------+-------+----------+-------+-------+---------+---------+------+--------+------+------+--------+--------+--------+--------+--------+---------+------+------+
|australian_appl_no| ipa_id|country|australian| entity|   name|cleanname|corp_desg| state|postcode|   lat|   lon|sa2_code|sa2_name|lga_code|lga_name|gcc_name|elect_div|   abn|   acn|
+------------------+-------+-------+----------+-------+-------+---------+---------+------+--------+------+------+--------+--------+--------+--------+--------+---------+------+------+
|           1464871|1274404|1464871|   1464871|1464871|1464871|  1464870|   920480|333711|  325940|325577|325577|  325577|  325577|  325577|  325577|  325577|   325577|146054|120913|
+------------------+-------+-------+----------+-------+-------+---------+---------+------+--------+------+------+--------+--------+--------+--------+--------+---------+------+------+

Data quality check: Count

In [0]:
print("Data quality check: Count of all values for table abr")
staging_abr.select([count(c).alias(c) for c in staging_abr.columns]).show()
print("Data quality check: Count of null values for table abr")
staging_abr.select([count(when(isnull(c), c)).alias(c) for c in staging_abr.columns]).show()

Data quality check: Count of all values for table abr
+-------------+-------+---------+---------+------------+-----------+------------+---------------+-------+
|REGISTER_NAME|BN_NAME|BN_STATUS|BN_REG_DT|BN_CANCEL_DT|BN_RENEW_DT|BN_STATE_NUM|BN_STATE_OF_REG| BN_ABN|
+-------------+-------+---------+---------+------------+-----------+------------+---------------+-------+
|      2644907|2644907|  2644907|  2644907|      346039|    2644907|     2644907|        2644907|2644907|
+-------------+-------+---------+---------+------------+-----------+------------+---------------+-------+

Data quality check: Count of null values for table abr
+-------------+-------+---------+---------+------------+-----------+------------+---------------+------+
|REGISTER_NAME|BN_NAME|BN_STATUS|BN_REG_DT|BN_CANCEL_DT|BN_RENEW_DT|BN_STATE_NUM|BN_STATE_OF_REG|BN_ABN|
+-------------+-------+---------+---------+------------+-----------+------------+---------------+------+
|            0|      0|        0|        0|  

In [0]:
staging_ipgod101.createOrReplaceTempView('staging_ipgod101')
staging_ipgod102.createOrReplaceTempView('staging_ipgod102')
staging_abr.createOrReplaceTempView('staging_abr')


application =  spark.sql(""" SELECT DISTINCT 
                                      staging_ipgod101.australian_appl_no,
                                      staging_ipgod101.patent_status_type,
                                      staging_ipgod101.patent_type,
                                      staging_ipgod101.primary_ipc_mark_value,
                                      staging_ipgod102.ipa_id,
                                      staging_ipgod102.lat,
                                      staging_ipgod102.lon
                                      from staging_ipgod101 inner join staging_ipgod102
                                      on staging_ipgod101.australian_appl_no = staging_ipgod102.australian_appl_no
                                      where staging_ipgod102.australian = 'True' and staging_ipgod102.entity = 'True'
                                      order by staging_ipgod101.australian_appl_no,staging_ipgod102.ipa_id
                                      """)

region = spark.sql("""SELECT DISTINCT
                                       lat,
                                       lon,
                                       state,
                                       sa2_code,
                                       sa2_name,
                                       lga_code,
                                       lga_name,
                                       gcc_name,
                                       elect_div
                                   FROM staging_ipgod102
                                   where australian = 'True' and entity = 'True' and lat is not NULL and lon is not NULL
                                   order by lat,lon
                                   """) 

business = spark.sql("""SELECT    DISTINCT
                                  staging_ipgod102.ipa_id,
                                  staging_ipgod102.abn,
                                  staging_ipgod102.cleanname,
                                  staging_abr.BN_STATUS as business_registration_status,
                                  staging_abr.BN_REG_DT as business_registration_date,
                                  staging_abr.BN_REG_DT as business_cancel_date,
                                  staging_abr.BN_RENEW_DT as business_renew_date
                                  from staging_ipgod102 INNER JOIN staging_abr 
                                  on staging_ipgod102.abn  = staging_abr.BN_ABN

                                  """)

In [0]:
business.head(5)

[Row(ipa_id='83891.0', abn='85054355618.0', cleanname='GOLDEN CIRCLE', business_registration_status='Registered', business_registration_date=datetime.date(2010, 11, 24), business_cancel_date=datetime.date(2010, 11, 24), business_renew_date=datetime.date(2013, 11, 24)),
 Row(ipa_id='143465.0', abn='78002894224.0', cleanname='LANDIS & GYR', business_registration_status='Registered', business_registration_date=datetime.date(2008, 1, 14), business_cancel_date=datetime.date(2008, 1, 14), business_renew_date=datetime.date(2017, 1, 14)),
 Row(ipa_id='91737.0', abn='64126087509.0', cleanname='CLEARWATER FILTER SYSTEMS AUSTRALIA', business_registration_status='Registered', business_registration_date=datetime.date(2015, 6, 9), business_cancel_date=datetime.date(2015, 6, 9), business_renew_date=datetime.date(2021, 7, 1)),
 Row(ipa_id='509027.0', abn='31631230405.0', cleanname='COLORCORP', business_registration_status='Registered', business_registration_date=datetime.date(2019, 4, 8), business_can