<h1 style="margin: auto; font-weight: bold; padding: 30px 30px 0px 30px; color:#FFF;" align="center">Cleaning and Exploring Big Data using PySpark</h1>
<p style="width: 100%; text-align: center; margin: 0px; padding: 0px 0px 30px 0px; font-size: 24px; color:#FFF;" align="center">| Spark - PySpark practice |</p>


## 1. Libraries

In [1]:
from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, round, asc, desc

import pyspark.sql.functions as F
from pyspark.sql.functions import (sum, avg, max, min, 
                                   mean, count, round,
                                  udf, concat)

from pyspark.sql.types import StringType

## 2. Set up SparkSession

In [2]:
spark = (
    SparkSession
    .builder
    .master("local[1]")
    .appName("PoC")
    .getOrCreate()
)

sc = spark.sparkContext

spark

## 2. Initial testing

In [3]:
rdd_test = sc.parallelize(range(1000))

In [4]:
rdd_test.takeSample(False, 5)

[355, 551, 914, 991, 206]

In [5]:
print(type(rdd_test))

<class 'pyspark.rdd.PipelinedRDD'>


In [6]:
rdd = sc.parallelize([('C',85,76,87,91), ('B',85,76,87,91), ("A", 85,78,96,92), ("A", 92,76,89,96)], 4)

In [7]:
print(type(rdd))

<class 'pyspark.rdd.RDD'>


## 3. Using createDataframe()

In [8]:
data = [
    ("James", "", "Smith", "1991-04-01", "M", 3000),
    ("Michael", "Rose", "", "2000-05-19", "M", 4000),
    ("Robert", "", "Williams", "1978-09-05", "M", 4000),
    ("Maria", "Anne", "Jones", "1967-12-01", "F", 4000),
    ("Jen", "Mary", "Brown", "1980-02-17", "F", -1)
]

In [9]:
COLUMNS = ["firstname", "middlename", "lastname", "dob", "gender", "salary"]

df = spark.createDataFrame(data=data, schema=COLUMNS)

In [10]:
df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



In [11]:
df.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



In [12]:
df.columns

['firstname', 'middlename', 'lastname', 'dob', 'gender', 'salary']

In [13]:
print(type(df))

<class 'pyspark.sql.dataframe.DataFrame'>


let's transform the dataframe to RDD

In [14]:
rdd_df = df.rdd

In [15]:
print(type(rdd_df))

<class 'pyspark.rdd.RDD'>


In [16]:
rdd_df.collect()

[Row(firstname='James', middlename='', lastname='Smith', dob='1991-04-01', gender='M', salary=3000),
 Row(firstname='Michael', middlename='Rose', lastname='', dob='2000-05-19', gender='M', salary=4000),
 Row(firstname='Robert', middlename='', lastname='Williams', dob='1978-09-05', gender='M', salary=4000),
 Row(firstname='Maria', middlename='Anne', lastname='Jones', dob='1967-12-01', gender='F', salary=4000),
 Row(firstname='Jen', middlename='Mary', lastname='Brown', dob='1980-02-17', gender='F', salary=-1)]

## 4. Reading csv

In [17]:
df_diabetes = spark.read.csv("diabetes.csv", header=True)

In [18]:
print((df_diabetes.count(), len(df_diabetes.columns)))

(768, 9)


In [19]:
df_diabetes.printSchema()

root
 |-- Pregnancies: string (nullable = true)
 |-- Glucose: string (nullable = true)
 |-- BloodPressure: string (nullable = true)
 |-- SkinThickness: string (nullable = true)
 |-- Insulin: string (nullable = true)
 |-- BMI: string (nullable = true)
 |-- DiabetesPedigreeFunction: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Outcome: string (nullable = true)



In [20]:
df_diabetes.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 5 rows



### 4.1 Transforming Spark dataframe to pandas

In [21]:
df_diabetes_pd = df_diabetes.toPandas()

In [22]:
type(df_diabetes_pd)

pandas.core.frame.DataFrame

In [23]:
df_diabetes_pd.value_counts("Outcome")

Outcome
0    500
1    268
dtype: int64

In [24]:
df_diabetes_pd.head()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome
0,6,148,72,35,0,33.6,0.627,50,1
1,1,85,66,29,0,26.6,0.351,31,0
2,8,183,64,0,0,23.3,0.672,32,1
3,1,89,66,23,94,28.1,0.167,21,0
4,0,137,40,35,168,43.1,2.288,33,1


