# Data Processing using Pyspark

In [1]:
#import SparkSession
from pyspark.sql import SparkSession

In [2]:
#create spar session object
spark=SparkSession.builder.appName('data_processing').getOrCreate()

In [3]:
# Load csv Dataset 
df=spark.read.csv('sample_data.csv',inferSchema=True,header=True)

In [4]:
#columns of dataframe
df.columns

['ratings', 'age', 'experience', 'family', 'Mobile']

In [5]:
#check number of columns
len(df.columns)

5

In [6]:
#number of records in dataframe
df.count()

33

In [7]:
#shape of dataset
print((df.count(),len(df.columns)))

(33, 5)


In [8]:
#printSchema
df.printSchema()

root
 |-- ratings: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: double (nullable = true)
 |-- family: double (nullable = true)
 |-- Mobile: string (nullable = true)



In [9]:
#fisrt few rows of dataframe
df.show(3)

+-------+---+----------+------+-------+
|ratings|age|experience|family| Mobile|
+-------+---+----------+------+-------+
|      3| 32|       9.0|   3.0|   Vivo|
|      3| 27|      13.0|   3.0|  Apple|
|      4| 22|       2.5|   0.0|Samsung|
+-------+---+----------+------+-------+
only showing top 3 rows



In [10]:
#select only 2 columns
df.select('age','Mobile').show(3)

+---+-------+
|age| Mobile|
+---+-------+
| 32|   Vivo|
| 27|  Apple|
| 22|Samsung|
+---+-------+
only showing top 3 rows



In [11]:
#info about dataframe
df.describe().show()

+-------+------------------+------------------+------------------+------------------+------+
|summary|           ratings|               age|        experience|            family|Mobile|
+-------+------------------+------------------+------------------+------------------+------+
|  count|                33|                33|                33|                33|    33|
|   mean|3.5757575757575757|30.484848484848484|10.303030303030303|1.9090909090909092|  null|
| stddev|1.1188806636071336|  6.18527087180309| 6.770731351213326|2.0095651949432427|  null|
|    min|                 1|                22|               2.5|               0.0| Apple|
|    max|                 5|                42|              23.0|               5.5|  Vivo|
+-------+------------------+------------------+------------------+------------------+------+



In [12]:
from pyspark.sql.types import StringType,DoubleType

In [13]:
df.withColumn('age_double',df['age'].cast(DoubleType())).show(10,False)

+-------+---+----------+------+-------+----------+
|ratings|age|experience|family|Mobile |age_double|
+-------+---+----------+------+-------+----------+
|3      |32 |9.0       |3.0   |Vivo   |32.0      |
|3      |27 |13.0      |3.0   |Apple  |27.0      |
|4      |22 |2.5       |0.0   |Samsung|22.0      |
|4      |37 |16.5      |4.0   |Apple  |37.0      |
|5      |27 |9.0       |1.0   |MI     |27.0      |
|4      |27 |9.0       |0.0   |Oppo   |27.0      |
|5      |37 |23.0      |5.5   |Vivo   |37.0      |
|5      |37 |23.0      |5.5   |Samsung|37.0      |
|3      |22 |2.5       |0.0   |Apple  |22.0      |
|3      |27 |6.0       |0.0   |MI     |27.0      |
+-------+---+----------+------+-------+----------+
only showing top 10 rows



In [15]:
#with column
df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10,False)

+-------+---+----------+------+-------+----------------+
|ratings|age|experience|family|Mobile |age_after_10_yrs|
+-------+---+----------+------+-------+----------------+
|3      |32 |9.0       |3.0   |Vivo   |42              |
|3      |27 |13.0      |3.0   |Apple  |37              |
|4      |22 |2.5       |0.0   |Samsung|32              |
|4      |37 |16.5      |4.0   |Apple  |47              |
|5      |27 |9.0       |1.0   |MI     |37              |
|4      |27 |9.0       |0.0   |Oppo   |37              |
|5      |37 |23.0      |5.5   |Vivo   |47              |
|5      |37 |23.0      |5.5   |Samsung|47              |
|3      |22 |2.5       |0.0   |Apple  |32              |
|3      |27 |6.0       |0.0   |MI     |37              |
+-------+---+----------+------+-------+----------------+
only showing top 10 rows



