In [4]:
import sys
sys.path.append('..')

from utils.spark_session import get_spark
from utils.validation import *
from utils.transformation import *

from pyspark.sql.functions import when
from pyspark.sql import functions as F

## Transformation ##
### Starting Spark Session ###

In [5]:
spark = get_spark()
spark

### 1. Reading Bronze Data ###

In [6]:
category_df = spark.read.parquet(
    '../data/bronze_ingested/category', 
    )

products_df = spark.read.parquet(
    '../data/bronze_ingested/products',
    )

sales_df = spark.read.parquet(
    '../data/bronze_ingested/sales',
    )

stores_df = spark.read.parquet(
    '../data/bronze_ingested/store',
    )

warranty_df = spark.read.parquet(
    '../data/bronze_ingested/warranty',
    )

### 2. Data Cleaning
#### 2.1: Standardizing

In [7]:
#------------Category Standardiztion--------------
category_df = rename_columns_lower(category_df)
category_df = cast_columns(category_df, {   'category_id':'string',
                                            'category_name' : 'string'
                                            })

#------------Product Standardiztion--------------
products_df = rename_columns_lower(products_df)
products_df = alphaNum_to_num(products_df,'price', dtype='float')
products_df = cast_columns(products_df, {   'product_id':'string',
                                            'product_name' : 'string',
                                            'category_id' : 'string',
                                            'launch_date' : 'date',
                                            })

#------------Sales Standardiztion--------------
sales_df = rename_columns_lower(sales_df)
sales_df = alphaNum_to_num(sales_df,'quantity', dtype='int')
sales_df = cast_columns(sales_df, {     'sale_id':'string',
                                        'sale_date' : 'date',
                                        'store_id' : 'string',
                                        'product_id' : 'string',
                                        })

#------------Stores Standardiztion--------------
stores_df = rename_columns_lower(stores_df)
stores_df = cast_columns(stores_df, {   'store_id':'string',
                                        'store_name' : 'string',
                                        'city' : 'string',
                                        'country' : 'string'
                                        })

#------------Warranty Standardiztion--------------
warranty_df = rename_columns_lower(warranty_df)
warranty_df = cast_columns(warranty_df, {   'claim_id':'string',
                                            'claim_date' : 'date',
                                            'sale_id' : 'string',
                                            'repair_status' : 'string'
                                            })

#### 2.2: Profiling to Identify Inconsistencies

In [8]:
profiles = []

profiles.append(profile_table(category_df, 'categories', 'category_id'))
profiles.append(profile_table(products_df, 'products', 'Product_ID',['Category_ID'],[category_df],['category_id']))
profiles.append(profile_table(stores_df, 'stores', 'Store_ID'))
profiles.append(profile_table(sales_df, 'sales', 'sale_id',['store_id','product_id'],[stores_df,products_df],['Store_ID','Product_ID']))
profiles.append(profile_table(warranty_df, 'warranty', 'claim_id',['sale_id'],[sales_df],['sale_id']))


#### 3. Cleaning

##### 3.1 Reading the profiles table created in 2.2 to assess necessary cleaning operations

In [9]:
profile_table_reader(profiles)


Data Quality Report — CATEGORIES
----------------------------------------
  - category_id: 1
----------------------------------------

Data Quality Report — PRODUCTS
----------------------------------------
  - product_id: 4
  - product_name: 5
  - category_id: 3
  - launch_date: 4
  - price: 3
  - Category_ID: 13
----------------------------------------

Data Quality Report — STORES
----------------------------------------
  - store_id: 2
  - store_name: 2
  - country: 6
----------------------------------------

Data Quality Report — SALES
----------------------------------------
  - sale_id: 52719
  - sale_date: 52996
  - store_id: 52838
  - product_id: 53162
  - quantity: 53354
  - store_id: 79806
  - product_id: 98499
----------------------------------------

Data Quality Report — WARRANTY
----------------------------------------
  - claim_id: 1503
  - claim_date: 1581
  - sale_id: 1504
  - repair_status: 1526
  - sale_id: 2971
----------------------------------------


#### To do:

* De-duplicate Rows Where Necessary
* De-duplicate Primary Keys
* Handle Null Values
* Drop Invalid Foreign Keys