## 5. Manipulating DataFrame

In [25]:
df_diabetes.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 5 rows



In [26]:
type(df_diabetes)

pyspark.sql.dataframe.DataFrame

### 5.1. Adding columns into the Dataframe

In [27]:
df_diabetes = df_diabetes.withColumn("Age in days", df_diabetes.Age*365)

In [28]:
df_diabetes = df_diabetes.withColumn("Age in days II", col("Age")*365)

In [29]:
df_diabetes.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------+--------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|Age in days|Age in days II|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------+--------------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|    18250.0|       18250.0|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|    11315.0|       11315.0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|    11680.0|       11680.0|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|     7665.0|        7665.0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|    12045.0|

#### 5.1.1. Rename column Name

In [30]:
df_diabetes = df_diabetes.withColumnRenamed("Age in days II", "Age II")

In [31]:
df_diabetes.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|Age in days| Age II|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|    18250.0|18250.0|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|    11315.0|11315.0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|    11680.0|11680.0|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|     7665.0| 7665.0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|    12045.0|12045.0|
+-----------+-------+-------------+-----

### 5.2. Casting  a column

Let's cast the age columns in days from float to int

In [32]:
df_diabetes.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|Age in days| Age II|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|    18250.0|18250.0|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|    11315.0|11315.0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|    11680.0|11680.0|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|     7665.0| 7665.0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|    12045.0|12045.0|
+-----------+-------+-------------+-----

In [33]:
df_diabetes = df_diabetes.withColumn("Age II", col("Age II").cast("Integer"))

In [34]:
df_diabetes.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------+------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|Age in days|Age II|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------+------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|    18250.0| 18250|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|    11315.0| 11315|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|    11680.0| 11680|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|     7665.0|  7665|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|    12045.0| 12045|
+-----------+-------+-------------+-------------

In [35]:
df_diabetes.printSchema()

root
 |-- Pregnancies: string (nullable = true)
 |-- Glucose: string (nullable = true)
 |-- BloodPressure: string (nullable = true)
 |-- SkinThickness: string (nullable = true)
 |-- Insulin: string (nullable = true)
 |-- BMI: string (nullable = true)
 |-- DiabetesPedigreeFunction: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Outcome: string (nullable = true)
 |-- Age in days: double (nullable = true)
 |-- Age II: integer (nullable = true)



### 5.3. Selecting data with select and getItem

In [36]:
df_temp = spark.createDataFrame([([1, 2], {"key": "value"})], ["l", "d"])

In [37]:
df_temp.show()

+------+--------------+
|     l|             d|
+------+--------------+
|[1, 2]|{key -> value}|
+------+--------------+



In [38]:
df_temp.select(df_temp.l.getItem(0), df_temp.d.getItem("key")).show()

+----+------+
|l[0]|d[key]|
+----+------+
|   1| value|
+----+------+



In [39]:
df_temp = spark.createDataFrame([
                ([1, 2], {"key": "value"}),
                ([3, 4, 5], {"name": "math"})
            ], ["l", "d"])

In [40]:
df_temp.show()

+---------+--------------+
|        l|             d|
+---------+--------------+
|   [1, 2]|{key -> value}|
|[3, 4, 5]|{name -> math}|
+---------+--------------+



In [41]:
df_temp.select(df_temp.l.getItem(0), df_temp.d.getItem("key")).show()

+----+------+
|l[0]|d[key]|
+----+------+
|   1| value|
|   3|  null|
+----+------+



In [42]:
df_temp.select(df_temp.l.getItem(2), df_temp.d.getItem("name")).show()

+----+-------+
|l[2]|d[name]|
+----+-------+
|null|   null|
|   5|   math|
+----+-------+



### 5.4. Adding a constant value through lit()

In [43]:
df_diabetes.withColumn("Patient country", lit("France"))

DataFrame[Pregnancies: string, Glucose: string, BloodPressure: string, SkinThickness: string, Insulin: string, BMI: string, DiabetesPedigreeFunction: string, Age: string, Outcome: string, Age in days: double, Age II: int, Patient country: string]

In [44]:
df_diabetes.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------+------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|Age in days|Age II|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------+------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|    18250.0| 18250|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|    11315.0| 11315|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|    11680.0| 11680|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|     7665.0|  7665|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|    12045.0| 12045|
+-----------+-------+-------------+-------------