In [16]:
#filter the records 
df.filter(df['Mobile']=='Vivo').show()

+-------+---+----------+------+------+
|ratings|age|experience|family|Mobile|
+-------+---+----------+------+------+
|      3| 32|       9.0|   3.0|  Vivo|
|      5| 37|      23.0|   5.5|  Vivo|
|      4| 37|       6.0|   0.0|  Vivo|
|      5| 37|      13.0|   1.0|  Vivo|
|      4| 37|       6.0|   0.0|  Vivo|
+-------+---+----------+------+------+



In [17]:
#filter the records 
df.filter(df['Mobile']=='Vivo').select('age','ratings','Mobile').show()

+---+-------+------+
|age|ratings|Mobile|
+---+-------+------+
| 32|      3|  Vivo|
| 37|      5|  Vivo|
| 37|      4|  Vivo|
| 37|      5|  Vivo|
| 37|      4|  Vivo|
+---+-------+------+



In [18]:
#filter the multiple conditions
df.filter(df['Mobile']=='Vivo').filter(df['experience'] >10).show()

+-------+---+----------+------+------+
|ratings|age|experience|family|Mobile|
+-------+---+----------+------+------+
|      5| 37|      23.0|   5.5|  Vivo|
|      5| 37|      13.0|   1.0|  Vivo|
+-------+---+----------+------+------+



In [19]:
#filter the multiple conditions
df.filter((df['Mobile']=='Vivo')&(df['experience'] >10)).show()

+-------+---+----------+------+------+
|ratings|age|experience|family|Mobile|
+-------+---+----------+------+------+
|      5| 37|      23.0|   5.5|  Vivo|
|      5| 37|      13.0|   1.0|  Vivo|
+-------+---+----------+------+------+



In [20]:
#Distinct Values in a column
df.select('Mobile').distinct().show()

+-------+
| Mobile|
+-------+
|     MI|
|   Oppo|
|Samsung|
|   Vivo|
|  Apple|
+-------+



In [21]:
#distinct value count
df.select('Mobile').distinct().count()

5

In [22]:
df.groupBy('Mobile').count().show(5,False)

+-------+-----+
|Mobile |count|
+-------+-----+
|MI     |8    |
|Oppo   |7    |
|Samsung|6    |
|Vivo   |5    |
|Apple  |7    |
+-------+-----+



In [23]:
# Value counts
df.groupBy('Mobile').count().orderBy('count',ascending=False).show(5,False)

+-------+-----+
|Mobile |count|
+-------+-----+
|MI     |8    |
|Oppo   |7    |
|Apple  |7    |
|Samsung|6    |
|Vivo   |5    |
+-------+-----+



In [24]:
# Value counts
df.groupBy('Mobile').mean().show(5,False)

+-------+------------------+------------------+------------------+------------------+
|Mobile |avg(ratings)      |avg(age)          |avg(experience)   |avg(family)       |
+-------+------------------+------------------+------------------+------------------+
|MI     |3.5               |30.125            |10.1875           |1.5               |
|Oppo   |2.857142857142857 |28.428571428571427|10.357142857142858|1.4285714285714286|
|Samsung|4.166666666666667 |28.666666666666668|8.666666666666666 |1.9166666666666667|
|Vivo   |4.2               |36.0              |11.4              |1.9               |
|Apple  |3.4285714285714284|30.571428571428573|11.0              |2.857142857142857 |
+-------+------------------+------------------+------------------+------------------+



In [25]:
df.groupBy('Mobile').sum().show(5,False)

+-------+------------+--------+---------------+-----------+
|Mobile |sum(ratings)|sum(age)|sum(experience)|sum(family)|
+-------+------------+--------+---------------+-----------+
|MI     |28          |241     |81.5           |12.0       |
|Oppo   |20          |199     |72.5           |10.0       |
|Samsung|25          |172     |52.0           |11.5       |
|Vivo   |21          |180     |57.0           |9.5        |
|Apple  |24          |214     |77.0           |20.0       |
+-------+------------+--------+---------------+-----------+



