<a href="https://colab.research.google.com/github/mayureshpawashe/ad_spark/blob/main/ad_spark_day2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##Data Aggregation and Joins
Day 2

In [5]:
import urllib.request
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Spark Training').getOrCreate()
url = "https://raw.githubusercontent.com/datasets/population/main/data/population.csv"
file_path = "/tmp/population.csv"
urllib.request.urlretrieve(url, file_path)
df = spark.read.csv(file_path, header=True, inferSchema=True)
df.show(5)


+------------+------------+----+-------+
|Country Name|Country Code|Year|  Value|
+------------+------------+----+-------+
|       Aruba|         ABW|1960|54922.0|
|       Aruba|         ABW|1961|55578.0|
|       Aruba|         ABW|1962|56320.0|
|       Aruba|         ABW|1963|57002.0|
|       Aruba|         ABW|1964|57619.0|
+------------+------------+----+-------+
only showing top 5 rows



In [6]:
# Count of Years
df.select('Year').distinct().groupBy().count().show()

+-----+
|count|
+-----+
|   64|
+-----+



##Spark Filter Data

In [8]:
from pyspark.sql.functions import col
df.filter((col('Country Name') == 'India') & (col('Year').isin('2015','2016'))).show()

+------------+------------+----+-------------+
|Country Name|Country Code|Year|        Value|
+------------+------------+----+-------------+
|       India|         IND|2015|1.328024498E9|
|       India|         IND|2016|1.343944296E9|
+------------+------------+----+-------------+



In [9]:
df.select('Country Name').orderBy('Country Name', ascending=False).distinct().show(truncate=False)

+--------------------------------------------+
|Country Name                                |
+--------------------------------------------+
|South Asia                                  |
|Chad                                        |
|Lower middle income                         |
|Paraguay                                    |
|Low & middle income                         |
|Heavily indebted poor countries (HIPC)      |
|World                                       |
|Congo, Dem. Rep.                            |
|Senegal                                     |
|Cabo Verde                                  |
|Sweden                                      |
|East Asia & Pacific (IDA & IBRD countries)  |
|Kiribati                                    |
|Least developed countries: UN classification|
|Guyana                                      |
|Eritrea                                     |
|Philippines                                 |
|Pacific island small states                 |
|Djibouti    

In [10]:
# Count of Years
df.select('Year').distinct().groupBy().count().show()

+-----+
|count|
+-----+
|   64|
+-----+



##Group and Aggregate Data
Find the Average Value per Country

In [12]:
from pyspark.sql.functions import avg

df.groupBy("Country Name").agg(avg("Value").alias("Avg Value")).show()

+--------------------+-------------------+
|        Country Name|          Avg Value|
+--------------------+-------------------+
|          South Asia|  1.2133345478125E9|
|                Chad|       8040221.1875|
| Lower middle income| 1.83420376521875E9|
|            Paraguay|     4215777.890625|
| Low & middle income|4.258080837546875E9|
|Heavily indebted ...|     4.3425936975E8|
|               World|  5.4587022159375E9|
|    Congo, Dem. Rep.|  4.5710188265625E7|
|             Senegal|     8870310.578125|
|          Cabo Verde|        391508.9375|
|              Sweden|     8753814.609375|
|East Asia & Pacif...|   1.567344636625E9|
|            Kiribati|        81998.15625|
|Least developed c...|     5.9735165275E8|
|              Guyana|        741505.5625|
|             Eritrea|     2083260.234375|
|         Philippines|  6.7997814796875E7|
|Pacific island sm...|      1792425.40625|
|            Djibouti|        573017.6875|
|               Tonga|       96705.234375|
+----------

Find the Maximum Value per Country

In [13]:
from pyspark.sql.functions import max

df.groupBy("Country Name").agg(max("Value").alias("Max Value")).show()


