### Data Engineering Capstone Project

#### Project Summary
   This project is to analyse the time spend in house holds so the government organizations can better understand the popular activities among the people of different states and utilize the people accordingly

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
This project specifically used to analyse the time spent by each person in the houses of 6 states and identify which activities performed by them. With this analysis the government organization can get various KPI's such that what the popular activity in each states,How much time spent on each activity, what factors effecting the activities between states. For ex: Lets take an activity as "Fetching Of Water". In the state of Tamilnadu the time taken is 5 min where as in the state of Haryana it's 1 hour. So with this data the Government Organisations can asses why it is taking time in Haryand and take the necessary action to reduce the time. \

The end solution for this project is to get the key activities that is happening in each house holds and how it is effecting the livelihood of life among various states.With this the Government organisations can take necessary steps to give better life to the people by reducing the complexity of certain activities like "Fetching of Water" by planning some canals/wells in the nearby areas.

Tools used are as follows :

        1. Amazon s3 for storage
        2. Amazon EMR with Spark for analysis
        3. The data will be read from s3 storage from CSV files converted into Parquest file and again saved to S3 for further analysis
        4. PowerBI for Dashboard with various metrics

#### Describe and Gather Data 
As the part of this project the data which will be explored is from https://www.kaggle.com/arjunprasadsarkhel/indian-time-use-survey?select=TimeUse-India.csv.

This data has information regarding various activities performed in the Indian households on different states/districts.There are 3 parts of this data as following:

        1. Acitivity Codes - which describes the Acitivity
        2. Different Indian states.
        3. The collection of data regarding who are the households and what are the activities being performed and also informations like
            which state they belong,duration etc. 
            
The TimeUse-India has **3.3M** rows that will be analysed via Spark


In [5]:
from pyspark.sql.types import *

'''Read the activity CSV from the s3 storage and save it in a Dataframe'''
activity_code_schema = StructType([
    StructField("activity_code",IntegerType()),
    StructField("activity",StringType())    
]) 

activity_df= spark.read.csv("s3a://indiantimedataset/DataSet/AcitivityCodes.csv",schema=activity_code_schema,header=True)
activity_df.printSchema()
activity_df.show(5)

In [None]:

+-------------+--------------------+
|activity_code|            activity|
+-------------+--------------------+
|          111| Ploughing, prepa...|
|          112| Sewing. planting...|
|          113| Application of m...|
|          114|            Weeding |
|          115| Supervision of w...|
+-------------+--------------------+

In [None]:
'''Read the state code CSV and save it in the Dataframe'''

state_code_schema = StructType([
    StructField('state',StringType()),
    StructField('state_code',IntegerType())
])
state_df= spark.read.csv("s3a://indiantimedataset/DataSet/StateCodes.csv",header=True,schema=state_code_schema)
state_df.printSchema()
state_df.show(5)

In [None]:
+----------------+----------+
|           state|state_code|
+----------------+----------+
|        Haryana |         1|
| Madhya Pradesh |         2|
|        Gujarat |         3|
|          Orissa|         4|
|      Tamil Nadu|         5|
+----------------+----------+

In [None]:
from pyspark.sql import functions as F

'''Read the Time Used CSV CSV and save it in the Dataframe'''

activities_schema = StructType([
    StructField('_c0',StringType()),
     StructField('Key_membno',StringType()),
     StructField('Key_hhold',StringType()),
     StructField('Rec_id',StringType()),
     StructField('Schedule_ID',StringType()),
     StructField('Schedule_No',IntegerType()),
     StructField('sector',IntegerType()),
     StructField('subround',IntegerType()),
     StructField('subsample',IntegerType()),
     StructField('State',IntegerType()),
     StructField('District',IntegerType()),
     StructField('district_class',IntegerType()),
     StructField('Tehsil_Town',IntegerType()),
     StructField('stratum',IntegerType()),
     StructField('Vill_Blk',IntegerType()),
     StructField('sub_blk',IntegerType()),
     StructField('stage2_stratum',IntegerType()),
     StructField('Hholdno',IntegerType()),
     StructField('membno',IntegerType()),
    StructField('age',IntegerType()),
    StructField("type_of_day",IntegerType()),
    StructField("hour_serial_no",IntegerType()),
    StructField("activity_serial_no",IntegerType()),
    StructField("hour_from",IntegerType()),
    StructField("hour_to",IntegerType()),
     StructField("multiple_activity",IntegerType()),
     StructField("actual_time_spent_minutes",IntegerType()),
     StructField("within_outside_household",IntegerType()),
    StructField("activity",IntegerType()),   
    
])

