# Data Wrangling with PySpark

Data wrangling is the practice of converting data from a "raw" form into a user-ready form for descriptive analytics and provide feed for other horizons of analytics such as predictive analytics. 

#### *PySpark comes from Python and Spark --> PySpark = [Py]thon + Spark . It is is a python api for working with Apache Spark

# Module 1: Importing data in as PySpark - with read.csv and read_excel

In [1]:
#if not already installed, use below:
!pip install pyspark[sql]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark[sql]
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=2364b21a84da145a8f89c556a55f327a923188be675456411d859e5b177d0737
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [2]:
import pyspark.sql
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test').getOrCreate() 
#the above line will return existing session if one was created before and was not closed

In [4]:
# read csv, all columns will be of type string
df1 = spark.read.option('header','true').csv('Sample_Fraud_Detection.csv')
df1.show(2)

+-------+----------------+----------------+--------------+-------------+------+-------+---+-----------+-----+---------------+----------+-----------------+--------------------+
|user_id|     signup_time|   purchase_time|purchase_value|    device_id|source|browser|sex| ip_address|class|       category|       dob|             name|               email|
+-------+----------------+----------------+--------------+-------------+------+-------+---+-----------+-----+---------------+----------+-----------------+--------------------+
|  22058|24-02-2015 22:55|18-04-2015 02:47|         65278|QVPSPJUOCKZAR|   SEO| Chrome|  M|732758368.8|    0|home_essentials|22-02-1976|  Aaron P Maashoh|aaron.maashoh@yah...|
| 333320|07-06-2015 20:39|08-06-2015 01:38|         96399|EOGFQPIZPYXFZ|   Ads| Chrome|  F|350311387.9|    0|       apparels|02-01-1962|Rick D Rothackerj|rick_rothackerj@y...|
+-------+----------------+----------------+--------------+-------------+------+-------+---+-----------+-----+-----------

In [5]:
df = spark.read.csv('Sample_Fraud_Detection.csv', inferSchema=True, header=True)
df.show(2)

+-------+----------------+----------------+--------------+-------------+------+-------+---+-------------+-----+---------------+----------+-----------------+--------------------+
|user_id|     signup_time|   purchase_time|purchase_value|    device_id|source|browser|sex|   ip_address|class|       category|       dob|             name|               email|
+-------+----------------+----------------+--------------+-------------+------+-------+---+-------------+-----+---------------+----------+-----------------+--------------------+
|  22058|24-02-2015 22:55|18-04-2015 02:47|         65278|QVPSPJUOCKZAR|   SEO| Chrome|  M|7.327583688E8|    0|home_essentials|22-02-1976|  Aaron P Maashoh|aaron.maashoh@yah...|
| 333320|07-06-2015 20:39|08-06-2015 01:38|         96399|EOGFQPIZPYXFZ|   Ads| Chrome|  F|3.503113879E8|    0|       apparels|02-01-1962|Rick D Rothackerj|rick_rothackerj@y...|
+-------+----------------+----------------+--------------+-------------+------+-------+---+-------------+-----

# Module 2: View the top and bottom rows of the DataFrame with DataFrame.head(n) and DataFrame.tail(n) OR show() options

In [6]:
#Top n rows
df.head(2) #The code here is exactly as same as that of pandas, but output in in row-by-row format

[Row(user_id=22058, signup_time='24-02-2015 22:55', purchase_time='18-04-2015 02:47', purchase_value=65278, device_id='QVPSPJUOCKZAR', source='SEO', browser='Chrome', sex='M', ip_address=732758368.8, class=0, category='home_essentials', dob='22-02-1976', name='Aaron P Maashoh', email='aaron.maashoh@yahoo.com'),
 Row(user_id=333320, signup_time='07-06-2015 20:39', purchase_time='08-06-2015 01:38', purchase_value=96399, device_id='EOGFQPIZPYXFZ', source='Ads', browser='Chrome', sex='F', ip_address=350311387.9, class=0, category='apparels', dob='02-01-1962', name='Rick D Rothackerj', email='rick_rothackerj@yahoo.com')]

In [7]:
#Top 5 rows
df.tail(2) #The code here is exactly as same as that of pandas

[Row(user_id=117191, signup_time='28-02-2015 17:14', purchase_time='14-04-2015 15:39', purchase_value=20193, device_id='GDHCTKIKPHENW', source='Direct', browser='IE', sex='M', ip_address=1411318028.0, class=0, category='apparels', dob='17-11-1979', name='Julia D Edwardsj', email='julia.edwardsj@sutletgroup.com'),
 Row(user_id=65732, signup_time='11-01-2015 11:10', purchase_time='22-01-2015 04:16', purchase_value=26225, device_id='VSMNAOFPSEQOL', source='Ads', browser='IE', sex='M', ip_address=3765208398.0, class=0, category='apparels', dob='23-03-1988', name='Conor V Humphriesk', email='conor.humphriesk@gmail.com')]

In [8]:
#Top 2 rows, in a better way
df.show(2)

+-------+----------------+----------------+--------------+-------------+------+-------+---+-------------+-----+---------------+----------+-----------------+--------------------+
|user_id|     signup_time|   purchase_time|purchase_value|    device_id|source|browser|sex|   ip_address|class|       category|       dob|             name|               email|
+-------+----------------+----------------+--------------+-------------+------+-------+---+-------------+-----+---------------+----------+-----------------+--------------------+
|  22058|24-02-2015 22:55|18-04-2015 02:47|         65278|QVPSPJUOCKZAR|   SEO| Chrome|  M|7.327583688E8|    0|home_essentials|22-02-1976|  Aaron P Maashoh|aaron.maashoh@yah...|
| 333320|07-06-2015 20:39|08-06-2015 01:38|         96399|EOGFQPIZPYXFZ|   Ads| Chrome|  F|3.503113879E8|    0|       apparels|02-01-1962|Rick D Rothackerj|rick_rothackerj@y...|
+-------+----------------+----------------+--------------+-------------+------+-------+---+-------------+-----

In [9]:
# top 3 rows and selected columns
df.select(['device_id','browser']).show(3)

+-------------+-------+
|    device_id|browser|
+-------------+-------+
|QVPSPJUOCKZAR| Chrome|
|EOGFQPIZPYXFZ| Chrome|
|YSSKYOSJHPPLJ|  Opera|
+-------------+-------+
only showing top 3 rows



# Module 3: Useful fuctions to get basic information about the data
### This is equivalent to Proc Contents of SAS, which gives out the Meta-data of the dataset
#### .info()- will also give information about  missing values

In [10]:
df.dtypes #The code here is exactly as same as that of pandas

[('user_id', 'int'),
 ('signup_time', 'string'),
 ('purchase_time', 'string'),
 ('purchase_value', 'int'),
 ('device_id', 'string'),
 ('source', 'string'),
 ('browser', 'string'),
 ('sex', 'string'),
 ('ip_address', 'double'),
 ('class', 'int'),
 ('category', 'string'),
 ('dob', 'string'),
 ('name', 'string'),
 ('email', 'string')]

In [11]:
df.schema

StructType([StructField('user_id', IntegerType(), True), StructField('signup_time', StringType(), True), StructField('purchase_time', StringType(), True), StructField('purchase_value', IntegerType(), True), StructField('device_id', StringType(), True), StructField('source', StringType(), True), StructField('browser', StringType(), True), StructField('sex', StringType(), True), StructField('ip_address', DoubleType(), True), StructField('class', IntegerType(), True), StructField('category', StringType(), True), StructField('dob', StringType(), True), StructField('name', StringType(), True), StructField('email', StringType(), True)])

##### .columns - Getting all column names

In [12]:
df.columns #The code here is exactly as same as that of pandas

['user_id',
 'signup_time',
 'purchase_time',
 'purchase_value',
 'device_id',
 'source',
 'browser',
 'sex',
 'ip_address',
 'class',
 'category',
 'dob',
 'name',
 'email']

In [13]:
df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- signup_time: string (nullable = true)
 |-- purchase_time: string (nullable = true)
 |-- purchase_value: integer (nullable = true)
 |-- device_id: string (nullable = true)
 |-- source: string (nullable = true)
 |-- browser: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- ip_address: double (nullable = true)
 |-- class: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)



