#Enterprise Fleet Analytics Pipeline: Focuses on the business outcome (analytics) and the domain (fleet/logistics).

![logistics](logistics_project.png)

Download the data from the below gdrive and upload into the catalog
https://drive.google.com/drive/folders/1J3AVJIPLP7CzT15yJIpSiWXshu1iLXKn?usp=drive_link

##**1. Data Munging** -

####1. Visibily/Manually opening the file and capture couple of data patterns (Manual Exploratory Data Analysis)

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW dfjson
USING json
OPTIONS (
  path "/Volumes/workspace/default/logistics_project_data/logistics_shipment_detail_3000.json",
  multiLine "true"
);

select * from dfjson limit 10

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW dfls1
USING csv
OPTIONS (
  path "/Volumes/workspace/default/logistics_project_data/logistics_source1",
  header "true",
  inferSchema "true"
);
select * from dfls1 limit 10;

In [0]:
CREATE OR REPLACE TEMP VIEW dfls2
USING CSV
OPTIONS (
  path "/Volumes/workspace/default/logistics_project_data/logistics_source2",
  header "true",
  inferSchema "true"
);

select * from dfls2 limit 10;

# (Manual Exploratory Data Analysis)
- It is a Structured data with comma seperator (CSV)
- Header, No comments, NO footer is there in the data
- Total columns are (seperator + 1)
####- Data Quality 
- Null columns & null rows are there
- duplicate rows & Duplicate keys
- format issues are there (age & shipment_id  is not in number format eg.ten)
- No Uniformity issues found manually (Artist, artist)
- Number of columns are more or less than the expected
- eg. 5000006,John,Mathews,ten,Supervisor,Additionalcolumn
- 5000004,Suresh,,52,Loader
- Identification of data type

####2. Programatically try to find couple of data patterns applying below EDA (File: logistics_source1)
1. Apply inferSchema and toDF to create a DF and analyse the actual data.
2. Analyse the schema, datatypes, columns etc.,
3. Analyse the duplicate records count and summary of the dataframe.

In [0]:
-- Apply inferSchema and toDF to create a DF and analyse the actual data.
CREATE OR REPLACE TEMP VIEW dfls1
USING csv
OPTIONS (
  path "/Volumes/workspace/default/logistics_project_data/logistics_source1",
  header "true",
  inferSchema "true"
);

CREATE OR REPLACE TEMP VIEW dfls1_FINAL AS
-- only way to write toDF in SQL - SELECT AND AS(alias), even though we give select here it only update the header does not display anything
SELECT
  shipment_id AS id,
  first_name  AS firstname,
  last_name   AS lastname,
  age,
  role        AS profession
FROM dfls1;
-- we need to display separately
SELECT * FROM dfls1_FINAL;

In [0]:
-- Apply inferSchema and toDF to create a DF and analyse the actual data.
CREATE OR REPLACE TEMP VIEW dfls2 
USING csv
OPTIONS (
  path "/Volumes/workspace/default/logistics_project_data/logistics_source2",
  header "true",
  inferSchema "true"
);
SELECT
  shipment_id AS id,
  first_name  AS firstname,
  last_name   AS lastname,
  age,
  role        AS profession
FROM dfls2 LIMIT 10;
-- SELECT * FROM dfls2 LIMIT 10;
DESCRIBE dfls2;

In [0]:
-- Analyse the duplicate records count and summary of the dataframe.
CREATE OR REPLACE TEMP VIEW dfls1_duplicates AS

SELECT COUNT(*)-COUNT(DISTINCT shipment_id, first_name, last_name, age, role) AS cnt 
        -- 69 - 60
FROM dfls1;
SELECT *
FROM dfls1_duplicates

In [0]:
-- Analyse the duplicate records count and summary of the dataframe.
CREATE OR REPLACE TEMP VIEW dfls1_duplicates AS
SELECT *, COUNT(*) OVER(PARTITION BY shipment_id) AS cnt
FROM dfls1; --RAW

-- Show only duplicates
SELECT *
FROM dfls1_duplicates
WHERE cnt > 1;


In [0]:
SELECT
  COUNT(*) AS total_rows,
  COUNT(DISTINCT shipment_id) AS unique_shipments,
  COUNT(DISTINCT first_name) AS unique_firstnames,
  COUNT(DISTINCT last_name) AS unique_lastnames,
  COUNT(DISTINCT role) AS unique_professions,
  MIN(try_cast(age AS INT)) AS min_age,
  MAX(try_cast(age AS INT)) AS max_age,
  AVG(try_cast(age AS INT)) AS avg_age
FROM dfls1;

###a. Passive Data Munging -  (File: logistics_source1  and logistics_source2)
Without modifying the data, identify:<br>
Shipment IDs that appear in both master_v1 and master_v2<br>
Records where:<br>
1. shipment_id is non-numeric
2. age is not an integer<br>

