In [1]:
print (sc.version)

2.4.8


In [2]:
import os
import shutil
import pandas as pd
# import sh
from pyspark.sql.functions import *
#from pyspark.sql import functions as Fn
from pyspark.sql.types import *


In [3]:
pd.set_option("max_colwidth", 100)

Set up COS functions for GCS

In [5]:
from google.cloud import storage

In [7]:
# Reading data from open bucket, available to all students.
bucket_read = 'big-data-bucket-uci-class'

# Saving Results into individual bucket, students must update their own bucket.
bucket_write = 'big-data-bucket-uci-class'

In [8]:
!hadoop fs -ls 'gs://big-data-bucket-uci-class/lung'

Found 1 items
-rwx------   3 root root      35057 2022-06-04 15:37 gs://big-data-bucket-uci-class/lung/LungCapData.txt


In [9]:
path = "gs://big-data-bucket-uci-class/lung/LungCapData.txt"

Spark Dataframe did not exist before 2.x

In [11]:
lung_df = spark.read.csv(path, header='true', inferSchema='true', sep=',', quote='"')

lung_df.cache()

DataFrame[LungCap: double, Age: double, Height: double, Smoke: string, Gender: string, Caesarean: string]

In [12]:
type(lung_df)

pyspark.sql.dataframe.DataFrame

In [13]:
# Display first record
lung_df.first()

Row(LungCap=6.475, Age=6.0, Height=62.1, Smoke='no', Gender='male', Caesarean='no')

In [14]:
# release the cache
lung_df.unpersist()

DataFrame[LungCap: double, Age: double, Height: double, Smoke: string, Gender: string, Caesarean: string]

In [15]:
# cache it again
lung_df.cache()

DataFrame[LungCap: double, Age: double, Height: double, Smoke: string, Gender: string, Caesarean: string]

In [16]:
# Display first 5 rows
lung_df.head(5)

[Row(LungCap=6.475, Age=6.0, Height=62.1, Smoke='no', Gender='male', Caesarean='no'),
 Row(LungCap=10.125, Age=18.0, Height=74.7, Smoke='yes', Gender='female', Caesarean='no'),
 Row(LungCap=9.55, Age=16.0, Height=69.7, Smoke='no', Gender='female', Caesarean='yes'),
 Row(LungCap=11.125, Age=14.0, Height=71.0, Smoke='no', Gender='male', Caesarean='no'),
 Row(LungCap=4.8, Age=5.0, Height=56.9, Smoke='no', Gender='male', Caesarean='no')]

In [20]:
# Use show for a better display
lung_df.show(5)

+-------+----+------+-----+------+---------+
|LungCap| Age|Height|Smoke|Gender|Caesarean|
+-------+----+------+-----+------+---------+
|  6.475| 6.0|  62.1|   no|  male|       no|
| 10.125|18.0|  74.7|  yes|female|       no|
|   9.55|16.0|  69.7|   no|female|      yes|
| 11.125|14.0|  71.0|   no|  male|       no|
|    4.8| 5.0|  56.9|   no|  male|       no|
+-------+----+------+-----+------+---------+
only showing top 5 rows



For a better formatting of data enable repl.eagerEval

In [21]:
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [22]:
lung_df.limit(5)

LungCap,Age,Height,Smoke,Gender,Caesarean
6.475,6.0,62.1,no,male,no
10.125,18.0,74.7,yes,female,no
9.55,16.0,69.7,no,female,yes
11.125,14.0,71.0,no,male,no
4.8,5.0,56.9,no,male,no


Print Schema is helpful for nested JSON formats.

In [24]:
lung_df.printSchema()

root
 |-- LungCap: double (nullable = true)
 |-- Age: double (nullable = true)
 |-- Height: double (nullable = true)
 |-- Smoke: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Caesarean: string (nullable = true)



Warning, Describe without "show" will give misleading results.

In [25]:
lung_df.describe()

summary,LungCap,Age,Height,Smoke,Gender,Caesarean
count,725.0,725.0,725.0,725,725,725
mean,7.863147586206895,12.326896551724138,64.83627586206904,,,
stddev,2.6620082486787164,4.004749514042408,7.202144012503063,,,
min,0.507,3.0,45.3,no,female,no
max,14.675,19.0,81.8,yes,male,yes