##### count() - Getting count of rows (obserations)

In [14]:
df.count()

999

#####  Getting count of columns

In [15]:
len(df.columns)

14

##### .describe()- will give basic statistics of the data in the comprehensive format
####  it is quite similar to Proc Summary/ Proc Means functionality of SAS

In [16]:
#Default is the descriptive statistics of only the numeric columns, vertical = True option makes it better to read
print(df.describe().show(truncate = True, vertical=True))

-RECORD 0------------------------------
 summary        | count                
 user_id        | 999                  
 signup_time    | 999                  
 purchase_time  | 999                  
 purchase_value | 999                  
 device_id      | 999                  
 source         | 999                  
 browser        | 999                  
 sex            | 974                  
 ip_address     | 999                  
 class          | 999                  
 category       | 999                  
 dob            | 970                  
 name           | 999                  
 email          | 999                  
-RECORD 1------------------------------
 summary        | mean                 
 user_id        | 196343.71971971972   
 signup_time    | null                 
 purchase_time  | null                 
 purchase_value | 50992.49149149149    
 device_id      | null                 
 source         | null                 
 browser        | null                 


In [17]:
# Descriptive statistics of only the one of the columns
print(df.select(['class']).describe().show())

+-------+-------------------+
|summary|              class|
+-------+-------------------+
|  count|                999|
|   mean| 0.1011011011011011|
| stddev|0.30161354453884753|
|    min|                  0|
|    max|                  1|
+-------+-------------------+

None


In [18]:
# Descriptive statistics of only the one of the columns
print(df.select(['browser']).describe().show())

+-------+-------+
|summary|browser|
+-------+-------+
|  count|    999|
|   mean|   null|
| stddev|   null|
|    min| Chrome|
|    max| Safari|
+-------+-------+

None


In [19]:
df.groupBy("browser", "sex").count().show()

+-------+----+-----+
|browser| sex|count|
+-------+----+-----+
|  Opera|   M|   11|
| Safari|   F|   68|
|FireFox|null|    5|
|  Opera|   F|    8|
| Chrome|   M|  218|
|FireFox|   M|   89|
| Chrome|   F|  167|
|     IE|null|    6|
| Chrome|null|   12|
|     IE|   F|  110|
|     IE|   M|  144|
|FireFox|   F|   61|
| Safari|   M|   98|
|  Opera|null|    2|
+-------+----+-----+



# Module 4 : Subsetting data Vertically and Horizontally (selecting subset of a DataFrame)

##### Selecting only few columns from a DataFrame

In [20]:
sample1 = df.select(['sex','browser'])
sample1.show(3)

+---+-------+
|sex|browser|
+---+-------+
|  M| Chrome|
|  F| Chrome|
|  M|  Opera|
+---+-------+
only showing top 3 rows



#### How to drop variables - in case we need to

In [21]:
df_x = df.drop("sex","browser")
df_x.show(2)

+-------+----------------+----------------+--------------+-------------+------+-------------+-----+---------------+----------+-----------------+--------------------+
|user_id|     signup_time|   purchase_time|purchase_value|    device_id|source|   ip_address|class|       category|       dob|             name|               email|
+-------+----------------+----------------+--------------+-------------+------+-------------+-----+---------------+----------+-----------------+--------------------+
|  22058|24-02-2015 22:55|18-04-2015 02:47|         65278|QVPSPJUOCKZAR|   SEO|7.327583688E8|    0|home_essentials|22-02-1976|  Aaron P Maashoh|aaron.maashoh@yah...|
| 333320|07-06-2015 20:39|08-06-2015 01:38|         96399|EOGFQPIZPYXFZ|   Ads|3.503113879E8|    0|       apparels|02-01-1962|Rick D Rothackerj|rick_rothackerj@y...|
+-------+----------------+----------------+--------------+-------------+------+-------------+-----+---------------+----------+-----------------+--------------------+
only

##### rename() : renaming the columns in a Pandas DataFrame

In [22]:
df_new = df.withColumnRenamed('sex','gender') \
         .withColumnRenamed('dob','birth_date')\
         .withColumnRenamed('email','email_id')
df_new.show(1,vertical = False, truncate = False )

+-------+----------------+----------------+--------------+-------------+------+-------+------+-------------+-----+---------------+----------+---------------+-----------------------+
|user_id|signup_time     |purchase_time   |purchase_value|device_id    |source|browser|gender|ip_address   |class|category       |birth_date|name           |email_id               |
+-------+----------------+----------------+--------------+-------------+------+-------+------+-------------+-----+---------------+----------+---------------+-----------------------+
|22058  |24-02-2015 22:55|18-04-2015 02:47|65278         |QVPSPJUOCKZAR|SEO   |Chrome |M     |7.327583688E8|0    |home_essentials|22-02-1976|Aaron P Maashoh|aaron.maashoh@yahoo.com|
+-------+----------------+----------------+--------------+-------------+------+-------+------+-------------+-----+---------------+----------+---------------+-----------------------+
only showing top 1 row



# Module 5: Conditional filter and LOGICAL operators similar to where clause in SAS/SQL

##### Extract rows with a single condition using operators

In [23]:
above = df[df.purchase_value > 90000]   #The code here is exactly as same as that of pandas
above.count()

96

##### Extract rows with multiple conditions using  operators " | ". It filters data similar to OR operater in where caluse of SAS/SQL.

In [24]:
chrm_ie = df[(df.browser == "Chrome") | (df.browser == "IE")]    #The code here is exactly as same as that of pandas
chrm_ie.count()

657

##### .isin() : multiple "OR" conditions over a single column

In [25]:
chrm_ie = df[df.browser.isin(["Chrome", "IE"])]       #The code here is exactly as same as that of pandas
chrm_ie.count()

657

In [None]:
#### use "&" symbol in PySpark as an “AND” operator of SQL. df [(condition1) & (condition2)]

In [26]:
above_chrm = df[(df.purchase_value > 90000) & (df.browser != "Chrome")]      #The code here is exactly as same as that of pandas
above_chrm.count()

65

##### use "between" to filter a numerical column between a range of values

In [27]:
btw = df[df['purchase_value'].between(80000, 90000)]       #The code here is exactly as same as that of pandas
btw.count()

106

In [28]:
#Another way to write this
btw = df.filter(df.purchase_value.between(80000, 90000))
btw.count()

106

In [29]:
#One more way to write this
btw = df.where(df.purchase_value.between(80000, 90000))
btw.count()

106

### How to use contains condition in where clause

In [30]:
data_subset = df.filter(df.source.contains('SE'))
data_subset.count()

399

# Unlike pandas, in PySpark we can use Like %% operators, YAY!!!

In [31]:
data_subset = df.filter(df.source.like("%SE%")) 
data_subset.count()

399

# Module 6 : Derive a new column(s)
SELECT *, purchase_value/82.52 as convt
FROM tips;

In [32]:
df = df.withColumn('Value_in_USD',df['purchase_value']/82.52)
df.select(['purchase_value','Value_in_USD']).show(2)

+--------------+-----------------+
|purchase_value|     Value_in_USD|
+--------------+-----------------+
|         65278|791.0567135239942|
|         96399|1168.189529810955|
+--------------+-----------------+
only showing top 2 rows



In [33]:
from pyspark.sql.functions import format_number

In [34]:
df.select(['purchase_value'
                           ,format_number('Value_in_USD', 2).alias('Value_in_USD')]).show(2)

+--------------+------------+
|purchase_value|Value_in_USD|
+--------------+------------+
|         65278|      791.06|
|         96399|    1,168.19|
+--------------+------------+
only showing top 2 rows



#### Add column with incremental values from 1 onwards with range(start, stop, step)

In [35]:
from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumn('counter', monotonically_increasing_id())
df.show(2, truncate = False)