+--------------------+-------------+
|        Country Name|    Max Value|
+--------------------+-------------+
|          South Asia|1.951539835E9|
|                Chad|  1.9319064E7|
| Lower middle income| 3.07977878E9|
|            Paraguay|    6844146.0|
| Low & middle income|6.633109634E9|
|Heavily indebted ...| 9.17304254E8|
|               World|8.061876001E9|
|    Congo, Dem. Rep.| 1.05789731E8|
|             Senegal|  1.8077573E7|
|          Cabo Verde|     522331.0|
|              Sweden|  1.0536632E7|
|East Asia & Pacif...|2.111139152E9|
|            Kiribati|     132530.0|
|Least developed c...|1.161055545E9|
|              Guyana|     826353.0|
|             Eritrea|    3470390.0|
|         Philippines| 1.14891199E8|
|Pacific island sm...|    2689224.0|
|            Djibouti|    1152944.0|
|               Tonga|     107570.0|
+--------------------+-------------+
only showing top 20 rows



##Sort Data

In [14]:
#Sort Countries by Value in Descending Order
df.orderBy(df["Value"].desc()).show()

+----------------+------------+----+--------------+
|    Country Name|Country Code|Year|         Value|
+----------------+------------+----+--------------+
|           World|         WLD|2023| 8.061876001E9|
|           World|         WLD|2022|7.9899815195E9|
|           World|         WLD|2021| 7.921184346E9|
|           World|         WLD|2020|7.8561387895E9|
|           World|         WLD|2019| 7.776892015E9|
|           World|         WLD|2018|7.6964948475E9|
|           World|         WLD|2017|7.6141135515E9|
|           World|         WLD|2016|7.5285233335E9|
|           World|         WLD|2015|7.4414717205E9|
|           World|         WLD|2014|7.3539109255E9|
|           World|         WLD|2013| 7.265314967E9|
|           World|         WLD|2012| 7.175816385E9|
|           World|         WLD|2011|7.0862016635E9|
|           World|         WLD|2010| 7.000671233E9|
|           World|         WLD|2009| 6.916129027E9|
|IDA & IBRD total|         IBT|2023| 6.858957145E9|
|           

##Sort by Year in Ascending Order

In [18]:
df.orderBy(df["Year"].asc()).show()

+--------------------+------------+----+-----------+
|        Country Name|Country Code|Year|      Value|
+--------------------+------------+----+-----------+
|      American Samoa|         ASM|1960|    20133.0|
|   Brunei Darussalam|         BRN|1960|    88347.0|
| Antigua and Barbuda|         ATG|1960|    55603.0|
|             Armenia|         ARM|1960|  1863705.0|
|           Australia|         AUS|1960|1.0276477E7|
|Africa Western an...|         AFW|1960|9.7630925E7|
|             Austria|         AUT|1960|  7047539.0|
|Africa Eastern an...|         AFE|1960|1.3007208E8|
|          Azerbaijan|         AZE|1960|  3894500.0|
|             Andorra|         AND|1960|     9510.0|
|             Burundi|         BDI|1960|  2764258.0|
|United Arab Emirates|         ARE|1960|   131334.0|
|             Belgium|         BEL|1960|  9153489.0|
|            Bulgaria|         BGR|1960|  7867374.0|
|             Bahrain|         BHR|1960|   165477.0|
|             Albania|         ALB|1960|  1608

##Joins
create two sample dataframes


In [19]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("JoinExample").getOrCreate()

# Define sales dataset
sales_data = [
    (1, 101, "Laptop", 2, 1200),
    (2, 102, "Phone", 1, 800),
    (3, 103, "Tablet", 3, 500),
    (4, 104, "Monitor", 2, 300),
    (5, 105, "Keyboard", 5, 100),
    (6, 106, "Mouse", 4, 50),
    (7, 107, "Speaker", 1, 200),
    (8, 108, "Camera", 2, 500),
    (9, 109, "Headset", 3, 150),
    (10, 110, "Charger", 6, 50)
]

sales_columns = ["sale_id", "customer_id", "product", "quantity", "price"]
df_sales = spark.createDataFrame(sales_data, sales_columns)

# Define customers dataset
customers_data = [
    (101, "Alice", "USA", 25, "F"),
    (102, "Bob", "UK", 30, "M"),
    (103, "Charlie", "India", 35, "M"),
    (104, "David", "Canada", 40, "M"),
    (105, "Eva", "Germany", 29, "F"),
    (111, "Frank", "USA", 50, "M"),
    (112, "Grace", "UK", 27, "F"),
    (113, "Henry", "India", 22, "M"),
    (114, "Ivy", "Canada", 45, "F"),
    (115, "Jack", "Germany", 38, "M")
]

