# Data Wrangling with Spark
Welcome to assignment-2-part-2 for the BDA course where you will use spark together with pandas to 
perfom common data processing tasks which are often part of a larger data science project. Please use the ```simulated_cdrs``` dataset.

By the end of the asssignment, you will have accomplished the following:
- how to setup a data science project 
- appreciate effect of your design choices on effiency when dealing with large datasets
- appreciate that how you sample data will vary alot depending on the nature of the data
- ability to manipulate spark dataframes, use spark user defined functions
- ability to switch betweeen spark dataframes which are distributed and pandass dataframes (which arent) as needed


For this assignment, please use the [pyspark API documentation](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html) to get details about specific functions when you use them.

## Python setup

In [65]:
# import other packages you need for this assignment
from datetime import datetime
import numpy as np
from pathlib import Path
import seaborn as sns

## 1-Sampling data
When working with large datasets, you often need to sample the data so that you can 
build and test your awesome algorithms on a smaller dataset without having to wait 10 minutes or hours depending on what you working with. However, sampling is not as straightfoward as you think, when the data is large and the sampling process complicated, you can even get stuck in the sampling stage. In this exercise, you will explore three strategies to achieve the same goal of sampling data. You will compare the efficiency of the three approaches.

### 1-First sampling approach: isin()
Given a list of users (their ```user_id```) that we have sampled from somewhere (in this case a pandas dataframe),
we can use the function ```isin()``` on the spark dataframe (i.e the large input dataframe) and apply ```isin()``` 
to keep only the users in the list.
### 2-Second sampling approach: join
Once we have a list of user_id's whose data we need, we can convert that into a spark dataframe, 
then join with the input large dataframe. This will give us the required data, assuming we use thee correct join. However, joins can be very slow if you have a lot of data because its a shuffle operation.
### 3-Third sampling approach: broadcast join
When joining, if you have one smaller dataset and its small enough to fit into the memory of each worker, we can turn ShuffleHashJoin or SortMergeJoin into a BroadcastHashJoin. In broadcast join, the smaller dataFrame will be broadcasted to all worker nodes. Whats broadcasting?
> Broadcast variables are read-only shared variables that are cached and available on all nodes in a cluster in-order to access or use by the tasks. Instead of sending this data along with every task, PySpark distributes broadcast variables to the workers using efficient broadcast algorithms to reduce communication costs.

When joining, we can use the ```BROADCAST hint``` which  tells Spark to broadcast the smaller DataFrame when joining them with the bigger one. See how to use hints [here](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.hint.html).

In [64]:
def sample_users(sdf, min_events, num_users, method, output_csv):
    """
    Sample users with minimum number of events
    Arguments:
    url -- spark dataframe tot work on
    num_users -- number of users to sample
    min_events -- minimum number of events
    method -- eitheer join based meethod or isin() query
    output_csv -- full path to ssave data for sampled users
    """
    # =====================================
    #       MARKING COMMENT 
    # A total of 13 points in this function
    # =====================================
    
    # Use groupBy on appropriate column to get number of events
    # per user (~ 1 line) and conveert the resulting dataframe to 
    # a pandas dataframe
    pdf = None
    
    # Keep users with min_events as required
    # Also, keep only num_users (~2 lines)
    pdf = None
    pdf2 = None
    
    # ===================================
    # RETRIEVE DATA FOR THE SAMPLED USERS
    # ===================================
    # using the isin() method as follows:
    # 1. Get a list of sampled users from pdf2 above
    # 2. Use isin() method to sample data
    # 3. Save data to CSV file.
    # replace pass with your code (~3 lines)
    if method == "isin":
        pass
    elif method == "join-broadcast":
        # In order to use join to select the data, do the following:
        # 1. dataconvert the pandas dataframe to spark
        # 2. join the two dataframes making sure you select the correct join type
        # this is broadcast join, so use hint as decsribed in docs
        # 3. Save the data
        # replace path with your code (~3-4 lines)
        # If you have issues with conversion from pandas to spark, use a scheme
        pass
    else:
        # this is regular join without broadcast
        pass

In [None]:
def compare_running_times(input_csv_file, output_dir):
    """
    Run the sample_users funcition above using three
    different approaches and report the one with fasest
    run time
    Arguments:
    input_csv_file -- full path to the input CSV file
    output_dir --  directory (as a Path object) to save the sampled files
    """
    
    # create sparksession object and load the dataframe
    spark = None
    df = None
    
    methods = ['isin', "join-broadcast", 'join']
    
    # dictionary to hold time taken results
    time_taken = None
    
    # Loop through the methods and do the following:
    # 1. Create 
    for m in methods:
        # Use joinpath() method on output_dir to create outcsv for this method
        # the CSV file should have name: sample_method.csv
        output_csv = None
        # use datetime.now() method to records start time
        start = None
        # Call the sample_users() function (~1 line)
        
        # Record finish time 
        end = None
        
        # Calculate duration in minutes
        # Use total_seconds() function on the time difference
        duration = None
        
        # Add the duration to the results dictionary
        # with method as key (~1 line)
        
    
    # Get the best and worst running times
    # Please use the dictionary to get the maximum and minimun
    # values as required
    best = None
    worst = None
    
    print('Best approach is {} which took {} minutes'.format(best, int(time_taken[best])))
    print('Worst approach is {} which took {} minutes'.format(worst, int(time_taken[worst])))
    
    spark.stop()

## 2-Data exploration using spark
In this part of the asssignment, you will get to perfom basic operations on a spark dataframe such 
as adding new columns, droping columns and more. Once you have summarized the data and its small enough, 
you can convert it to a pandas dataframe and do some analysis with pandas. Please complete the functiton below by 
following the steps below.