+-------+----------------+----------------+--------------+-------------+------+-------+---+-------------+-----+---------------+----------+-----------------+-------------------------+-----------------+-------+
|user_id|signup_time     |purchase_time   |purchase_value|device_id    |source|browser|sex|ip_address   |class|category       |dob       |name             |email                    |Value_in_USD     |counter|
+-------+----------------+----------------+--------------+-------------+------+-------+---+-------------+-----+---------------+----------+-----------------+-------------------------+-----------------+-------+
|22058  |24-02-2015 22:55|18-04-2015 02:47|65278         |QVPSPJUOCKZAR|SEO   |Chrome |M  |7.327583688E8|0    |home_essentials|22-02-1976|Aaron P Maashoh  |aaron.maashoh@yahoo.com  |791.0567135239942|0      |
|333320 |07-06-2015 20:39|08-06-2015 01:38|96399         |EOGFQPIZPYXFZ|Ads   |Chrome |F  |3.503113879E8|0    |apparels       |02-01-1962|Rick D Rothackerj|rick_rot

# Module 6.1 :Text functions

#### use length function -- to finds the length of a character string

In [36]:
#from pyspark.sql.functions import length
#or better 
from pyspark.sql.functions import *
df = df.withColumn('email_length',length('email'))
df.select(['email','email_length']).show(2)

+--------------------+------------+
|               email|email_length|
+--------------------+------------+
|aaron.maashoh@yah...|          23|
|rick_rothackerj@y...|          25|
+--------------------+------------+
only showing top 2 rows



#### How to change the case of a string

In [37]:
df = df.withColumn('email_all_caps',upper ('email'))
df = df.withColumn('email_all_lower',lower('email_all_caps'))
df = df.withColumn('email_propcase',initcap('email_all_caps'))

df.select(['email','email_all_caps','email_all_lower','email_propcase']).show(2, truncate = False)

+-------------------------+-------------------------+-------------------------+-------------------------+
|email                    |email_all_caps           |email_all_lower          |email_propcase           |
+-------------------------+-------------------------+-------------------------+-------------------------+
|aaron.maashoh@yahoo.com  |AARON.MAASHOH@YAHOO.COM  |aaron.maashoh@yahoo.com  |Aaron.maashoh@yahoo.com  |
|rick_rothackerj@yahoo.com|RICK_ROTHACKERJ@YAHOO.COM|rick_rothackerj@yahoo.com|Rick_rothackerj@yahoo.com|
+-------------------------+-------------------------+-------------------------+-------------------------+
only showing top 2 rows



#### instr  and locate functions - finds the first occurenece position of a character in a column of strings

In [38]:
df = df.withColumn('Position_of_@',instr('email_propcase','@'))
df = df.withColumn('Position_of_a',instr('email_propcase','a'))
df = df.withColumn('Position_of_A_CAPS',instr('email_propcase','A'))

df.select(['email_propcase','Position_of_@','Position_of_a','Position_of_A_CAPS']).show(2,truncate = False)

+-------------------------+-------------+-------------+------------------+
|email_propcase           |Position_of_@|Position_of_a|Position_of_A_CAPS|
+-------------------------+-------------+-------------+------------------+
|Aaron.maashoh@yahoo.com  |14           |2            |1                 |
|Rick_rothackerj@yahoo.com|16           |10           |0                 |
+-------------------------+-------------+-------------+------------------+
only showing top 2 rows



In [39]:
df = df.withColumn('Location_of_@',locate('@','email_propcase'))
df = df.withColumn('Location_of_a',locate('a','email_propcase'))
df = df.withColumn('Location_of_A_CAPS',locate('A','email_propcase'))
df.select(['email_propcase','Location_of_@','Location_of_a','Location_of_A_CAPS']).show(2,truncate = False)

+-------------------------+-------------+-------------+------------------+
|email_propcase           |Location_of_@|Location_of_a|Location_of_A_CAPS|
+-------------------------+-------------+-------------+------------------+
|Aaron.maashoh@yahoo.com  |14           |2            |1                 |
|Rick_rothackerj@yahoo.com|16           |10           |0                 |
+-------------------------+-------------+-------------+------------------+
only showing top 2 rows



#### How to split a string on the basis of a delimiter, example portion in email before and after @

In [40]:
df = df.withColumn('email_part1',substring_index(df.email, '@', 1))
df = df.withColumn('email_part2',substring_index(df.email, '@', -1))
df.select(['email','email_part1','email_part2']).show(2,truncate = False)

+-------------------------+---------------+-----------+
|email                    |email_part1    |email_part2|
+-------------------------+---------------+-----------+
|aaron.maashoh@yahoo.com  |aaron.maashoh  |yahoo.com  |
|rick_rothackerj@yahoo.com|rick_rothackerj|yahoo.com  |
+-------------------------+---------------+-----------+
only showing top 2 rows



#### Another way is using a split function

In [41]:
df.select('email',split(df.email,"@").getItem(0).alias("email_part1")
                 ,split(df.email,"@").getItem(1).alias("email_part2")).show(5,truncate = False)

+--------------------------+---------------+-----------+
|email                     |email_part1    |email_part2|
+--------------------------+---------------+-----------+
|aaron.maashoh@yahoo.com   |aaron.maashoh  |yahoo.com  |
|rick_rothackerj@yahoo.com |rick_rothackerj|yahoo.com  |
|harriet.mcleodd@gmail.com |harriet.mcleodd|gmail.com  |
|sinead.carews@gmail.com   |sinead.carews  |gmail.com  |
|laurence.frosty@tntech.edu|laurence.frosty|tntech.edu |
+--------------------------+---------------+-----------+
only showing top 5 rows



#### let's now cponcatename the strings

In [42]:
df.select('email_part1','email_part2',concat(df.email_part1,lit("@"),df.email_part2).alias("FULL_EMAIL")).show(2,truncate = False)

+---------------+-----------+-------------------------+
|email_part1    |email_part2|FULL_EMAIL               |
+---------------+-----------+-------------------------+
|aaron.maashoh  |yahoo.com  |aaron.maashoh@yahoo.com  |
|rick_rothackerj|yahoo.com  |rick_rothackerj@yahoo.com|
+---------------+-----------+-------------------------+
only showing top 2 rows



### In the above example, we also used function "lit", which is used to create a column with one static value in a column, lit is abbreviation of literal. 
#### Following method is close of "CATX" function in SAS

In [43]:
df.select('email_part1','email_part2',concat_ws('@',df.email_part1,df.email_part2).alias("FULL_EMAIL")).show(2,truncate = False)

+---------------+-----------+-------------------------+
|email_part1    |email_part2|FULL_EMAIL               |
+---------------+-----------+-------------------------+
|aaron.maashoh  |yahoo.com  |aaron.maashoh@yahoo.com  |
|rick_rothackerj|yahoo.com  |rick_rothackerj@yahoo.com|
+---------------+-----------+-------------------------+
only showing top 2 rows



#### Using a substr function in Pyspark

In [44]:
df = df.withColumn('email_first_3',df.email.substr(1,3))
df.select(['email','email_first_3']).show(2,truncate = False)

+-------------------------+-------------+
|email                    |email_first_3|
+-------------------------+-------------+
|aaron.maashoh@yahoo.com  |aar          |
|rick_rothackerj@yahoo.com|ric          |
+-------------------------+-------------+
only showing top 2 rows



## Another method to use these functions is with a 'select' statement, which is quite simple and SQL style

In [45]:
df1 = df.select('email',substring(df.email, 1, 3).alias('email_first_3'))
df1.show(5,truncate = False)

+--------------------------+-------------+
|email                     |email_first_3|
+--------------------------+-------------+
|aaron.maashoh@yahoo.com   |aar          |
|rick_rothackerj@yahoo.com |ric          |
|harriet.mcleodd@gmail.com |har          |
|sinead.carews@gmail.com   |sin          |
|laurence.frosty@tntech.edu|lau          |
+--------------------------+-------------+
only showing top 5 rows



#### Translate function works same as it works in SAS

In [46]:
df1 = df.select('email',translate(df.email, 'a', 'X').alias('email_translated'))
df1.show(5,truncate = False)

+--------------------------+--------------------------+
|email                     |email_translated          |
+--------------------------+--------------------------+
|aaron.maashoh@yahoo.com   |XXron.mXXshoh@yXhoo.com   |
|rick_rothackerj@yahoo.com |rick_rothXckerj@yXhoo.com |
|harriet.mcleodd@gmail.com |hXrriet.mcleodd@gmXil.com |
|sinead.carews@gmail.com   |sineXd.cXrews@gmXil.com   |
|laurence.frosty@tntech.edu|lXurence.frosty@tntech.edu|
+--------------------------+--------------------------+
only showing top 5 rows



