## With low-level RDD API (unstructured)

In [None]:
#|output: false
from pyspark import SparkContext

sc = SparkContext('local')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/22 22:28:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
dataRDD = sc.parallelize([('Brooke', 20), ('Denny', 31), ('Jules', 30), ('TD', 35), ('Brooke', 25)])

In [None]:
dataRDD.first()

('Brooke', 20)

In [None]:
dataRDD.collect() # only use when the dataset is small

[('Brooke', 20), ('Denny', 31), ('Jules', 30), ('TD', 35), ('Brooke', 25)]

In [None]:
dataRDD.take(2) # use `take(n)` to get the first n rows

[('Brooke', 20), ('Denny', 31)]

In [None]:
mapedRDD = dataRDD.map(lambda x: (x[0], (x[1], 1)))
mapedRDD.collect()

[('Brooke', (20, 1)),
 ('Denny', (31, 1)),
 ('Jules', (30, 1)),
 ('TD', (35, 1)),
 ('Brooke', (25, 1))]

In [None]:
reducedRDD = mapedRDD.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
reducedRDD.collect()

[('Brooke', (45, 2)), ('Denny', (31, 1)), ('Jules', (30, 1)), ('TD', (35, 1))]

In [None]:
mapedRDD = reducedRDD.map(lambda x: (x[0], x[1][0]/x[1][1]))
mapedRDD.collect()

[('Brooke', 22.5), ('Denny', 31.0), ('Jules', 30.0), ('TD', 35.0)]

In [None]:
#|output: false
sc.stop()

## With high-level DSL operates and DataFrame API (structured)

> DSL: domain specific language

In [None]:
#|output: false
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

spark = SparkSession.builder.appName('AvgAges').getOrCreate()

In [None]:
data_df = spark.createDataFrame([('Brooke', 20), ('Denny', 31), ('Jules', 30), ('TD', 35), ('Brooke', 25)], 
                                ['name', 'age'])
data_df.show()

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

+------+---+
|  name|age|
+------+---+
|Brooke| 20|
| Denny| 31|
| Jules| 30|
|    TD| 35|
|Brooke| 25|
+------+---+



In [None]:
data_df.show(1)

+------+---+
|  name|age|
+------+---+
|Brooke| 20|
+------+---+
only showing top 1 row



In [None]:
data_df.take(2) # Get the first 2 rows

[Row(name='Brooke', age=20), Row(name='Denny', age=31)]

In [None]:
data_df.tail(2) # Get the last 3 rows

[Row(name='TD', age=35), Row(name='Brooke', age=25)]

In [None]:
avg_df = data_df.groupBy('name').agg(avg('age'))
avg_df.show()

+------+--------+
|  name|avg(age)|
+------+--------+
|Brooke|    22.5|
| Denny|    31.0|
| Jules|    30.0|
|    TD|    35.0|
+------+--------+



:::{.callout-note}
            
This version is far more expressive and simpler than the previous one.

:::

In [None]:
#|output: false
spark.stop()