customers_columns = ["customer_id", "name", "country", "age", "gender"]
df_customers = spark.createDataFrame(customers_data, customers_columns)


df_sales.show(10)
df_customers.show(10)


+-------+-----------+--------+--------+-----+
|sale_id|customer_id| product|quantity|price|
+-------+-----------+--------+--------+-----+
|      1|        101|  Laptop|       2| 1200|
|      2|        102|   Phone|       1|  800|
|      3|        103|  Tablet|       3|  500|
|      4|        104| Monitor|       2|  300|
|      5|        105|Keyboard|       5|  100|
|      6|        106|   Mouse|       4|   50|
|      7|        107| Speaker|       1|  200|
|      8|        108|  Camera|       2|  500|
|      9|        109| Headset|       3|  150|
|     10|        110| Charger|       6|   50|
+-------+-----------+--------+--------+-----+

+-----------+-------+-------+---+------+
|customer_id|   name|country|age|gender|
+-----------+-------+-------+---+------+
|        101|  Alice|    USA| 25|     F|
|        102|    Bob|     UK| 30|     M|
|        103|Charlie|  India| 35|     M|
|        104|  David| Canada| 40|     M|
|        105|    Eva|Germany| 29|     F|
|        111|  Frank|    US

##let's perform Inner Join, Left Join, Right Join, and Full Outer Join using **customer_id** as the key.

### Inner Join (Only Matching Records)

In [20]:
df_inner = df_sales.join(df_customers, "customer_id", "inner")
df_inner.show(20)

+-----------+-------+--------+--------+-----+-------+-------+---+------+
|customer_id|sale_id| product|quantity|price|   name|country|age|gender|
+-----------+-------+--------+--------+-----+-------+-------+---+------+
|        101|      1|  Laptop|       2| 1200|  Alice|    USA| 25|     F|
|        102|      2|   Phone|       1|  800|    Bob|     UK| 30|     M|
|        103|      3|  Tablet|       3|  500|Charlie|  India| 35|     M|
|        104|      4| Monitor|       2|  300|  David| Canada| 40|     M|
|        105|      5|Keyboard|       5|  100|    Eva|Germany| 29|     F|
+-----------+-------+--------+--------+-----+-------+-------+---+------+



###Left Join (All from Left, Matching from Right)

In [21]:
df_left = df_sales.join(df_customers, "customer_id", "left")
df_left.show(20)

+-----------+-------+--------+--------+-----+-------+-------+----+------+
|customer_id|sale_id| product|quantity|price|   name|country| age|gender|
+-----------+-------+--------+--------+-----+-------+-------+----+------+
|        103|      3|  Tablet|       3|  500|Charlie|  India|  35|     M|
|        104|      4| Monitor|       2|  300|  David| Canada|  40|     M|
|        105|      5|Keyboard|       5|  100|    Eva|Germany|  29|     F|
|        101|      1|  Laptop|       2| 1200|  Alice|    USA|  25|     F|
|        102|      2|   Phone|       1|  800|    Bob|     UK|  30|     M|
|        110|     10| Charger|       6|   50|   NULL|   NULL|NULL|  NULL|
|        107|      7| Speaker|       1|  200|   NULL|   NULL|NULL|  NULL|
|        106|      6|   Mouse|       4|   50|   NULL|   NULL|NULL|  NULL|
|        108|      8|  Camera|       2|  500|   NULL|   NULL|NULL|  NULL|
|        109|      9| Headset|       3|  150|   NULL|   NULL|NULL|  NULL|
+-----------+-------+--------+--------

###Right Join (All from Right, Matching from Left)

In [22]:
df_right = df_sales.join(df_customers, "customer_id", "right")
df_right.show(20)

