In [47]:
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

data = pd.read_csv('data/healthcare-dataset-stroke-data.csv')

df = spark.createDataFrame(data)
df.show()

+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|   id|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level| bmi| smoking_status|stroke|
+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
| 9046|  Male|67.0|           0|            1|         Yes|      Private|         Urban|           228.69|36.6|formerly smoked|     1|
|51676|Female|61.0|           0|            0|         Yes|Self-employed|         Rural|           202.21| NaN|   never smoked|     1|
|31112|  Male|80.0|           0|            1|         Yes|      Private|         Rural|           105.92|32.5|   never smoked|     1|
|60182|Female|49.0|           0|            0|         Yes|      Private|         Urban|           171.23|34.4|         smokes|     1|
| 1665|Female|79.0|           1|            0|         

In [48]:
df.printSchema()
df.select('age').describe().show(5)
# df.collect 
# DataFrame.collect() collects the distributed data to the driver side as the local data in Python.
# Note that this can throw an out-of-memory error when the dataset is too large to fit in the driver side
# because it collects all the data from executors to the driver side.
# We can use df.take(n) or df.tail
# DataFrame.toPandas() can also throw an out-of-memory error

root
 |-- id: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: long (nullable = true)
 |-- heart_disease: long (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: long (nullable = true)

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|              5110|
|   mean|43.226614481409015|
| stddev| 22.61264672311348|
|    min|              0.08|
|    max|              82.0|
+-------+------------------+



In [49]:
from pyspark.sql import Column
from pyspark.sql.functions import upper

test_df = df.select('gender')
test_df = test_df.withColumn('upper_gender', upper(test_df.gender))
test_df.show()
test_df.filter(test_df.upper_gender == 'MALE').show()

+------+------------+
|gender|upper_gender|
+------+------------+
|  Male|        MALE|
|Female|      FEMALE|
|  Male|        MALE|
|Female|      FEMALE|
|Female|      FEMALE|
|  Male|        MALE|
|  Male|        MALE|
|Female|      FEMALE|
|Female|      FEMALE|
|Female|      FEMALE|
|Female|      FEMALE|
|Female|      FEMALE|
|Female|      FEMALE|
|  Male|        MALE|
|Female|      FEMALE|
|Female|      FEMALE|
|  Male|        MALE|
|  Male|        MALE|
|Female|      FEMALE|
|  Male|        MALE|
+------+------------+
only showing top 20 rows

+------+------------+
|gender|upper_gender|
+------+------------+
|  Male|        MALE|
|  Male|        MALE|
|  Male|        MALE|
|  Male|        MALE|
|  Male|        MALE|
|  Male|        MALE|
|  Male|        MALE|
|  Male|        MALE|
|  Male|        MALE|
|  Male|        MALE|
|  Male|        MALE|
|  Male|        MALE|
|  Male|        MALE|
|  Male|        MALE|
|  Male|        MALE|
|  Male|        MALE|
|  Male|        MALE|
|  Mal

In [50]:
smoke_status_summary = df.groupby('smoking_status').count()
type(smoke_status_summary)

pyspark.sql.dataframe.DataFrame

In [51]:
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1

df.select(pandas_plus_one(df.age)).show()

+--------------------+
|pandas_plus_one(age)|
+--------------------+
|                  68|
|                  62|
|                  81|
|                  50|
|                  80|
|                  82|
|                  75|
|                  70|
|                  60|
|                  79|
|                  82|
|                  62|
|                  55|
|                  79|
|                  80|
|                  51|
|                  65|
|                  76|
|                  61|
|                  58|
+--------------------+
only showing top 20 rows



In [55]:
smoke_status_summary.show()

+---------------+-----+
| smoking_status|count|
+---------------+-----+
|         smokes|  789|
|        Unknown| 1544|
|   never smoked| 1892|
|formerly smoked|  885|
+---------------+-----+



In [61]:
# Can use DataFrame and SQL interchangeably
df.createOrReplaceTempView('tableA')
spark.sql('SELECT count(*) from tableA')

# Can register UDFs and invoke them in SQL
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1
spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(age) FROM tableA")

# These SQL expressions can directly be mixed and used as PySpark columns.
from pyspark.sql.functions import expr

df.selectExpr('add_one(age)').show()
df.select(expr('count(*)') > 0).show()

+------------+
|add_one(age)|
+------------+
|          68|
|          62|
|          81|
|          50|
|          80|
|          82|
|          75|
|          70|
|          60|
|          79|
|          82|
|          62|
|          55|
|          79|
|          80|
|          51|
|          65|
|          76|
|          61|
|          58|
+------------+
only showing top 20 rows

+--------------+
|(count(1) > 0)|
+--------------+
|          true|
+--------------+

