# Challenge(1):
----------------------------------
## Subscriber data analysis using pyspark

* The target of this task is to analyze subscriber data from telecommunication source system named DPI.

## Input:
-------------------------------------
* Highly compressed file as tar + gzip and the extension .tgz The data schema provided as below  

<img src="img/ch1_1.jpg">

## Ouput: 
------------------------------------
1. New **CSV** file with the following columns :
    * Account_Number : which map to the subscriber id in the input file 
    * MostUsedServiceName_1 : the 1st most used service by the subscriber 
    * MostUsedServiceName_2 : the 2nd most used service by the subscriber 
    * Event_Type : Hardcoded to 'Mobile'
    * IMEI : Map to IMEI in the input file
    * MAC_Address : Hardcoded to N/A
    * Access_Point : Hardcoded to N/A
    * Total_Bytes : Sum(TotalBytes) for the subscriberID 
    * Total_Count : Count of transactions
    * Total_Time : Difference between (min(StartTime), max(EndTime)) for the subscriberID   
    * Insertion_Date : Current_Date (yyyy-mm-dd) 

2. **dpiDuplicate** file which contains the duplicate transaction in the input file 

3. **dpiRejection** file which contains the rows of a bad subscriber_id **Which i assumed it will be an number**

## Task Description:
--------------------------------------


* load the required spark apis and utilities 

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import * 
import pyspark.sql.functions as func
from collections import Counter

* Define utilities and helper functions 
    * see(s): function to print a value 
    * see(s,v): function to pring a key and value 
    * get_most_common(arr, ith): function to return the ith commen used item in a list 
    * is_int(num): check if a value is an integer or not 

In [2]:
def see(s):
    """
    Function to print a value to the console 
    """
    print("---- %s -----" %s)    
    
def see(s, v):
    """
    Function to print a key and value to the console 
    """
    print("---- %s -----" %s)
    print(v)
    
def is_int(num):
    """
    Function to check if a value is an integer or not 
    """
    try:
        if num is None:
            return False;
        int(num)
        return True
    except ValueError:
        return False
    

def extract_result(services):
    """
    Function to extract the 1st and  the 2nd most used services
    """
    services = list(services)
    if len(services) == 0:
        return ['','']
    elif len(services) == 1:
        return [services[0],'']
    else:
        return [services[0],services[1]]

* Register the helper functions to be a user defined funciton in spark

In [3]:
is_int_col = func.UserDefinedFunction(func=is_int, returnType=BooleanType())

* define the path of the input file 
* define the dir path of the output

In [4]:
inputFilePath = "Data/MobileProtocol.20170327T221500.27784.udr"
ouputDir = ""

* start the spark session 

In [5]:
spark = SparkSession.builder.appName("challenge1").master("local").getOrCreate()

* Read the input file which represent in the comma seperated values file format 

In [6]:
mobile_df = spark.read.format("csv").option("header", "true").load(inputFilePath)

* Here in this step i will exclude the bad rows which contains invalid subscriber_id which i assumed it will unique number for the subscriber and store it into **mobile_df_excluded** dataframe 

In [7]:
mobile_df_excluded = mobile_df.filter(is_int_col(mobile_df['SubscriberId']) == False)

* Then i will filter the dataframe to contains only the valid subscriber_ids 
* The i removed the duplicates to avoid the duplication problem 

In [8]:
subscriber_mobile_df = mobile_df.filter(is_int_col(mobile_df['SubscriberId']))
unique_subscriber_mobile_df = subscriber_mobile_df.dropDuplicates()

* In this step asked to exclude the duplicate rows into a seperate file and to do that i tried to show the power of dataframe apis which we can easy convert the dataframe into a rdd and the vice versa.

* The way i was thinking is to form a paired rdd where the key is the entire row and the value is 1 then reduce this rdd by value to count the duplicates row then filters the row which count is greater than 1 and save them into a rdd to be written later.

In [9]:
duplicated_subscriber_mobile_rdd = subscriber_mobile_df.rdd.map(lambda line: (line,1)).reduceByKey(lambda v1,v2: v1+v2)
duplicated_subscriber_mobile_rdd = duplicated_subscriber_mobile_rdd.filter(lambda line: line[1] != 1)
duplicated_subscriber_mobile_rdd = duplicated_subscriber_mobile_rdd.map(lambda line: line[0])

