***Advance usecases of PySpark***

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DataType

In [2]:
spark = SparkSession.builder.appName('sparkAdvanced').master('local[*]').getOrCreate()
spark

In [3]:
# Defining schema for the dataset

schema = StructType([
    StructField('Age', IntegerType(), nullable=True),
    StructField('Gender', StringType(), nullable=True),
    StructField('Occupation', StringType(), nullable=True),
    StructField('SleepHours', FloatType(), nullable=True),
    StructField('PhysicalActivity', FloatType(), nullable=True),
    StructField('CaffeineIntake', IntegerType(), nullable=True),
    StructField('AlcoholConsumption', IntegerType(), nullable=True),
    StructField('Smoking', StringType(), nullable=True),
    StructField('FamilyHistory', StringType(), nullable=True),
    StructField('StressLevel', IntegerType(), nullable=True),
    StructField('HeartRate', IntegerType(), nullable=True),
    StructField('BreathingRate', IntegerType(), nullable=True),
    StructField('SweatingLevel', IntegerType(), nullable=True),
    StructField('Dizziness', StringType(), nullable=True),
    StructField('Medication', StringType(), nullable=True),
    StructField('TherapySessions', IntegerType(), nullable=True),
    StructField('DietQuality', IntegerType(), nullable=True),
    StructField('AnxietyLevel', IntegerType(), nullable=True),
])

In [4]:
main_df = spark.read.csv('socialanxiety/enhanced_anxiety_dataset.csv', schema=schema, header=True)
main_df.show(2)

+---+------+----------+----------+----------------+--------------+------------------+-------+-------------+-----------+---------+-------------+-------------+---------+----------+---------------+-----------+------------+
|Age|Gender|Occupation|SleepHours|PhysicalActivity|CaffeineIntake|AlcoholConsumption|Smoking|FamilyHistory|StressLevel|HeartRate|BreathingRate|SweatingLevel|Dizziness|Medication|TherapySessions|DietQuality|AnxietyLevel|
+---+------+----------+----------+----------------+--------------+------------------+-------+-------------+-----------+---------+-------------+-------------+---------+----------+---------------+-----------+------------+
| 29|Female|    Artist|       6.0|             2.7|           181|                10|    Yes|           No|         10|      114|           14|            4|       No|       Yes|              3|          7|           5|
| 46| Other|     Nurse|       6.2|             5.7|           200|                 8|    Yes|          Yes|          1| 

In [5]:
main_df.printSchema(), main_df.columns

root
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- SleepHours: float (nullable = true)
 |-- PhysicalActivity: float (nullable = true)
 |-- CaffeineIntake: integer (nullable = true)
 |-- AlcoholConsumption: integer (nullable = true)
 |-- Smoking: string (nullable = true)
 |-- FamilyHistory: string (nullable = true)
 |-- StressLevel: integer (nullable = true)
 |-- HeartRate: integer (nullable = true)
 |-- BreathingRate: integer (nullable = true)
 |-- SweatingLevel: integer (nullable = true)
 |-- Dizziness: string (nullable = true)
 |-- Medication: string (nullable = true)
 |-- TherapySessions: integer (nullable = true)
 |-- DietQuality: integer (nullable = true)
 |-- AnxietyLevel: integer (nullable = true)



(None,
 ['Age',
  'Gender',
  'Occupation',
  'SleepHours',
  'PhysicalActivity',
  'CaffeineIntake',
  'AlcoholConsumption',
  'Smoking',
  'FamilyHistory',
  'StressLevel',
  'HeartRate',
  'BreathingRate',
  'SweatingLevel',
  'Dizziness',
  'Medication',
  'TherapySessions',
  'DietQuality',
  'AnxietyLevel'])

In [6]:
df = main_df.select(['Age',
  'Gender',
  'SleepHours',
  'PhysicalActivity',
  'AlcoholConsumption',
  'Smoking',
  'FamilyHistory',
  'StressLevel',
  'HeartRate',
  'SweatingLevel',
  'DietQuality',
  'AnxietyLevel'])