In [27]:
lung_df.describe().show()

+-------+------------------+------------------+-----------------+-----+------+---------+
|summary|           LungCap|               Age|           Height|Smoke|Gender|Caesarean|
+-------+------------------+------------------+-----------------+-----+------+---------+
|  count|               725|               725|              725|  725|   725|      725|
|   mean| 7.863147586206895|12.326896551724138|64.83627586206904| null|  null|     null|
| stddev|2.6620082486787164| 4.004749514042408|7.202144012503063| null|  null|     null|
|    min|             0.507|               3.0|             45.3|   no|female|       no|
|    max|            14.675|              19.0|             81.8|  yes|  male|      yes|
+-------+------------------+------------------+-----------------+-----+------+---------+



Spark DF Variable Casting

In [29]:
lung_df = lung_df.withColumn("Age", lung_df["Age"].cast(IntegerType()))

In [30]:
lung_df.printSchema()

root
 |-- LungCap: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Height: double (nullable = true)
 |-- Smoke: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Caesarean: string (nullable = true)



In [31]:
lung_df_mod = lung_df.\
withColumn("Random", rand()).\
withColumn("Zero", lit(0)).\
withColumn("Country",lit('Italy')).\
withColumn("City", lit('Rome'))

In [32]:
lung_df_mod.show(5)

+-------+---+------+-----+------+---------+--------------------+----+-------+----+
|LungCap|Age|Height|Smoke|Gender|Caesarean|              Random|Zero|Country|City|
+-------+---+------+-----+------+---------+--------------------+----+-------+----+
|  6.475|  6|  62.1|   no|  male|       no|  0.9247807853300706|   0|  Italy|Rome|
| 10.125| 18|  74.7|  yes|female|       no|  0.7634121438873512|   0|  Italy|Rome|
|   9.55| 16|  69.7|   no|female|      yes| 0.44689831575447114|   0|  Italy|Rome|
| 11.125| 14|  71.0|   no|  male|       no|  0.7764535190257928|   0|  Italy|Rome|
|    4.8|  5|  56.9|   no|  male|       no|0.036606339531124354|   0|  Italy|Rome|
+-------+---+------+-----+------+---------+--------------------+----+-------+----+
only showing top 5 rows



Feature Engineering: numerical transformation

In [38]:
lung_df_feat = lung_df.\
withColumn("LungCap_10", lung_df.LungCap * 10).\
withColumn("LungCap_Exp", exp("LungCap")).\
withColumn("LungCap_Log", log("LungCap")).\
withColumn("LungCap_Sqrt",sqrt("LungCap")).\
withColumn("LungCap_Sq", lung_df.LungCap ** 2)

lung_df_feat.show()

+-------+---+------+-----+------+---------+----------+------------------+------------------+------------------+------------------+
|LungCap|Age|Height|Smoke|Gender|Caesarean|LungCap_10|       LungCap_Exp|       LungCap_Log|      LungCap_Sqrt|        LungCap_Sq|
+-------+---+------+-----+------+---------+----------+------------------+------------------+------------------+------------------+
|  6.475|  6|  62.1|   no|  male|       no|     64.75| 648.7192276108779|1.8679486075856013| 2.544602129999894|         41.925625|
| 10.125| 18|  74.7|  yes|female|       no|    101.25|24959.255641914595| 2.315007612992603| 3.181980515339464|        102.515625|
|   9.55| 16|  69.7|   no|female|      yes|      95.5| 14044.69467150283| 2.256541154492639|3.0903074280724887| 91.20250000000001|
| 11.125| 14|  71.0|   no|  male|       no|    111.25| 67846.29106328034| 2.409194828052304|3.3354160160315836|        123.765625|
|    4.8|  5|  56.9|   no|  male|       no|      48.0|121.51041751873485|1.56861591

Selecting and reordering columns

In [41]:
lung_df_feat.select\
(["LungCap_10", "LungCap_Exp", "LungCap_Log", "LungCap_Sqrt","LungCap_Sq"]).\
show(5)

