## Pyspark DataFrame
1. Accessing PySpark from Jupyter Notebook
    * jupyter notebook usually accessses the defualt python version
    * use `findspark` to find the spark as long as the SPARK_HOME environment varaible is defined
    * after `import findspark` and use `findspark.init()` to locate Spark process, we can import pyspark
2. Activate the spark cluster with project name: `SparkSession`
3. Read csv
    * without `inferSchema=True`, the data format will be default string
    * `show()` print the dataframe in readable form
4. Deal with columns
    * get all columns
    * select column to transform
    * add/drop/rename columns
5. Describe data set with summary
6. Handling missing value
7. Filter operation
8. Groupby and aggregate functions 

In [1]:
#### 1. Accessing PySpark from Jupyter Notebook
import findspark
findspark.init()

In [2]:
#### 2. Activate the spark cluster with project name: `SparkSession`
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('pyspark_practice_dataframe').getOrCreate()
spark

In [12]:
#### 3. Read csv
pyspark_df = spark.read.csv('./Data/customer.csv', header='true')
print(pyspark_df.printSchema())
pyspark_df = spark.read.csv('./Data/customer.csv', header='true', inferSchema=True)
print(pyspark_df.printSchema())
print(f'dataframe type:{type(pyspark_df)}')
pyspark_df.show()

root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- location: string (nullable = true)

None
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- location: string (nullable = true)

None
dataframe type:<class 'pyspark.sql.dataframe.DataFrame'>
+------+---+------+--------+
|  name|age|salary|location|
+------+---+------+--------+
|   amy| 22| 28000|  taipei|
| jacky| 20| 25000|taichung|
| meggy| 30| 35000|  tainan|
|  adam| 33| 34000|  taipei|
|sophie| 40| 60000|  taipei|
|claire| 25| 30000|  tainan|
|sherry| 27| 51000|    null|
| phlip| 31|  null|taichung|
| fanny| 29|  null|    null|
+------+---+------+--------+



In [4]:
#### 4. Deal with columns (select/head/type)
print(f'columns: {pyspark_df.columns}')
print()
print('head of df:', pyspark_df.head(3), sep='\n')
print()
print('type of seleted columns:', type(pyspark_df.select(['name','age'])))
print()
print('dtype of seleted columns:', pyspark_df.dtypes)
print()
pyspark_df.select(['name','age']).show()

columns: ['name', 'age', 'salary', 'location']

head of df:
[Row(name='amy', age=22, salary=28000, location='taipei'), Row(name='jacky', age=20, salary=25000, location='taichung'), Row(name='meggy', age=30, salary=35000, location='tainan')]

type of seleted columns: <class 'pyspark.sql.dataframe.DataFrame'>

dtype of seleted columns: [('name', 'string'), ('age', 'int'), ('salary', 'int'), ('location', 'string')]

+------+---+
|  name|age|
+------+---+
|   amy| 22|
| jacky| 20|
| meggy| 30|
|  adam| 33|
|sophie| 40|
|claire| 25|
|sherry| 27|
| phlip| 31|
| fanny| 29|
+------+---+



In [5]:
#### 4. Deal with columns (add/drop/rename)
pyspark_df = pyspark_df.withColumn('raised_salary', pyspark_df['salary']*1.05) # add new column: raised_salary
pyspark_df.show()
pyspark_df = pyspark_df.drop('raised_salary') # drop column: raised_salary
pyspark_df.show()
pyspark_df = pyspark_df.withColumnRenamed('salary', 'new_salary') # rename column: salary --> new_salary
pyspark_df.show()

+------+---+------+--------+-------------+
|  name|age|salary|location|raised_salary|
+------+---+------+--------+-------------+
|   amy| 22| 28000|  taipei|      29400.0|
| jacky| 20| 25000|taichung|      26250.0|
| meggy| 30| 35000|  tainan|      36750.0|
|  adam| 33| 34000|  taipei|      35700.0|
|sophie| 40| 60000|  taipei|      63000.0|
|claire| 25| 30000|  tainan|      31500.0|
|sherry| 27| 51000|    null|      53550.0|
| phlip| 31|  null|taichung|         null|
| fanny| 29|  null|    null|         null|
+------+---+------+--------+-------------+

+------+---+------+--------+
|  name|age|salary|location|
+------+---+------+--------+
|   amy| 22| 28000|  taipei|
| jacky| 20| 25000|taichung|
| meggy| 30| 35000|  tainan|
|  adam| 33| 34000|  taipei|
|sophie| 40| 60000|  taipei|
|claire| 25| 30000|  tainan|
|sherry| 27| 51000|    null|
| phlip| 31|  null|taichung|
| fanny| 29|  null|    null|
+------+---+------+--------+

+------+---+----------+--------+
|  name|age|new_salary|locati

In [6]:
#### 5. Describe data set with summary
pyspark_df.describe().show()

+-------+------+------------------+------------------+--------+
|summary|  name|               age|        new_salary|location|
+-------+------+------------------+------------------+--------+
|  count|     9|                 9|                 7|       7|
|   mean|  null|28.555555555555557| 37571.42857142857|    null|
| stddev|  null| 6.023103666530884|12972.498382567419|    null|
|    min|  adam|                20|             25000|taichung|
|    max|sophie|                40|             60000|  taipei|
+-------+------+------------------+------------------+--------+