#### Following is the SAS Tranwrd equivalent, but with regular expression it becomes much more versatile.

In [47]:
df1 = df.select('email',regexp_replace(df.email, ('@\w+\.\w+'), '@some_domain').alias('regex_replaced'))
df1.show(5,truncate = False)

+--------------------------+---------------------------+
|email                     |regex_replaced             |
+--------------------------+---------------------------+
|aaron.maashoh@yahoo.com   |aaron.maashoh@some_domain  |
|rick_rothackerj@yahoo.com |rick_rothackerj@some_domain|
|harriet.mcleodd@gmail.com |harriet.mcleodd@some_domain|
|sinead.carews@gmail.com   |sinead.carews@some_domain  |
|laurence.frosty@tntech.edu|laurence.frosty@some_domain|
+--------------------------+---------------------------+
only showing top 5 rows



In [48]:
import pyspark.pandas as ps
# We can convert a pyspark data to Pandas dataframe, and vice versa as well
PYARROW_IGNORE_TIMEZONE = 1
result_pdf = df1.toPandas()



In [49]:
result_pdf.head(5)

Unnamed: 0,email,regex_replaced
0,aaron.maashoh@yahoo.com,aaron.maashoh@some_domain
1,rick_rothackerj@yahoo.com,rick_rothackerj@some_domain
2,harriet.mcleodd@gmail.com,harriet.mcleodd@some_domain
3,sinead.carews@gmail.com,sinead.carews@some_domain
4,laurence.frosty@tntech.edu,laurence.frosty@some_domain


# Module 7 : Regular Expressions (popular by name "regex")

In [50]:
import pandas as pd
data = {'Customer_id': ['cus_01', 'cus_02', 'cus_03', 'cus_04'], 
        'email': ['name.surname@gmail.com','anonymous123@yahoo.co.ukk','anonymous123@.com','a..@gmail.com']
        } 
pd_df = pd.DataFrame(data)
pd_df

Unnamed: 0,Customer_id,email
0,cus_01,name.surname@gmail.com
1,cus_02,anonymous123@yahoo.co.ukk
2,cus_03,anonymous123@.com
3,cus_04,a..@gmail.com


In [51]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
df_py = spark.createDataFrame(pd_df)

In [52]:
df_py.show(truncate= False)

+-----------+-------------------------+
|Customer_id|email                    |
+-----------+-------------------------+
|cus_01     |name.surname@gmail.com   |
|cus_02     |anonymous123@yahoo.co.ukk|
|cus_03     |anonymous123@.com        |
|cus_04     |a..@gmail.com            |
+-----------+-------------------------+



In [53]:
df_py.select('Customer_id', 'email', regexp_extract(df_py.email, '(\w+[._-]*\w*)', 1).alias('first_portion')).show(truncate= False)

+-----------+-------------------------+-------------+
|Customer_id|email                    |first_portion|
+-----------+-------------------------+-------------+
|cus_01     |name.surname@gmail.com   |name.surname |
|cus_02     |anonymous123@yahoo.co.ukk|anonymous123 |
|cus_03     |anonymous123@.com        |anonymous123 |
|cus_04     |a..@gmail.com            |a..          |
+-----------+-------------------------+-------------+



In [54]:
df_py.select('Customer_id', 'email', regexp_extract(df_py.email, '@(\w+)',1).alias('second_portion')).show(truncate= False)

+-----------+-------------------------+--------------+
|Customer_id|email                    |second_portion|
+-----------+-------------------------+--------------+
|cus_01     |name.surname@gmail.com   |gmail         |
|cus_02     |anonymous123@yahoo.co.ukk|yahoo         |
|cus_03     |anonymous123@.com        |              |
|cus_04     |a..@gmail.com            |gmail         |
+-----------+-------------------------+--------------+



# Module 8 : Handling the Date Time

In [55]:
df_dt = df.select ('signup_time','purchase_time',current_date().alias("todays_date"),current_timestamp().alias("whats_time_now"))

In [56]:
df_dt.dtypes

[('signup_time', 'string'),
 ('purchase_time', 'string'),
 ('todays_date', 'date'),
 ('whats_time_now', 'timestamp')]

In [57]:
df_dt.show(4,truncate= False)

+----------------+----------------+-----------+--------------------------+
|signup_time     |purchase_time   |todays_date|whats_time_now            |
+----------------+----------------+-----------+--------------------------+
|24-02-2015 22:55|18-04-2015 02:47|2023-05-09 |2023-05-09 07:49:19.719587|
|07-06-2015 20:39|08-06-2015 01:38|2023-05-09 |2023-05-09 07:49:19.719587|
|01-01-2015 18:52|01-01-2015 18:52|2023-05-09 |2023-05-09 07:49:19.719587|
|28-04-2015 21:13|04-05-2015 13:54|2023-05-09 |2023-05-09 07:49:19.719587|
+----------------+----------------+-----------+--------------------------+
only showing top 4 rows



In [58]:
df_dt1 = df_dt.select('*', to_timestamp('signup_time','M/d/y H:m').alias('converted_date'))

In [None]:
df_dt1.show(2,truncate = False)

+---------------+--------------+-----------+-----------------------+-------------------+
|signup_time    |purchase_time |todays_date|whats_time_now         |converted_date     |
+---------------+--------------+-----------+-----------------------+-------------------+
|2/24/2015 22:55|4/18/2015 2:47|2023-03-24 |2023-03-24 13:07:21.187|2015-02-24 22:55:00|
|6/7/2015 20:39 |6/8/2015 1:38 |2023-03-24 |2023-03-24 13:07:21.187|2015-06-07 20:39:00|
+---------------+--------------+-----------+-----------------------+-------------------+
only showing top 2 rows



### get date, time, date elements, time elements from date

In [59]:
df_dt1.select('converted_date'
              , to_date('converted_date').alias('datepart') 
              , year('converted_date').alias('year') 
              , month('converted_date').alias('month') 
              , dayofmonth('converted_date').alias('day') 
              , dayofweek('converted_date').alias("dayofweek")
              , hour('converted_date').alias('hour') 
              , minute('converted_date').alias('min') 
              , second('converted_date').alias('min') 
             ).show(2)

+--------------+--------+----+-----+----+---------+----+----+----+
|converted_date|datepart|year|month| day|dayofweek|hour| min| min|
+--------------+--------+----+-----+----+---------+----+----+----+
|          null|    null|null| null|null|     null|null|null|null|
|          null|    null|null| null|null|     null|null|null|null|
+--------------+--------+----+-----+----+---------+----+----+----+
only showing top 2 rows



### We can move the date - intnx funcationality in SAS or Trunc in Oracle SQL?

In [60]:
df_dt1.select('converted_date'
              , trunc(add_months('converted_date',-1),'Month').alias('PMB')
              , (trunc('converted_date','Month')-1).alias('PME') 
              , trunc((trunc('converted_date','year')-1),'year').alias('PYB')
              , (trunc('converted_date','year')-1).alias('PYE')
              , trunc((trunc('converted_date','quarter')-1),'quarter').alias('PQB')
              , (trunc('converted_date','quarter')-1).alias('PQE')

             ).show(2)

+--------------+----+----+----+----+----+----+
|converted_date| PMB| PME| PYB| PYE| PQB| PQE|
+--------------+----+----+----+----+----+----+
|          null|null|null|null|null|null|null|
|          null|null|null|null|null|null|null|
+--------------+----+----+----+----+----+----+
only showing top 2 rows



### Calculate gap between two dates

In [61]:
df_dt1.select('converted_date'
             , datediff(current_date(),'converted_date').alias('date_difference')  #simply give number of days in between (A-B)
             , months_between(current_date(),'converted_date').alias('months_between')  #exact months, not just days/30
             ).show(2)

+--------------+---------------+--------------+
|converted_date|date_difference|months_between|
+--------------+---------------+--------------+
|          null|           null|          null|
|          null|           null|          null|
+--------------+---------------+--------------+
only showing top 2 rows