In [45]:
df_diabetes = df_diabetes.withColumn("Patient country", lit("France"))

In [46]:
df_diabetes.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------+------+---------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|Age in days|Age II|Patient country|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------+------+---------------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|    18250.0| 18250|         France|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|    11315.0| 11315|         France|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|    11680.0| 11680|         France|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|     7665.0|  7665|         France|
|          0|    137|           40|           35|    16

#### 5.4.1. Going beyond with __**lit()**__ and using __**when()**__

In this case, we are going to same that if the patient is under 30 the patient is from South America, otherwise, the patient is from Europe.

To do that, we are going to use **when**

In [47]:
df_diabetes = df_diabetes.withColumn("Patient continent", when(col("Age")<30, lit("South America")).otherwise(lit("Europe")))

In [48]:
df_diabetes.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------+------+---------------+-----------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|Age in days|Age II|Patient country|Patient continent|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------+------+---------------+-----------------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|    18250.0| 18250|         France|           Europe|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|    11315.0| 11315|         France|           Europe|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|    11680.0| 11680|         France|           Europe|
|          1|     89|           66|           23|     94|28.1|                   0

### 5.5. Drop column

In [49]:
df_diabetes = df_diabetes.drop("Patient country")

In [50]:
df_diabetes.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------+------+-----------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|Age in days|Age II|Patient continent|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------+------+-----------------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|    18250.0| 18250|           Europe|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|    11315.0| 11315|           Europe|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|    11680.0| 11680|           Europe|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|     7665.0|  7665|    South America|
|          0|    137|           40|      

### 5.6. Selecting columns based on conditions

In [51]:
df_diabetes.select(col("Glucose"), col("BloodPressure"), col("Age"))\
            .filter((col("Age")>30) & (col("Age")<35))\
            .show(5)

+-------+-------------+---+
|Glucose|BloodPressure|Age|
+-------+-------------+---+
|     85|           66| 31|
|    183|           64| 32|
|    137|           40| 33|
|    168|           74| 34|
|    100|            0| 32|
+-------+-------------+---+
only showing top 5 rows



#### 5.6.1. Selecting columns based on conditions using alias

In [52]:
df_diabetes.select(col("Glucose"), col("BloodPressure").alias("Blood_presure"), col("Age"))\
            .filter((col("Age")>30) & (col("Age")<35) & (col("Blood_presure")>74))\
            .show(5)

+-------+-------------+---+
|Glucose|Blood_presure|Age|
+-------+-------------+---+
|    118|           84| 31|
|    100|           88| 31|
|    123|           80| 34|
|    122|           90| 31|
|    131|           88| 32|
+-------+-------------+---+
only showing top 5 rows



#### 5.6.2. Selecting value from a specific cel

In [53]:
df_diabetes.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------+------+-----------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|Age in days|Age II|Patient continent|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------+------+-----------------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|    18250.0| 18250|           Europe|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|    11315.0| 11315|           Europe|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|    11680.0| 11680|           Europe|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|     7665.0|  7665|    South America|
|          0|    137|           40|      

### 5.7. Groupby

In [54]:
df_diabetes.groupBy("Patient continent")\
            .count()\
            .show()

+-----------------+-----+
|Patient continent|count|
+-----------------+-----+
|           Europe|  372|
|    South America|  396|
+-----------------+-----+



In [55]:
df_diabetes.groupBy("Patient continent", "Outcome")\
            .count()\
            .sort(asc("Patient continent"))\
            .show()

+-----------------+-------+-----+
|Patient continent|Outcome|count|
+-----------------+-------+-----+
|           Europe|      0|  188|
|           Europe|      1|  184|
|    South America|      1|   84|
|    South America|      0|  312|
+-----------------+-------+-----+



In [56]:
df_diabetes.groupBy("Patient continent", "Outcome")\
            .count()\
            .sort(desc("Patient continent"), asc("Outcome"))\
            .show()

+-----------------+-------+-----+
|Patient continent|Outcome|count|
+-----------------+-------+-----+
|    South America|      0|  312|
|    South America|      1|   84|
|           Europe|      0|  188|
|           Europe|      1|  184|
+-----------------+-------+-----+



In [57]:
df_diabetes.groupBy("Patient continent", "Age")\
            .count()\
            .show()