In [10]:
category_df = deduplicate(category_df,['category_id'])
category_df = category_df.filter(col('category_id').isNotNull())

In [11]:
products_df = deduplicate(products_df,['product_id'])

products_df = products_df.filter(col('product_id').isNotNull())

median_price = products_df.agg({'price':'median'}).collect()[0][0]
products_df = fill_missing(products_df, {
    'product_name' : 'unkown',
    'price' : median_price
})
products_df = products_df.withColumn('missing_launch_date_flag',
                                    when(col('launch_date').isNull(),1).otherwise(0)
                                    )

In [12]:
stores_df = deduplicate(stores_df,['store_id'])
stores_df = stores_df.filter(col('store_id').isNotNull())
stores_df = fill_missing(stores_df,{
    'store_name' : 'unknown',
    'country' : 'unknown'
})

In [13]:
sales_df = sales_df.filter(
    col('sale_id').isNotNull() &
    col('product_id').isNotNull() &
    col('store_id').isNotNull()
)

sales_df = sales_df.withColumns({
    'quantity_missing_flag': when(col('quantity').isNull(),1).otherwise(0),
    'sale_date_missing_flag' : when(col('sale_date').isNull(),1).otherwise(0)
        }
)

sales_df = deduplicate(sales_df,subset_cols=['sale_id','product_id'])

sales_df = fill_missing(sales_df, {'quantity' : 0})

In [14]:
warranty_df = warranty_df.filter(
    col('claim_id').isNotNull() &
    col('sale_id').isNotNull()
)

warranty_df = deduplicate(warranty_df,['claim_id'])

warranty_df = warranty_df.withColumn('missing_claim_date_flag',
                                    when(col('claim_date').isNull(),1).otherwise(0)
)

warranty_df = fill_missing(warranty_df,{
    'repair_status' : 'unknown'
})


#### 3.2 Referential Integrity Enforcement

In [15]:
products_df = dup_FK_check(products_df,'category_id',category_df,'category_id',drop=True)[0]

In [16]:
sales_df = dup_FK_check(sales_df,'product_id',products_df,'product_id',drop=True)[0]
sales_df = dup_FK_check(sales_df,'store_id',stores_df,'store_id',drop=True)[0]

In [17]:
warranty_df = dup_FK_check(warranty_df,'sale_id',sales_df,'sale_id',drop=True)[0]

### 4. Final Validation

In [18]:
c_profiles = []

c_profiles.append(profile_table(category_df, 'categories', 'category_id'))
c_profiles.append(profile_table(products_df, 'products', 'product_id',['category_id'],[category_df],['category_id']))
c_profiles.append(profile_table(stores_df, 'stores', 'store_id'))
c_profiles.append(profile_table(sales_df, 'sales', 'sale_id',['store_id','product_id'],[stores_df,products_df],['store_id','product_id']))
c_profiles.append(profile_table(warranty_df, 'warranty', 'claim_id',['sale_id'],[sales_df],['sale_id']))

profile_table_reader(c_profiles)


Data Quality Report — CATEGORIES
----------------------------------------
                ALL CLEAR               
----------------------------------------

Data Quality Report — PRODUCTS
----------------------------------------
  - launch_date: 3
----------------------------------------

Data Quality Report — STORES
----------------------------------------
                ALL CLEAR               
----------------------------------------

Data Quality Report — SALES
----------------------------------------
  - sale_date: 35596
----------------------------------------

Data Quality Report — WARRANTY
----------------------------------------
  - claim_date: 926
----------------------------------------


#### All Clear. Ready to Upload to Silver
Note: launch_date, sale_date, claim_date is being allowed to be null.

### 5. Uploading to Silver

In [19]:
category_df.write.mode('overwrite').parquet('../data/silver_cleaned/category')
products_df.write.mode('overwrite').parquet('../data/silver_cleaned/products')
sales_df.write.mode('overwrite').parquet('../data/silver_cleaned/sales')
stores_df.write.mode('overwrite').parquet('../data/silver_cleaned/store')
warranty_df.write.mode('overwrite').parquet('../data/silver_cleaned/warranty')

### DONE!