### Let's convert a date to charcter and back to date

In [62]:
df_dt2 = df_dt1.select('converted_date'
 
              , concat(year('converted_date') ,lit('-'),  month('converted_date') ,lit('-'), dayofmonth('converted_date')
             ).alias('char_date'))

In [63]:
df_dt2.show(2)

+--------------+---------+
|converted_date|char_date|
+--------------+---------+
|          null|     null|
|          null|     null|
+--------------+---------+
only showing top 2 rows



In [64]:
df_dt2.dtypes

[('converted_date', 'timestamp'), ('char_date', 'string')]

In [65]:
df_dt2 = df_dt2.select('*'
                        ,to_date('char_date').alias('back_to_date'))

In [66]:
df_dt2.show(2)

+--------------+---------+------------+
|converted_date|char_date|back_to_date|
+--------------+---------+------------+
|          null|     null|        null|
|          null|     null|        null|
+--------------+---------+------------+
only showing top 2 rows



In [67]:
df_dt2.dtypes

[('converted_date', 'timestamp'),
 ('char_date', 'string'),
 ('back_to_date', 'date')]

# Module 9 : Arithmetic Operations on PySpark DataFrame

In [68]:
sample_1 = df.select('user_id','browser','sex','class')

sample_1 = sample_1.withColumn('class_plus_10',col('class') + 10)
sample_1 = sample_1.withColumn('class_inverse',1 - col('class') )
sample_1 = sample_1.withColumn('class_multiplied_2',col('class') * 10)
sample_1 = sample_1.withColumn('class_divided',col('class') / 2)
sample_1 = sample_1.withColumn('mod_remainder',(col('class')+3) % 2)
sample_1 = sample_1.select('*',pow('class_plus_10',2).alias('powered'))

sample_1.show(5)

+-------+-------+---+-----+-------------+-------------+------------------+-------------+-------------+-------+
|user_id|browser|sex|class|class_plus_10|class_inverse|class_multiplied_2|class_divided|mod_remainder|powered|
+-------+-------+---+-----+-------------+-------------+------------------+-------------+-------------+-------+
|  22058| Chrome|  M|    0|           10|            1|                 0|          0.0|            1|  100.0|
| 333320| Chrome|  F|    0|           10|            1|                 0|          0.0|            1|  100.0|
|   1359|  Opera|  M|    1|           11|            0|                10|          0.5|            0|  121.0|
| 150084| Safari|  M|    0|           10|            1|                 0|          0.0|            1|  100.0|
| 221365| Safari|  M|    0|           10|            1|                 0|          0.0|            1|  100.0|
+-------+-------+---+-----+-------------+-------------+------------------+-------------+-------------+-------+
o

# Module 10 : GroupBy() Method for PySpark DataFrame

In [69]:
sample_1 = df.select('user_id','browser','sex','class')
sample_1.show(5)

+-------+-------+---+-----+
|user_id|browser|sex|class|
+-------+-------+---+-----+
|  22058| Chrome|  M|    0|
| 333320| Chrome|  F|    0|
|   1359|  Opera|  M|    1|
| 150084| Safari|  M|    0|
| 221365| Safari|  M|    0|
+-------+-------+---+-----+
only showing top 5 rows



In [70]:
summary_data  = sample_1.groupBy('browser','sex').sum('class')
summary_data.show()

+-------+----+----------+
|browser| sex|sum(class)|
+-------+----+----------+
|  Opera|   M|         2|
| Safari|   F|         4|
|FireFox|null|         1|
|  Opera|   F|         2|
| Chrome|   M|        23|
|FireFox|   M|        11|
| Chrome|   F|        11|
|     IE|null|         1|
| Chrome|null|         1|
|     IE|   F|        12|
|     IE|   M|        17|
|FireFox|   F|         3|
| Safari|   M|        13|
|  Opera|null|         0|
+-------+----+----------+



In [71]:
# here is a better way:
from pyspark.sql.functions import sum,avg,max,min,mean,count
summary_data  = sample_1.groupBy('browser','sex').agg(sum('class').alias('sum_class') \
                                                      , mean('class').alias('avg_class') \
                                                      , min('class').alias('min_class') \
                                                      , max('class').alias('max_class') \
                                                      , count('class').alias('count_class')
                                                      , countDistinct('class').alias('count_distinct')
                                                     )
summary_data.show()

+-------+----+---------+--------------------+---------+---------+-----------+--------------+
|browser| sex|sum_class|           avg_class|min_class|max_class|count_class|count_distinct|
+-------+----+---------+--------------------+---------+---------+-----------+--------------+
|  Opera|   M|        2| 0.18181818181818182|        0|        1|         11|             2|
| Safari|   F|        4|0.058823529411764705|        0|        1|         68|             2|
|FireFox|null|        1|                 0.2|        0|        1|          5|             2|
|  Opera|   F|        2|                0.25|        0|        1|          8|             2|
| Chrome|   M|       23| 0.10550458715596331|        0|        1|        218|             2|
|FireFox|   M|       11| 0.12359550561797752|        0|        1|         89|             2|
| Chrome|   F|       11|  0.0658682634730539|        0|        1|        167|             2|
|     IE|null|        1| 0.16666666666666666|        0|        1|     

# Module 11: Another way to use where condition, first method was explained in Module 5

In [72]:
summary_data.filter("sex == 'M'").show()

+-------+---+---------+-------------------+---------+---------+-----------+--------------+
|browser|sex|sum_class|          avg_class|min_class|max_class|count_class|count_distinct|
+-------+---+---------+-------------------+---------+---------+-----------+--------------+
|  Opera|  M|        2|0.18181818181818182|        0|        1|         11|             2|
| Chrome|  M|       23|0.10550458715596331|        0|        1|        218|             2|
|FireFox|  M|       11|0.12359550561797752|        0|        1|         89|             2|
|     IE|  M|       17|0.11805555555555555|        0|        1|        144|             2|
| Safari|  M|       13| 0.1326530612244898|        0|        1|         98|             2|
+-------+---+---------+-------------------+---------+---------+-----------+--------------+



In [73]:
summary_data.filter((summary_data.sex == "M") & (summary_data.browser == "Chrome") ).show()

+-------+---+---------+-------------------+---------+---------+-----------+--------------+
|browser|sex|sum_class|          avg_class|min_class|max_class|count_class|count_distinct|
+-------+---+---------+-------------------+---------+---------+-----------+--------------+
| Chrome|  M|       23|0.10550458715596331|        0|        1|        218|             2|
+-------+---+---------+-------------------+---------+---------+-----------+--------------+



In [74]:
summary_data.filter((summary_data.sex == "M") | (summary_data.browser == "Chrome") ).show()

+-------+----+---------+-------------------+---------+---------+-----------+--------------+
|browser| sex|sum_class|          avg_class|min_class|max_class|count_class|count_distinct|
+-------+----+---------+-------------------+---------+---------+-----------+--------------+
|  Opera|   M|        2|0.18181818181818182|        0|        1|         11|             2|
| Chrome|   M|       23|0.10550458715596331|        0|        1|        218|             2|
|FireFox|   M|       11|0.12359550561797752|        0|        1|         89|             2|
| Chrome|   F|       11| 0.0658682634730539|        0|        1|        167|             2|
| Chrome|null|        1|0.08333333333333333|        0|        1|         12|             2|
|     IE|   M|       17|0.11805555555555555|        0|        1|        144|             2|
| Safari|   M|       13| 0.1326530612244898|        0|        1|         98|             2|
+-------+----+---------+-------------------+---------+---------+-----------+----

In [75]:
summary_data.filter((summary_data.sex != "M")).show()

+-------+---+---------+--------------------+---------+---------+-----------+--------------+
|browser|sex|sum_class|           avg_class|min_class|max_class|count_class|count_distinct|
+-------+---+---------+--------------------+---------+---------+-----------+--------------+
| Safari|  F|        4|0.058823529411764705|        0|        1|         68|             2|
|  Opera|  F|        2|                0.25|        0|        1|          8|             2|
| Chrome|  F|       11|  0.0658682634730539|        0|        1|        167|             2|
|     IE|  F|       12| 0.10909090909090909|        0|        1|        110|             2|
|FireFox|  F|        3| 0.04918032786885246|        0|        1|         61|             2|
+-------+---+---------+--------------------+---------+---------+-----------+--------------+