+-----------------+---+-----+
|Patient continent|Age|count|
+-----------------+---+-----+
|           Europe| 46|   13|
|           Europe| 66|    4|
|           Europe| 37|   19|
|           Europe| 65|    3|
|           Europe| 60|    5|
|    South America| 27|   32|
|           Europe| 62|    4|
|    South America| 28|   35|
|           Europe| 41|   22|
|           Europe| 58|    7|
|           Europe| 45|   15|
|           Europe| 72|    1|
|           Europe| 42|   18|
|    South America| 25|   48|
|           Europe| 38|   16|
|           Europe| 48|    5|
|           Europe| 44|    8|
|           Europe| 47|    6|
|           Europe| 68|    1|
|           Europe| 49|    5|
+-----------------+---+-----+
only showing top 20 rows



In [58]:
df_diabetes.printSchema()

root
 |-- Pregnancies: string (nullable = true)
 |-- Glucose: string (nullable = true)
 |-- BloodPressure: string (nullable = true)
 |-- SkinThickness: string (nullable = true)
 |-- Insulin: string (nullable = true)
 |-- BMI: string (nullable = true)
 |-- DiabetesPedigreeFunction: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Outcome: string (nullable = true)
 |-- Age in days: double (nullable = true)
 |-- Age II: integer (nullable = true)
 |-- Patient continent: string (nullable = false)



In [59]:
df_diabetes = df_diabetes.withColumn("Age", col("Age").cast("Integer"))

In [60]:
df_diabetes.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------+------+-----------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|Age in days|Age II|Patient continent|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------+------+-----------------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|    18250.0| 18250|           Europe|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|    11315.0| 11315|           Europe|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|    11680.0| 11680|           Europe|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|     7665.0|  7665|    South America|
|          0|    137|           40|      

#### 5.7.1. Avg per group

In [61]:
%%time

df_diabetes.groupBy("Patient continent")\
            .avg("Age").select(col("Patient continent"), round(col("avg(Age)"), 2).alias("Avg Age"))\
            .show()

+-----------------+-------+
|Patient continent|Avg Age|
+-----------------+-------+
|           Europe|  42.74|
|    South America|  24.31|
+-----------------+-------+

CPU times: user 494 µs, sys: 10.5 ms, total: 11 ms
Wall time: 221 ms


In [62]:
%%time

df_temp = df_diabetes.groupBy("Patient continent")\
                        .agg({ "Age" : "avg" })\
                        .select(col("Patient continent"), round(col("avg(Age)"), 2).alias("Avg Age"))\
                        .show()

+-----------------+-------+
|Patient continent|Avg Age|
+-----------------+-------+
|           Europe|  42.74|
|    South America|  24.31|
+-----------------+-------+

CPU times: user 9.74 ms, sys: 0 ns, total: 9.74 ms
Wall time: 131 ms


In [63]:
%%time

df_temp = df_diabetes.groupBy("Patient continent")\
                        .agg({ "Age" : "avg" })\
                        .withColumnRenamed("avg(Age)", "Avg Age")\
                        .show()

+-----------------+------------------+
|Patient continent|           Avg Age|
+-----------------+------------------+
|           Europe|42.744623655913976|
|    South America| 24.31313131313131|
+-----------------+------------------+

CPU times: user 6.05 ms, sys: 2.09 ms, total: 8.14 ms
Wall time: 144 ms


In [64]:
print(type(df_temp))

<class 'NoneType'>


##### 5.7.1.1. Importing the functions directly or through an alias

In [65]:
%%time

# import pyspark.sql.functions as F

df_diabetes.groupBy("Patient continent")\
            .agg(F.avg("Age"), F.max("Age").alias("Max Age"))\
            .withColumnRenamed("avg(Age)", "Avg Age")\
            .show()

+-----------------+------------------+-------+
|Patient continent|           Avg Age|Max Age|
+-----------------+------------------+-------+
|           Europe|42.744623655913976|     81|
|    South America| 24.31313131313131|     29|
+-----------------+------------------+-------+

CPU times: user 3.94 ms, sys: 2.15 ms, total: 6.09 ms
Wall time: 183 ms


In [66]:
%%time

from pyspark.sql.functions import sum, avg, max, min, mean, count, round

df_diabetes.groupBy("Patient continent")\
            .agg(round(avg("Age"), 2), 
                 max("Age").alias("Max Age"))\
            .withColumnRenamed("avg(Age)", "Avg Age")\
            .show()