In [26]:
# Value counts
df.groupBy('Mobile').max().show(5,False)

+-------+------------+--------+---------------+-----------+
|Mobile |max(ratings)|max(age)|max(experience)|max(family)|
+-------+------------+--------+---------------+-----------+
|MI     |5           |42      |23.0           |5.5        |
|Oppo   |4           |42      |23.0           |2.0        |
|Samsung|5           |37      |23.0           |5.5        |
|Vivo   |5           |37      |23.0           |5.5        |
|Apple  |4           |37      |16.5           |5.5        |
+-------+------------+--------+---------------+-----------+



In [27]:
# Value counts
df.groupBy('Mobile').min().show(5,False)

+-------+------------+--------+---------------+-----------+
|Mobile |min(ratings)|min(age)|min(experience)|min(family)|
+-------+------------+--------+---------------+-----------+
|MI     |1           |27      |2.5            |0.0        |
|Oppo   |2           |22      |6.0            |0.0        |
|Samsung|2           |22      |2.5            |0.0        |
|Vivo   |3           |32      |6.0            |0.0        |
|Apple  |3           |22      |2.5            |0.0        |
+-------+------------+--------+---------------+-----------+



In [29]:
#Aggregation
df.groupBy('Mobile').agg({'Experience':'sum'}).show(5,False)

+-------+---------------+
|Mobile |sum(Experience)|
+-------+---------------+
|MI     |81.5           |
|Oppo   |72.5           |
|Samsung|52.0           |
|Vivo   |57.0           |
|Apple  |77.0           |
+-------+---------------+



In [30]:
# UDF
from pyspark.sql.functions import udf


In [31]:
#normal function 
def price_range(brand):
    if brand in ['Samsung','Apple']:
        return 'High Price'
    elif brand =='MI':
        return 'Mid Price'
    else:
        return 'Low Price'

In [32]:
#create udf using python function
brand_udf=udf(price_range,StringType())
#apply udf on dataframe
df.withColumn('price_range',brand_udf(df['Mobile'])).show(10,False)

+-------+---+----------+------+-------+-----------+
|ratings|age|experience|family|Mobile |price_range|
+-------+---+----------+------+-------+-----------+
|3      |32 |9.0       |3.0   |Vivo   |Low Price  |
|3      |27 |13.0      |3.0   |Apple  |High Price |
|4      |22 |2.5       |0.0   |Samsung|High Price |
|4      |37 |16.5      |4.0   |Apple  |High Price |
|5      |27 |9.0       |1.0   |MI     |Mid Price  |
|4      |27 |9.0       |0.0   |Oppo   |Low Price  |
|5      |37 |23.0      |5.5   |Vivo   |Low Price  |
|5      |37 |23.0      |5.5   |Samsung|High Price |
|3      |22 |2.5       |0.0   |Apple  |High Price |
|3      |27 |6.0       |0.0   |MI     |Mid Price  |
+-------+---+----------+------+-------+-----------+
only showing top 10 rows



In [33]:
#using lambda function
age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())
#apply udf on dataframe
df.withColumn("age_group", age_udf(df.age)).show(10,False)

+-------+---+----------+------+-------+---------+
|ratings|age|experience|family|Mobile |age_group|
+-------+---+----------+------+-------+---------+
|3      |32 |9.0       |3.0   |Vivo   |senior   |
|3      |27 |13.0      |3.0   |Apple  |young    |
|4      |22 |2.5       |0.0   |Samsung|young    |
|4      |37 |16.5      |4.0   |Apple  |senior   |
|5      |27 |9.0       |1.0   |MI     |young    |
|4      |27 |9.0       |0.0   |Oppo   |young    |
|5      |37 |23.0      |5.5   |Vivo   |senior   |
|5      |37 |23.0      |5.5   |Samsung|senior   |
|3      |22 |2.5       |0.0   |Apple  |young    |
|3      |27 |6.0       |0.0   |MI     |young    |
+-------+---+----------+------+-------+---------+
only showing top 10 rows



