In [1]:
import pyspark.pandas as ps
from pyspark.sql import SparkSession
from pyspark.sql.functions import *



In [2]:
spark = SparkSession.builder.getOrCreate()


24/04/10 22:09:14 WARN Utils: Your hostname, Vamsees-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.87 instead (on interface en0)
24/04/10 22:09:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/10 22:09:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Accessing the diabetes prediction dataset CSV file using the spard.read.csv method.

In [3]:
diabetesPrediction = spark.read.csv('diabetes_prediction_dataset.csv', header=True)
diabetesPrediction

DataFrame[gender: string, age: string, hypertension: string, heart_disease: string, smoking_history: string, bmi: string, HbA1c_level: string, blood_glucose_level: string, diabetes: string]

### As you can see in the output above, you will not be able to see a preview of the dataframe when you call it. It is because pySpark follows a lazy evaluation. Action methods on the spark dataframe only will trigger the computation. Some of the most used action methods are .show() and .collect()

In [4]:
diabetesPrediction.show()

+------+----+------------+-------------+---------------+-----+-----------+-------------------+--------+
|gender| age|hypertension|heart_disease|smoking_history|  bmi|HbA1c_level|blood_glucose_level|diabetes|
+------+----+------------+-------------+---------------+-----+-----------+-------------------+--------+
|Female|80.0|           0|            1|          never|25.19|        6.6|                140|       0|
|Female|54.0|           0|            0|        No Info|27.32|        6.6|                 80|       0|
|  Male|28.0|           0|            0|          never|27.32|        5.7|                158|       0|
|Female|36.0|           0|            0|        current|23.45|        5.0|                155|       0|
|  Male|76.0|           1|            1|        current|20.14|        4.8|                155|       0|
|Female|20.0|           0|            0|          never|27.32|        6.6|                 85|       0|
|Female|44.0|           0|            0|          never|19.31|  

### As you can see in the above output, the .show() method triggered computation on the spark dataframe and is displaying the top 20 rows. 

### To print the schema of a spark data frame, we should use the .printSchema() method.

In [5]:
diabetesPrediction.printSchema()

root
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- hypertension: string (nullable = true)
 |-- heart_disease: string (nullable = true)
 |-- smoking_history: string (nullable = true)
 |-- bmi: string (nullable = true)
 |-- HbA1c_level: string (nullable = true)
 |-- blood_glucose_level: string (nullable = true)
 |-- diabetes: string (nullable = true)



### To get the summary statistics out of a spark dataframe, we should call .describe() to create a summary statistics dataframe and .show() to trigger the computation. 

In [6]:
diabetesPrediction.describe().show()

24/04/10 22:09:24 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 2:>                                                          (0 + 1) / 1]

+-------+------+-----------------+------------------+------------------+---------------+-----------------+------------------+-------------------+-------------------+
|summary|gender|              age|      hypertension|     heart_disease|smoking_history|              bmi|       HbA1c_level|blood_glucose_level|           diabetes|
+-------+------+-----------------+------------------+------------------+---------------+-----------------+------------------+-------------------+-------------------+
|  count|100000|           100000|            100000|            100000|         100000|           100000|            100000|             100000|             100000|
|   mean|  NULL|41.88585600000013|           0.07485|           0.03942|           NULL|27.32076709999422|5.5275069999983275|          138.05806|              0.085|
| stddev|  NULL|22.51683987161704|0.2631504702289171|0.1945930169980986|           NULL|6.636783416648357|1.0706720918835468|  40.70813604870383|0.27888308976661896|
|   

                                                                                

### The .show() method on the summary statistics dataframe is visually messy. If you want it to look like the basic pandas dataframe when called, we should use .pandas_api(). This method will convert the spark dataframe to pandas-on-spark dataframe. This type of dataframe is very similar to the everyday pandas dataframe. However, the former is distributed and the latter is on a single machine. 

In [7]:
diabetesPrediction.describe().pandas_api()

                                                                                

Unnamed: 0,summary,gender,age,hypertension,heart_disease,smoking_history,bmi,HbA1c_level,blood_glucose_level,diabetes
0,count,100000,100000.0,100000.0,100000.0,100000,100000.0,100000.0,100000.0,100000.0
1,mean,,41.88585600000013,0.07485,0.03942,,27.32076709999422,5.527506999998328,138.05806,0.085
2,stddev,,22.51683987161704,0.2631504702289171,0.1945930169980986,,6.636783416648357,1.0706720918835468,40.70813604870383,0.2788830897666189
3,min,Female,0.08,0.0,0.0,No Info,10.01,3.5,100.0,0.0
4,max,Other,9.0,1.0,1.0,not current,95.69,9.0,90.0,1.0


### To group by a column's values and apply a aggregate function to get a measure, we should use .groupby("columnName").agg({"columnName":"aggregationFunction"}) to get the desired result.

### Here, we want to calcuate the average age of each gender. We should apply groupby on the gender column and apply average aggregation function on the age column. 

In [8]:
genderAvgAge = diabetesPrediction.groupBy(diabetesPrediction.gender).agg({'age':'mean'})
genderAvgAge.show()

