<a href="https://colab.research.google.com/github/jimmarczyk/data_cleaning_scala/blob/master/FirstAmerican_JimMarczyk.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [69]:
from google.colab import drive
drive.mount('/content/drive')
!pip install pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
spark

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [70]:
df_raw = spark.read.csv('/data-engineer-interview-data.csv', header=True,)
df_raw.show()

+--------------+------------+------------+----------------------+--------------------+------------+
|DocumentNumber|DocumentDate|DocumentType|RefersToDocumentNumber|RefersToDocumentYear|     Remarks|
+--------------+------------+------------+----------------------+--------------------+------------+
|            27|   8/16/2021|           C|                  null|                null|        null|
|             1|   2/15/2021|           T|                     8|                2020|2020 0008-00|
|            67|   10/9/2020|           A|                  null|                null|        null|
|           157|   10/9/2020|           A|                  null|                null|        null|
|           189|   10/9/2020|           J|                  null|                null|        null|
|           250|   10/9/2020|           R|                  2016|                  98|        null|
|            87|    8/7/2020|           R|                  1992|                  97|        null|


In [71]:
df_raw.count()

146

In [72]:
df_raw.dtypes

[('DocumentNumber', 'string'),
 ('DocumentDate', 'string'),
 ('DocumentType', 'string'),
 ('RefersToDocumentNumber', 'string'),
 ('RefersToDocumentYear', 'string'),
 ('Remarks', 'string')]

spark can guess data types, but it is a best practice to explicity define the types

In [73]:
from pyspark.sql.types import *

In [74]:
Schema=StructType([
    StructField('DocumentNumber',IntegerType(),nullable=True),
    StructField('DocumentDate',DateType(),nullable=True),
    StructField('DocumentType',StringType(),nullable=True),
    StructField('RefersToDocumentNumber',IntegerType(),nullable=True),
    StructField('DocumeRefersToDocumentYearntNumber',IntegerType(),nullable=True),
    StructField('Remarks',StringType(),nullable=True)
])
df_raw = spark.read.option("header",True).schema(Schema).csv('/data-engineer-interview-data.csv')
df_raw.printSchema()

root
 |-- DocumentNumber: integer (nullable = true)
 |-- DocumentDate: date (nullable = true)
 |-- DocumentType: string (nullable = true)
 |-- RefersToDocumentNumber: integer (nullable = true)
 |-- DocumeRefersToDocumentYearntNumber: integer (nullable = true)
 |-- Remarks: string (nullable = true)



enforcing the schema is a fundamental step in cleaning the data

In [75]:
!pip install quinn
import quinn

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [76]:
quinn.validate_schema(df_raw, Schema)

no schema violations

In [77]:
df_raw.count()

146

dropping malformed rows will discard any corrupt records/type violations

In [78]:
df_cleanded=spark.read.option("header",True).option("mode",'DROPMALFORMED').csv('/data-engineer-interview-data.csv')

no malformed items

In [79]:
df_cleanded.count()

146

In [80]:
df_cleanded.show()

+--------------+------------+------------+----------------------+--------------------+------------+
|DocumentNumber|DocumentDate|DocumentType|RefersToDocumentNumber|RefersToDocumentYear|     Remarks|
+--------------+------------+------------+----------------------+--------------------+------------+
|            27|   8/16/2021|           C|                  null|                null|        null|
|             1|   2/15/2021|           T|                     8|                2020|2020 0008-00|
|            67|   10/9/2020|           A|                  null|                null|        null|
|           157|   10/9/2020|           A|                  null|                null|        null|
|           189|   10/9/2020|           J|                  null|                null|        null|
|           250|   10/9/2020|           R|                  2016|                  98|        null|
|            87|    8/7/2020|           R|                  1992|                  97|        null|


log rows that contain DocumentType = 'R'

note: in case of append issues - try this:  df.write.mode("append").csv("pathToFile")

In [81]:
from pyspark.sql.functions import col
df_rdocs = df_cleanded.filter(col('DocumentType') == 'R').write.csv("remove_log.csv", mode="append", header=True)
df_rdocs

In [82]:
df_candidate = df_cleanded.filter(col('DocumentType') != 'R')

In [83]:
df_candidate.show()

+--------------+------------+------------+----------------------+--------------------+------------+
|DocumentNumber|DocumentDate|DocumentType|RefersToDocumentNumber|RefersToDocumentYear|     Remarks|
+--------------+------------+------------+----------------------+--------------------+------------+
|            27|   8/16/2021|           C|                  null|                null|        null|
|             1|   2/15/2021|           T|                     8|                2020|2020 0008-00|
|            67|   10/9/2020|           A|                  null|                null|        null|
|           157|   10/9/2020|           A|                  null|                null|        null|
|           189|   10/9/2020|           J|                  null|                null|        null|
|             8|    7/3/2020|           B|                  null|                null|        null|
|             2|    1/5/2020|           C|                  null|                null|        null|


Removing the J docs referred to by the T docs

In [84]:
df_candidate.createOrReplaceTempView("df1")

In [85]:
df_remove = df_candidate.createOrReplaceTempView("df1")
spark.sql('''
SELECT d1.DocumentNumber, d1.DocumentDate, d1.DocumentType, d1.RefersToDocumentNumber, d1.RefersToDocumentYear, d1.Remarks FROM df1 as d1
  FULL OUTER JOIN df1 as d2
  ON d1.DocumentNumber == d2.RefersToDocumentNumber
  WHERE d1.DocumentType == "J" 
  AND d2.DocumentType == "T"
'''
).write.mode("append").csv("remove_log.csv", header=True)


In [86]:
df_final = df_candidate.filter(col('DocumentNumber') != '10300')

It appears that the values may have been accidentally swapped between RefersToDocumentNumber and RefersToDocumentYear (the numbers look like years and vica versa. Note to ask Business/Data Analyst.

I will continue with the requirements, as written.

In [87]:
df_final.show()

+--------------+------------+------------+----------------------+--------------------+------------+
|DocumentNumber|DocumentDate|DocumentType|RefersToDocumentNumber|RefersToDocumentYear|     Remarks|
+--------------+------------+------------+----------------------+--------------------+------------+
|            27|   8/16/2021|           C|                  null|                null|        null|
|             1|   2/15/2021|           T|                     8|                2020|2020 0008-00|
|            67|   10/9/2020|           A|                  null|                null|        null|
|           157|   10/9/2020|           A|                  null|                null|        null|
|           189|   10/9/2020|           J|                  null|                null|        null|
|             8|    7/3/2020|           B|                  null|                null|        null|
|             2|    1/5/2020|           C|                  null|                null|        null|


Replacing 'null' with '-1', -1 could never be mistaken as an index value

In [88]:
df_final.na.fill('-1').show()

+--------------+------------+------------+----------------------+--------------------+------------+
|DocumentNumber|DocumentDate|DocumentType|RefersToDocumentNumber|RefersToDocumentYear|     Remarks|
+--------------+------------+------------+----------------------+--------------------+------------+
|            27|   8/16/2021|           C|                    -1|                  -1|          -1|
|             1|   2/15/2021|           T|                     8|                2020|2020 0008-00|
|            67|   10/9/2020|           A|                    -1|                  -1|          -1|
|           157|   10/9/2020|           A|                    -1|                  -1|          -1|
|           189|   10/9/2020|           J|                    -1|                  -1|          -1|
|             8|    7/3/2020|           B|                    -1|                  -1|          -1|
|             2|    1/5/2020|           C|                    -1|                  -1|          -1|


In [89]:
df_final.write.mode("append").csv("final_table.csv", header=True)