+----------+------------------+------------------+------------------+-----------------+
|LungCap_10|       LungCap_Exp|       LungCap_Log|      LungCap_Sqrt|       LungCap_Sq|
+----------+------------------+------------------+------------------+-----------------+
|     64.75| 648.7192276108779|1.8679486075856013| 2.544602129999894|        41.925625|
|    101.25|24959.255641914595| 2.315007612992603| 3.181980515339464|       102.515625|
|      95.5| 14044.69467150283| 2.256541154492639|3.0903074280724887|91.20250000000001|
|    111.25| 67846.29106328034| 2.409194828052304|3.3354160160315836|       123.765625|
|      48.0|121.51041751873485|1.5686159179138452|2.1908902300206643|            23.04|
+----------+------------------+------------------+------------------+-----------------+
only showing top 5 rows



In [42]:
lung_df_feat.select\
(["Age", "Height", "LungCap","LungCap_10", "LungCap_Exp", "LungCap_Log", "LungCap_Sqrt","LungCap_Sq"]).\
show(5)

+---+------+-------+----------+------------------+------------------+------------------+-----------------+
|Age|Height|LungCap|LungCap_10|       LungCap_Exp|       LungCap_Log|      LungCap_Sqrt|       LungCap_Sq|
+---+------+-------+----------+------------------+------------------+------------------+-----------------+
|  6|  62.1|  6.475|     64.75| 648.7192276108779|1.8679486075856013| 2.544602129999894|        41.925625|
| 18|  74.7| 10.125|    101.25|24959.255641914595| 2.315007612992603| 3.181980515339464|       102.515625|
| 16|  69.7|   9.55|      95.5| 14044.69467150283| 2.256541154492639|3.0903074280724887|91.20250000000001|
| 14|  71.0| 11.125|    111.25| 67846.29106328034| 2.409194828052304|3.3354160160315836|       123.765625|
|  5|  56.9|    4.8|      48.0|121.51041751873485|1.5686159179138452|2.1908902300206643|            23.04|
+---+------+-------+----------+------------------+------------------+------------------+-----------------+
only showing top 5 rows



Starting from Spark 2.1 you can use DROP method

In [43]:
lung_df_feat.drop('Smoke', 'Gender','Caesarean').show(5)

+-------+---+------+----------+------------------+------------------+------------------+-----------------+
|LungCap|Age|Height|LungCap_10|       LungCap_Exp|       LungCap_Log|      LungCap_Sqrt|       LungCap_Sq|
+-------+---+------+----------+------------------+------------------+------------------+-----------------+
|  6.475|  6|  62.1|     64.75| 648.7192276108779|1.8679486075856013| 2.544602129999894|        41.925625|
| 10.125| 18|  74.7|    101.25|24959.255641914595| 2.315007612992603| 3.181980515339464|       102.515625|
|   9.55| 16|  69.7|      95.5| 14044.69467150283| 2.256541154492639|3.0903074280724887|91.20250000000001|
| 11.125| 14|  71.0|    111.25| 67846.29106328034| 2.409194828052304|3.3354160160315836|       123.765625|
|    4.8|  5|  56.9|      48.0|121.51041751873485|1.5686159179138452|2.1908902300206643|            23.04|
+-------+---+------+----------+------------------+------------------+------------------+-----------------+
only showing top 5 rows



Conditions and if-then: single condition - Binary Feature of 0 or 1.

In [49]:
lung_df_cond = lung_df.\
withColumn("Smoker_Binary", when(col("Smoke") == 'yes', 1).otherwise(0)).\
withColumn("Caesarean_Binary", when(col("Caesarean") == 'yes', 1).otherwise(0))

lung_df_cond.show(5)

+-------+---+------+-----+------+---------+-------------+----------------+
|LungCap|Age|Height|Smoke|Gender|Caesarean|Smoker_Binary|Caesarean_Binary|
+-------+---+------+-----+------+---------+-------------+----------------+
|  6.475|  6|  62.1|   no|  male|       no|            0|               0|
| 10.125| 18|  74.7|  yes|female|       no|            1|               0|
|   9.55| 16|  69.7|   no|female|      yes|            0|               1|
| 11.125| 14|  71.0|   no|  male|       no|            0|               0|
|    4.8|  5|  56.9|   no|  male|       no|            0|               0|
+-------+---+------+-----+------+---------+-------------+----------------+
only showing top 5 rows