activities_df = spark.read.csv("s3a://indiantimedataset/DataSet/time_use_partial.csv",header=True,schema=activities_schema)
activities_df = activities_df.select('Key_membno','Key_hhold','State','District','district_class','age','type_of_day','hour_from','hour_to',
                                  'multiple_activity','actual_time_spent_minutes','within_outside_household','activity')

activities_df.printSchema()
activities_df.show(5)


In [11]:
#write to parquet
activities_df.write.partitionBy('state','age','activity').parquet("s3a://indiantimedataset/output/time_use_analysis_part.parquet")

### Step 2: Explore and Assess the Data

#### Cleaning Steps
Since the analysis is heavily dependent on the key data such as Age,Activity and State these should checked to have proper values as following.

    1. Age should not be equal to 0 and between 0 and 100
    2. State cannot be 0
    3. activity cannot be 0

In [None]:
#data cleaning
activities_df = activities_df.filter((activities_df['age'] != 0 & (activities_df['age']>0 &activities_df['age']<100)) & (activities_df['state'] != 0) | (activities_df['state'] != None) 
                                     & (activities_df['activity'] != 0))

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Attached document DataModel.pdf


#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
#Data analysis

'''Create a table view for all the three DataFrames to perform query on them and to do the required analyzes. This will be helpful to 
 extract the required data ouf it and store it a Fact Table'''

time_use_df= spark.read.parquet("s3a://indiantimedataset/output/time_use_analysis.parquet");

time_use_df.createOrReplaceTempView("timeuse")
state_df.createOrReplaceTempView("state")
activity_df.createOrReplaceTempView("activitycode")

#get actual state name and activity name along with code
time_use_data = spark.sql("""select *,s.State state_name,a.Activity as activity_name from timeuse t inner join activitycode a on t.activity = a.activity_code \
                             inner join state s on s.state_code = t.State""")


time_use_data.spark.parquet("s3a://indiantimedataset/output/time_use_analysis_data.parquet")
time_use_data.show(5)
time_use_data.createOrReplaceTempView("time_use_data")

In [None]:
+----------+-----------+--------+--------------+-----------+---------+-------+-----------------+-------------------------+------------------------+-----+---+--------+-------------+--------------------+----------+----------+----------+--------------------+
|Key_membno|  Key_hhold|District|district_class|type_of_day|hour_from|hour_to|multiple_activity|actual_time_spent_minutes|within_outside_household|state|age|activity|activity_code|            activity|     state|state_code|state_name|       activity_name|
+----------+-----------+--------+--------------+-----------+---------+-------+-----------------+-------------------------+------------------------+-----+---+--------+-------------+--------------------+----------+----------+----------+--------------------+
| 1.422E+12|14220001102|       2|             1|          1|        4|      5|                2|                       60|                       1|    6| 40|     911|          911|  Night sleep/ess...| Meghalaya|         6| Meghalaya|  Night sleep/ess...|
| 1.422E+12|14220001102|       2|             1|          1|        5|      6|                2|                       60|                       1|    6| 40|     911|          911|  Night sleep/ess...| Meghalaya|         6| Meghalaya|  Night sleep/ess...|
| 1.422E+12|14220001102|       2|             1|          2|        4|      5|                2|                       60|                       1|    6| 40|     911|          911|  Night sleep/ess...| Meghalaya|         6| Meghalaya|  Night sleep/ess...|
| 1.422E+12|14220001102|       2|             1|          2|        5|      6|                2|                       30|                       1|    6| 40|     911|          911|  Night sleep/ess...| Meghalaya|         6| Meghalaya|  Night sleep/ess...|
| 1.422E+12|14220001102|       2|             1|          1|        3|      4|                2|                       60|                       1|    6| 40|     911|          911|  Night sleep/ess...| Meghalaya|         6| Meghalaya|  Night sleep/ess...|
+----------+-----------+--------+--------------+-----------+---------+-------+-----------------+-------------------------+------------------------+-----+---+--------+-------------+--------------------+----------+----------+----------+--------------------+