+------+------------------+
|gender|          avg(age)|
+------+------------------+
|Female|42.463290750102445|
| Other|29.555555555555557|
|  Male| 41.07513878831753|
+------+------------------+



### The average age of a female is 42.5 , while the average age of a male is 29.5

### Getting the columns in a pyspark dataframe is similar to getting columns in a pandas dataframe. We should use the .columns property, and it will return a list of columns.

In [9]:
diabetesPrediction.columns

['gender',
 'age',
 'hypertension',
 'heart_disease',
 'smoking_history',
 'bmi',
 'HbA1c_level',
 'blood_glucose_level',
 'diabetes']

### To drop duplicates, we can drop use the .dropduplicates() method similar to th

### To get the null values in the spark dataframe, we should use a .isnull method inside a .filter method to get the count of null values. 

In [10]:
diabetesPrediction.filter(col("age").isNull()).count()


0

### This shows that there are zero null values in the age column. 

### We can use .withColumn() method to add new columns to the dataframe. 

In [11]:
diabetesPrediction.withColumn('age2',diabetesPrediction.age**2).show()

+------+----+------------+-------------+---------------+-----+-----------+-------------------+--------+------+
|gender| age|hypertension|heart_disease|smoking_history|  bmi|HbA1c_level|blood_glucose_level|diabetes|  age2|
+------+----+------------+-------------+---------------+-----+-----------+-------------------+--------+------+
|Female|80.0|           0|            1|          never|25.19|        6.6|                140|       0|6400.0|
|Female|54.0|           0|            0|        No Info|27.32|        6.6|                 80|       0|2916.0|
|  Male|28.0|           0|            0|          never|27.32|        5.7|                158|       0| 784.0|
|Female|36.0|           0|            0|        current|23.45|        5.0|                155|       0|1296.0|
|  Male|76.0|           1|            1|        current|20.14|        4.8|                155|       0|5776.0|
|Female|20.0|           0|            0|          never|27.32|        6.6|                 85|       0| 400.0|
|

### As you can see age2 column is added to the end of the dataframe. 

### To filter columns by the necessary conditions on columns, we can use either .filter() or .where() methods. 

In [12]:
diabetesPrediction.filter( (diabetesPrediction.age > 25) & (diabetesPrediction.age<35)).show()


+------+----+------------+-------------+---------------+-----+-----------+-------------------+--------+
|gender| age|hypertension|heart_disease|smoking_history|  bmi|HbA1c_level|blood_glucose_level|diabetes|
+------+----+------------+-------------+---------------+-----+-----------+-------------------+--------+
|  Male|28.0|           0|            0|          never|27.32|        5.7|                158|       0|
|Female|32.0|           0|            0|          never|27.32|        5.0|                100|       0|
|  Male|30.0|           0|            0|          never|33.76|        6.1|                126|       0|
|Female|26.0|           0|            0|          never|21.22|        6.6|                200|       0|
|Female|34.0|           0|            0|          never|56.43|        6.2|                200|       0|
|Female|29.0|           0|            0|          never|19.95|        5.0|                 90|       0|
|Female|26.0|           0|            0|        No Info|27.32|  

In [13]:
diabetesPrediction.where( (diabetesPrediction.age > 25) & (diabetesPrediction.age<35)).show()

+------+----+------------+-------------+---------------+-----+-----------+-------------------+--------+
|gender| age|hypertension|heart_disease|smoking_history|  bmi|HbA1c_level|blood_glucose_level|diabetes|
+------+----+------------+-------------+---------------+-----+-----------+-------------------+--------+
|  Male|28.0|           0|            0|          never|27.32|        5.7|                158|       0|
|Female|32.0|           0|            0|          never|27.32|        5.0|                100|       0|
|  Male|30.0|           0|            0|          never|33.76|        6.1|                126|       0|
|Female|26.0|           0|            0|          never|21.22|        6.6|                200|       0|
|Female|34.0|           0|            0|          never|56.43|        6.2|                200|       0|
|Female|29.0|           0|            0|          never|19.95|        5.0|                 90|       0|
|Female|26.0|           0|            0|        No Info|27.32|  

### Here, only rows or entities with ages >25 and ages < 35 are shown. 

### We can merge data frames in spark by using the union() method. 

In [14]:
df1 = spark.createDataFrame([("Alice", 1), ("Bob", 2)], ["name", "id"])
df2 = spark.createDataFrame([(3, "Charlie"), (4, "Dave")], ["id", "name"])
union_df = df1.union(df2)

### We can use also SQL queries to operate on spark dataframes.

### We can select all rows from a DataFrame by using a placeholder like {table1} in the query and passing the DataFrame using keyword arguments, such as table1=diabetesPrediction.

In [15]:
spark.sql('SELECT * FROM {table1}',table1=diabetesPrediction).show()

