# Data Processing using PySpark
## Ivan Muhammad Siegfried

In [1]:
# Import required packages and build an app named data_processing
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('data_processing').getOrCreate()

In [2]:
# Import pyspark mathematical functions and nall data types
import pyspark.sql.functions as fx
from pyspark.sql.types import *

In [3]:
# Read data from csv
## inferSchema = True meaning that the data types will be all strings.
df=spark.read.csv("customer_data.csv",header=True,inferSchema=True)

In [4]:
df.count()

2000

In [5]:
len(df.columns)

7

In [6]:
df.printSchema()

root
 |-- Customer_subtype: string (nullable = true)
 |-- Number_of_houses: integer (nullable = true)
 |-- Avg_size_household: integer (nullable = true)
 |-- Avg_age: string (nullable = true)
 |-- Customer_main_type: string (nullable = true)
 |-- Avg_Salary: integer (nullable = true)
 |-- label: integer (nullable = true)



In [10]:
df.show(n=2, truncate=False, vertical=True)

-RECORD 0----------------------------------------
 Customer_subtype   | Lower class large families 
 Number_of_houses   | 1                          
 Avg_size_household | 3                          
 Avg_age            | 30-40 years                
 Customer_main_type | Family with grown ups      
 Avg_Salary         | 44905                      
 label              | 0                          
 salary_category    | upper                      
-RECORD 1----------------------------------------
 Customer_subtype   | Mixed small town dwellers  
 Number_of_houses   | 1                          
 Avg_size_household | 2                          
 Avg_age            | 30-40 years                
 Customer_main_type | Family with grown ups      
 Avg_Salary         | 37575                      
 label              | 0                          
 salary_category    | upper                      
only showing top 2 rows



## Data Filtering

In [13]:
df.filter(df['Avg_Salary'] > 1000000).filter(df['Number_of_houses']==2).show(truncate=False, vertical=True)

-RECORD 0----------------------------------
 Customer_subtype   | High status seniors  
 Number_of_houses   | 2                    
 Avg_size_household | 3                    
 Avg_age            | 40-50 years          
 Customer_main_type | Successful hedonists 
 Avg_Salary         | 6138618              
 label              | 0                    
 salary_category    | upper                
-RECORD 1----------------------------------
 Customer_subtype   | High status seniors  
 Number_of_houses   | 2                    
 Avg_size_household | 3                    
 Avg_age            | 40-50 years          
 Customer_main_type | Successful hedonists 
 Avg_Salary         | 23723162             
 label              | 0                    
 salary_category    | upper                



## Data Grouping

In [15]:
df.groupBy('Customer_main_type').count().show()

+--------------------+-----+
|  Customer_main_type|count|
+--------------------+-----+
|             Farmers|   93|
|       Career Loners|   15|
|Retired and Relig...|  202|
|Successful hedonists|  194|
|         Living well|  178|
|      Average Family|  308|
|    Cruising Seniors|   60|
|Conservative fami...|  236|
|      Driven Growers|  172|
|Family with grown...|  542|
+--------------------+-----+



In [22]:
for col in df.columns:
    if col!='Avg_Salary':
        print("Aggregation for ",col,"Columns")
        df.groupBy(col).count().orderBy('count',ascending=False).show()

Aggregation for  Customer_subtype Columns
+--------------------+-----+
|    Customer_subtype|count|
+--------------------+-----+
|Lower class large...|  288|
|Traditional families|  129|
|Middle class fami...|  122|
|Large religious f...|  107|
|Modern, complete ...|   93|
|Couples with teen...|   83|
|    Young and rising|   78|
| High status seniors|   76|
|Low income catholics|   72|
|       Mixed seniors|   71|
|    Village families|   68|
|        Mixed rurals|   67|
|       Stable family|   62|
|Young all america...|   62|
|Large family, emp...|   56|
| Young, low educated|   56|
|     Family starters|   55|
|High Income, expe...|   52|
|Mixed small town ...|   47|
|Religious elderly...|   47|
+--------------------+-----+
only showing top 20 rows

Aggregation for  Number_of_houses Columns
+----------------+-----+
|Number_of_houses|count|
+----------------+-----+
|               1| 1808|
|               2|  178|
|               3|   12|
|              10|    1|
|               5| 

## Data Aggregation 