+-----------------+------------------+-------+
|Patient continent|round(avg(Age), 2)|Max Age|
+-----------------+------------------+-------+
|           Europe|             42.74|     81|
|    South America|             24.31|     29|
+-----------------+------------------+-------+

CPU times: user 8.05 ms, sys: 9.25 ms, total: 17.3 ms
Wall time: 146 ms


##### 5.7.2.1. Using filter indistinctly

In [67]:
%%time

AGES = [30, 31, 33, 40]

df_diabetes\
    .where(col("Age").isin(AGES))\
    .groupby("Outcome")\
    .agg(round(avg("Age"), 2).alias("Avg Age"))\
    .show(5)

+-------+-------+
|Outcome|Avg Age|
+-------+-------+
|      0|  32.55|
|      1|  32.94|
+-------+-------+

CPU times: user 13 ms, sys: 150 µs, total: 13.1 ms
Wall time: 180 ms


In [68]:
%%time

AGES = [30, 31, 33, 40]

df_diabetes\
    .where(col("Age").isin(AGES))\
    .groupby("Outcome")\
    .avg("Age").select(col("Outcome"), round(col("avg(Age)"), 2).alias("Avg Age"))\
    .show(5)

+-------+-------+
|Outcome|Avg Age|
+-------+-------+
|      0|  32.55|
|      1|  32.94|
+-------+-------+

CPU times: user 8.22 ms, sys: 4.46 ms, total: 12.7 ms
Wall time: 142 ms


### 5.8. Counting null in the dataset

In [77]:
from pyspark.sql.functions import col,isnan, when, count
df_diabetes.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_diabetes.columns]
   ).show()

+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+-----------+------+-----------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|BMI|DiabetesPedigreeFunction|Age|Outcome|Age in days|Age II|Patient continent|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+-----------+------+-----------------+
|          0|      0|            0|            0|      0|  0|                       0|  0|      0|          0|     0|                0|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+-----------+------+-----------------+



## 6. Working with UDF

In [69]:
columns_udf = ["Seqno","Name"]
data_udf = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

In [70]:
df_udf = spark.createDataFrame(data=data_udf, schema=columns_udf)

df_udf.show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



### 6.1. Using UDF with PySpark DataFrame select()

In [71]:
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 

In [72]:
""" Converting function to UDF """
#convertUDF = udf(lambda z: convertCase(z), StringType())
""" Converting function to UDF 
StringType() is by default hence not required """
convertUDF = udf(lambda z: convertCase(z)) 

In [73]:
df_udf.select(col("Seqno"), \
    convertUDF(col("Name")).alias("Name") ) \
   .show(truncate=False)

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



In [75]:
print(type(df_udf))

<class 'pyspark.sql.dataframe.DataFrame'>


### 6.2. Using UDF with PySpark DataFrame withColumn()

In [78]:
def upperCase(str):
    return str.upper()

In [79]:
upperCaseUDF = udf(lambda z:upperCase(z), StringType()) 

In [80]:
df_udf_temp = df_udf.withColumn("Cureated Name", concat(col("Name"), lit(" hola")))

In [81]:
df_udf_temp.show(5)

+-----+------------+-----------------+
|Seqno|        Name|    Cureated Name|
+-----+------------+-----------------+
|    1|  john jones|  john jones hola|
|    2|tracey smith|tracey smith hola|
|    3| amy sanders| amy sanders hola|
+-----+------------+-----------------+



In [82]:
from pyspark.sql.functions import col,isnan, when, count
df_udf_temp.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_udf_temp.columns]
   ).show()

+-----+----+-------------+
|Seqno|Name|Cureated Name|
+-----+----+-------------+
|    0|   0|            0|
+-----+----+-------------+



In [83]:
print(type(df_udf_temp))

<class 'pyspark.sql.dataframe.DataFrame'>


In [84]:
df_udf_temp = df_udf_temp.withColumn("Cureated Name", upperCaseUDF(col("Name")))

In [85]:
df_udf_temp.show(truncate=False)

+-----+------------+-------------+
|Seqno|Name        |Cureated Name|
+-----+------------+-------------+
|1    |john jones  |JOHN JONES   |
|2    |tracey smith|TRACEY SMITH |
|3    |amy sanders |AMY SANDERS  |
+-----+------------+-------------+



In [86]:
print(type(df_udf_temp))