+------+----+------------+-------------+---------------+-----+-----------+-------------------+--------+
|gender| age|hypertension|heart_disease|smoking_history|  bmi|HbA1c_level|blood_glucose_level|diabetes|
+------+----+------------+-------------+---------------+-----+-----------+-------------------+--------+
|Female|80.0|           0|            1|          never|25.19|        6.6|                140|       0|
|Female|54.0|           0|            0|        No Info|27.32|        6.6|                 80|       0|
|  Male|28.0|           0|            0|          never|27.32|        5.7|                158|       0|
|Female|36.0|           0|            0|        current|23.45|        5.0|                155|       0|
|  Male|76.0|           1|            1|        current|20.14|        4.8|                155|       0|
|Female|20.0|           0|            0|          never|27.32|        6.6|                 85|       0|
|Female|44.0|           0|            0|          never|19.31|  

### The following query displays the age column

In [16]:
spark.sql("SELECT {table1}.age FROM {table1}",table1=diabetesPrediction).show()

+----+
| age|
+----+
|80.0|
|54.0|
|28.0|
|36.0|
|76.0|
|20.0|
|44.0|
|79.0|
|42.0|
|32.0|
|53.0|
|54.0|
|78.0|
|67.0|
|76.0|
|78.0|
|15.0|
|42.0|
|42.0|
|37.0|
+----+
only showing top 20 rows



### The following query displayes the age column. This difference from the previous query is that it demonstrates how table alias can be used in pyspark SQL.

In [17]:
spark.sql("SELECT t1.age FROM {table1} t1",table1=diabetesPrediction).show()

+----+
| age|
+----+
|80.0|
|54.0|
|28.0|
|36.0|
|76.0|
|20.0|
|44.0|
|79.0|
|42.0|
|32.0|
|53.0|
|54.0|
|78.0|
|67.0|
|76.0|
|78.0|
|15.0|
|42.0|
|42.0|
|37.0|
+----+
only showing top 20 rows



In [18]:
ageFilter = 25
spark.sql("SELECT t1.age FROM {table1} t1 WHERE t1.age > {ageFilter}",table1=diabetesPrediction,ageFilter=ageFilter).show()

+----+
| age|
+----+
|80.0|
|54.0|
|28.0|
|36.0|
|76.0|
|44.0|
|79.0|
|42.0|
|32.0|
|53.0|
|54.0|
|78.0|
|67.0|
|76.0|
|78.0|
|42.0|
|42.0|
|37.0|
|40.0|
|69.0|
+----+
only showing top 20 rows



In [19]:
spark.sql("SELECT t1.age FROM {table1} t1 WHERE t1.age > :ageFilter",{"ageFilter":5},table1=diabetesPrediction).show()

+----+
| age|
+----+
|80.0|
|54.0|
|28.0|
|36.0|
|76.0|
|20.0|
|44.0|
|79.0|
|42.0|
|32.0|
|53.0|
|54.0|
|78.0|
|67.0|
|76.0|
|78.0|
|15.0|
|42.0|
|42.0|
|37.0|
+----+
only showing top 20 rows



In [20]:
spark.sql("SELECT t1.age FROM {table1} t1 WHERE t1.age > ? and ? < t1.age",args=[25,70],table1=diabetesPrediction).show()

+----+
| age|
+----+
|80.0|
|76.0|
|79.0|
|78.0|
|76.0|
|78.0|
|72.0|
|76.0|
|73.0|
|77.0|
|74.0|
|76.0|
|80.0|
|79.0|
|74.0|
|80.0|
|80.0|
|80.0|
|75.0|
|77.0|
+----+
only showing top 20 rows



In [21]:
diabetesPrediction.groupBy("gender").pivot("smoking_history").agg(avg("bmi")).fillna(0).show()


+------+------------------+-----------------+------------------+------------------+------------------+------------------+
|gender|           No Info|          current|              ever|            former|             never|       not current|
+------+------------------+-----------------+------------------+------------------+------------------+------------------+
|Female| 25.64423857868251|28.36533807829256| 28.76907059874885|29.498684541265742|28.101528706982418|28.285589062101383|
| Other|26.804999999999996|              0.0|             27.32|               0.0| 33.61333333333334|25.480000000000004|
|  Male|24.975211669772523| 28.5138741721861|28.753308781869666|29.748034076016317|28.109054242003985|28.089014251781567|
+------+------------------+-----------------+------------------+------------------+------------------+------------------+



In [22]:
diabetesPrediction.groupBy("gender").pivot("smoking_history",["No Info","never"]).agg(avg("bmi")).fillna(0).show()


+------+------------------+------------------+
|gender|           No Info|             never|
+------+------------------+------------------+
|Female| 25.64423857868251|28.101528706982418|
| Other|26.804999999999996| 33.61333333333334|
|  Male|24.975211669772523|28.109054242003985|
+------+------------------+------------------+



In [23]:
diabetesPrediction.groupBy("smoking_history").agg(count("*").alias("count")).show()



+---------------+-----+
|smoking_history|count|
+---------------+-----+
|    not current| 6447|
|         former| 9352|
|        No Info|35816|
|        current| 9286|
|          never|35095|
|           ever| 4004|
+---------------+-----+



                                                                                