In [None]:
df.groupBy('Customer_main_type').agg(F.max('Avg_Salary').alias('max_salary')).orderBy('max_salary',descending=True)

## Data Manipulation

In [8]:
xx

#udf: user defined function
from pyspark.sql.functions import udf,rank, col,row_number
def salary_category(salary):
    if salary<15000:
        return('lower')
    else:
        return('upper')

#create age category udf 
salary_udf=udf(salary_category,StringType())
#create the bucket column by applying udf
df=df.withColumn('salary_category',salary_udf(df['Avg_Salary']))
df.show(n=3, truncate=False, vertical=True)

-RECORD 0----------------------------------------
 Customer_subtype   | Lower class large families 
 Number_of_houses   | 1                          
 Avg_size_household | 3                          
 Avg_age            | 30-40 years                
 Customer_main_type | Family with grown ups      
 Avg_Salary         | 44905                      
 label              | 0                          
 salary_category    | upper                      
-RECORD 1----------------------------------------
 Customer_subtype   | Mixed small town dwellers  
 Number_of_houses   | 1                          
 Avg_size_household | 2                          
 Avg_age            | 30-40 years                
 Customer_main_type | Family with grown ups      
 Avg_Salary         | 37575                      
 label              | 0                          
 salary_category    | upper                      
-RECORD 2----------------------------------------
 Customer_subtype   | Mixed small town dwellers  


## Data Summary

In [23]:
df.select('Avg_Salary').summary().show()

+-------+-----------------+
|summary|       Avg_Salary|
+-------+-----------------+
|  count|             2000|
|   mean|     1616908.0835|
| stddev|6822647.757312146|
|    min|             1361|
|    25%|            20315|
|    50%|            31421|
|    75%|            42949|
|    max|         48919896|
+-------+-----------------+



## Join Data 

In [24]:
new_region = spark.createDataFrame([('Family with grown ups','FGU'),
                                    ('Driven Growers','DG'),
                                    ('Conservative families','CF'),
                                    ('Cruising Seniors','CS'),
                                    ('Average Family ','AF'),
                                    ('Living well','LW'),
                                    ('Successful hedonists','SH'),
                                    ('Retired and Religious','RR'),
                                   ('Career Loners','CL'),('Farmers','FM')],schema=StructType().add("Customer_main_type","string").add("Region Code","string")) # Columns Name

In [25]:
new_region.show()

+--------------------+-----------+
|  Customer_main_type|Region Code|
+--------------------+-----------+
|Family with grown...|        FGU|
|      Driven Growers|         DG|
|Conservative fami...|         CF|
|    Cruising Seniors|         CS|
|     Average Family |         AF|
|         Living well|         LW|
|Successful hedonists|         SH|
|Retired and Relig...|         RR|
|       Career Loners|         CL|
|             Farmers|         FM|
+--------------------+-----------+



In [26]:
new_df=df.join(new_region,on='Customer_main_type')

In [31]:
new_df.groupby("Region Code").count().orderBy('count',ascending=False).show()

+-----------+-----+
|Region Code|count|
+-----------+-----+
|        FGU|  542|
|         CF|  236|
|         RR|  202|
|         SH|  194|
|         LW|  178|
|         DG|  172|
|         FM|   93|
|         CS|   60|
|         CL|   15|
+-----------+-----+



In [35]:
df.groupBy('Customer_subtype').pivot('Avg_age').sum('Avg_salary').fillna(0).show()

+--------------------+-----------+-----------+-----------+-----------+-----------+-----------+
|    Customer_subtype|20-30 years|30-40 years|40-50 years|50-60 years|60-70 years|70-80 years|
+--------------------+-----------+-----------+-----------+-----------+-----------+-----------+
|Large family, emp...|          0|     739471|    1002442|      98687|          0|          0|
|Religious elderly...|          0|      34406|     454491|     764425|     133027|      49059|
|Large religious f...|          0|    1092108|    1471683|     462562|     146432|          0|
|Modern, complete ...|          0|    1000307|    1791359|     144902|          0|          0|
|    Village families|          0|     108090|    1427712|     670762|          0|          0|
|Young all america...|          0|     996923|     750607|      85395|          0|          0|
|Young urban have-...|          0|      47272|          0|      55732|          0|          0|
|Young seniors in ...|          0|      43302|    