#### 4.2 Data Quality Checks
Since the data will be saved into the S3 storage in Parquet format with analysed data the following Quality checks are applicable:

 1. The data should not be empty
 2. All the state column should have state name value
 3. All the Activities will have proper activity name
 


In [None]:
time_use_analysis_df =  spark.read.parquet("s3a://indiantimedataset/output/time_use_analysis_data.parquet");

if time_use_analysis_df.count() > 0:
    raise ValueError("Data is empty")

time_use_analysis_check_state = time_use_analysis_df.filter(time_use_analysis_df['state'] == 0)
    
if time_use_analysis_check_state.count() > 0:
    raise ValueError("State value is missing for some records")
    
time_use_analysis_check_activity = time_use_analysis_df.filter(time_use_analysis_df['activity'] == 0)

if time_use_analysis_check_activity.count() > 0:
    raise ValueError("Activity is missing for some rows")

In [None]:
Sample output when data is empty :
    
An error was encountered:
Data is empty
Traceback (most recent call last):
ValueError: Data is empty

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

In [None]:
The technologies used are as follows :
    
    1. AWS S3 for storage
    2. EMR with Spark for analysis
    3. PowerBI for dashboard
    
AWS S3 is the popular and cheap storage compared to Datawarehouse like Redshift, where it can contain any amount of data and need not to be worried about capacity. 
So for this analysis this should enough to do the analysis. Also Spark has inbuilt functions to read data from S3. Spark uses Parallelization and RDD to store the data
and it is popular tool to analyze the data in 100x speed thatn Hadoop. EMR is the offering from AWS where it gives VM's with preconfigured Spark, so that we can use it
to run analysis. PowerBI is a popular tool to create dash with various kind of charts for the data that will the Visual representation of the kPIs.

The Data should be updated once in every 3 month so that the Government Organisation can ascertain the improvement activities that are happening in the required
state/districts. 

If the data is increased by 100X :
    
Spark will still be used for Data Manipulation process. Since spark automatically partition the data on the cores available and it is 
10x faster than Hadoop also doing in-memomry processing the data will still be manipulated using spark.

The Apache Airflow can be used to schedule the tasks at 7 am daily run and the data can be processed by splitting across year and month.

 dag = DAG(
        dag_id="generated_dag_id",
        start_date=(2021,07,15,07,00,00)
        schedule_interval="@daily"
    )

In order to support 100+ concurrent users rather than storage,Datawarehouse like Redshift would be better option to store since it can support data upto petabytes and 
having auto scalable options. The replication can also been turned on to support the users across regions and to avoid any data loss.

    

In [None]:
Sample Queries and Results :

In [None]:
#get unique activities on each state
unique_activity_per_state = spark.sql("""select state_name,activity_name from time_use_data group by state_name,activity_name""")
unique_activity_per_state.show(5)

In [None]:
+----------+--------------------+
|state_name|       activity_name|
+----------+--------------------+
| Meghalaya| Ploughing, prepa...|
| Meghalaya|  Assisting depen...|
| Meghalaya|  Attendance at p...|
| Meghalaya| Collection of ra...|
| Meghalaya|  Night sleep/ess...|
+----------+--------------------+

In [None]:
#get the popular activity
popular_activity =spark.sql("""select activity_name,count(activity_name) activity_count from time_use_data group by activity_name order by activity_count desc limit 1""")
popular_activity.show(1)

In [None]:
+--------------------+--------------+
|       activity_name|activity_count|
+--------------------+--------------+
|  Night sleep/ess...|           425|
+--------------------+--------------+