In [76]:
summary_data.filter( "sex is NULL AND browser is not NULL").show()

+-------+----+---------+-------------------+---------+---------+-----------+--------------+
|browser| sex|sum_class|          avg_class|min_class|max_class|count_class|count_distinct|
+-------+----+---------+-------------------+---------+---------+-----------+--------------+
|FireFox|null|        1|                0.2|        0|        1|          5|             2|
|     IE|null|        1|0.16666666666666666|        0|        1|          6|             2|
| Chrome|null|        1|0.08333333333333333|        0|        1|         12|             2|
|  Opera|null|        0|                0.0|        0|        0|          2|             1|
+-------+----+---------+-------------------+---------+---------+-----------+--------------+



In [77]:
summary_data.filter( "sex is NULL OR browser is not NULL").show()

+-------+----+---------+--------------------+---------+---------+-----------+--------------+
|browser| sex|sum_class|           avg_class|min_class|max_class|count_class|count_distinct|
+-------+----+---------+--------------------+---------+---------+-----------+--------------+
|  Opera|   M|        2| 0.18181818181818182|        0|        1|         11|             2|
| Safari|   F|        4|0.058823529411764705|        0|        1|         68|             2|
|FireFox|null|        1|                 0.2|        0|        1|          5|             2|
|  Opera|   F|        2|                0.25|        0|        1|          8|             2|
| Chrome|   M|       23| 0.10550458715596331|        0|        1|        218|             2|
|FireFox|   M|       11| 0.12359550561797752|        0|        1|         89|             2|
| Chrome|   F|       11|  0.0658682634730539|        0|        1|        167|             2|
|     IE|null|        1| 0.16666666666666666|        0|        1|     

In [78]:
list_1 = ["Chrome","FireFox"]
summary_data.filter(summary_data.browser.isin(list_1)).show()

+-------+----+---------+-------------------+---------+---------+-----------+--------------+
|browser| sex|sum_class|          avg_class|min_class|max_class|count_class|count_distinct|
+-------+----+---------+-------------------+---------+---------+-----------+--------------+
|FireFox|null|        1|                0.2|        0|        1|          5|             2|
| Chrome|   M|       23|0.10550458715596331|        0|        1|        218|             2|
|FireFox|   M|       11|0.12359550561797752|        0|        1|         89|             2|
| Chrome|   F|       11| 0.0658682634730539|        0|        1|        167|             2|
| Chrome|null|        1|0.08333333333333333|        0|        1|         12|             2|
|FireFox|   F|        3|0.04918032786885246|        0|        1|         61|             2|
+-------+----+---------+-------------------+---------+---------+-----------+--------------+



In [79]:
list_1 = ["Chrome","FireFox"]
summary_data.filter(summary_data.browser.isin(list_1) == False).show()

+-------+----+---------+--------------------+---------+---------+-----------+--------------+
|browser| sex|sum_class|           avg_class|min_class|max_class|count_class|count_distinct|
+-------+----+---------+--------------------+---------+---------+-----------+--------------+
|  Opera|   M|        2| 0.18181818181818182|        0|        1|         11|             2|
| Safari|   F|        4|0.058823529411764705|        0|        1|         68|             2|
|  Opera|   F|        2|                0.25|        0|        1|          8|             2|
|     IE|null|        1| 0.16666666666666666|        0|        1|          6|             2|
|     IE|   F|       12| 0.10909090909090909|        0|        1|        110|             2|
|     IE|   M|       17| 0.11805555555555555|        0|        1|        144|             2|
| Safari|   M|       13|  0.1326530612244898|        0|        1|         98|             2|
|  Opera|null|        0|                 0.0|        0|        0|     

In [80]:
list_1 = ["Chrome","FireFox"]
summary_data.filter(~summary_data.browser.isin(list_1)).show()

+-------+----+---------+--------------------+---------+---------+-----------+--------------+
|browser| sex|sum_class|           avg_class|min_class|max_class|count_class|count_distinct|
+-------+----+---------+--------------------+---------+---------+-----------+--------------+
|  Opera|   M|        2| 0.18181818181818182|        0|        1|         11|             2|
| Safari|   F|        4|0.058823529411764705|        0|        1|         68|             2|
|  Opera|   F|        2|                0.25|        0|        1|          8|             2|
|     IE|null|        1| 0.16666666666666666|        0|        1|          6|             2|
|     IE|   F|       12| 0.10909090909090909|        0|        1|        110|             2|
|     IE|   M|       17| 0.11805555555555555|        0|        1|        144|             2|
| Safari|   M|       13|  0.1326530612244898|        0|        1|         98|             2|
|  Opera|null|        0|                 0.0|        0|        0|     

In [81]:
summary_data.filter(summary_data.browser.startswith("O")).show()

+-------+----+---------+-------------------+---------+---------+-----------+--------------+
|browser| sex|sum_class|          avg_class|min_class|max_class|count_class|count_distinct|
+-------+----+---------+-------------------+---------+---------+-----------+--------------+
|  Opera|   M|        2|0.18181818181818182|        0|        1|         11|             2|
|  Opera|   F|        2|               0.25|        0|        1|          8|             2|
|  Opera|null|        0|                0.0|        0|        0|          2|             1|
+-------+----+---------+-------------------+---------+---------+-----------+--------------+



In [82]:
summary_data.filter(summary_data.browser.endswith("e")).show()

+-------+----+---------+-------------------+---------+---------+-----------+--------------+
|browser| sex|sum_class|          avg_class|min_class|max_class|count_class|count_distinct|
+-------+----+---------+-------------------+---------+---------+-----------+--------------+
| Chrome|   M|       23|0.10550458715596331|        0|        1|        218|             2|
| Chrome|   F|       11| 0.0658682634730539|        0|        1|        167|             2|
| Chrome|null|        1|0.08333333333333333|        0|        1|         12|             2|
+-------+----+---------+-------------------+---------+---------+-----------+--------------+



In [83]:
summary_data.filter(summary_data.browser.contains("hr")).show()

+-------+----+---------+-------------------+---------+---------+-----------+--------------+
|browser| sex|sum_class|          avg_class|min_class|max_class|count_class|count_distinct|
+-------+----+---------+-------------------+---------+---------+-----------+--------------+
| Chrome|   M|       23|0.10550458715596331|        0|        1|        218|             2|
| Chrome|   F|       11| 0.0658682634730539|        0|        1|        167|             2|
| Chrome|null|        1|0.08333333333333333|        0|        1|         12|             2|
+-------+----+---------+-------------------+---------+---------+-----------+--------------+



In [84]:
summary_data.filter(summary_data.browser.like("%hr%")).show()

+-------+----+---------+-------------------+---------+---------+-----------+--------------+
|browser| sex|sum_class|          avg_class|min_class|max_class|count_class|count_distinct|
+-------+----+---------+-------------------+---------+---------+-----------+--------------+
| Chrome|   M|       23|0.10550458715596331|        0|        1|        218|             2|
| Chrome|   F|       11| 0.0658682634730539|        0|        1|        167|             2|
| Chrome|null|        1|0.08333333333333333|        0|        1|         12|             2|
+-------+----+---------+-------------------+---------+---------+-----------+--------------+



In [85]:
summary_data.filter(summary_data.browser.rlike("\w+Fox")).show()

+-------+----+---------+-------------------+---------+---------+-----------+--------------+
|browser| sex|sum_class|          avg_class|min_class|max_class|count_class|count_distinct|
+-------+----+---------+-------------------+---------+---------+-----------+--------------+
|FireFox|null|        1|                0.2|        0|        1|          5|             2|
|FireFox|   M|       11|0.12359550561797752|        0|        1|         89|             2|
|FireFox|   F|        3|0.04918032786885246|        0|        1|         61|             2|
+-------+----+---------+-------------------+---------+---------+-----------+--------------+



# Module 12 : WHEN function in PySpark

