In [1]:
# Install pyspark package
%pip install pyspark

from pyspark.sql import SparkSession # type: ignore
from pyspark.sql.functions import count # type: ignore

# Build a SparkSession using SparkSession APIs.
# If one does not exist, create a new one.
spark = (SparkSession
         .builder
         .appName("PythonMnMCount")
         .config("spark.driver.bindAddress", "127.0.0.1")
         .getOrCreate())

#Get the M&M dataset
mnm_file = "data/mnm_dataset.csv"

# Read the file into a Spark DataFrame using the CSV
# by infering the schema and specifying that the
# file contains a header.
mnm_df = (spark.read.format("csv")
          .option("header", "true")
          .option("inferSchema", "true")
          .load(mnm_file))

Note: you may need to restart the kernel to use updated packages.


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/19 19:19:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# 1. Select from the DataFrame the fields State, Color, and Count.
# 2. Group by State and Color.
# 3. Aggregate counts of all colors and groupBy() State and Color.
# 4. orderBy() in descending order.
count_mnm_df = (mnm_df
                .select("State", "Color", "Count")
                .groupBy("State", "Color")
                .agg(count("Count").alias("Total"))
                .orderBy("Total", ascending=False))

# Show the resulting aggregations for all the states and colors.
# Note: show() is an action, which will trigger the above querey to be executed.
count_mnm_df.show(n=60, truncate=False)
print("Total Rows = %d" % (count_mnm_df.count()))

+-----+------+-----+
|State|Color |Total|
+-----+------+-----+
|CA   |Yellow|1807 |
|WA   |Green |1779 |
|OR   |Orange|1743 |
|TX   |Green |1737 |
|TX   |Red   |1725 |
|CA   |Green |1723 |
|CO   |Yellow|1721 |
|CA   |Brown |1718 |
|CO   |Green |1713 |
|NV   |Orange|1712 |
|TX   |Yellow|1703 |
|NV   |Green |1698 |
|AZ   |Brown |1698 |
|WY   |Green |1695 |
|CO   |Blue  |1695 |
|NM   |Red   |1690 |
|AZ   |Orange|1689 |
|NM   |Yellow|1688 |
|NM   |Brown |1687 |
|UT   |Orange|1684 |
|NM   |Green |1682 |
|UT   |Red   |1680 |
|AZ   |Green |1676 |
|NV   |Yellow|1675 |
|NV   |Blue  |1673 |
|WA   |Red   |1671 |
|WY   |Red   |1670 |
|WA   |Brown |1669 |
|NM   |Orange|1665 |
|WY   |Blue  |1664 |
|WA   |Yellow|1663 |
|WA   |Orange|1658 |
|NV   |Brown |1657 |
|CA   |Orange|1657 |
|CA   |Red   |1656 |
|CO   |Brown |1656 |
|UT   |Blue  |1655 |
|AZ   |Yellow|1654 |
|TX   |Orange|1652 |
|AZ   |Red   |1648 |
|OR   |Blue  |1646 |
|UT   |Yellow|1645 |
|OR   |Red   |1645 |
|CO   |Orange|1642 |
|TX   |Brown 

In [3]:
# 1. Select from all rows in the DataFrame
# 2. Filter only where State is CA.
# 3. groupBy() State and Color.
# 4. Aggregate counts for each color.
# 5. orderBy() in descending order.
# Find the aggregated for California by filtering
ca_count_mnm_df = (mnm_df
                   .select("State", "Color", "Count")
                   .where(mnm_df.State == "CA")
                   .groupBy("State", "Color")
                   .agg(count("Count").alias("Total"))
                   .orderBy("Total", ascending=False))

# Show the resulting aggregations for CA.
# Note: show() is an action, which will trigger the above querey.
ca_count_mnm_df.show(n=10, truncate=False)

+-----+------+-----+
|State|Color |Total|
+-----+------+-----+
|CA   |Yellow|1807 |
|CA   |Green |1723 |
|CA   |Brown |1718 |
|CA   |Orange|1657 |
|CA   |Red   |1656 |
|CA   |Blue  |1603 |
+-----+------+-----+



In [4]:
# Stop the SparkSession.
spark.stop()