<class 'pyspark.sql.dataframe.DataFrame'>


In [87]:
df_udf_temp.printSchema()

root
 |-- Seqno: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Cureated Name: string (nullable = true)



### 6.3. Creating UDF with annotation

In [88]:
@udf(returnType=StringType())
def lowerCase(str):
    return str.lower()

In [89]:
df_udf_temp = df_udf_temp.withColumn("lower Cureated Name", lowerCase(col("Cureated Name")))

In [90]:
df_udf_temp.show()

+-----+------------+-------------+-------------------+
|Seqno|        Name|Cureated Name|lower Cureated Name|
+-----+------------+-------------+-------------------+
|    1|  john jones|   JOHN JONES|         john jones|
|    2|tracey smith| TRACEY SMITH|       tracey smith|
|    3| amy sanders|  AMY SANDERS|        amy sanders|
+-----+------------+-------------+-------------------+



In [91]:
df_udf_temp.printSchema()

root
 |-- Seqno: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Cureated Name: string (nullable = true)
 |-- lower Cureated Name: string (nullable = true)



## 7. Working with map()

In [92]:
data = [('James','Smith','M',30),
  ('Anna','Rose','F',41),
  ('Robert','Williams','M',62), 
]

In [93]:
columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()

+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|    30|
|     Anna|    Rose|     F|    41|
|   Robert|Williams|     M|    62|
+---------+--------+------+------+



In [97]:
# Refering columns by index.
rdd2=df.rdd.map(lambda x: (x[0]+", "+x[1],x[2],x[3]*2))  

In [98]:
df2=rdd2.toDF(["name","gender","new_salary"]   )

In [99]:
df2.show()

+----------------+------+----------+
|            name|gender|new_salary|
+----------------+------+----------+
|    James, Smith|     M|        60|
|      Anna, Rose|     F|        82|
|Robert, Williams|     M|       124|
+----------------+------+----------+



In [100]:
rdd2=df.rdd.map(lambda x: 
    (x["firstname"]+","+x["lastname"],x["gender"],x["salary"]*2)
    ) 

In [104]:
df2=rdd2.toDF(["name","gender","new_salary"])

In [105]:
df2.show()

+---------------+------+----------+
|           name|gender|new_salary|
+---------------+------+----------+
|    James,Smith|     M|        60|
|      Anna,Rose|     F|        82|
|Robert,Williams|     M|       124|
+---------------+------+----------+



In [108]:
# By Calling function
def func1(x):
    firstName=x.firstname
    lastName=x.lastname
    name=firstName+", XX "+lastName
    gender=x.gender.lower()
    salary=x.salary*2
    return (name,gender,salary)

rdd2=df.rdd.map(lambda x: func1(x))

In [109]:
df2=rdd2.toDF(["name","gender","new_salary"])
df2.show()

+-------------------+------+----------+
|               name|gender|new_salary|
+-------------------+------+----------+
|    James, XX Smith|     m|        60|
|      Anna, XX Rose|     f|        82|
|Robert, XX Williams|     m|       124|
+-------------------+------+----------+



In [None]:
%%time

df_diabetes.groupBy("Patient continent")\
            .avg("Age").select(col("Patient continent"), round(col("avg(Age)"), 2).alias("Avg Age"))\
            .show()

psdf.spark.explain()

We are going to select the `second` records from the column called Age

In [None]:
 %%time

from pyspark.sql.functions import udf

# Use udf to define a row-at-a-time udf
@udf('double')
# Input/output are both a single double value
def plus_one(v):
      return v + 1

df_diabetes_v2 = df_diabetes.withColumn('v2', plus_one(col("Age II")))

In [None]:
df_diabetes_v2.printSchema()

In [None]:
df_diabetes_v2.show(5)

In [None]:
 %%time
    
from pyspark.sql.functions import pandas_udf, PandasUDFType

# Use pandas_udf to define a Pandas UDF
@pandas_udf('double', PandasUDFType.SCALAR)
# Input/output are both a pandas.Series of doubles

def pandas_plus_one(v):
    return v + 1

df_diabetes_v3 = df_diabetes.withColumn('v2', pandas_plus_one(col("Age II")))

In [None]:
from datetime import date
 
def age(birthdate):
    today = date.today()
    age = today.year - birthdate.year - ((today.month, today.day) < (birthdate.month, birthdate.day))
    return age