In [86]:
summary_data = summary_data.withColumn('gender', when(summary_data.sex == "M", "Male")
                               .when(summary_data.sex == "F", "Female")
                               .when(summary_data.sex.isNull(), "Other")
                               .otherwise("check") )
summary_data.show()

+-------+----+---------+--------------------+---------+---------+-----------+--------------+------+
|browser| sex|sum_class|           avg_class|min_class|max_class|count_class|count_distinct|gender|
+-------+----+---------+--------------------+---------+---------+-----------+--------------+------+
|  Opera|   M|        2| 0.18181818181818182|        0|        1|         11|             2|  Male|
| Safari|   F|        4|0.058823529411764705|        0|        1|         68|             2|Female|
|FireFox|null|        1|                 0.2|        0|        1|          5|             2| Other|
|  Opera|   F|        2|                0.25|        0|        1|          8|             2|Female|
| Chrome|   M|       23| 0.10550458715596331|        0|        1|        218|             2|  Male|
|FireFox|   M|       11| 0.12359550561797752|        0|        1|         89|             2|  Male|
| Chrome|   F|       11|  0.0658682634730539|        0|        1|        167|             2|Female|


# Module 13 : rank(), dense_rank(), row_number(), percent_rank() in PySpark

In [88]:
data_rank= spark.read.option('header','true').csv('Rank_data.csv')
data_rank.show()

+------+---------+-----+
|  Name|  Subject|Marks|
+------+---------+-----+
|   Sam|  Physics|   76|
|   Sam|Chemistry|   89|
|   Sam|    Maths|   85|
| Robby|  Physics|   89|
| Robby|Chemistry|   91|
| Robby|    Maths|   87|
|Merlin|  Physics|   84|
|Merlin|Chemistry|   95|
|Merlin|    Maths|   90|
+------+---------+-----+



In [89]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

windowSpec  = Window.partitionBy("Subject").orderBy(col("Marks").desc())
data_rank = data_rank.withColumn('Row_Number',row_number().over(windowSpec))
data_rank.show()

+------+---------+-----+----------+
|  Name|  Subject|Marks|Row_Number|
+------+---------+-----+----------+
|Merlin|Chemistry|   95|         1|
| Robby|Chemistry|   91|         2|
|   Sam|Chemistry|   89|         3|
|Merlin|    Maths|   90|         1|
| Robby|    Maths|   87|         2|
|   Sam|    Maths|   85|         3|
| Robby|  Physics|   89|         1|
|Merlin|  Physics|   84|         2|
|   Sam|  Physics|   76|         3|
+------+---------+-----+----------+



In [90]:
data_rank = data_rank.withColumn('RANK',rank().over(windowSpec))
data_rank.show()

+------+---------+-----+----------+----+
|  Name|  Subject|Marks|Row_Number|RANK|
+------+---------+-----+----------+----+
|Merlin|Chemistry|   95|         1|   1|
| Robby|Chemistry|   91|         2|   2|
|   Sam|Chemistry|   89|         3|   3|
|Merlin|    Maths|   90|         1|   1|
| Robby|    Maths|   87|         2|   2|
|   Sam|    Maths|   85|         3|   3|
| Robby|  Physics|   89|         1|   1|
|Merlin|  Physics|   84|         2|   2|
|   Sam|  Physics|   76|         3|   3|
+------+---------+-----+----------+----+



In [91]:
data_rank = data_rank.withColumn('DENSE_RANK',dense_rank().over(windowSpec))
data_rank.show()

+------+---------+-----+----------+----+----------+
|  Name|  Subject|Marks|Row_Number|RANK|DENSE_RANK|
+------+---------+-----+----------+----+----------+
|Merlin|Chemistry|   95|         1|   1|         1|
| Robby|Chemistry|   91|         2|   2|         2|
|   Sam|Chemistry|   89|         3|   3|         3|
|Merlin|    Maths|   90|         1|   1|         1|
| Robby|    Maths|   87|         2|   2|         2|
|   Sam|    Maths|   85|         3|   3|         3|
| Robby|  Physics|   89|         1|   1|         1|
|Merlin|  Physics|   84|         2|   2|         2|
|   Sam|  Physics|   76|         3|   3|         3|
+------+---------+-----+----------+----+----------+



In [92]:
data_rank = data_rank.withColumn('PERCENT_RANK',percent_rank().over(windowSpec))
data_rank.show()

+------+---------+-----+----------+----+----------+------------+
|  Name|  Subject|Marks|Row_Number|RANK|DENSE_RANK|PERCENT_RANK|
+------+---------+-----+----------+----+----------+------------+
|Merlin|Chemistry|   95|         1|   1|         1|         0.0|
| Robby|Chemistry|   91|         2|   2|         2|         0.5|
|   Sam|Chemistry|   89|         3|   3|         3|         1.0|
|Merlin|    Maths|   90|         1|   1|         1|         0.0|
| Robby|    Maths|   87|         2|   2|         2|         0.5|
|   Sam|    Maths|   85|         3|   3|         3|         1.0|
| Robby|  Physics|   89|         1|   1|         1|         0.0|
|Merlin|  Physics|   84|         2|   2|         2|         0.5|
|   Sam|  Physics|   76|         3|   3|         3|         1.0|
+------+---------+-----+----------+----+----------+------------+



In [93]:
data_rank = data_rank.withColumn('NTILE',ntile(2).over(windowSpec))
data_rank.show()

+------+---------+-----+----------+----+----------+------------+-----+
|  Name|  Subject|Marks|Row_Number|RANK|DENSE_RANK|PERCENT_RANK|NTILE|
+------+---------+-----+----------+----+----------+------------+-----+
|Merlin|Chemistry|   95|         1|   1|         1|         0.0|    1|
| Robby|Chemistry|   91|         2|   2|         2|         0.5|    1|
|   Sam|Chemistry|   89|         3|   3|         3|         1.0|    2|
|Merlin|    Maths|   90|         1|   1|         1|         0.0|    1|
| Robby|    Maths|   87|         2|   2|         2|         0.5|    1|
|   Sam|    Maths|   85|         3|   3|         3|         1.0|    2|
| Robby|  Physics|   89|         1|   1|         1|         0.0|    1|
|Merlin|  Physics|   84|         2|   2|         2|         0.5|    1|
|   Sam|  Physics|   76|         3|   3|         3|         1.0|    2|
+------+---------+-----+----------+----+----------+------------+-----+



In [94]:
data_rank = data_rank.withColumn('CUME_DIST',cume_dist().over(windowSpec))
data_rank.show()

+------+---------+-----+----------+----+----------+------------+-----+------------------+
|  Name|  Subject|Marks|Row_Number|RANK|DENSE_RANK|PERCENT_RANK|NTILE|         CUME_DIST|
+------+---------+-----+----------+----+----------+------------+-----+------------------+
|Merlin|Chemistry|   95|         1|   1|         1|         0.0|    1|0.3333333333333333|
| Robby|Chemistry|   91|         2|   2|         2|         0.5|    1|0.6666666666666666|
|   Sam|Chemistry|   89|         3|   3|         3|         1.0|    2|               1.0|
|Merlin|    Maths|   90|         1|   1|         1|         0.0|    1|0.3333333333333333|
| Robby|    Maths|   87|         2|   2|         2|         0.5|    1|0.6666666666666666|
|   Sam|    Maths|   85|         3|   3|         3|         1.0|    2|               1.0|
| Robby|  Physics|   89|         1|   1|         1|         0.0|    1|0.3333333333333333|
|Merlin|  Physics|   84|         2|   2|         2|         0.5|    1|0.6666666666666666|
|   Sam|  

# Module 14 : Data management using PySpark

### 1. Merging Data

#### Merging data involves combining two or more data sets into a single data set based on common columns or indexes.

In [95]:
import pandas as pd
# create two data frames
df1 = pd.DataFrame({'id': ['1001', '1002', '1003'], 'name':   ['abc', 'mno', 'opq']})
df2 = pd.DataFrame({'id': ['1001', '1002', '1004'], 'salary': [1100.25, 2200.50, 3300.20]})

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
df_py1 = spark.createDataFrame(df1)
df_py2 = spark.createDataFrame(df2)

In [96]:
df_py1.show()