* In this step i find the dataframe apis more easy to use and for better performance to summarize the subscriber data and aggregrate the following cols :
    * TotalBytes : The total bytes used by the subscriber
    * Total_Count: The total count of transaction for the subscriber 
    * Insertion_Date : The current date
    * Total_Time_In_Mins: The total time in mintues for the subscriber transactions 

In [10]:
mobile_subscriber_summary_df = unique_subscriber_mobile_df.groupBy("SubscriberId").agg(
func.sum("TotalBytes").alias("Total_Bytes"),
func.count("TotalBytes").alias("Total_Count"),
func.current_date().alias("Insertion_Date"),
func.first('IMEI').alias('IMEI'),
((func.unix_timestamp(func.max("EndTime"))-func.unix_timestamp(func.min("StartTime")))/60).alias('Total_Time_In_Mins'))

* Now i will constrcut the subscriber Most frequent used services dataframe for more details please read teh Notes section

In [11]:
subscriber_services = unique_subscriber_mobile_df.rdd.map(lambda line: ((line[7], line[19]), 1)).reduceByKey(lambda v1,v2: v1+v2)
subscriber_services = subscriber_services.map(lambda line:(line[1],line[0])).sortByKey(False)
subscriber_services = subscriber_services.map(lambda line: line[1])
subscriber_services = subscriber_services.groupByKey().map(lambda line: (line[0], extract_result(line[1])[0],extract_result(line[1])[1]))
subscriber_services = subscriber_services.toDF(["SubscriberId", "MostUsedServiceName_1", "MostUsedServiceName_2"])

* Then i will join subscriber_id services dataframe with the summary dataframe 

In [12]:
mobile_subscriber_summary_df = mobile_subscriber_summary_df.join(subscriber_services,['SubscriberId'])

* In this step i will add the hardcoded columns like **("Event_Type", "MAC_Address", "Access_Point")**

In [None]:
mobile_subscriber_summary_df = mobile_subscriber_summary_df.withColumn("Event_Type", func.lit("Mobile"))
mobile_subscriber_summary_df = mobile_subscriber_summary_df.withColumn("MAC_Address", func.lit("N/A"))
mobile_subscriber_summary_df = mobile_subscriber_summary_df.withColumn("Access_Point ", func.lit("N/A"))

* Write the subscriber summary data to the output file 

In [None]:
mobile_subscriber_summary_df.write.csv(ouputDir+"CH1_Job1")

* Write the exclued invalid subsriber rows  to the output file 

In [None]:
mobile_df_excluded.rdd.saveAsTextFile(ouputDir+"dpiRejection1")

* Write the duplicates subscriber transaction rows to the output file 

In [None]:
duplicated_subscriber_mobile_rdd.saveAsTextFile(ouputDir+"dpiDublicate1")

## Notes & Enchancements 
--------------------------------------------
* As i mentioned i did an assumption that the subscriber_id services are not too big to be collected as list but anthoer way come to my mind is to: 
    * make a new paired rdd where the key is the (subscriber_id, service_name) and the value is 1 then reduce this rdd by key to count for each key how much it was commen then sort the this rdd descending.
    * After grouping the resulted rdd by key we will get a list a list of unique services names sorted in desc order 
    * Map the grouped rdd and select the first and the second elements if avaliable and if not return empty strings instead
    * Then we can join the resulted paired rdd **subscriber_services** with **mobile_subscriber_summary_def** on the subscriber_id.


In [None]:
subscriber_services = unique_subscriber_mobile_df.rdd.map(lambda line: ((line[7], line[19]), 1)).reduceByKey(lambda v1,v2: v1+v2)
subscriber_services = subscriber_services.map(lambda line:(line[1],line[0])).sortByKey(False)
subscriber_services = subscriber_services.map(lambda line: line[1])
subscriber_services = subscriber_services.groupByKey().map(lambda line: (line[0], extract_result(line[1])[0],extract_result(line[1])[1]))
subscriber_services = subscriber_services.toDF(["SubscriberId", "MostUsedServiceName_1", "MostUsedServiceName_2"])

* In this task i used pyspark dataframe due to it's better performance also i tried to swith between dataframe and rdd and vice versa 

* One final item to mention that i will have alot of items to optimize already :( like first point i mentioned and utilize the dataframe perforamce i will work on them seperately and still there is a room of enhancement.