# Practice Lab M04 (Version 3)
## Data Wrangling and Exploratory Data Analysis

Practice Lab M04 will focus on how to do the data wrangling and exploratory data analysis on big data technologies such as pyspark

## To do

- Use pyspark to perform data wrangling
- Conduct the exploratory data analysis with visulization.


## Tasks 1 Data Wrangling on Pyspark
### Task 1.1 Reading parquet file
We first read the given data source **magic.parquet**. However, it is not in a standard readable format. The parquet file is in columnar storage format for efficient storage purpose (very common in cloud service scenario). Our goal in this task is to read this parquet file.

In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Comment this if the data visualisations doesn't work on your side
%matplotlib inline

plt.style.use('bmh')

In [None]:
!pip install wget
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
!tar xf spark-2.4.0-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('ml-bank').getOrCreate()
# input your code for - read the parquet file by using pyspark 
# print the pyspark dataframe schema
df.show(5)

### Task 1.2 Pyspark operation
After having the dataframe for the given data in pyspark, let's do some wrangling operation by using pyspark

In [None]:
# filter the data on column fWidth where fWidth > 50

In [None]:
# filter the data on column fWidth where fWidth > 50 and fWidth < 70, then print the statistic information of 'fSize' (using describe())

In [None]:
# filter the data on column fWidth where fWidth > 60 and print the statistic information of 'fSize' (using describe())

### Task 1.3 Normalization and Standardization
After reading the data and also filter the data, we will sometime need to perform the normalization and standardization on the data.

In [None]:
# write code for the normalizing the fWidth on min max scaler and create new column 'scaled_fWidth

In [None]:
# How about we make a custom function to scale columns of our choice
# write code for normalizaing multiple column in the same time 
def min_max_scaler(df, cols_to_scale):
  # Takes a dataframe and list of columns to minmax scale. Returns a dataframe.
  for col in cols_to_scale:
    # Define min and max values and collect them
    max_days = df.agg({col: 'max'}).collect()[0][0]
    min_days = df.agg({col: 'min'}).collect()[0][0]
    new_column_name = 'scaled_' + col
    # Create a new column based off the scaled data
    df = df.withColumn(new_column_name, 
                      (df[col] - min_days) / (max_days - min_days))
  return df

In [None]:
# try the above function you defined

In [None]:
# write code for the standardization for the fWidth and create new column 'standardized_fWidth ', save the dataframe called 'df_stand'



In [None]:
# Check the mean to be close to 0 


In [None]:
# And the stddev to be close to 1


### Task 1.4 Pyspark dataframe join
Now we have to join the dataframe. Particularly, we want to join the original dataframe with 'df_stand'.
Firstly, we need to find a ID to join

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *

# Using windowfunction to create rownumber on both original dataframe and 'df_stand'
windSpec = Window.orderBy(lit(1))


In [None]:
# Joining the two dataframe on rownumber and save the final joined dataframe as 'dataframe_pd'



## Task 2 Exploratory Data Analysis
### Task 2.1 Vislauzation for checking distribution 
Now we want to plot the histogram to check the distribution of 'fLength' from 'dataframe_pd'

In [None]:
# df.count() does not include NaN values
df2 = dataframe_pd[[column for column in dataframe_pd if dataframe_pd[column].count() / len(dataframe_pd) >= 0.3]]
del df2['row_count']
print("List of dropped columns:", end=" ")
for c in dataframe_pd.columns:
    if c not in df2.columns:
        print(c, end=", ")
print('\n')
dataframe_pd = df2

Now lets take a look at how the fLength is distributed

In [None]:
#Using sns.distplot to plot the histogram for fLength
print(dataframe_pd['fLength'].describe())


Using pandas dataframe hist() function to plot the distributoins for all the numerical columns
To do so lets first list all the types of our data from our dataset and take only the numerical ones:

In [None]:
print(list(set(dataframe_pd.dtypes.tolist())))
df_num = dataframe_pd.select_dtypes(include = ['float64', 'int64'])
df_num.head()

In [None]:
df_num.hist(figsize=(16, 20), bins=50, xlabelsize=8, ylabelsize=8); # ; avoid having the matplotlib verbose informations

### Task 2.2 Boxplot
Now we have to draw the boxplot to check the distribution on discrete columns

In [None]:
# Discrete the 'fWidth', 'fLength', 'fAlpha' and create new columns (bin is given in notebook)
columns = ['fLength','fWidth','fAlpha']
for col in columns:
  new_column_name = col+'_bin'
  bins = [0, 25, 50, 75, 100]
  labels = [1,2,3,4]
  dataframe_pd[new_column_name] = pd.cut(dataframe_pd[col], bins=bins, labels=labels)

In [None]:
# Using sns.countplot to plot the boxplot for discrete 'fLength'

In [None]:
# Drawing the sns.countplot with x on discrete 'fLength' and y on the 'fConc' (numerical format)

### Task 2.3 (Advanced) Heatmap visualization
We want to group the discrete 'fWidth', 'fLength' to obtain the count for each. However we want to exclude the 'fLength' on bin 1

In [None]:
# Create the group by with pandas
type_grouped = dataframe_pd[dataframe_pd['fLength_bin']>1].groupby(['fLength_bin', 'fWidth_bin']).size()
print(type_grouped)

In [None]:
# Draw the count into heatmap by using ```sns.heatmap()```