+----+----+
|  id|name|
+----+----+
|1001| abc|
|1002| mno|
|1003| opq|
+----+----+



In [97]:
df_py2.show()

+----+-------+
|  id| salary|
+----+-------+
|1001|1100.25|
|1002| 2200.5|
|1004| 3300.2|
+----+-------+



In [98]:
inner_data = df_py1.join(df_py2,df_py1.id ==  df_py2.id,"inner")
inner_data.show()

+----+----+----+-------+
|  id|name|  id| salary|
+----+----+----+-------+
|1001| abc|1001|1100.25|
|1002| mno|1002| 2200.5|
+----+----+----+-------+



In [99]:
left_data = df_py1.join(df_py2,df_py1.id ==  df_py2.id,"left")
left_data.show()

+----+----+----+-------+
|  id|name|  id| salary|
+----+----+----+-------+
|1001| abc|1001|1100.25|
|1002| mno|1002| 2200.5|
|1003| opq|null|   null|
+----+----+----+-------+



In [100]:
left_data = df_py1.join(df_py2,df_py1.id ==  df_py2.id,"full")
left_data.show()

+----+----+----+-------+
|  id|name|  id| salary|
+----+----+----+-------+
|1001| abc|1001|1100.25|
|1002| mno|1002| 2200.5|
|1003| opq|null|   null|
|null|null|1004| 3300.2|
+----+----+----+-------+



### Simpler way is to use SQL Join to write complex joins

#### It has been my experience that joining is most vulnerable data operation, I have seen that most of the data goes wrong while joining multiple data tables.

In [101]:
#First datafrrames should be registed to SQL
df_py1.createOrReplaceTempView("SQL_1")
df_py2.createOrReplaceTempView("SQL_2")

#Now data tables can be joined usng classical SQL queries:
joined_data1 = spark.sql("select coalesce(a.id, b.id) as ID \
        , a.name \
        , b.salary \
        from SQL_1 a \
        full join SQL_2 b \
        on a.id == b.id" \
         )

In [102]:
joined_data1.show()

+----+----+-------+
|  ID|name| salary|
+----+----+-------+
|1001| abc|1100.25|
|1002| mno| 2200.5|
|1003| opq|   null|
|1004|null| 3300.2|
+----+----+-------+



### 2. Appending Data

#### Appending means vertical stacking of data below each other

In [103]:
import pandas as pd
df1 = pd.DataFrame({'col1': [1, 2, 3, 4], 'col2': ['1', '2', '3', '4']})
df2 = pd.DataFrame({'col1': [5, 6, 7], 'col3': [1005, 1006, 1007]})

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
df_py1 = spark.createDataFrame(df1)
df_py2 = spark.createDataFrame(df2)

In [104]:
df_py1.show()

+----+----+
|col1|col2|
+----+----+
|   1|   1|
|   2|   2|
|   3|   3|
|   4|   4|
+----+----+



In [105]:
df_py2.show()

+----+----+
|col1|col3|
+----+----+
|   5|1005|
|   6|1006|
|   7|1007|
+----+----+



In [106]:
union_DF = df_py2.union(df_py1)
union_DF.show()

+----+----+
|col1|col3|
+----+----+
|   5|1005|
|   6|1006|
|   7|1007|
|   1|   1|
|   2|   2|
|   3|   3|
|   4|   4|
+----+----+



In [107]:
union_DF.dtypes

[('col1', 'bigint'), ('col3', 'string')]

In [108]:
union_DF = df_py1.union(df_py2)
union_DF.show()

+----+----+
|col1|col2|
+----+----+
|   1|   1|
|   2|   2|
|   3|   3|
|   4|   4|
|   5|1005|
|   6|1006|
|   7|1007|
+----+----+



In [109]:
union_DF.dtypes

[('col1', 'bigint'), ('col2', 'string')]

### Hence, we can assume that it is a SQL vector union, where name of the variable doesn't matter

# Module 15 : Let's revise - how to write SQL Qery in pyspark

In [110]:
# First we need to register DataFrame as a SQL temporary view
sample_1 = df.select('user_id','browser','sex','class')
sample_1.createOrReplaceTempView("sample_sql")

# You can write query in multiple lines, using a \, which defines End of the Line
new_data = spark.sql(
          "SELECT sex \
          , class \
          , class+10 as class_plus_10 \
          , class * 2 as class_multiplied_2 \
          , 1- class  as class_inversed \
          , class/2  as class_divided \
          , pow(class+2,  2)  as class_power \
          ,  (class+3) % 2 as mod_class \
          FROM sample_sql"
          
         )

In [111]:
new_data.show(5)

+---+-----+-------------+------------------+--------------+-------------+-----------+---------+
|sex|class|class_plus_10|class_multiplied_2|class_inversed|class_divided|class_power|mod_class|
+---+-----+-------------+------------------+--------------+-------------+-----------+---------+
|  M|    0|           10|                 0|             1|          0.0|        4.0|        1|
|  F|    0|           10|                 0|             1|          0.0|        4.0|        1|
|  M|    1|           11|                 2|             0|          0.5|        9.0|        0|
|  M|    0|           10|                 0|             1|          0.0|        4.0|        1|
|  M|    0|           10|                 0|             1|          0.0|        4.0|        1|
+---+-----+-------------+------------------+--------------+-------------+-----------+---------+
only showing top 5 rows



In [112]:
new_data = spark.sql(
          "SELECT sex \
          , browser \
          , sum(class) as class_sum \
          FROM sample_sql \
          where sex = 'M' \
          group by sex , browser \
          having class_sum > 13 \
          "
          )
new_data.show()

+---+-------+---------+
|sex|browser|class_sum|
+---+-------+---------+
|  M|     IE|       17|
|  M| Chrome|       23|
+---+-------+---------+



# Module 16 : Miscellaneous

In [113]:
sample_1 = df.select('user_id','browser','sex','class')
sample_1.show(5)

+-------+-------+---+-----+
|user_id|browser|sex|class|
+-------+-------+---+-----+
|  22058| Chrome|  M|    0|
| 333320| Chrome|  F|    0|
|   1359|  Opera|  M|    1|
| 150084| Safari|  M|    0|
| 221365| Safari|  M|    0|
+-------+-------+---+-----+
only showing top 5 rows



In [114]:
transpose = sample_1.groupBy("browser").pivot("sex").sum("class")
transpose.show()

+-------+----+---+---+
|browser|null|  F|  M|
+-------+----+---+---+
|FireFox|   1|  3| 11|
| Safari|null|  4| 13|
|     IE|   1| 12| 17|
| Chrome|   1| 11| 23|
|  Opera|   0|  2|  2|
+-------+----+---+---+



In [115]:
transpose = sample_1.groupBy("browser").pivot("sex",["F","M"]).sum("class")
transpose.show()

+-------+---+---+
|browser|  F|  M|
+-------+---+---+
|FireFox|  3| 11|
| Safari|  4| 13|
|     IE| 12| 17|
| Chrome| 11| 23|
|  Opera|  2|  2|
+-------+---+---+



In [116]:
transpose = sample_1[(df.browser == 'Chrome') | (df.browser == "Safari")].groupBy("browser").pivot("sex",["F","M"]).sum("class")
transpose.show()

+-------+---+---+
|browser|  F|  M|
+-------+---+---+
| Safari|  4| 13|
| Chrome| 11| 23|
+-------+---+---+



In [117]:
transpose = sample_1.groupBy("browser").pivot("sex",["F","M"]).sum("class")
transpose.show()
#Sorting data in descending order of crowser and asceding order of F,
# in this case second one is redundant, but it is to demonstrated that we can use multiple columns at once
transpose = transpose.orderBy(transpose.browser.desc(), transpose.F)
transpose.show()

+-------+---+---+
|browser|  F|  M|
+-------+---+---+
|FireFox|  3| 11|
| Safari|  4| 13|
|     IE| 12| 17|
| Chrome| 11| 23|
|  Opera|  2|  2|
+-------+---+---+

+-------+---+---+
|browser|  F|  M|
+-------+---+---+
| Safari|  4| 13|
|  Opera|  2|  2|
|     IE| 12| 17|
|FireFox|  3| 11|
| Chrome| 11| 23|
+-------+---+---+