In [56]:
lung_df_cond = lung_df.\
withColumn("Education",\
           when(col("Age") < 5, "Preschool").\
           when(col("Age") < 9, "Elementary School").\
           when(col("Age") < 14, "Middle School").\
           when(col("Age") < 18, "High School").\
           otherwise ("College"))

In [57]:
lung_df_cond.select\
(["Age","Education","Height"]).\
show(5)

+---+-----------------+------+
|Age|        Education|Height|
+---+-----------------+------+
|  6|Elementary School|  62.1|
| 18|          College|  74.7|
| 16|      High School|  69.7|
| 14|      High School|  71.0|
|  5|Elementary School|  56.9|
+---+-----------------+------+
only showing top 5 rows



Sorting a Spark Data Frame

In [70]:
# lung_df_cond.orderBy("Age",ascending=False).select(["Age","Education","Height"]).show(20)
# Sort by Education and then Height.
lung_df_cond.orderBy("Education","Height",ascending=False).show(40)


+-------+---+------+-----+------+---------+-------------+
|LungCap|Age|Height|Smoke|Gender|Caesarean|    Education|
+-------+---+------+-----+------+---------+-------------+
|  5.875|  3|  55.9|   no|  male|       no|    Preschool|
|    3.4|  4|  55.6|   no|  male|       no|    Preschool|
|  2.875|  4|  55.4|   no|  male|       no|    Preschool|
|  3.675|  3|  54.2|   no|  male|      yes|    Preschool|
|   4.65|  4|  53.7|   no|female|       no|    Preschool|
|  4.075|  3|  53.6|   no|  male|      yes|    Preschool|
|  5.475|  3|  52.9|   no|  male|       no|    Preschool|
|  3.225|  4|  52.8|   no|female|       no|    Preschool|
|    4.7|  3|  52.7|   no|  male|       no|    Preschool|
|   3.25|  3|  52.0|   no|  male|       no|    Preschool|
|  1.675|  3|  51.9|   no|  male|       no|    Preschool|
|  1.175|  3|  51.9|   no|  male|       no|    Preschool|
|  2.375|  4|  51.7|   no|female|      yes|    Preschool|
|  0.507|  3|  51.6|   no|female|      yes|    Preschool|
|    2.0|  3| 

In [None]:
# You can chain all of your filtering, sorting, selecting, etc or do on a line of their own.
# SPARK CREATES A DAG AND DOESN'T EXECUTE UNTIL YOU EXECUTE .SHOW(). 

Spark Filtering

In [83]:
lung_df_filt = lung_df_cond.filter("Age < 10")
lung_df_filt.orderBy('Age', ascending=False).show(20)
# lung_df_filt.show(20) this doesn't work.???

# Alternate way to do sort
lung_df_filt = lung_df_cond.filter(lung_df_cond.Age < 10)
lung_df_filt.orderBy('Age', ascending=False).show(20)


+-------+---+------+-----+------+---------+-------------+
|LungCap|Age|Height|Smoke|Gender|Caesarean|    Education|
+-------+---+------+-----+------+---------+-------------+
|    8.0|  9|  66.4|   no|  male|       no|Middle School|
|  9.325|  9|  66.5|   no|  male|       no|Middle School|
|  7.275|  9|  63.7|   no|  male|       no|Middle School|
|  8.775|  9|  63.6|   no|  male|       no|Middle School|
|  7.625|  9|  60.0|   no|female|       no|Middle School|
|   6.95|  9|  61.4|   no|female|       no|Middle School|
|  5.225|  9|  53.7|   no|female|       no|Middle School|
|   6.95|  9|  63.9|   no|  male|      yes|Middle School|
|  7.975|  9|  61.9|   no|  male|       no|Middle School|
|  6.725|  9|  56.1|   no|  male|       no|Middle School|
|  8.775|  9|  59.2|   no|female|       no|Middle School|
|  3.425|  9|  51.0|   no|  male|       no|Middle School|
|  4.725|  9|  59.3|   no|female|       no|Middle School|
|  9.025|  9|  65.6|   no|  male|       no|Middle School|
|  6.225|  9| 

