# PROJECT: DATA LAKE
## ETL PIPELINE FOR SPARKIFY DATABASE - DATA LAKE ON S3

## PART 2: VALIDATION ON THE SPARKIFY DATABASE

### by Tran Nguyen

Perform the data quality checks to make sure if the ETL pipeline successfully added all the records to the tables.

## 1. SET UP SPARK SESSION

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from datetime import datetime
from pyspark.sql.functions import col, max as max_
from pyspark.sql.functions import *
from time import time

import os
import configparser

### 1.1. LOAD AWS CREDENTIALS

In [2]:
### Read config file
config = configparser.ConfigParser()
config.read('credentials.cfg')
# Retrieve value using config['KEYWORD']['SUBKEYWORD']
os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

### 1.2. CREATE SPARK SESSION WITH HADOOP-AWS PACKAGE

In [3]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()
### The package `org.apache.hadoop:hadoop-aws:2.7.0` allows you to connect aws S3.

## 2. PERFORM QUALITY CHECK

In [37]:
### Function for quality check
def validate_table_insertion(table, id):
    """
    Read the parquet file into dataframe, convert spark df to sql table
    Check the table using sql queries
    """
    ### load table
    table_name = os.path.join(input_path, table)
    parquetFile = spark.read.parquet(table_name)
    ### print count
    print('\033[1m' + f"Validate insertion from table {table}:" + '\033[0m')
    print("Dimension of the table:", parquetFile.count())
    
    ### convert df to sql table
    parquetFile.createOrReplaceTempView(table)
    
    ### count unique
    unique = spark.sql(f"SELECT COUNT(DISTINCT {id}) count FROM {table};")
    print("Number of unique id: ", unique.select('count').collect()[0]['count'])
    ### check null id
    
    null_id = spark.sql(f"SELECT COUNT(*) count FROM {table} WHERE {id} = NULL;")    
    print("Number of null id: ", null_id.select('count').collect()[0]['count'])
    
    ### print some examples
    print("Some examples from the table:")
    examples = spark.sql(f"SELECT * FROM {table} LIMIT 5;")
    examples.show()

In [38]:
input_path = 'output' # location of all the tables
tables = ['songs', 'artists', 'users', 'time', 'songplays']
ids = ["song_id", "artist_id", "user_id", "start_time", "start_time"]

### Perform all the check:
for i in range(0, len(tables)):
    validate_table_insertion(tables[i], ids[i])

[1mValidate insertion from table songs:[0m
Dimension of the table: 14896
Number of unique id:  14896
Number of null id:  0
Some examples from the table:
+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SODUMDU12AC468A22B|We're Skrewed (Ot...| 249.5473|   0|ARHOSMU1242078130D|
|SOTCIHX12A8C13DDD2|Finally_ as that ...|483.34322|2006|ARYOIZG1187FB41E30|
|SOPSXLI12A6D4FA418|Practical Cats - ...|251.03628|   0|ARR79V31187FB5B96E|
|SOUFBFK12A8C13D668|String Quartets O...|348.60363|   0|ARAILTA11F4C840A06|
|SOROAMT12A8C13C6D0|Me gustan mas los...|101.85098|2008|ARWUDTF1187B9AA096|
+------------------+--------------------+---------+----+------------------+

[1mValidate insertion from table artists:[0m
Dimension of the table: 10025
Number of unique id:  9553
Number of null id:  0
Some examples from the table:
+---