df.show(5)

+---+------+----------+----------------+------------------+-------+-------------+-----------+---------+-------------+-----------+------------+
|Age|Gender|SleepHours|PhysicalActivity|AlcoholConsumption|Smoking|FamilyHistory|StressLevel|HeartRate|SweatingLevel|DietQuality|AnxietyLevel|
+---+------+----------+----------------+------------------+-------+-------------+-----------+---------+-------------+-----------+------------+
| 29|Female|       6.0|             2.7|                10|    Yes|           No|         10|      114|            4|          7|           5|
| 46| Other|       6.2|             5.7|                 8|    Yes|          Yes|          1|       62|            2|          8|           3|
| 64|  Male|       5.0|             3.7|                 4|     No|          Yes|          1|       91|            3|          1|           1|
| 20|Female|       5.8|             2.8|                 6|    Yes|           No|          4|       86|            3|          1|           2|

*Note:* We can create dataframe out of user input by using **spark.createDataFrame(input_data, schema)**

***df.ccolect()***
- We can retrieve data from spark RDD/DataFrame using collect.
- It return data in a list of objects
- NB: collect() should be used cautiously with large datasets because it can cause OutOfMemoryError on the driver. Alternatives like take() or show() are preferred for sampling or previewing large DataFrames.
- docs: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.collect.html

In [7]:
# Here using limit because collect returns all the record in a dataframe. Thast may cause memory error.
df.limit(5).collect()