Count rows having:
3. fewer columns than expected
4. more columns than expected

In [0]:
CREATE OR REPLACE TEMP VIEW df_nonnumberic AS
SELECT *
FROM dfls1
WHERE age RLIKE '[a-zA-Z]' OR shipment_id RLIKE '[a-zA-Z]';

SELECT * FROM df_nonnumberic;

In [0]:
CREATE OR REPLACE TEMP VIEW dfls1_columns
USING text --USING csv:Splits the line into columns, Applies header & schema
-- using text: no split, no header, no schema

-- | Situation                     | Meaning      |
-- | ----------------------------- | ------------ |
-- | Reading a sentence as a whole | `USING text` |
-- | Breaking sentence into words  | `USING csv`  |

OPTIONS (
  path "/Volumes/workspace/default/logistics_project_data/logistics_source1",
  header "true",
  inferSchema "true"
);

CREATE OR REPLACE TEMP VIEW df1_analyzed AS
SELECT
  value,
  size(split(value, ',')) AS col_count
FROM dfls1_columns;

SELECT *
FROM df1_analyzed
WHERE col_count < 5 OR col_count > 5;


In [0]:
CREATE OR REPLACE TEMP VIEW dfls2_columns
USING text --USING csv:Splits the line into columns, Applies header & schema
-- using text: no split, no header, no schema

-- | Situation                     | Meaning      |
-- | ----------------------------- | ------------ |
-- | Reading a sentence as a whole | `USING text` |
-- | Breaking sentence into words  | `USING csv`  |

OPTIONS (
  path "/Volumes/workspace/default/logistics_project_data/logistics_source2",
  header "true",
  inferSchema "true"
);

CREATE OR REPLACE TEMP VIEW df2_analyzed AS
SELECT
  value,
  size(split(value, ',')) AS col_count
FROM dfls2_columns;

SELECT *
FROM df2_analyzed
WHERE col_count <> 7;

###**b. Active Data Munging** File: logistics_source1 and logistics_source2


#####1.Combining Data + Schema Merging (Structuring)
1. Read both files without enforcing schema
2. Align them into a single canonical schema: shipment_id,
first_name,
last_name,
age,
role,
hub_location,
vehicle_type,
data_source
3. Add data_source column with values as: system1, system2 in the respective dataframes

In [0]:
CREATE OR REPLACE TEMP VIEW df_canonical AS
SELECT
  shipment_id,
  first_name,
  last_name,
  age,
  role,
  NULL        AS hub_location,
  NULL        AS vehicle_type,
  'system1'  AS data_source
FROM dfls1;

SELECT * FROM df_canonical;


In [0]:
CREATE OR REPLACE TEMP VIEW df_canonical2 AS
SELECT
  shipment_id,
  first_name,
  last_name,
  age,
  role,
  hub_location,
  vehicle_type,
  'system2'  AS data_source
FROM dfls2;

SELECT * FROM df_canonical2;


In [0]:
CREATE OR REPLACE TEMP VIEW dfunion AS
SELECT
  try_cast(shipment_id AS INT) AS shipment_id,
  first_name,
  last_name,
  try_cast(age AS INT) AS age,
  role,
  hub_location,
  vehicle_type,
  data_source
FROM df_canonical
UNION ALL
SELECT
  shipment_id,
  first_name,
  last_name,
  try_cast(CASE WHEN CAST(age AS STRING) RLIKE '^[0-9]+$' THEN CAST(age AS STRING) ELSE NULL END AS INT) AS age,
  role,
  hub_location,
  vehicle_type,
  data_source
FROM df_canonical2;

SELECT * FROM dfunion

#####2. Cleansing, Scrubbing: 
Cleansing (removal of unwanted datasets)<br>
1. Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role<br>
2. Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name<br>
3. Join Readiness Rule - Drop records where the join key is null: shipment_id<br>

Scrubbing (convert raw to tidy)<br>
4. Age Defaulting Rule - Fill NULL values in the age column with: -1<br>
5. Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN<br>
6. Invalid Age Replacement - Replace the following values in age:
"ten" to -1
"" to -1<br>
7. Vehicle Type Normalization - Replace inconsistent vehicle types: 
truck to LMV
bike to TwoWheeler

In [0]:
CREATE OR REPLACE TEMP VIEW dfunion AS

SELECT
  shipment_id,
  try_cast(age AS INT) AS age,
  first_name,
  last_name,
  role,
  vehicle_type
FROM df_canonical

UNION ALL

SELECT
  shipment_id,
  try_cast(age AS INT) AS age,
  first_name,
  last_name,
  role,
  vehicle_type
FROM df_canonical2;


In [0]:
CREATE OR REPLACE TEMP VIEW df_cleansing AS
SELECT *
FROM dfunion
WHERE shipment_id IS NOT NULL
  AND role IS NOT NULL;