+-----------+-------+--------+--------+-----+-------+-------+---+------+
|customer_id|sale_id| product|quantity|price|   name|country|age|gender|
+-----------+-------+--------+--------+-----+-------+-------+---+------+
|        103|      3|  Tablet|       3|  500|Charlie|  India| 35|     M|
|        104|      4| Monitor|       2|  300|  David| Canada| 40|     M|
|        105|      5|Keyboard|       5|  100|    Eva|Germany| 29|     F|
|        101|      1|  Laptop|       2| 1200|  Alice|    USA| 25|     F|
|        102|      2|   Phone|       1|  800|    Bob|     UK| 30|     M|
|        112|   NULL|    NULL|    NULL| NULL|  Grace|     UK| 27|     F|
|        113|   NULL|    NULL|    NULL| NULL|  Henry|  India| 22|     M|
|        114|   NULL|    NULL|    NULL| NULL|    Ivy| Canada| 45|     F|
|        115|   NULL|    NULL|    NULL| NULL|   Jack|Germany| 38|     M|
|        111|   NULL|    NULL|    NULL| NULL|  Frank|    USA| 50|     M|
+-----------+-------+--------+--------+-----+------

###Full Outer Join: Returns all rows from both DataFrames.

In [23]:
df_outer = df_sales.join(df_customers, "customer_id", "outer")
df_outer.show(20)

+-----------+-------+--------+--------+-----+-------+-------+----+------+
|customer_id|sale_id| product|quantity|price|   name|country| age|gender|
+-----------+-------+--------+--------+-----+-------+-------+----+------+
|        101|      1|  Laptop|       2| 1200|  Alice|    USA|  25|     F|
|        102|      2|   Phone|       1|  800|    Bob|     UK|  30|     M|
|        103|      3|  Tablet|       3|  500|Charlie|  India|  35|     M|
|        104|      4| Monitor|       2|  300|  David| Canada|  40|     M|
|        105|      5|Keyboard|       5|  100|    Eva|Germany|  29|     F|
|        106|      6|   Mouse|       4|   50|   NULL|   NULL|NULL|  NULL|
|        107|      7| Speaker|       1|  200|   NULL|   NULL|NULL|  NULL|
|        108|      8|  Camera|       2|  500|   NULL|   NULL|NULL|  NULL|
|        109|      9| Headset|       3|  150|   NULL|   NULL|NULL|  NULL|
|        110|     10| Charger|       6|   50|   NULL|   NULL|NULL|  NULL|
|        111|   NULL|    NULL|    NULL

In [25]:
df1 = df.select('Country Name', 'Country Code').distinct()
df1.count()

265

In [30]:
df2 = df.select(col('Country Code').alias('ctry_cd'), 'Value', 'Year').distinct()
df2.count()

16930

In [31]:
# Combine 2 Dataframes
df1 = df.orderBy('Country Code').limit(10)
df2 = df.orderBy('Country Code', ascending=False).limit(10)
df1.union(df2).show()

+------------+------------+----+---------+
|Country Name|Country Code|Year|    Value|
+------------+------------+----+---------+
|       Aruba|         ABW|1960|  54922.0|
|       Aruba|         ABW|1961|  55578.0|
|       Aruba|         ABW|1962|  56320.0|
|       Aruba|         ABW|1963|  57002.0|
|       Aruba|         ABW|1964|  57619.0|
|       Aruba|         ABW|1965|  58190.0|
|       Aruba|         ABW|1966|  58694.0|
|       Aruba|         ABW|1967|  58990.0|
|       Aruba|         ABW|1968|  59069.0|
|       Aruba|         ABW|1969|  59052.0|
|    Zimbabwe|         ZWE|1960|3809389.0|
|    Zimbabwe|         ZWE|1969|5058181.0|
|    Zimbabwe|         ZWE|1961|3930401.0|
|    Zimbabwe|         ZWE|1962|4055959.0|
|    Zimbabwe|         ZWE|1963|4185877.0|
|    Zimbabwe|         ZWE|1964|4320006.0|
|    Zimbabwe|         ZWE|1965|4458462.0|
|    Zimbabwe|         ZWE|1966|4601217.0|
|    Zimbabwe|         ZWE|1967|4748307.0|
|    Zimbabwe|         ZWE|1968|4900440.0|
+----------