In [1]:
import sys, os
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession, Catalog
from pyspark.sql import DataFrame, DataFrameStatFunctions, DataFrameNaFunctions
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.types import Row
from subprocess import check_output

SPARK_DRIVER_HOST = check_output(["hostname", "-i"]).decode(encoding="utf-8").strip()
spark_conf = SparkConf()
spark_conf.setAll([
    ('spark.master', 'spark://spark:7077'),
    ('spark.app.name', 'myApp'),
    ('spark.submit.deployMode', 'client'),
    ('spark.ui.showConsoleProgress', 'true'),
    ('spark.eventLog.enabled', 'false'),
    ('spark.logConf', 'false'),
    ('spark.driver.bindAddress', '0.0.0.0'),
    ('spark.driver.host', SPARK_DRIVER_HOST),
    ('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.0,com.amazonaws:aws-java-sdk-bundle:1.11.704,org.apache.spark:spark-hadoop-cloud_2.12:3.3.0'),
    ("spark.hadoop.fs.s3a.endpoint", 'http://minio:9000'),
    ('spark.hadoop.fs.s3a.access.key', 'minio-root-user'),
    ('spark.hadoop.fs.s3a.secret.key', 'minio-root-password'),
    ('spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled', True),
    ("spark.hadoop.fs.s3a.fast.upload", True),
    ("spark.hadoop.fs.s3a.path.style.access", True),
    ("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
])
 
spark_sess          = SparkSession.builder.config(conf=spark_conf).getOrCreate()
spark_ctxt          = spark_sess.sparkContext
spark_reader        = spark_sess.read
spark_streamReader  = spark_sess.readStream
spark_ctxt.setLogLevel("WARN")

heartDF = spark_sess.read.option("header",True).csv('s3a://heart/heart.csv')

heartDF.show(truncate=False)

:: loading settings :: url = jar:file:/opt/bitnami/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
org.apache.spark#spark-hadoop-cloud_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-866e00c4-00d3-470d-bf11-97f3ce7b3d06;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.0 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.704 in central
	found org.apache.spark#spark-hadoop-cloud_2.12;3.3.0 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.hadoop#hadoop-aws;3.3.2 in central
	found com.amazonaws#aws-java-sdk-bund

22/10/04 21:49:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/04 21:49:55 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


                                                                                

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex|cp |trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope|ca |thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|63 |1  |3  |145     |233 |1  |0      |150    |0    |2.3    |0    |0  |1   |1     |
|37 |1  |2  |130     |250 |0  |1      |187    |0    |3.5    |0    |0  |2   |1     |
|41 |0  |1  |130     |204 |0  |0      |172    |0    |1.4    |2    |0  |2   |1     |
|56 |1  |1  |120     |236 |0  |1      |178    |0    |0.8    |2    |0  |2   |1     |
|57 |0  |0  |120     |354 |0  |1      |163    |1    |0.6    |2    |0  |2   |1     |
|57 |1  |0  |140     |192 |0  |1      |148    |0    |0.4    |1    |0  |1   |1     |
|56 |0  |1  |140     |294 |0  |0      |153    |0    |1.3    |1    |0  |2   |1     |
|44 |1  |1  |120     |263 |0  |1      |173    |0    |0      |2    |0  |3   |1     |
|52 |1  |2  |172     |199 |1  |1      |162    |0    |0.5    |2    |0  |3   |

## Challenge of the Day

. Calculate the average blood pressure of your data
. Choose column in heart.csv and determine the average. 
. Send a broadcast variable with a dictionary of cholesterol (`chol`) levels
  . Normal: Less than 200 mg/dL
  . Borderline high: 200 to 239 mg/dL
  . High: At or above 240 mg/dL
. Run the data by creating a new column called `chol_category` (`'N'`, `'B'`, `'H'`)
. Ensure that the computation is correct, by `show`

In [2]:
heartDF.select(F.avg('chol')).collect()

[Row(avg(chol)=246.26402640264027)]

In [3]:
heartDF.select(F.avg('trestbps')).collect()

[Row(avg(trestbps)=131.62376237623764)]

In [4]:
cholcat = {200:"N", 239:"B", 240:"H"}
broadcastCC = spark_ctxt.broadcast(cholcat)

NewHeartdf = heartDF.withColumn(
    'chol_cat',
    F.when((F.col("chol") <= 200), "N")\
    .when(F.col('chol').between(200,239), "B")\
    .when((F.col("chol") > 239), "H")\
    .otherwise(0)
)
NewHeartdf.show()

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+--------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|chol_cat|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+--------+
| 63|  1|  3|     145| 233|  1|      0|    150|    0|    2.3|    0|  0|   1|     1|       B|
| 37|  1|  2|     130| 250|  0|      1|    187|    0|    3.5|    0|  0|   2|     1|       H|
| 41|  0|  1|     130| 204|  0|      0|    172|    0|    1.4|    2|  0|   2|     1|       B|
| 56|  1|  1|     120| 236|  0|      1|    178|    0|    0.8|    2|  0|   2|     1|       B|
| 57|  0|  0|     120| 354|  0|      1|    163|    1|    0.6|    2|  0|   2|     1|       H|
| 57|  1|  0|     140| 192|  0|      1|    148|    0|    0.4|    1|  0|   1|     1|       N|
| 56|  0|  1|     140| 294|  0|      0|    153|    0|    1.3|    1|  0|   2|     1|       H|
| 44|  1|  1|     120| 263|  0|      1|    173|    0|      0|    2|  0

In [5]:
def my_func(chol):
    return cholcat[chol]


result = heartDF.map(lambda chol: my_func(chol)).collect()

result.show()


AttributeError: 'DataFrame' object has no attribute 'map'

In [6]:
heartDF = spark_sess.read.option("header",True).option("inferSchema", True).csv('s3a://heart/heart.csv')
brodcast_category = {200 :'N', 239:'B', 240:'H'}
broadcastCats = spark_ctxt.broadcast(brodcast_category)

def determineCategory(num):
    for key, value in broadcastCats.value.items():
        if num < key:
            return value
        
determineCategoryUDF = F.udf(lambda c: determineCategory(c),T.StringType())  
resultFrame = heartDF.withColumn('chol_cal', determineCategoryUDF(heartDF.chol))
resultFrame.show()

                                                                                

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+--------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|chol_cal|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+--------+
| 63|  1|  3|     145| 233|  1|      0|    150|    0|    2.3|    0|  0|   1|     1|       B|
| 37|  1|  2|     130| 250|  0|      1|    187|    0|    3.5|    0|  0|   2|     1|    null|
| 41|  0|  1|     130| 204|  0|      0|    172|    0|    1.4|    2|  0|   2|     1|       B|
| 56|  1|  1|     120| 236|  0|      1|    178|    0|    0.8|    2|  0|   2|     1|       B|
| 57|  0|  0|     120| 354|  0|      1|    163|    1|    0.6|    2|  0|   2|     1|    null|
| 57|  1|  0|     140| 192|  0|      1|    148|    0|    0.4|    1|  0|   1|     1|       N|
| 56|  0|  1|     140| 294|  0|      0|    153|    0|    1.3|    1|  0|   2|     1|    null|
| 44|  1|  1|     120| 263|  0|      1|    173|    0|    0.0|    2|  0

                                                                                

In [7]:
broadcast_category = { 240: 'H', 239: 'B', 200 : 'N'}
broadcastCats = spark_ctxt.broadcast(broadcast_category)

def determineCategory(num):
    for key, value in broadcastCats.value.items():
        #print(key)
        if num >= key:
            return value
        else:
            return 'N'

result = determineCategory(141)     
result

'N'

In [8]:
broadcast_category = { 240: 'H', 239: 'B', 200 : 'N'}
broadcastCats = spark_ctxt.broadcast(broadcast_category)

def determineCategory(num):
    answer = 'H'
    for key, value in broadcastCats.value.items():
        if num <= key:
            answer = value
    return answer  
            
result = determineCategory(220)     
result

'B'

In [9]:
determineCategoryUDF = F.udf(lambda c: determineCategory(c),T.StringType())  
resultFrame = heartDF.withColumn('chol_cal', determineCategoryUDF(heartDF.chol))
resultFrame.show()

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+--------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|chol_cal|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+--------+
| 63|  1|  3|     145| 233|  1|      0|    150|    0|    2.3|    0|  0|   1|     1|       B|
| 37|  1|  2|     130| 250|  0|      1|    187|    0|    3.5|    0|  0|   2|     1|       H|
| 41|  0|  1|     130| 204|  0|      0|    172|    0|    1.4|    2|  0|   2|     1|       B|
| 56|  1|  1|     120| 236|  0|      1|    178|    0|    0.8|    2|  0|   2|     1|       B|
| 57|  0|  0|     120| 354|  0|      1|    163|    1|    0.6|    2|  0|   2|     1|       H|
| 57|  1|  0|     140| 192|  0|      1|    148|    0|    0.4|    1|  0|   1|     1|       N|
| 56|  0|  1|     140| 294|  0|      0|    153|    0|    1.3|    1|  0|   2|     1|       H|
| 44|  1|  1|     120| 263|  0|      1|    173|    0|    0.0|    2|  0

In [10]:
result.repartition(5)

AttributeError: 'str' object has no attribute 'repartition'

In [12]:
resultFrame.repartition(5).rdd.glom().collect()

[[Row(age=35, sex=1, cp=1, trestbps=122, chol=192, fbs=0, restecg=1, thalach=174, exang=0, oldpeak=0.0, slope=2, ca=0, thal=2, target=1, chol_cal='N'),
  Row(age=63, sex=1, cp=0, trestbps=130, chol=254, fbs=0, restecg=0, thalach=147, exang=0, oldpeak=1.4, slope=1, ca=1, thal=3, target=0, chol_cal='H'),
  Row(age=56, sex=1, cp=3, trestbps=120, chol=193, fbs=0, restecg=0, thalach=162, exang=0, oldpeak=1.9, slope=1, ca=0, thal=3, target=1, chol_cal='N'),
  Row(age=62, sex=0, cp=0, trestbps=160, chol=164, fbs=0, restecg=0, thalach=145, exang=0, oldpeak=6.2, slope=0, ca=3, thal=3, target=0, chol_cal='N'),
  Row(age=65, sex=0, cp=2, trestbps=155, chol=269, fbs=0, restecg=1, thalach=148, exang=0, oldpeak=0.8, slope=2, ca=0, thal=2, target=1, chol_cal='H'),
  Row(age=63, sex=0, cp=1, trestbps=140, chol=195, fbs=0, restecg=1, thalach=179, exang=0, oldpeak=0.0, slope=2, ca=2, thal=2, target=1, chol_cal='N'),
  Row(age=43, sex=1, cp=0, trestbps=150, chol=247, fbs=0, restecg=1, thalach=171, exang=

In [16]:
high = spark_ctxt.accumulator(0)
borderline = spark_ctxt.accumulator(0)
normal = spark_ctxt.accumulator(0)

In [17]:
resultFrame.where(F.col('chol_cal') == 'H').rdd.foreach(lambda m: high.add(1))
resultFrame.where(F.col('chol_cal') == 'B').rdd.foreach(lambda m: borderline.add(1))
resultFrame.where(F.col('chol_cal') == 'N').rdd.foreach(lambda m: normal.add(1))

In [18]:
high.value

155

In [19]:
borderline.value

97

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 60190)
Traceback (most recent call last):
  File "/opt/bitnami/python/lib/python3.8/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/bitnami/python/lib/python3.8/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/bitnami/python/lib/python3.8/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/bitnami/python/lib/python3.8/socketserver.py", line 747, in __init__
    self.handle()
  File "/opt/bitnami/spark/python/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/opt/bitnami/spark/python/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/opt/bitnami/spark/python/pyspark/accumulators.py", line 257, in accum_updates
    num_updates = read_int(self.r