Group By Operations with Functions applied

In [102]:
lung_df.groupby('Gender').agg(sum('LungCap')).show()                                                                     
                                                                     

+------+------------------+
|Gender|      sum(LungCap)|
+------+------------------+
|female|          2651.257|
|  male|3049.5249999999987|
+------+------------------+



In [94]:
lung_df.groupby('Gender').agg(mean('Height')).show()

+------+------------------+
|Gender|       avg(Height)|
+------+------------------+
|female|63.805586592178805|
|  male|   65.841689373297|
+------+------------------+



In [95]:
lung_df.groupby('Gender').sum('Height', 'LungCap').show()

+------+------------------+------------------+
|Gender|       sum(Height)|      sum(LungCap)|
+------+------------------+------------------+
|female|22842.400000000012|          2651.257|
|  male|           24163.9|3049.5249999999987|
+------+------------------+------------------+



In [96]:
lung_df.groupby('Gender').agg(min('Height'),max('Height')).orderBy('Gender').show()

+------+-----------+-----------+
|Gender|min(Height)|max(Height)|
+------+-----------+-----------+
|female|       45.3|       79.8|
|  male|       47.8|       81.8|
+------+-----------+-----------+



In [98]:
lung_df.groupby('Gender').agg(sum('LungCap'),count('*')).show()

+------+------------------+--------+
|Gender|      sum(LungCap)|count(1)|
+------+------------------+--------+
|female|          2651.257|     358|
|  male|3049.5249999999987|     367|
+------+------------------+--------+



In [101]:
lung_df.groupby('Gender').agg(countDistinct('Smoke')).show()

+------+---------------------+
|Gender|count(DISTINCT Smoke)|
+------+---------------------+
|female|                    2|
|  male|                    2|
+------+---------------------+



In [104]:
lung_df.groupby('Gender').agg(min('Height'),avg('Height'), max('Height'), sum('Height'),\
                                               min('Age'), avg('Age'), max('Age'),\
                                                  count('*')).orderBy('Gender', ascending=False).show()

+------+-----------+------------------+-----------+------------------+--------+------------------+--------+--------+
|Gender|min(Height)|       avg(Height)|max(Height)|       sum(Height)|min(Age)|          avg(Age)|max(Age)|count(1)|
+------+-----------+------------------+-----------+------------------+--------+------------------+--------+--------+
|  male|       47.8|   65.841689373297|       81.8|           24163.9|       3|12.207084468664851|      19|     367|
|female|       45.3|63.805586592178805|       79.8|22842.400000000012|       3|12.449720670391061|      19|     358|
+------+-----------+------------------+-----------+------------------+--------+------------------+--------+--------+



How to rename columns for better readability
When doing line continuation "\" do not have a space after it.  An error will occur.
For better readability.

In [120]:
lung_df.groupby('Gender')\
    .agg\
        (min('Height').alias('Height_Min'),\
         avg('Height').alias('Height_Avg'),\
         max('Height').alias('Height_Max'),\
         sum('Height').alias('Height_Sum'),\
         min('Age').alias('Age_Min'),\
         avg('Age').alias('Age_Avg'),\
         max('Age').alias('Age_Max'),\
         count('*').alias('Record_Cnt'))\
    .orderBy('Gender', ascending=False).show()

+------+----------+------------------+----------+------------------+-------+------------------+-------+----------+
|Gender|Height_Min|        Height_Avg|Height_Max|        Height_Sum|Age_Min|           Age_Avg|Age_Max|Record_Cnt|
+------+----------+------------------+----------+------------------+-------+------------------+-------+----------+
|  male|      47.8|   65.841689373297|      81.8|           24163.9|      3|12.207084468664851|     19|       367|
|female|      45.3|63.805586592178805|      79.8|22842.400000000012|      3|12.449720670391061|     19|       358|
+------+----------+------------------+----------+------------------+-------+------------------+-------+----------+



The next section is sampling of the data.  Make time to do!!