### 1-Preprocess the data
In this step, you will add new columns, drop columns and rename some columns. You will use the ```user defined function (udf)``` to add a date and datetime column to be used later in the analysis. This is because the original ```cdr_datetime``` is a sstring and is not time aware in Python. For this, please read on how to [define and use udf in spark](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.udf.html#pyspark.sql.functions.udf)

### 2-Get number of days
One important thing we would like to know is how many days of data do we have. 
You will use the date column created above to get the number of days. During this process, 
note that you will be working with spark's ```Row``` data type.

### 3-Get a distribution of calls by hour
Also important to know is how many calls are made on each hour. To this end, 
you will use aggregate function to generate this and out the results as a heatmap for easier visualization.
### 4-User attributes
Finally, you will get one user attribute which is number of calls per day. Once you get this, you report the mean and median number of calls per day for users. 

In [59]:
def explore_data_with_spark(sdf=None, output_plot_file=None, output_heatmap=None):
    """
    For quick examination of user activity, lets generate
    user call count and do a simple plot.
    """
    # ======================================
    # DO QUICK PRE-PROCESSING
    # ======================================
    # 1.rename the following columns:
    # cdr datetime =>cdr_datetime", "last calling cellid"=>"cell_id")
    # "call duration"=>"call_duration"
    df2 = None
    
    # drop cdr type column
    df3 = None
    
    # 2. add datetime and date by converting from string cdr_datetime
    # Use Spark UDF to add date and datetime
    date_format = '%Y%m%d%H%M%S'
    
    # define udf to convert from string to Python datetime
    # To convert from string to datetime, use function  datetime.strptime()
    # Use spark TimestampType() data type as the output
    add_datetime = None
    
    # For date, same instructions ass above but use spark DateType()
    add_date = None

    # Now add new columns for datetime and 'date' by calling 
    # the udfs above
    df4 = None
    df5 = None
    
    # ======================================
    # NUMBER OF DAYS IN THE DATA
    # ======================================
    # Get how many days are in the data by doing the following
    # 1. Get unique dates by using distinct() on the spark dataframe
    # 2. You can either use collect() on the result or covert to pandas dataframe
    # 3. Use sorting to get the first (earliest) and last date 
    # These instructions assume you are using collect() which wil give us
    # spark Row objects. Otherwise, if you end up using pandas, please 
    # write your 
    dates_rows = None
    # use sorted() and list comprehension to sort the list of dates
    sorted_dates = None
    diff = None
    # You can use days() function on the diff object
    num_days = None

    # ======================================
    # AGGREGATE TO GET CALL COUNT BY HOUR
    # ======================================
    # define  udf to add hour column using attribute hour
    # on a datetime
    add_hr = None
    
    # define udf to add weekday as a number 
    # using python function weekday() on a date object
    add_wkday = None
    
    # create dict with nums (starting from 0) as keys and 
    # days as values
    day_dict = None
    
    # add hour column to the spark dataframe
    dfHr = None
    
    # add wkday column
    dfHr2 = None
    
    # group by to get week day and hour to get
    # total events by day and hour and convert result
    # to pandas dataframe
    dfWkDay = None
    
    # use pandas map() function on column wkday
    # to get the wkday name and add that as a column
    dfWkDay['wkday_name'] = None
    
    dfWkDay.drop(labels=['wkday'], axis=1, inplace=True)
    dfWkDayPivot = dfWkDay.pivot(index='wkday_name', columns='hr', values='count')
    
    # Create a heatmap with sns.heatmap() and the dfWkDayPivot dataframe
    ax = None
    
    # Save output to file (~ 1 line)
    
    
    # ==============================================
    # GET USER ATTRIBUTES: CALLS PER DAY
    # ============================================
    # group user and count number of events
    # convert resulting spark dataframe to pandas
    dfGroup = None

    # create a distribution plot of user call count using
    # seaborn
    ax = None

    # save plot as png file(~ 1 line)

    # report average number calls per day for each user
    # by grouping by user_id and date
    # and convert resullt to pandas dataframe
    dfGroupDay = None
    
    
    # get mean and median days based on
    # grouped dataframe above
    mean = None
    median = None

    # return mean, median, num_days

## 3-Put it all together.
From the ```sample_users()``` function, you should have sample data for subset of users 
which you can use with the ```explore_data_with_spark()``` function. Note that you can make the number of users very small at the beginning ass you test the function.

In [61]:
# ==============================
# SAMPLING USER DATA
# ==============================
# setup input file and output directory
input_cdrs = None

# Convert the output dir str into a Path object
# using Path() function
outdir = None

# Call the compare_running_times() function (~ 1 line)


# ==============================
# DATA EXPLORATION
# ==============================
# create spark session
spark = None

# please use the sample users from previous function to reduce computation time
# you can start experimenting witth very few user data
# load a spark dataframe
df = None

# provide full paths to save distplot and heatmap
out_heatmap = None
out_distplot = None

# Call the explore_data_with_spark() function
mean, median, num_days = None

### Question based on ```sample_users() function```
1. Why are we sampling the data the way we are doing. Why can't we just randomly sample 
from the input dataframe ```sdf``` without worrying about the user id's and grouping users?
2. What other spark function(s) (on a dataframe) could we have explored to achieve the same goal?

## Congratulations on completing this part of assignment!
Here some notes on how the assingment will be marked. Full marks will be 
achieved based on two main criteria:
- Code is able to run and saves all the outputs as required
- The saved outputs are as expected: heatmap and distribution plot
- The returned values for ```mean, median, num_days``` are accurate

Also, please answer the question above in the same below the question. It will be marked.