In [2]:
# https://github.com/areibman/pyspark_exercises?tab=readme-ov-file
# Taken exmples from the above repository

# Ex - GroupBy

### Introduction:

GroupBy can be summarized as Split-Apply-Combine.

Special thanks to: https://github.com/justmarkham for sharing the dataset and materials.

Check out this [Diagram](http://i.imgur.com/yjNkiwL.png)  
### Step 1. Import the necessary libraries

In [6]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("groupby").getOrCreate()
spark

24/12/13 14:52:39 WARN Utils: Your hostname, Prabhakaras-Mac-mini.local resolves to a loopback address: 127.0.0.1; using 192.168.0.62 instead (on interface en1)
24/12/13 14:52:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/13 14:52:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Step 2. Import the dataset from this [address](https://raw.githubusercontent.com/justmarkham/DAT8/master/data/drinks.csv). 

### Step 3. Assign it to a variable called drinks.

In [10]:
from pyspark import SparkFiles

In [20]:
#url = "https://raw.githubusercontent.com/justmarkham/DAT8/master/data/drinks.csv"
drinks_data_file = "/Users/prabhakarapelluru/prabhakara/data_science/code/spark-python-prabha-2024/spark-python-prabha-2024/group_by_examples/drinks.csv"
drinks = spark.read.csv(path=drinks_data_file,header=True,inferSchema=True,sep=',')
drinks.show(5)

+-----------+-------------+---------------+-------------+----------------------------+---------+
|    country|beer_servings|spirit_servings|wine_servings|total_litres_of_pure_alcohol|continent|
+-----------+-------------+---------------+-------------+----------------------------+---------+
|Afghanistan|            0|              0|            0|                         0.0|       AS|
|    Albania|           89|            132|           54|                         4.9|       EU|
|    Algeria|           25|              0|           14|                         0.7|       AF|
|    Andorra|          245|            138|          312|                        12.4|       EU|
|     Angola|          217|             57|           45|                         5.9|       AF|
+-----------+-------------+---------------+-------------+----------------------------+---------+
only showing top 5 rows



### Step 4. Which continent drinks more beer on average?

In [23]:
beer_continent = drinks.groupBy("continent").agg({"beer_servings":"avg"})
beer_continent.orderBy("avg(beer_servings)", ascending=0).head(1)

[Row(continent='EU', avg(beer_servings)=193.77777777777777)]

### Step 5. For each continent print the statistics for wine consumption.

In [26]:
from pyspark.sql.functions import *

In [28]:
continent = drinks.select("continent").distinct().collect()
# print(type(drinks.select("continent").distinct().collect()))
continents = sorted([x[0] for x in continent])
print(continents)

['AF', 'AS', 'EU', 'NA', 'OC', 'SA']


In [30]:
for each in continents:
    print(each)
    drinks.filter(drinks.continent.isin(each)).describe().select("summary","wine_servings").show()

AF
+-------+------------------+
|summary|     wine_servings|
+-------+------------------+
|  count|                53|
|   mean|16.264150943396228|
| stddev| 38.84641897335842|
|    min|                 0|
|    max|               233|
+-------+------------------+

AS
+-------+------------------+
|summary|     wine_servings|
+-------+------------------+
|  count|                44|
|   mean| 9.068181818181818|
| stddev|21.667033931944484|
|    min|                 0|
|    max|               123|
+-------+------------------+

EU
+-------+------------------+
|summary|     wine_servings|
+-------+------------------+
|  count|                45|
|   mean|142.22222222222223|
| stddev| 97.42173756146497|
|    min|                 0|
|    max|               370|
+-------+------------------+

NA
+-------+------------------+
|summary|     wine_servings|
+-------+------------------+
|  count|                23|
|   mean| 24.52173913043478|
| stddev|28.266378301658847|
|    min|                 1|

### Step 6. Print the mean alcohol consumption per continent for every column

In [33]:
drinks.groupBy("continent").mean().show()

+---------+------------------+--------------------+------------------+---------------------------------+
|continent|avg(beer_servings)|avg(spirit_servings)|avg(wine_servings)|avg(total_litres_of_pure_alcohol)|
+---------+------------------+--------------------+------------------+---------------------------------+
|       NA|145.43478260869566|   165.7391304347826| 24.52173913043478|                5.995652173913044|
|       SA|175.08333333333334|              114.75|62.416666666666664|                6.308333333333334|
|       AS| 37.04545454545455|   60.84090909090909| 9.068181818181818|               2.1704545454545454|
|       OC|           89.6875|             58.4375|            35.625|               3.3812500000000005|
|       EU|193.77777777777777|  132.55555555555554|142.22222222222223|                8.617777777777777|
|       AF|61.471698113207545|  16.339622641509433|16.264150943396228|                 3.00754716981132|
+---------+------------------+--------------------+----

### Step 7. Print the median alcohol consumption per continent for every column

In [36]:
drinks.columns

['country',
 'beer_servings',
 'spirit_servings',
 'wine_servings',
 'total_litres_of_pure_alcohol',
 'continent']

In [38]:
# df.approxQuantile("x", [0.5], 0.25)
for each in continents:
    print(each)
    print(['beer_servings', 'spirit_servings', 'wine_servings', 'total_litres_of_pure_alcohol'])
    print(drinks.filter(drinks.continent.isin(each)).approxQuantile(['beer_servings', 'spirit_servings', 'wine_servings', 'total_litres_of_pure_alcohol'], [0.5],0.001))

AF
['beer_servings', 'spirit_servings', 'wine_servings', 'total_litres_of_pure_alcohol']
[[32.0], [3.0], [2.0], [2.3]]
AS
['beer_servings', 'spirit_servings', 'wine_servings', 'total_litres_of_pure_alcohol']
[[16.0], [16.0], [1.0], [1.0]]
EU
['beer_servings', 'spirit_servings', 'wine_servings', 'total_litres_of_pure_alcohol']
[[219.0], [122.0], [128.0], [10.0]]
NA
['beer_servings', 'spirit_servings', 'wine_servings', 'total_litres_of_pure_alcohol']
[[143.0], [137.0], [11.0], [6.3]]
OC
['beer_servings', 'spirit_servings', 'wine_servings', 'total_litres_of_pure_alcohol']
[[49.0], [35.0], [8.0], [1.5]]
SA
['beer_servings', 'spirit_servings', 'wine_servings', 'total_litres_of_pure_alcohol']
[[162.0], [100.0], [8.0], [6.6]]


### Step 8. Print the mean, min and max values for spirit consumption.
#### This time output a DataFrame

In [41]:
drinks.describe().select("summary","spirit_servings").show()

+-------+-----------------+
|summary|  spirit_servings|
+-------+-----------------+
|  count|              193|
|   mean|80.99481865284974|
| stddev|88.28431210968618|
|    min|                0|
|    max|              438|
+-------+-----------------+