[Row(Age=29, Gender='Female', SleepHours=6.0, PhysicalActivity=2.700000047683716, AlcoholConsumption=10, Smoking='Yes', FamilyHistory='No', StressLevel=10, HeartRate=114, SweatingLevel=4, DietQuality=7, AnxietyLevel=5),
 Row(Age=46, Gender='Other', SleepHours=6.199999809265137, PhysicalActivity=5.699999809265137, AlcoholConsumption=8, Smoking='Yes', FamilyHistory='Yes', StressLevel=1, HeartRate=62, SweatingLevel=2, DietQuality=8, AnxietyLevel=3),
 Row(Age=64, Gender='Male', SleepHours=5.0, PhysicalActivity=3.700000047683716, AlcoholConsumption=4, Smoking='No', FamilyHistory='Yes', StressLevel=1, HeartRate=91, SweatingLevel=3, DietQuality=1, AnxietyLevel=1),
 Row(Age=20, Gender='Female', SleepHours=5.800000190734863, PhysicalActivity=2.799999952316284, AlcoholConsumption=6, Smoking='Yes', FamilyHistory='No', StressLevel=4, HeartRate=86, SweatingLevel=3, DietQuality=1, AnxietyLevel=2),
 Row(Age=49, Gender='Female', SleepHours=8.199999809265137, PhysicalActivity=2.299999952316284, Alcohol

***RDD(Resilient Distributed Dataset***
- A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.

In [8]:
#  To see the Rdd
df.rdd

MapPartitionsRDD[15] at javaToPython at NativeMethodAccessorImpl.java:0

***df.take(n)***
- Returns Number of first *n* rows of a dataframe.
- docs: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.take.html
- df.head(n) also has similar functionalities.
- df.tail(n) returns last n rows.


In [9]:
df.take(1), df.head(1), df.tail(1)

([Row(Age=29, Gender='Female', SleepHours=6.0, PhysicalActivity=2.700000047683716, AlcoholConsumption=10, Smoking='Yes', FamilyHistory='No', StressLevel=10, HeartRate=114, SweatingLevel=4, DietQuality=7, AnxietyLevel=5)],
 [Row(Age=29, Gender='Female', SleepHours=6.0, PhysicalActivity=2.700000047683716, AlcoholConsumption=10, Smoking='Yes', FamilyHistory='No', StressLevel=10, HeartRate=114, SweatingLevel=4, DietQuality=7, AnxietyLevel=5)],
 [Row(Age=56, Gender='Other', SleepHours=6.099999904632568, PhysicalActivity=1.100000023841858, AlcoholConsumption=11, Smoking='No', FamilyHistory='No', StressLevel=1, HeartRate=66, SweatingLevel=3, DietQuality=8, AnxietyLevel=2)])

***Selecting Multiple columns in a dataframe:***
- df.select([çol1, col2, ....])
- docs: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.select.html

In [10]:
df.select(['Age', 'Gender', 'AlcoholConsumption', 'Smoking']).show()

+---+------+------------------+-------+
|Age|Gender|AlcoholConsumption|Smoking|
+---+------+------------------+-------+
| 29|Female|                10|    Yes|
| 46| Other|                 8|    Yes|
| 64|  Male|                 4|     No|
| 20|Female|                 6|    Yes|
| 49|Female|                 4|    Yes|
| 53|  Male|                 2|     No|
| 20|  Male|                14|    Yes|
| 54|Female|                15|     No|
| 51| Other|                 2|     No|
| 59|Female|                15|    Yes|
| 30|  Male|                 4|     No|
| 38| Other|                14|    Yes|
| 45| Other|                 2|     No|
| 31|Female|                 8|     No|
| 31| Other|                 8|    Yes|
| 44|Female|                16|     No|
| 56| Other|                11|     No|
| 61| Other|                 3|    Yes|
| 57|Female|                 5|     No|
| 29|Female|                 9|    Yes|
+---+------+------------------+-------+
only showing top 20 rows



In [11]:
# Fiter data where age greater than 30, Gender Female and alcohol consumption greater than 10

df.filter(
    (df.Age >= 30) &
    (df.Gender == 'Female') &
    (df.AlcoholConsumption >= 10)
).show()

+---+------+----------+----------------+------------------+-------+-------------+-----------+---------+-------------+-----------+------------+
|Age|Gender|SleepHours|PhysicalActivity|AlcoholConsumption|Smoking|FamilyHistory|StressLevel|HeartRate|SweatingLevel|DietQuality|AnxietyLevel|
+---+------+----------+----------------+------------------+-------+-------------+-----------+---------+-------------+-----------+------------+
| 54|Female|       6.3|             5.5|                15|     No|           No|          5|      113|            1|          7|           4|
| 59|Female|       5.1|             4.8|                15|    Yes|           No|          5|       95|            5|          1|           4|
| 44|Female|       7.7|             2.0|                16|     No|          Yes|          3|       92|            3|          3|           2|
| 34|Female|       4.5|             3.0|                11|     No|           No|          8|       82|            2|         10|           7|

In [12]:
# Filter data smoking is NO, diet quality is greater than 6 and anxiety level greater than 6 and select only Age, Gender, Smoking, DietQuality and AnxietyLevel order by Age Ascending
df.filter(
    (df.Smoking=='No') &
    (df.DietQuality >= 6) &
    (df.AnxietyLevel > 6)
).select(['Age', 'Gender', 'Smoking', 'DietQuality', 'AnxietyLevel']).orderBy(df.Age.asc()).show()

+---+------+-------+-----------+------------+
|Age|Gender|Smoking|DietQuality|AnxietyLevel|
+---+------+-------+-----------+------------+
| 20| Other|     No|         10|           7|
| 21|Female|     No|          9|           7|
| 21|Female|     No|          9|           7|
| 22| Other|     No|         10|           7|
| 24|Female|     No|          9|           7|
| 29|Female|     No|          9|           7|
| 30|Female|     No|          9|           7|
| 32| Other|     No|          8|           7|
| 32|Female|     No|         10|           7|
| 33|Female|     No|          9|           7|
| 34|Female|     No|         10|           7|
| 38|  Male|     No|          7|           7|
| 39|Female|     No|          7|           7|
| 40| Other|     No|         10|           7|
| 40| Other|     No|         10|           8|
| 42|Female|     No|          6|           7|
| 43|Female|     No|         10|           7|
| 46|Female|     No|          7|           7|
| 46| Other|     No|          7|  

In [13]:
# To Add a new column to the dataframe called Gender Map where 1 where Gender is Male , 0 where Gender is Female and -1 where Gender is Others
from pyspark.sql.functions import when, col
# condition = 1 if col('Gender') == 'Male' else 0 if col('Gender') == 'Female' else -1
df.withColumn(
    'GenderMap', when(df.Gender== 'Male', 1).when(df.Gender=='Female', 0).when(df.Gender=='Other', -1).otherwise(1)
).select(['Age', 'Gender', 'Smoking', 'DietQuality', 'AnxietyLevel', 'GenderMap']).show()

+---+------+-------+-----------+------------+---------+
|Age|Gender|Smoking|DietQuality|AnxietyLevel|GenderMap|
+---+------+-------+-----------+------------+---------+
| 29|Female|    Yes|          7|           5|        0|
| 46| Other|    Yes|          8|           3|       -1|
| 64|  Male|     No|          1|           1|        1|
| 20|Female|    Yes|          1|           2|        0|
| 49|Female|    Yes|          3|           1|        0|
| 53|  Male|     No|          5|           4|        1|
| 20|  Male|    Yes|          2|           4|        1|
| 54|Female|     No|          7|           4|        0|
| 51| Other|     No|          8|           3|       -1|
| 59|Female|    Yes|          1|           4|        0|
| 30|  Male|     No|          9|           2|        1|
| 38| Other|    Yes|          3|           2|       -1|
| 45| Other|     No|         10|           3|       -1|
| 31|Female|     No|          4|           4|        0|
| 31| Other|    Yes|         10|           4|   

In [14]:
# Dropping as column
df.drop('Age', 'Gender').show(2)

+----------+----------------+------------------+-------+-------------+-----------+---------+-------------+-----------+------------+
|SleepHours|PhysicalActivity|AlcoholConsumption|Smoking|FamilyHistory|StressLevel|HeartRate|SweatingLevel|DietQuality|AnxietyLevel|
+----------+----------------+------------------+-------+-------------+-----------+---------+-------------+-----------+------------+
|       6.0|             2.7|                10|    Yes|           No|         10|      114|            4|          7|           5|
|       6.2|             5.7|                 8|    Yes|          Yes|          1|       62|            2|          8|           3|
+----------+----------------+------------------+-------+-------------+-----------+---------+-------------+-----------+------------+
only showing top 2 rows



In [15]:
# Renaing a column
df.withColumnRenamed('Gender', 'GENDER').show(2)

+---+------+----------+----------------+------------------+-------+-------------+-----------+---------+-------------+-----------+------------+
|Age|GENDER|SleepHours|PhysicalActivity|AlcoholConsumption|Smoking|FamilyHistory|StressLevel|HeartRate|SweatingLevel|DietQuality|AnxietyLevel|
+---+------+----------+----------------+------------------+-------+-------------+-----------+---------+-------------+-----------+------------+
| 29|Female|       6.0|             2.7|                10|    Yes|           No|         10|      114|            4|          7|           5|
| 46| Other|       6.2|             5.7|                 8|    Yes|          Yes|          1|       62|            2|          8|           3|
+---+------+----------+----------------+------------------+-------+-------------+-----------+---------+-------------+-----------+------------+
only showing top 2 rows



In [16]:
# # Pyspark Userdefind Functions ♂, ♀
# this will work with Spark
# from pyspark.sql.functions import udf

# @udf('string')
# def make_symbol(gender):
#     if gender == 'Male':
#         return '♂'
#     elif gender == 'female':
#         return '♀'
#     else:
#         return gender

# df.select('Age', 'Gender', make_symbol(df.Gender).alias('GenderIcon')).show()

In [19]:
# Save dataframe in an RDD in form of a text file
# on windows it gives error
# df.rdd.saveAsTextFile(path='./RDD/anxiety', compressionCodecClass=None)

Py4JJavaError: An error occurred while calling o160.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/D:/codes/pyspark/RDD/anxiety already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:299)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1027)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1009)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:965)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1623)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1623)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1609)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1609)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:564)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:563)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