In [38]:
#pandas udf
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [39]:
#create python function
def z_score(rating):
    z=(rating-3.5)/1.1
    return z

In [40]:
#create udf using python function
z_udf = pandas_udf(z_score, DoubleType())
#apply pandas udf on dataframe
df.withColumn("z_score", z_udf(df['ratings'])).show(10,False)

+-------+---+----------+------+-------+--------------------+
|ratings|age|experience|family|Mobile |z_score             |
+-------+---+----------+------+-------+--------------------+
|3      |32 |9.0       |3.0   |Vivo   |-0.45454545454545453|
|3      |27 |13.0      |3.0   |Apple  |-0.45454545454545453|
|4      |22 |2.5       |0.0   |Samsung|0.45454545454545453 |
|4      |37 |16.5      |4.0   |Apple  |0.45454545454545453 |
|5      |27 |9.0       |1.0   |MI     |1.3636363636363635  |
|4      |27 |9.0       |0.0   |Oppo   |0.45454545454545453 |
|5      |37 |23.0      |5.5   |Vivo   |1.3636363636363635  |
|5      |37 |23.0      |5.5   |Samsung|1.3636363636363635  |
|3      |22 |2.5       |0.0   |Apple  |-0.45454545454545453|
|3      |27 |6.0       |0.0   |MI     |-0.45454545454545453|
+-------+---+----------+------+-------+--------------------+
only showing top 10 rows



In [42]:
#udf using two columns 
def prod(rating,exp):
    x=rating*exp
    return x

In [43]:
#create udf using python function
prod_udf = pandas_udf(prod, DoubleType())
#apply pandas udf on multiple columns of dataframe
df.withColumn("prod", prod_udf(df['ratings'],df['experience'])).show(10,False)

+-------+---+----------+------+-------+-----+
|ratings|age|experience|family|Mobile |prod |
+-------+---+----------+------+-------+-----+
|3      |32 |9.0       |3.0   |Vivo   |27.0 |
|3      |27 |13.0      |3.0   |Apple  |39.0 |
|4      |22 |2.5       |0.0   |Samsung|10.0 |
|4      |37 |16.5      |4.0   |Apple  |66.0 |
|5      |27 |9.0       |1.0   |MI     |45.0 |
|4      |27 |9.0       |0.0   |Oppo   |36.0 |
|5      |37 |23.0      |5.5   |Vivo   |115.0|
|5      |37 |23.0      |5.5   |Samsung|115.0|
|3      |22 |2.5       |0.0   |Apple  |7.5  |
|3      |27 |6.0       |0.0   |MI     |18.0 |
+-------+---+----------+------+-------+-----+
only showing top 10 rows



In [45]:
#duplicate values
df.count()

33

In [46]:
#drop duplicate values
df=df.dropDuplicates()

In [47]:
#validate new count
df.count()

26

In [48]:
#drop column of dataframe
df_new=df.drop('Mobile')

In [49]:
df_new.show(10)

+-------+---+----------+------+
|ratings|age|experience|family|
+-------+---+----------+------+
|      5| 27|       9.0|   1.0|
|      5| 27|       6.0|   0.0|
|      3| 42|      23.0|   5.5|
|      1| 37|      23.0|   5.5|
|      3| 37|      16.5|   5.5|
|      5| 27|       2.5|   0.0|
|      4| 27|       6.0|   1.0|
|      3| 27|       6.0|   0.0|
|      5| 22|       2.5|   0.0|
|      4| 37|       9.0|   2.0|
+-------+---+----------+------+
only showing top 10 rows



In [None]:
# saving file (csv)

In [None]:
#current working directory
pwd

In [None]:
#target directory 
write_uri='/home/jovyan/work/df_csv'

In [None]:
#save the dataframe as single csv 
df.coalesce(1).write.format("csv").option("header","true").save(write_uri)

In [None]:
# parquet

In [None]:
#target location
parquet_uri='/home/jovyan/work/df_parquet'

In [None]:
#save the data into parquet format 
df.write.format('parquet').save(parquet_uri)