In [7]:
#### 6. Handling missing value (drop)
pyspark_df.na.drop().show() # any row with na will be dropped
pyspark_df.na.drop(how='all').show() # whole row with na will be dropped
pyspark_df.na.drop(how='any', thresh=3).show() # at least 2 not na value will be reserved
pyspark_df.na.drop(how='any', subset=['new_salary']).show() # new_salary without na will be reserved

+------+---+----------+--------+
|  name|age|new_salary|location|
+------+---+----------+--------+
|   amy| 22|     28000|  taipei|
| jacky| 20|     25000|taichung|
| meggy| 30|     35000|  tainan|
|  adam| 33|     34000|  taipei|
|sophie| 40|     60000|  taipei|
|claire| 25|     30000|  tainan|
+------+---+----------+--------+

+------+---+----------+--------+
|  name|age|new_salary|location|
+------+---+----------+--------+
|   amy| 22|     28000|  taipei|
| jacky| 20|     25000|taichung|
| meggy| 30|     35000|  tainan|
|  adam| 33|     34000|  taipei|
|sophie| 40|     60000|  taipei|
|claire| 25|     30000|  tainan|
|sherry| 27|     51000|    null|
| phlip| 31|      null|taichung|
| fanny| 29|      null|    null|
+------+---+----------+--------+

+------+---+----------+--------+
|  name|age|new_salary|location|
+------+---+----------+--------+
|   amy| 22|     28000|  taipei|
| jacky| 20|     25000|taichung|
| meggy| 30|     35000|  tainan|
|  adam| 33|     34000|  taipei|
|sophie|

In [8]:
#### 6. Handling missing value (fill with specific number)
pyspark_df.na.fill(-999999, ['new_salary']).show()

+------+---+----------+--------+
|  name|age|new_salary|location|
+------+---+----------+--------+
|   amy| 22|     28000|  taipei|
| jacky| 20|     25000|taichung|
| meggy| 30|     35000|  tainan|
|  adam| 33|     34000|  taipei|
|sophie| 40|     60000|  taipei|
|claire| 25|     30000|  tainan|
|sherry| 27|     51000|    null|
| phlip| 31|   -999999|taichung|
| fanny| 29|   -999999|    null|
+------+---+----------+--------+



In [9]:
#### 6. Handling missing value (fill with mean/median)
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols = ['new_salary','age'],
                  outputCols = [f'filled_{c}' for c in ['new_salary','age']]).setStrategy('mean')
imputer.fit(pyspark_df).transform(pyspark_df).show()

+------+---+----------+--------+-----------------+----------+
|  name|age|new_salary|location|filled_new_salary|filled_age|
+------+---+----------+--------+-----------------+----------+
|   amy| 22|     28000|  taipei|            28000|        22|
| jacky| 20|     25000|taichung|            25000|        20|
| meggy| 30|     35000|  tainan|            35000|        30|
|  adam| 33|     34000|  taipei|            34000|        33|
|sophie| 40|     60000|  taipei|            60000|        40|
|claire| 25|     30000|  tainan|            30000|        25|
|sherry| 27|     51000|    null|            51000|        27|
| phlip| 31|      null|taichung|            37571|        31|
| fanny| 29|      null|    null|            37571|        29|
+------+---+----------+--------+-----------------+----------+



In [10]:
#### 7. Filter operation
pyspark_df.filter(pyspark_df['new_salary']>50000).show()
pyspark_df.filter((pyspark_df['new_salary']>50000) & (pyspark_df['location']=='taipei')).show()
pyspark_df.filter(~(pyspark_df['location']=='taipei')).show()

+------+---+----------+--------+
|  name|age|new_salary|location|
+------+---+----------+--------+
|sophie| 40|     60000|  taipei|
|sherry| 27|     51000|    null|
+------+---+----------+--------+

+------+---+----------+--------+
|  name|age|new_salary|location|
+------+---+----------+--------+
|sophie| 40|     60000|  taipei|
+------+---+----------+--------+

+------+---+----------+--------+
|  name|age|new_salary|location|
+------+---+----------+--------+
| jacky| 20|     25000|taichung|
| meggy| 30|     35000|  tainan|
|claire| 25|     30000|  tainan|
| phlip| 31|      null|taichung|
+------+---+----------+--------+



In [11]:
#### 8. Groupby and aggregate functions
pyspark_df.groupBy('location').sum().show()
pyspark_df.groupBy('location').max().show()
pyspark_df.groupBy('location').agg({'new_salary':'sum','age':'max'}).show()

+--------+--------+---------------+
|location|sum(age)|sum(new_salary)|
+--------+--------+---------------+
|  taipei|      95|         122000|
|  tainan|      55|          65000|
|    null|      56|          51000|
|taichung|      51|          25000|
+--------+--------+---------------+

+--------+--------+---------------+
|location|max(age)|max(new_salary)|
+--------+--------+---------------+
|  taipei|      40|          60000|
|  tainan|      30|          35000|
|    null|      29|          51000|
|taichung|      31|          25000|
+--------+--------+---------------+

+--------+---------------+--------+
|location|sum(new_salary)|max(age)|
+--------+---------------+--------+
|  taipei|         122000|      40|
|  tainan|          65000|      30|
|    null|          51000|      29|
|taichung|          25000|      31|
+--------+---------------+--------+

