<a href="https://colab.research.google.com/github/rjarun8/Spark/blob/main/PySpark_Exercises.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Setting Up Spark

In [1]:
!git clone https://github.com/Apress/learn-pyspark.git

Cloning into 'learn-pyspark'...
remote: Enumerating objects: 44, done.[K
remote: Counting objects: 100% (44/44), done.[K
remote: Compressing objects: 100% (33/33), done.[K
remote: Total 44 (delta 11), reused 44 (delta 11), pack-reused 0[K
Unpacking objects: 100% (44/44), done.


In [2]:
#Spark env seyup

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://mirrors.estointernet.in/apache/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf '/content/spark-3.1.2-bin-hadoop3.2.tgz'
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"


import findspark
findspark.init()
print('spark location -->', findspark.find())

spark location --> /content/spark-3.1.2-bin-hadoop3.2


In [3]:
from pyspark.sql import SparkSession


In [None]:
SparkSession.builder.appName('data_processing').getOrCreate()

In [5]:
spark = SparkSession.builder.appName('data_preprocessing').getOrCreate()
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf,PandasUDFType

In [None]:
#Creating Dataframe
schema = StructType().add("user_id","string").add("country","string").add("browser","string").add("os","string").add("age","integer")

In [None]:
df = spark.createDataFrame([('A201','India','Chrome','WIN',33),('A202','UK','Safari','MAC',27),
                            ('A203','US','Firefox','UBUNTU',2)],schema=schema)

In [None]:
df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- browser: string (nullable = true)
 |-- os: string (nullable = true)
 |-- age: integer (nullable = true)



In [None]:
df.show()

+-------+-------+-------+------+---+
|user_id|country|browser|    os|age|
+-------+-------+-------+------+---+
|   A201|  India| Chrome|   WIN| 33|
|   A202|     UK| Safari|   MAC| 27|
|   A203|     US|Firefox|UBUNTU|  2|
+-------+-------+-------+------+---+



In [None]:
df_na = spark.createDataFrame([("A203",None,"Chrome","WIN",33),("A201",'China',None,"MacOS",35),("A205",'UK',"Mozilla","Linux",25)],
                              schema=schema)

In [None]:
df_na.show()

+-------+-------+-------+-----+---+
|user_id|country|browser|   os|age|
+-------+-------+-------+-----+---+
|   A203|   null| Chrome|  WIN| 33|
|   A201|  China|   null|MacOS| 35|
|   A205|     UK|Mozilla|Linux| 25|
+-------+-------+-------+-----+---+



In [None]:
df_na.fillna('0').show()

+-------+-------+-------+-----+---+
|user_id|country|browser|   os|age|
+-------+-------+-------+-----+---+
|   A203|      0| Chrome|  WIN| 33|
|   A201|  China|      0|MacOS| 35|
|   A205|     UK|Mozilla|Linux| 25|
+-------+-------+-------+-----+---+



In [None]:
df_na.fillna({'country':'USA',"browser":"Safari"}).show()

+-------+-------+-------+-----+---+
|user_id|country|browser|   os|age|
+-------+-------+-------+-----+---+
|   A203|    USA| Chrome|  WIN| 33|
|   A201|  China| Safari|MacOS| 35|
|   A205|     UK|Mozilla|Linux| 25|
+-------+-------+-------+-----+---+



In [None]:
df_na.na.drop().show()

+-------+-------+-------+-----+---+
|user_id|country|browser|   os|age|
+-------+-------+-------+-----+---+
|   A205|     UK|Mozilla|Linux| 25|
+-------+-------+-------+-----+---+



In [None]:
df = spark.read.csv(r'/content/learn-pyspark/chap_2/customer_data.csv',header=True,inferSchema=True)

In [None]:
df.count()

2000

In [None]:
df.show(5)

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|Lower class large...|               1|                 3|30-40 years|Family with grown...|     44905|    0|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     37575|    0|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     27915|    0|
|Modern, complete ...|               1|                 3|40-50 years|      Average Family|     19504|    0|
|  Large family farms|               1|                 4|30-40 years|             Farmers|     34943|    0|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
only showing top 5 

In [None]:
df.printSchema()

root
 |-- Customer_subtype: string (nullable = true)
 |-- Number_of_houses: integer (nullable = true)
 |-- Avg_size_household: integer (nullable = true)
 |-- Avg_age: string (nullable = true)
 |-- Customer_main_type: string (nullable = true)
 |-- Avg_Salary: integer (nullable = true)
 |-- label: integer (nullable = true)



In [None]:
df.summary().show()

+-------+--------------------+------------------+------------------+-----------+--------------------+-----------------+------------------+
|summary|    Customer_subtype|  Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|       Avg_Salary|             label|
+-------+--------------------+------------------+------------------+-----------+--------------------+-----------------+------------------+
|  count|                2000|              2000|              2000|       2000|                2000|             2000|              2000|
|   mean|                null|            1.1075|            2.6895|       null|                null|     1616908.0835|            0.0605|
| stddev|                null|0.3873225521186316|0.7914562220841646|       null|                null|6822647.757312146|0.2384705099001677|
|    min|Affluent senior a...|                 1|                 1|20-30 years|      Average Family|             1361|                 0|
|    25%|                nu

#Subset of a Dataframe
    Select

    Filter

    Where

In [None]:
df.select(['Customer_subtype','Avg_Salary']).show(3)

+--------------------+----------+
|    Customer_subtype|Avg_Salary|
+--------------------+----------+
|Lower class large...|     44905|
|Mixed small town ...|     37575|
|Mixed small town ...|     27915|
+--------------------+----------+
only showing top 3 rows



In [None]:
df.filter(df['Avg_Salary']>100000).count()

181

In [None]:
df.filter(df['Avg_Salary']>100000).show(3)

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
| High status seniors|               1|                 3|40-50 years|Successful hedonists|   4670288|    0|
|Affluent young fa...|               1|                 3|30-40 years|      Average Family|    762769|    1|
| High status seniors|               1|                 3|50-60 years|Successful hedonists|   9561873|    0|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
only showing top 3 rows



In [None]:
df.where((df['Avg_Salary'] > 100000) & (df['Number_of_houses']>2)).show(3)

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    596723|    0|
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    944444|    0|
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    788477|    0|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
only showing top 3 rows



#Aggregations
Any kind of aggregation can be broken simply into three stages, in the following order:

    Split

    Apply
      mean,maxmmin,sum

    Combine

In [None]:
for col in df.columns:
  if col != 'Avg_Salary':
    print(f"Aggregation for {col}")
    df.groupBy(col).count().orderBy('count',ascending=False).show(truncate=False)

Aggregation for Customer_subtype
+------------------------------------------+-----+
|Customer_subtype                          |count|
+------------------------------------------+-----+
|Lower class large families                |288  |
|Traditional families                      |129  |
|Middle class families                     |122  |
|Large religious families                  |107  |
|Modern, complete families                 |93   |
|Couples with teens 'Married with children'|83   |
|Young and rising                          |78   |
|High status seniors                       |76   |
|Low income catholics                      |72   |
|Mixed seniors                             |71   |
|Village families                          |68   |
|Mixed rurals                              |67   |
|Stable family                             |62   |
|Young all american family                 |62   |
|Young, low educated                       |56   |
|Large family, employed child              |56   

In [None]:
df.groupBy('Customer_main_type').agg(F.mean('Avg_Salary')).show(3)

+--------------------+------------------+
|  Customer_main_type|   avg(Avg_Salary)|
+--------------------+------------------+
|             Farmers|30209.333333333332|
|       Career Loners|           32272.6|
|Retired and Relig...| 27338.80693069307|
+--------------------+------------------+
only showing top 3 rows



In [None]:
df.sort("Avg_Salary",ascending=False).show(3)

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
| High status seniors|               1|                 2|60-70 years|Successful hedonists|  48919896|    0|
|High Income, expe...|               1|                 2|50-60 years|Successful hedonists|  48177970|    0|
|High Income, expe...|               1|                 2|50-60 years|Successful hedonists|  48069548|    1|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
only showing top 3 rows



In [None]:
df.groupBy('Customer_subtype').agg(F.mean('Avg_Salary').alias('mean_salary')).orderBy('mean_salary',ascending=False).show(3)

+--------------------+--------------------+
|    Customer_subtype|         mean_salary|
+--------------------+--------------------+
| High status seniors| 2.507677857894737E7|
|High Income, expe...|2.3839817807692308E7|
|Affluent young fa...|   662068.7777777778|
+--------------------+--------------------+
only showing top 3 rows



Collect
      
    Collect List

    Collect Set

In [None]:
df.groupby('Customer_subtype').agg(F.collect_set("Number_of_houses")).show(3)

+--------------------+-----------------------------+
|    Customer_subtype|collect_set(Number_of_houses)|
+--------------------+-----------------------------+
|Large family, emp...|                       [1, 2]|
|Religious elderly...|                       [1, 2]|
|Large religious f...|                       [1, 2]|
+--------------------+-----------------------------+
only showing top 3 rows



In [None]:
df.groupby('Customer_subtype').agg(F.collect_list("Number_of_houses")).show(3)

+--------------------+------------------------------+
|    Customer_subtype|collect_list(Number_of_houses)|
+--------------------+------------------------------+
|Large family, emp...|          [2, 1, 2, 1, 2, 1...|
|Religious elderly...|          [1, 1, 1, 1, 1, 1...|
|Large religious f...|          [2, 1, 1, 2, 1, 1...|
+--------------------+------------------------------+
only showing top 3 rows



In [None]:
df = df.withColumn('Constant',F.lit('finance'))
df.show(3)

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|Constant|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+
|Lower class large...|               1|                 3|30-40 years|Family with grown...|     44905|    0| finance|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     37575|    0| finance|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     27915|    0| finance|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+
only showing top 3 rows



In [None]:
'''
User-Defined Functions (UDFs)

'''

from pyspark.sql.functions import udf
df.groupby("Avg_age").count().show(3)

+-----------+-----+
|    Avg_age|count|
+-----------+-----+
|70-80 years|    8|
|50-60 years|  373|
|30-40 years|  496|
+-----------+-----+
only showing top 3 rows



In [None]:
def age_category(age):
    if age  == "20-30 years":
        return "Young"
    elif age== "30-40 years":
        return "Mid Aged"
    elif ((age== "40-50 years") or (age== "50-60 years")) :
        return "Old"
    else:
        return "Very Old"

In [None]:
age_udf = udf(age_category,StringType())
df=df.withColumn('age_category',age_udf(df['Avg_age']))
df.show(3)

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|Constant|age_category|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+
|Lower class large...|               1|                 3|30-40 years|Family with grown...|     44905|    0| finance|    Mid Aged|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     37575|    0| finance|    Mid Aged|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     27915|    0| finance|    Mid Aged|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+
only showing top 3 rows



In [None]:
min_sal=1361
max_sal=48919896
from pyspark.sql.functions import pandas_udf,PandasUDFType

def  scaled_salary(salary):
    scaled_sal = ((salary-min_sal)/(max_sal-min_sal))
    return scaled_sal

In [None]:
scaling_udf = pandas_udf(scaled_salary,DoubleType())
df.withColumn("Scaled_salary",scaling_udf(df['Avg_Salary'])).show(3)

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+--------------------+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|Constant|age_category|       Scaled_salary|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+--------------------+
|Lower class large...|               1|                 3|30-40 years|Family with grown...|     44905|    0| finance|    Mid Aged|8.901329526732557E-4|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     37575|    0| finance|    Mid Aged| 7.40291997705982E-4|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     27915|    0| finance|    Mid Aged| 5.42820834679534E-4|
+--------------------+----------------+------------------+-----------+------------------

In [None]:
'''Joins'''
region_data = spark.createDataFrame([('Family with grown ups','PN'),
                     ('Driven Growers','GJ'),('Conservative families','DD'),('Cruising Seniors','DL'),('Average Family ','MN'),
                     ('Living well','KA'),('Successful hedonists','JH'),('Retired and Religious','AX'),
                     ('Career Loners','HY'),('Farmers','JH')],
                     schema = StructType().add("Customer_main_type","string").add("Region Code","string"))

region_data.show()

+--------------------+-----------+
|  Customer_main_type|Region Code|
+--------------------+-----------+
|Family with grown...|         PN|
|      Driven Growers|         GJ|
|Conservative fami...|         DD|
|    Cruising Seniors|         DL|
|     Average Family |         MN|
|         Living well|         KA|
|Successful hedonists|         JH|
|Retired and Relig...|         AX|
|       Career Loners|         HY|
|             Farmers|         JH|
+--------------------+-----------+



In [None]:
new_df = df.join(region_data,on='Customer_main_type')
new_df.groupby("Region COde").count().show(3)

+-----------+-----+
|Region COde|count|
+-----------+-----+
|         JH|  287|
|         HY|   15|
|         DD|  236|
+-----------+-----+
only showing top 3 rows



In [None]:
df.groupBy('Customer_main_type').pivot('Avg_age').sum('Avg_Salary').fillna(0).show(5)

+--------------------+-----------+-----------+-----------+-----------+-----------+-----------+
|  Customer_main_type|20-30 years|30-40 years|40-50 years|50-60 years|60-70 years|70-80 years|
+--------------------+-----------+-----------+-----------+-----------+-----------+-----------+
|             Farmers|          0|     462027|    2031235|     316206|          0|          0|
|       Career Loners|     143998|     176639|      25701|     105193|      32558|          0|
|Retired and Relig...|     126350|     336631|    2975266|    1687711|     335357|      61124|
|Successful hedonists|      42261|  171278764| 1223362814| 1563071675|  200340129|      15518|
|         Living well|     460528|    2965303|    1795405|     331304|          0|          0|
+--------------------+-----------+-----------+-----------+-----------+-----------+-----------+
only showing top 5 rows



#Window Functions


    Aggregations

    Ranking

    Analytics

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col,row_number
win = Window.orderBy(df['Avg_Salary'].desc())
df = df.withColumn('rank', row_number().over(win).alias('rank'))
df.show(3)

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|Constant|age_category|rank|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+----+
| High status seniors|               1|                 2|60-70 years|Successful hedonists|  48919896|    0| finance|    Very Old|   1|
|High Income, expe...|               1|                 2|50-60 years|Successful hedonists|  48177970|    0| finance|         Old|   2|
|High Income, expe...|               1|                 2|50-60 years|Successful hedonists|  48069548|    1| finance|         Old|   3|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+----+
only showing top 3 rows



In [None]:
win_1 = Window.partitionBy("Customer_subtype").orderBy(df['Avg_Salary'].desc())
df=df.withColumn('rank',row_number().over(win_1))#.alias('rank')

In [None]:
df.show(5)

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|Constant|age_category|rank|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+----+
|Large family, emp...|               2|                 3|30-40 years|Family with grown...|     49418|    0| finance|    Mid Aged|   1|
|Large family, emp...|               1|                 4|40-50 years|Family with grown...|     48390|    0| finance|         Old|   2|
|Large family, emp...|               1|                 3|40-50 years|Family with grown...|     48272|    0| finance|         Old|   3|
|Large family, emp...|               1|                 2|40-50 years|Family with grown...|     47684|    0| finance|         Old|   4|
|Large family, emp...|               1|         

In [None]:
df.groupBy('rank').count().orderBy('rank').show(3)

+----+-----+
|rank|count|
+----+-----+
|   1|   39|
|   2|   37|
|   3|   36|
+----+-----+
only showing top 3 rows



In [None]:
df.filter(col('rank')<4).show(3)

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|Constant|age_category|rank|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+----+
|Large family, emp...|               2|                 3|30-40 years|Family with grown...|     49418|    0| finance|    Mid Aged|   1|
|Large family, emp...|               1|                 4|40-50 years|Family with grown...|     48390|    0| finance|         Old|   2|
|Large family, emp...|               1|                 3|40-50 years|Family with grown...|     48272|    0| finance|         Old|   3|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+----+
only showing top 3 rows



#Spark Structured Streaming

Spark Streaming then, Spark structured streaming now

In [6]:
spark_stream = SparkSession.builder.appName('StructuredStreaming').getOrCreate()

In [45]:
df_1 = spark_stream.createDataFrame([("XN203",'FB',300,30),("XN201",'Twitter',10,19),("XN202",'Insta',500,45)],["user_id","app","time_in_secs","age"]).\
write.csv('/content/csv_folder',mode='append')

In [8]:
schema=StructType().add("user_id","string").add("app","string").add("time_in_secs", "integer").add("age", "integer")

In [10]:
data = spark.readStream.option("sep",",").schema(schema).csv('/content/csv_folder')

In [12]:
data.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- app: string (nullable = true)
 |-- time_in_secs: integer (nullable = true)
 |-- age: integer (nullable = true)



In [13]:
app_count = data.groupBy('app').count()

In [14]:
query = (app_count.writeStream.queryName('count_query').outputMode('complete').format('memory').start())

In [17]:
spark.sql("select * from count_query").toPandas().head(3)

Unnamed: 0,app,count
0,Insta,1
1,FB,1
2,Twitter,1


In [19]:
fb_data = data.filter(data['app']=='FB')

In [20]:
fb_avg_time = fb_data.groupby('user_id').agg(F.avg('time_in_secs'))

In [21]:
fb_query = (fb_avg_time.writeStream.queryName('fb_query').outputMode('complete').format('memory').start())

In [23]:
spark.sql("select * from fb_query").toPandas().head(3)

Unnamed: 0,user_id,avg(time_in_secs)
0,XN203,300.0


In [33]:
df_3=spark.createDataFrame([("XN20",'FB',100,30),("XN2044",'FB',10,19),("XN2102",'FB',2000,45)],["user_id","app","time_in_secs","age"]).write.csv("/content/csv_folder",mode='append')

In [34]:
spark.sql("select * from fb_query ").toPandas().head(5)

Unnamed: 0,user_id,avg(time_in_secs)
0,XN203,150.0
1,XN20,100.0
2,XN201,10.0
3,XN2044,10.0
4,XN202,2000.0


In [35]:
app_df = data.groupBy('app').agg(F.sum('time_in_secs').alias('total_time')).orderBy('total_time',ascending=False)

In [36]:
app_query = (app_df.writeStream.queryName('app_wise_query').outputMode('Complete').format('memory').start())


In [37]:
spark.sql("select * from app_wise_query").toPandas().head(5)

Unnamed: 0,app,total_time
0,FB,10850
1,Insta,500
2,Twitter,10


In [40]:
df_5=spark.createDataFrame([("XN203",'FB',1000,30),("XN201",'Insta',400,19),("XN202",'Twitter',900,45)],["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')

In [41]:
spark.sql("select * from app_wise_query").toPandas().head(5)

Unnamed: 0,app,total_time
0,FB,11350
1,Insta,530
2,Twitter,110


In [42]:
app_df=spark.createDataFrame([('FB','FACEBOOK'),('Insta','INSTAGRAM'),('Twitter','TWITTER')],["app", "full_name"])

In [43]:
app_df.show()

+-------+---------+
|    app|full_name|
+-------+---------+
|     FB| FACEBOOK|
|  Insta|INSTAGRAM|
|Twitter|  TWITTER|
+-------+---------+



In [47]:
app_stream_df = data.join(app_df,'app')
join_query = (app_stream_df.writeStream.queryName('JoinQuery').outputMode('append').format('memory').start())

In [48]:
spark.sql("select * from JoinQuery").toPandas().head(3)

Unnamed: 0,app,user_id,time_in_secs,age,full_name
0,FB,XN20,100,30,FACEBOOK
1,FB,XN20,100,30,FACEBOOK
2,FB,XN203,300,30,FACEBOOK
